客户端改造:将spring项目修改成springboot项目,使用自定义注解WzxAutowired对服务端订单接口进行注入,通过spring前置处理器BeanPostProcessor.postProcessBeforeInitialization将所有加了WzxAutowired的注解的bean,注入spring容器,然后针对加了WzxAutowired的注解bean,设置了一个代理,这个代理实现是RemoteInvocationHandler,RemoteInvocationHandler实现远程调用order-service。
服务端改造:使用WzxRemoteService注解标记远程调用的接口,通过该spring后置处理器
BeanPostProcessor.postProcessAfterInitialization将所有加了WzxRemoteService的注解的bean,注入spring容器,并通过Mediator发布到远程公客户端调用
客户端:
@Component
public class AutowiredInvokeProxy implements BeanPostProcessor {
@Autowired
RemoteInvocationHandler invocationHandler;
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
//将所有加了WzxAutowired的注解的bean,注入spring容器
Field[] fields = bean.getClass().getDeclaredFields();
for(Field field : fields){
if(field.isAnnotationPresent(WzxAutowired.class)){
//针对这个加了WzxAutowired注解的字段,设置为一个代理的值
Object proxy = Proxy.newProxyInstance(field.getType().getClassLoader(), new Class<?>[]{field.getType()}, invocationHandler);
try {
//相当于针对加了WzxAutowired的注解,设置了一个代理,这个代理实现是RemoteInvocationHandler
field.set(bean, proxy);
} catch (IllegalAccessException e) {
e.printStackTrace();
}
}
}
return null;
}
}
@Component
public class RemoteInvocationHandler implements InvocationHandler {
@Value("${wzx.host}")
private String host;
@Value("${wzx.port}")
private int port;
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//先建立远程连接
RpcNetTransport rpcNetTransport = new RpcNetTransport(host, port);
//传递数据(调用哪个接口,方法,参数),服务端接收到这些数据可以基于这些数据反射调用
RpcRequest rpcRequest = new RpcRequest();
rpcRequest.setClassName(method.getDeclaringClass().getName());
rpcRequest.setArgs(args);
rpcRequest.setMethodName(method.getName());
rpcRequest.setTypes(method.getParameterTypes());
return rpcNetTransport.send(rpcRequest);
}
}
public class RpcNetTransport {
private String host;
private int port;
public RpcNetTransport(String host, int port) {
this.host = host;
this.port = port;
}
public Socket createSocket() throws IOException {
Socket socket = new Socket(host, port);
return socket;
}
public Object send(RpcRequest request) throws IOException, ClassNotFoundException {
ObjectInputStream inputStream = null;
ObjectOutputStream outputStream = null;
Socket socket = createSocket();
//IO操作
socket.getOutputStream();
outputStream = new ObjectOutputStream(socket.getOutputStream());
outputStream.writeObject(request);
//情况缓冲区
outputStream.flush();
//读取服务端返回数据
inputStream = new ObjectInputStream(socket.getInputStream());
return inputStream.readObject();
}
}
@RestController
public class UserController {
@WzxAutowired
IOrderService orderService;
@GetMapping("/test")
public String test(){
return orderService.queryOrderList();
}
}
@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface WzxAutowired {
}
@SpringBootApplication
@Component("com.wzx.example")
public class UserServiceApplication {
public static void main(String[] args) {
SpringApplication.run(UserServiceApplication.class);
}
}
application.yml
spring:
application:
name: user-service
server:
port: 8080
wzx:
host: localhost
port: 8888
image.png
运行结果:
image.png
服务端:
@Component
public class InitialMediator implements BeanPostProcessor {
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
//加了WzxRemoteService的bean进行远程发布
if(bean.getClass().isAnnotationPresent(WzxRemoteService.class)){
Method[] methods = bean.getClass().getDeclaredMethods();
for(Method method : methods){
String key = bean.getClass().getInterfaces()[0].getName() + "." + method.getName();
BeanMethod beanMethod = new BeanMethod();
beanMethod.setBean(bean);
beanMethod.setMethod(method);
//完成需要发布的bean的存储
Mediator.map.put(key, beanMethod);
}
}
return bean;
}
}
/**
* 定义一个中间类
*/
public class Mediator {
//用来存储所有发布服务的实例,也就是所有加了WzxRemoteService注解的Bean
public static Map<String, BeanMethod> map = new ConcurrentHashMap<String, BeanMethod>();
private volatile static Mediator instance;
private Mediator() {
}
public static Mediator getInstance(){
if(instance == null){
synchronized (Mediator.class){
if(instance == null){
instance = new Mediator();
}
}
}
return instance;
}
public Object processor(RpcRequest request){
String key = request.getClassName() + "." + request.getMethodName();
//beanMethod已经通过后置处理器加载到了map中,直接去map中获取
BeanMethod beanMethod = map.get(key);
if (beanMethod == null){
return null;
}
Object bean = beanMethod.getBean();
Method method = beanMethod.getMethod();
try {
return method.invoke(bean, request.getArgs());
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}
return null;
}
}
public class RpcRequest implements Serializable {
private String className;
private String methodName;
//参数
private Object[] args;
//参数类型
private Class[] types;
public String getClassName() {
return className;
}
public void setClassName(String className) {
this.className = className;
}
public String getMethodName() {
return methodName;
}
public void setMethodName(String methodName) {
this.methodName = methodName;
}
public Object[] getArgs() {
return args;
}
public void setArgs(Object[] args) {
this.args = args;
}
public Class[] getTypes() {
return types;
}
public void setTypes(Class[] types) {
this.types = types;
}
}
public class BeanMethod {
private Object Bean;
private Method method;
public Object getBean() {
return Bean;
}
public void setBean(Object bean) {
Bean = bean;
}
public Method getMethod() {
return method;
}
public void setMethod(Method method) {
this.method = method;
}
}
//spring容器启动完成以后会发布一个ContextRefreshedEvent
@Component
public class SocketServerInitial implements ApplicationListener<ContextRefreshedEvent> {
public final ExecutorService executorService = Executors.newCachedThreadPool();
public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
//启动服务
ServerSocket serverSocket = null;
try {
serverSocket = new ServerSocket(8888);
//不断循环去监听客户端请求
for(;;){
Socket socket = serverSocket.accept();
//通过Processorhandler解决IO阻塞
executorService.execute(new ProcessorHandler(socket));
}
} catch (IOException e) {
e.printStackTrace();
}finally {
try {
serverSocket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
public class ProcessorHandler implements Runnable{
private Socket socket;
public ProcessorHandler(Socket socket) {
this.socket = socket;
}
public Socket getSocket() {
return socket;
}
public void setSocket(Socket socket) {
this.socket = socket;
}
public void run() {
ObjectInputStream inputStream = null;
ObjectOutputStream outputStream = null;
try {
inputStream = new ObjectInputStream(socket.getInputStream());
RpcRequest request = (RpcRequest)inputStream.readObject();
//路由
Mediator mediator = Mediator.getInstance();
Object rs = mediator.processor(request);
System.out.println("服务端处理的结果:" + rs);
//结果写回去
outputStream = new ObjectOutputStream(socket.getOutputStream());
outputStream.writeObject(rs);
outputStream.flush();
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if(outputStream != null) {
outputStream.close();
}
if(inputStream != null) {
inputStream.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface WzxRemoteService {
}
@Configuration
@ComponentScan(value = "com.wzx.example")
public class Bootstrap {
public static void main(String[] args) {
ApplicationContext applicationContext = new AnnotationConfigApplicationContext(Bootstrap.class);
}
}
//通过这个注解自动发布服务
@WzxRemoteService
public class OrderServiceImpl implements IOrderService {
public String queryOrderInfo(String id) {
return "this is queryOrderInfo";
}
public String queryOrderList() {
return "this is queryOrderList";
}
}
public interface IOrderService {
String queryOrderInfo(String id);
String queryOrderList();
}
image.png
网友评论