美文网首页
使用socket实现RPC远程调用升级版

使用socket实现RPC远程调用升级版

作者: 奋斗的韭菜汪 | 来源:发表于2020-06-22 18:08 被阅读0次

    客户端改造:将spring项目修改成springboot项目,使用自定义注解WzxAutowired对服务端订单接口进行注入,通过spring前置处理器BeanPostProcessor.postProcessBeforeInitialization将所有加了WzxAutowired的注解的bean,注入spring容器,然后针对加了WzxAutowired的注解bean,设置了一个代理,这个代理实现是RemoteInvocationHandler,RemoteInvocationHandler实现远程调用order-service。
    服务端改造:使用WzxRemoteService注解标记远程调用的接口,通过该spring后置处理器
    BeanPostProcessor.postProcessAfterInitialization将所有加了WzxRemoteService的注解的bean,注入spring容器,并通过Mediator发布到远程公客户端调用
    客户端:

    @Component
    public class AutowiredInvokeProxy implements BeanPostProcessor {
        @Autowired
        RemoteInvocationHandler invocationHandler;
        public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
            //将所有加了WzxAutowired的注解的bean,注入spring容器
            Field[] fields = bean.getClass().getDeclaredFields();
            for(Field field : fields){
                if(field.isAnnotationPresent(WzxAutowired.class)){
                    //针对这个加了WzxAutowired注解的字段,设置为一个代理的值
                    Object proxy = Proxy.newProxyInstance(field.getType().getClassLoader(), new Class<?>[]{field.getType()}, invocationHandler);
                    try {
                        //相当于针对加了WzxAutowired的注解,设置了一个代理,这个代理实现是RemoteInvocationHandler
                        field.set(bean, proxy);
                    } catch (IllegalAccessException e) {
                        e.printStackTrace();
                    }
                }
            }
            return null;
        }
    }
    
    @Component
    public class RemoteInvocationHandler implements InvocationHandler {
        @Value("${wzx.host}")
        private String host;
        @Value("${wzx.port}")
        private int port;
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            //先建立远程连接
            RpcNetTransport rpcNetTransport = new RpcNetTransport(host, port);
            //传递数据(调用哪个接口,方法,参数),服务端接收到这些数据可以基于这些数据反射调用
            RpcRequest rpcRequest = new RpcRequest();
            rpcRequest.setClassName(method.getDeclaringClass().getName());
            rpcRequest.setArgs(args);
            rpcRequest.setMethodName(method.getName());
            rpcRequest.setTypes(method.getParameterTypes());
            return rpcNetTransport.send(rpcRequest);
        }
    }
    
    public class RpcNetTransport {
        private String host;
        private int port;
        public RpcNetTransport(String host, int port) {
            this.host = host;
            this.port = port;
        }
        public Socket createSocket() throws IOException {
            Socket socket = new Socket(host, port);
            return socket;
        }
        public Object send(RpcRequest request) throws IOException, ClassNotFoundException {
            ObjectInputStream inputStream = null;
            ObjectOutputStream outputStream = null;
            Socket socket = createSocket();
            //IO操作
            socket.getOutputStream();
            outputStream = new ObjectOutputStream(socket.getOutputStream());
            outputStream.writeObject(request);
            //情况缓冲区
            outputStream.flush();
            //读取服务端返回数据
            inputStream = new ObjectInputStream(socket.getInputStream());
            return inputStream.readObject();
        }
    }
    
    @RestController
    public class UserController {
        @WzxAutowired
        IOrderService orderService;
        @GetMapping("/test")
        public String test(){
           return orderService.queryOrderList();
        }
    }
    
    @Target(ElementType.FIELD)
    @Retention(RetentionPolicy.RUNTIME)
    @Component
    public @interface WzxAutowired {
    }
    
    @SpringBootApplication
    @Component("com.wzx.example")
    public class UserServiceApplication {
        public static void main(String[] args) {
            SpringApplication.run(UserServiceApplication.class);
        }
    }
    

    application.yml

    spring:
      application:
        name: user-service
    server:
      port: 8080
    
    wzx:
      host: localhost
      port: 8888
    
    image.png

    运行结果:


    image.png

    服务端:

    @Component
    public class InitialMediator implements BeanPostProcessor {
    
        public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
            //加了WzxRemoteService的bean进行远程发布
            if(bean.getClass().isAnnotationPresent(WzxRemoteService.class)){
                Method[] methods = bean.getClass().getDeclaredMethods();
                for(Method method : methods){
                    String key = bean.getClass().getInterfaces()[0].getName() + "." + method.getName();
                    BeanMethod beanMethod = new BeanMethod();
                    beanMethod.setBean(bean);
                    beanMethod.setMethod(method);
                    //完成需要发布的bean的存储
                    Mediator.map.put(key, beanMethod);
                }
            }
            return bean;
        }
    }
    
    /**
     * 定义一个中间类
     */
    public class Mediator {
    
        //用来存储所有发布服务的实例,也就是所有加了WzxRemoteService注解的Bean
        public static Map<String, BeanMethod> map = new ConcurrentHashMap<String, BeanMethod>();
        private volatile static Mediator instance;
        private Mediator() {
        }
        public static Mediator getInstance(){
            if(instance == null){
                synchronized (Mediator.class){
                    if(instance == null){
                        instance = new Mediator();
                    }
                }
            }
            return instance;
        }
        public Object processor(RpcRequest request){
            String key  = request.getClassName() + "." + request.getMethodName();
            //beanMethod已经通过后置处理器加载到了map中,直接去map中获取
            BeanMethod beanMethod = map.get(key);
            if (beanMethod == null){
                return null;
            }
            Object bean = beanMethod.getBean();
            Method method = beanMethod.getMethod();
            try {
                return method.invoke(bean, request.getArgs());
            } catch (IllegalAccessException e) {
                e.printStackTrace();
            } catch (InvocationTargetException e) {
                e.printStackTrace();
            }
            return null;
        }
    }
    
    public class RpcRequest implements Serializable {
        private String className;
        private String methodName;
        //参数
        private Object[] args;
        //参数类型
        private Class[] types;
    
        public String getClassName() {
            return className;
        }
        public void setClassName(String className) {
            this.className = className;
        }
        public String getMethodName() {
            return methodName;
        }
        public void setMethodName(String methodName) {
            this.methodName = methodName;
        }
        public Object[] getArgs() {
            return args;
        }
        public void setArgs(Object[] args) {
            this.args = args;
        }
        public Class[] getTypes() {
            return types;
        }
        public void setTypes(Class[] types) {
            this.types = types;
        }
    }
    
    public class BeanMethod {
        private Object Bean;
    
        private Method method;
    
        public Object getBean() {
            return Bean;
        }
    
        public void setBean(Object bean) {
            Bean = bean;
        }
    
        public Method getMethod() {
            return method;
        }
    
        public void setMethod(Method method) {
            this.method = method;
        }
    }
    
    //spring容器启动完成以后会发布一个ContextRefreshedEvent
    @Component
    public class SocketServerInitial implements ApplicationListener<ContextRefreshedEvent> {
        public final ExecutorService executorService = Executors.newCachedThreadPool();
        public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
            //启动服务
            ServerSocket serverSocket = null;
            try {
                serverSocket = new ServerSocket(8888);
                //不断循环去监听客户端请求
                for(;;){
                    Socket socket = serverSocket.accept();
                    //通过Processorhandler解决IO阻塞
                    executorService.execute(new ProcessorHandler(socket));
                }
            } catch (IOException e) {
                e.printStackTrace();
            }finally {
                try {
                    serverSocket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    
    public class ProcessorHandler implements Runnable{
        private Socket socket;
    
        public ProcessorHandler(Socket socket) {
            this.socket = socket;
    
        }
        public Socket getSocket() {
            return socket;
        }
        public void setSocket(Socket socket) {
            this.socket = socket;
        }
        public void run() {
            ObjectInputStream inputStream = null;
            ObjectOutputStream outputStream = null;
            try {
                inputStream = new ObjectInputStream(socket.getInputStream());
                RpcRequest request = (RpcRequest)inputStream.readObject();
                //路由
                Mediator mediator = Mediator.getInstance();
                Object rs = mediator.processor(request);
                System.out.println("服务端处理的结果:" + rs);
                //结果写回去
                outputStream = new ObjectOutputStream(socket.getOutputStream());
                outputStream.writeObject(rs);
                outputStream.flush();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                try {
                    if(outputStream != null) {
                        outputStream.close();
                    }
                    if(inputStream != null) {
                        inputStream.close();
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    
    @Target(ElementType.TYPE)
    @Retention(RetentionPolicy.RUNTIME)
    @Component
    public @interface WzxRemoteService {
    }
    
    @Configuration
    @ComponentScan(value = "com.wzx.example")
    public class Bootstrap {
        public static void main(String[] args) {
            ApplicationContext applicationContext = new AnnotationConfigApplicationContext(Bootstrap.class);
        }
    }
    
    //通过这个注解自动发布服务
    @WzxRemoteService
    public class OrderServiceImpl implements IOrderService {
        public String queryOrderInfo(String id) {
            return "this is queryOrderInfo";
        }
        public String queryOrderList() {
            return "this is queryOrderList";
        }
    }
    
    public interface IOrderService {
        String queryOrderInfo(String id);
        String queryOrderList();
    }
    
    image.png

    相关文章

      网友评论

          本文标题:使用socket实现RPC远程调用升级版

          本文链接:https://www.haomeiwen.com/subject/gslffktx.html