美文网首页
源码阅读笔记:分布式服务框架XXL-RPC(基于1.4.1)to

源码阅读笔记:分布式服务框架XXL-RPC(基于1.4.1)to

作者: 鸡熟了 | 来源:发表于2019-06-17 18:37 被阅读0次

    前言:接上篇,看完了注册中心,该看看RPC框架了——《分布式服务框架XXL-RPC》

    老样子,想看看它自己怎么吹的

    1.1 概述>

    XXL-RPC 是一个分布式服务框架,提供稳定高性能的RPC远程服务调用功能。拥有"高性能、分布式、注册中心、负载均衡、服务治理"等特性。现已开放源代码,开箱即用。>

    1.2 特性>

    • 1、快速接入:接入步骤非常简洁,两分钟即可上手;
    • 2、服务透明:系统完整的封装了底层通信细节,开发时调用远程服务就像调用本地服务,在提供远程调用能力时不损失本地调用的语义简洁性;
    • 3、多调用方案:支持 SYNC、ONEWAY、FUTURE、CALLBACK 等方案;
    • 4、多通讯方案:支持 TCP 和 HTTP 两种通讯方式进行服务调用;其中 TCP 提供可选方案 NETTY 或 MINA ,HTTP 提供可选方案 NETTY_HTTP 或 Jetty;
    • 5、多序列化方案:支持 HESSIAN、HESSIAN1、PROTOSTUFF、KRYO、JACKSON 等方案;
    • 6、负载均衡/软负载:提供丰富的负载均衡策略,包括:轮询、随机、LRU、LFU、一致性HASH等;
    • 7、注册中心:可选组件,支持服务注册并动态发现;可选择不启用,直接指定服务提供方机器地址通讯;选择启用时,内置可选方案:“XXL-REGISTRY 轻量级注册中心”(推荐)、“ZK注册中心”、“Local注册中心”等;
    • 8、服务治理:提供服务治理中心,可在线管理注册的服务信息,如服务锁定、禁用等;
    • 9、服务监控:可在线监控服务调用统计信息以及服务健康状况等(计划中);
    • 10、容错:服务提供方集群注册时,某个服务节点不可用时将会自动摘除,同时消费方将会移除失效节点将流量分发到其余节点,提高系统容错能力。
    • 11、解决1+1问题:传统分布式通讯一般通过nginx或f5做集群服务的流量负载均衡,每次请求在到达目标服务机器之前都需要经过负载均衡机器,即1+1,这将会把流量放大一倍。而XXL-RPC将会从消费方直达服务提供方,每次请求直达目标机器,从而可以避免上述问题;
    • 12、高兼容性:得益于优良的兼容性与模块化设计,不限制外部框架;除 spring/springboot 环境之外,理论上支持运行在任何Java代码中,甚至main方法直接启动运行;
    • 13、泛化调用:服务调用方不依赖服务方提供的API;

    还是老套路,直接代码下下来,跑个demo看看(ps:注册中心它自己推荐的XXL-REGISTRY)
    先看看provider的代码。provider提供了一个简单的sayHi方法,代码如下。

    @XxlRpcService
    @Service
    public class DemoServiceImpl implements DemoService {
        private static Logger logger = LoggerFactory.getLogger(DemoServiceImpl.class);
    
        @Override
        public UserDTO sayHi(String name) {
    
            String word = MessageFormat.format("Hi {0}, from {1} as {2}",
                    name, DemoServiceImpl.class.getName(), String.valueOf(System.currentTimeMillis()));
    
            if ("error".equalsIgnoreCase(name)) throw new RuntimeException("test exception.");
    
            UserDTO userDTO = new UserDTO(name, word);
            logger.info(userDTO.toString());
    
            return userDTO;
        }
    
    }
    

    结着按照说明稍微配置一下注册中心,provider走你!
    成功启动后可以在注册中心看到刚刚启动的provider


    注册中心

    点开编辑看看


    编辑

    很好,看起来非常美好,注册key就是接口,注册信息就是provider的url,看起来非常ok。
    那么我们再把consumer跑起来看看,配置同一个注册中心,走你!
    控制台表示成功运行起来了。那么我们看看怎么测试一下远程调用。
    翻一翻consumer代码,看到一个controller,然后在controller里面会调用远程方法。

    @Controller
    public class IndexController {
        
        @XxlRpcReference
        private DemoService demoService;
    
    
        @RequestMapping("")
        @ResponseBody
        public UserDTO http(String name) {
    
            try {
                return demoService.sayHi(name);
            } catch (Exception e) {
                e.printStackTrace();
                return new UserDTO(null, e.getMessage());
            }
        }
    
    }
    

    ok,让我们打开浏览器试一试。和预期的一样,得到了provider的response


    浏览器直接调用

    这样一个最简单的远程调用就完成了。
    那么接下来,就该看看这些功能是怎么实现的了。先帖个框架自己对rpc的描述

    4.4 RPC工作原理剖析

    rpc原理

    概念
    1、serialization:序列化,通讯数据需要经过序列化,从而支持在网络中传输;
    2、deserialization:反序列化,服务接受到序列化的请求数据,需要序列化为底层原始数据;
    3、stub:体现在XXL-RPC为服务的api接口;
    4、skeleton:体现在XXL-RPC为服务的实现api接口的具体服务;
    5、proxy:根据远程服务的stub生成的代理服务,对开发人员透明;
    6、provider:远程服务的提供方;
    7、consumer:远程服务的消费方;

    RPC通讯,可大致划分为四个步骤,可参考上图进行理解:(XXL-RPC提供了多种调用方案,此处以 “SYNC” 方案为例讲解;)>
    1、consumer发起请求:consumer会根据远程服务的stub实例化远程服务的代理服务,在发起请求时,代理服务会封装本次请求相关底层数据,如服务iface、methos、params等等,然后将数据经过serialization之后发送给provider;
    2、provider接收请求:provider接收到请求数据,首先会deserialization获取原始请求数据,然后根据stub匹配目标服务并调用;
    3、provider响应请求:provider在调用目标服务后,封装服务返回数据并进行serialization,然后把数据传输给consumer;
    4、consumer接收响应:consumer接受到相应数据后,首先会deserialization获取原始数据,然后根据stub生成调用返回结果,返回给请求调用处。结束。

    其实已经讲得比较清楚,在官方提供的demo里,

    1. consumer调用sayHi方法
    2. 通过注册中心找到provider
    3. 代理类封装请求并序列化后发送给provider
    4. provider反序列化数据,发现调用的是sayHi方法
    5. 把调用结果序列化返回给consumer
    6. consumer反序列化返回结果

    接下来回到代码本身,看看这一系列过程是怎么实现的。
    先从provider的demo入手吧。
    先看看配置,只配置了一个XxlRpcSpringProviderFactorybean
    从配置代码来看,配置了provider的端口,注册中心的类型(xxl-registry or zookeeper or local,这里是xxl-registry) ,已经注册中心的一些参数(这里是对应注册中心xxl-registry需要的配置:注册中心地址,环境,token)

    @Configuration
    public class XxlRpcProviderConfig {
        private Logger logger = LoggerFactory.getLogger(XxlRpcProviderConfig.class);
    
        @Value("${xxl-rpc.remoting.port}")
        private int port;
    
        @Value("${xxl-rpc.registry.xxlregistry.address}")
        private String address;
    
        @Value("${xxl-rpc.registry.xxlregistry.env}")
        private String env;
    
        @Value("${xxl-rpc.registry.xxlregistry.token}")
        private String token;
    
        @Bean
        public XxlRpcSpringProviderFactory xxlRpcSpringProviderFactory() {
    
            XxlRpcSpringProviderFactory providerFactory = new XxlRpcSpringProviderFactory();
            providerFactory.setPort(port);
            providerFactory.setServiceRegistryClass(XxlRegistryServiceRegistry.class);
            providerFactory.setServiceRegistryParam(new HashMap<String, String>(){{
                put(XxlRegistryServiceRegistry.XXL_REGISTRY_ADDRESS, address);
                put(XxlRegistryServiceRegistry.ENV, env);
                put(XxlRegistryServiceRegistry.ACCESS_TOKEN,token);
            }});
    
            logger.info(">>>>>>>>>>> xxl-rpc provider config init finish.");
            return providerFactory;
        }
    
    }
    

    ok,那我们在看看XxlRpcSpringProviderFactory有什么花头。
    实现了3个接口
    implements ApplicationContextAware, InitializingBean,DisposableBean
    ,并继承自XxlRpcProviderFactory

    ps:这几个接口都是一些spring bean的一些扩展,详细的可以自行搜索, 下面给出一些简单的描述

    InitialingBean是一个接口,提供了一个唯一的方法afterPropertiesSet()
    DisposableBean也是一个接口,提供了一个唯一的方法destory()
    前者顾名思义在Bean属性都设置完毕后调用afterPropertiesSet()方法做一些初始化的工作,后者在Bean生命周期结束前调用destory()方法做一些收尾工作

    实现ApplicationContextAware接口的Bean,在Bean加载的过程中可以获取到Spring的ApplicationContext,这个尤其重要,ApplicationContext是Spring应用上下文,从ApplicationContext中可以获取包括任意的Bean在内的大量Spring容器内容和信息

    public class XxlRpcSpringProviderFactory extends XxlRpcProviderFactory implements ApplicationContextAware, InitializingBean,DisposableBean {
    

    这里比较好理解,因为XxlRpcSpringProviderFactory是针对spring的客户端,所需需要额外实现几个接口,主要的逻辑是在它的父类XxlRpcProviderFactory里面。
    先看看第一段

    // ---------------------- config ----------------------
    

    里面的内容
    netType定义了网络通信协议(NETTY,NETTY_HTTP,MINA,JETTY)
    Serializer定义了序列化方式
    ip,port,accessToken还不知道干嘛,留着
    serviceRegistryClassserviceRegistryParam定义注册中心类和参数

        private NetEnum netType;
        private Serializer serializer;
    
        private String ip;                  // for registry
        private int port;                   // default port
        private String accessToken;
    
        private Class<? extends ServiceRegistry> serviceRegistryClass;
        private Map<String, String> serviceRegistryParam;
    

    再往下看,这些属性的设置全是在initConfig方法中设值的。
    通过这段代码,就可以知道,ip其实是给consumer使用的本地ip(会注册到注册中心的ip),port其实就是用来和consumer通信的端口号(比如刚刚demo里面的7080)

    public void initConfig(NetEnum netType,
                              Serializer serializer,
                              String ip,
                              int port,
                              String accessToken,
                               Class<? extends ServiceRegistry> serviceRegistryClass,
                              Map<String, String> serviceRegistryParam) {
    
            // init
            this.netType = netType;
            this.serializer = serializer;
            this.ip = ip;
            this.port = port;
            this.accessToken = accessToken;
            this.serviceRegistryClass = serviceRegistryClass;
            this.serviceRegistryParam = serviceRegistryParam;
    
            // valid
            if (this.netType==null) {
                throw new XxlRpcException("xxl-rpc provider netType missing.");
            }
            if (this.serializer==null) {
                throw new XxlRpcException("xxl-rpc provider serializer missing.");
            }
            if (this.ip == null) {
                this.ip = IpUtil.getIp();
            }
            if (this.port <= 0) {
                this.port = 7080;
            }
            if (NetUtil.isPortUsed(this.port)) {
                throw new XxlRpcException("xxl-rpc provider port["+ this.port +"] is used.");
            }
            if (this.serviceRegistryClass != null) {
                if (this.serviceRegistryParam == null) {
                    throw new XxlRpcException("xxl-rpc provider serviceRegistryParam is missing.");
                }
            }
    
        }
    

    这段代码其实就是一些基础参数的config。比如用什么通信协议,开房什么端口,用什么注册中心等等。
    再接着往下看

    // ---------------------- start / stop ----------------------
    

    看看start和stop代码有什么

        private Server server;
        private ServiceRegistry serviceRegistry;
        private String serviceAddress;
    

    先往下看,定义了start和stop方法,仔细一看是针对server的startstop,server是netType的一个instance。
    那么就比较好理解了,server就是负责通信的实例(demo里是netty)。
    首先拿到server的实例,然后设置了setStartedCallbacksetStopedCallback,并调用了start方法。

    public void start() throws Exception {
            // start server
            serviceAddress = IpUtil.getIpPort(this.ip, port);
            server = netType.serverClass.newInstance();
            server.setStartedCallback(new BaseCallback() {      // serviceRegistry started
                @Override
                public void run() throws Exception {
                    // start registry
                    if (serviceRegistryClass != null) {
                        serviceRegistry = serviceRegistryClass.newInstance();
                        serviceRegistry.start(serviceRegistryParam);
                        if (serviceData.size() > 0) {
                            serviceRegistry.registry(serviceData.keySet(), serviceAddress);
                        }
                    }
                }
            });
            server.setStopedCallback(new BaseCallback() {       // serviceRegistry stoped
                @Override
                public void run() {
                    // stop registry
                    if (serviceRegistry != null) {
                        if (serviceData.size() > 0) {
                            serviceRegistry.remove(serviceData.keySet(), serviceAddress);
                        }
                        serviceRegistry.stop();
                        serviceRegistry = null;
                    }
                }
            });
            server.start(this);
        }
    
        public void  stop() throws Exception {
            // stop server
            server.stop();
        }
    

    那我们再深入看看server这些callbackstart方法都干了什么吧。
    server是一个抽象类(nettyServer会继承这个类),定义了BaseCallback类型的startedCallbackstopedCallback,根据名字猜测是通信server调用start后,会调用startedCallback.run方法,server调用stop之后会调用stopedCallback.run方法。好像比较抽象,毕竟是抽象类。

    public abstract class Server {
        protected static final Logger logger = LoggerFactory.getLogger(Server.class);
    
    
        private BaseCallback startedCallback;
        private BaseCallback stopedCallback;
    
        public void setStartedCallback(BaseCallback startedCallback) {
            this.startedCallback = startedCallback;
        }
    
        public void setStopedCallback(BaseCallback stopedCallback) {
            this.stopedCallback = stopedCallback;
        }
    
    
        /**
         * start server
         *
         * @param xxlRpcProviderFactory
         * @throws Exception
         */
        public abstract void start(final XxlRpcProviderFactory xxlRpcProviderFactory) throws Exception;
    
        /**
         * callback when started
         */
        public void onStarted() {
            if (startedCallback != null) {
                try {
                    startedCallback.run();
                } catch (Exception e) {
                    logger.error(">>>>>>>>>>> xxl-rpc, server startedCallback error.", e);
                }
            }
        }
    
        /**
         * stop server
         *
         * @throws Exception
         */
        public abstract void stop() throws Exception;
    
        /**
         * callback when stoped
         */
        public void onStoped() {
            if (stopedCallback != null) {
                try {
                    stopedCallback.run();
                } catch (Exception e) {
                    logger.error(">>>>>>>>>>> xxl-rpc, server stopedCallback error.", e);
                }
            }
        }
    
    }
    

    那我们直接进入继承他的NettyServer看看,这样应该就比较清晰了。
    start方法里面直接开了一个守护线程,线程做的事情非常简单:配置并开启netty服务,并调用onStarted方法
    stop方法更简单,直接interrupt,并调用onStoped方法。

    public class NettyServer extends Server {
    
        private Thread thread;
    
        @Override
        public void start(final XxlRpcProviderFactory xxlRpcProviderFactory) throws Exception {
    
            thread = new Thread(new Runnable() {
                @Override
                public void run() {
    
                    // param
                    final ThreadPoolExecutor serverHandlerPool = ThreadPoolUtil.makeServerThreadPool(NettyServer.class.getSimpleName());
                    EventLoopGroup bossGroup = new NioEventLoopGroup();
                    EventLoopGroup workerGroup = new NioEventLoopGroup();
    
                    try {
                        // start server
                        ServerBootstrap bootstrap = new ServerBootstrap();
                        bootstrap.group(bossGroup, workerGroup)
                                .channel(NioServerSocketChannel.class)
                                .childHandler(new ChannelInitializer<SocketChannel>() {
                                    @Override
                                    public void initChannel(SocketChannel channel) throws Exception {
                                        channel.pipeline()
                                                .addLast(new IdleStateHandler(0,0,10, TimeUnit.MINUTES))
                                                .addLast(new NettyDecoder(XxlRpcRequest.class, xxlRpcProviderFactory.getSerializer()))
                                                .addLast(new NettyEncoder(XxlRpcResponse.class, xxlRpcProviderFactory.getSerializer()))
                                                .addLast(new NettyServerHandler(xxlRpcProviderFactory, serverHandlerPool));
                                    }
                                })
                                .childOption(ChannelOption.TCP_NODELAY, true)
                                .childOption(ChannelOption.SO_KEEPALIVE, true);
    
                        // bind
                        ChannelFuture future = bootstrap.bind(xxlRpcProviderFactory.getPort()).sync();
    
                        logger.info(">>>>>>>>>>> xxl-rpc remoting server start success, nettype = {}, port = {}", NettyServer.class.getName(), xxlRpcProviderFactory.getPort());
                        onStarted();
    
                        // wait util stop
                        future.channel().closeFuture().sync();
    
                    } catch (Exception e) {
                        if (e instanceof InterruptedException) {
                            logger.info(">>>>>>>>>>> xxl-rpc remoting server stop.");
                        } else {
                            logger.error(">>>>>>>>>>> xxl-rpc remoting server error.", e);
                        }
                    } finally {
    
                        // stop
                        try {
                            serverHandlerPool.shutdown();    // shutdownNow
                        } catch (Exception e) {
                            logger.error(e.getMessage(), e);
                        }
                        try {
                            workerGroup.shutdownGracefully();
                            bossGroup.shutdownGracefully();
                        } catch (Exception e) {
                            logger.error(e.getMessage(), e);
                        }
    
                    }
                }
            });
            thread.setDaemon(true);
            thread.start();
    
        }
    
        @Override
        public void stop() throws Exception {
    
            // destroy server thread
            if (thread != null && thread.isAlive()) {
                thread.interrupt();
            }
    
            // on stop
            onStoped();
            logger.info(">>>>>>>>>>> xxl-rpc remoting server destroy success.");
        }
    
    }
    

    让我们再回到刚才的代码,startedCallback方法就是在netty服务启动完成之后,把provider的信息注册到注册中心

    ServiceRegistry是个注册中心的抽象,demo里面用的是XxlRegistryServiceRegistry,其实就是对注册中心操作的一些封装,代码非常简单,不懂可以参考上一篇源码阅读:分布式服务注册中心XXL-REGISTRY(基于1.0.2)

    server.setStartedCallback(new BaseCallback() {      // serviceRegistry started
                @Override
                public void run() throws Exception {
                    // start registry
                    if (serviceRegistryClass != null) {
                        serviceRegistry = serviceRegistryClass.newInstance();
                        serviceRegistry.start(serviceRegistryParam);
                        if (serviceData.size() > 0) {
                            serviceRegistry.registry(serviceData.keySet(), serviceAddress);
                        }
                    }
                }
            });
    

    stopedCallback也比较好理解了,就是在netty服务关闭之后,从注册中心移除自己。

    server.setStopedCallback(new BaseCallback() {       // serviceRegistry stoped
                @Override
                public void run() {
                    // stop registry
                    if (serviceRegistry != null) {
                        if (serviceData.size() > 0) {
                            serviceRegistry.remove(serviceData.keySet(), serviceAddress);
                        }
                        serviceRegistry.stop();
                        serviceRegistry = null;
                    }
                }
            });
    

    最后再看看server invoke里面有什么吧
    看起来像是用来记rpc service的,先不管他,接着往下看

    // ---------------------- server invoke ----------------------
    /**
         * init local rpc service map
         */
        private Map<String, Object> serviceData = new HashMap<String, Object>();
        public Map<String, Object> getServiceData() {
            return serviceData;
        }
    

    好像就是字符串的拼接,不知道干嘛的,先往下看

        /**
         * make service key
         *
         * @param iface
         * @param version
         * @return
         */
        public static String makeServiceKey(String iface, String version){
            String serviceKey = iface;
            if (version!=null && version.trim().length()>0) {
                serviceKey += "#".concat(version);
            }
            return serviceKey;
        }
    

    addService用到了makeServiceKey,看来这个是用来做唯一主键的。
    根据名字推测,应该是往前面的serviceData里面把serviceBean放进去。

    /**
         * add service
         *
         * @param iface
         * @param version
         * @param serviceBean
         */
        public void addService(String iface, String version, Object serviceBean){
            String serviceKey = makeServiceKey(iface, version);
            serviceData.put(serviceKey, serviceBean);
    
            logger.info(">>>>>>>>>>> xxl-rpc, provider factory add service success. serviceKey = {}, serviceBean = {}", serviceKey, serviceBean.getClass());
        }
    

    通过IDE看看哪里用了addService方法
    发现在刚才的XxlRpcSpringProviderFactory就有用到!
    看下代码,其实很简单:

    • 从spring上下文找到加了XxlRpcService注解的bean
    • 接口名+版本号作为唯一主键把bean放入serviceData里面
    @Override
        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
    
            Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(XxlRpcService.class);
            if (serviceBeanMap!=null && serviceBeanMap.size()>0) {
                for (Object serviceBean : serviceBeanMap.values()) {
                    // valid
                    if (serviceBean.getClass().getInterfaces().length ==0) {
                        throw new XxlRpcException("xxl-rpc, service(XxlRpcService) must inherit interface.");
                    }
                    // add service
                    XxlRpcService xxlRpcService = serviceBean.getClass().getAnnotation(XxlRpcService.class);
    
                    String iface = serviceBean.getClass().getInterfaces()[0].getName();
                    String version = xxlRpcService.version();
    
                    super.addService(iface, version, serviceBean);
                }
            }
    
            // TODO,addServices by api + prop
    
        }
    

    最后一个方法,从方法名就能看出来,调用service,接受一个xxlRpcRequest参数
    serviceData里面取出request里面要调用的bean
    通过反射调用方法并返回response

    /**
         * invoke service
         *
         * @param xxlRpcRequest
         * @return
         */
        public XxlRpcResponse invokeService(XxlRpcRequest xxlRpcRequest) {
    
            //  make response
            XxlRpcResponse xxlRpcResponse = new XxlRpcResponse();
            xxlRpcResponse.setRequestId(xxlRpcRequest.getRequestId());
    
            // match service bean
            String serviceKey = makeServiceKey(xxlRpcRequest.getClassName(), xxlRpcRequest.getVersion());
            Object serviceBean = serviceData.get(serviceKey);
    
            // valid
            if (serviceBean == null) {
                xxlRpcResponse.setErrorMsg("The serviceKey["+ serviceKey +"] not found.");
                return xxlRpcResponse;
            }
    
            if (System.currentTimeMillis() - xxlRpcRequest.getCreateMillisTime() > 3*60*1000) {
                xxlRpcResponse.setErrorMsg("The timestamp difference between admin and executor exceeds the limit.");
                return xxlRpcResponse;
            }
            if (accessToken!=null && accessToken.trim().length()>0 && !accessToken.trim().equals(xxlRpcRequest.getAccessToken())) {
                xxlRpcResponse.setErrorMsg("The access token[" + xxlRpcRequest.getAccessToken() + "] is wrong.");
                return xxlRpcResponse;
            }
    
            try {
                // invoke
                Class<?> serviceClass = serviceBean.getClass();
                String methodName = xxlRpcRequest.getMethodName();
                Class<?>[] parameterTypes = xxlRpcRequest.getParameterTypes();
                Object[] parameters = xxlRpcRequest.getParameters();
    
                Method method = serviceClass.getMethod(methodName, parameterTypes);
                method.setAccessible(true);
                Object result = method.invoke(serviceBean, parameters);
    
                /*FastClass serviceFastClass = FastClass.create(serviceClass);
                FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes);
                Object result = serviceFastMethod.invoke(serviceBean, parameters);*/
    
                xxlRpcResponse.setResult(result);
            } catch (Throwable t) {
                // catch error
                logger.error("xxl-rpc provider invokeService error.", t);
                xxlRpcResponse.setErrorMsg(ThrowableUtil.toString(t));
            }
    
            return xxlRpcResponse;
        }
    

    ok,到这里为止,XxlRpcProviderFactory的代码看完了,来总结一下它究竟能干什么事情

    • 配置通信协议,序列化方式,注册中心
    • 开启通信server
    • serviceData里所有的provider服务注册到注册中心
    • 通过反射机制,提供调用服务的(invokeService)方法
      看完了XxlRpcProviderFactory,我们再回到XxlRpcSpringProviderFactory
      与父类不同的是,提供了netType默认使用netty,序列化默认使用hessian
        // ---------------------- config ----------------------
    
        private String netType = NetEnum.NETTY.name();
        private String serialize = Serializer.SerializeEnum.HESSIAN.name();
    

    再看看必须实现的几个接口
    这个方法前面已经看到过,这里是把所有带有XxlRpcService注解的bean放到serverData这个map里面

    @Override
        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
    
            Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(XxlRpcService.class);
            if (serviceBeanMap!=null && serviceBeanMap.size()>0) {
                for (Object serviceBean : serviceBeanMap.values()) {
                    // valid
                    if (serviceBean.getClass().getInterfaces().length ==0) {
                        throw new XxlRpcException("xxl-rpc, service(XxlRpcService) must inherit interface.");
                    }
                    // add service
                    XxlRpcService xxlRpcService = serviceBean.getClass().getAnnotation(XxlRpcService.class);
    
                    String iface = serviceBean.getClass().getInterfaces()[0].getName();
                    String version = xxlRpcService.version();
    
                    super.addService(iface, version, serviceBean);
                }
            }
    
            // TODO,addServices by api + prop
    
        }
    

    最后两个方法,很简单,就是配置一下基本参数,以及调用父类的方法。

        @Override
        public void afterPropertiesSet() throws Exception {
            this.prepareConfig();
            super.start();
        }
    
        @Override
        public void destroy() throws Exception {
            super.stop();
        }
    

    这样一来,XxlRpcSpringProviderFactory就全部阅读完了。我们重新梳理一遍流程XxlRpcInvokerConfig干的事情

    1. 配置通信框架(netty),序列化框架(hessian),注册中心(xxl-registry)
    2. 把使用了XxlRpcService注解的bean全部put到Map<String,Object> serviceData里面,key为bean继承的接口名+版本号,Object为service的bean本身
    3. 启动通信框架(netty),启动成功后把serviceData里面的bean注册到注册中心
      这样,provider就已经完全启动完成了,一切准备就绪,就等客户端调用了!
      那么,我们再看看客户端的代码!
      老套路,从配置开始看起,和provider的配置差不多,就不在赘述
    @Configuration
    public class XxlRpcInvokerConfig {
        private Logger logger = LoggerFactory.getLogger(XxlRpcInvokerConfig.class);
    
    
        @Value("${xxl-rpc.registry.xxlregistry.address}")
        private String address;
    
        @Value("${xxl-rpc.registry.xxlregistry.env}")
        private String env;
    
        @Value("${xxl-rpc.registry.xxlregistry.token}")
        private String token;
    
    
        @Bean
        public XxlRpcSpringInvokerFactory xxlJobExecutor() {
    
            XxlRpcSpringInvokerFactory invokerFactory = new XxlRpcSpringInvokerFactory();
            invokerFactory.setServiceRegistryClass(XxlRegistryServiceRegistry.class);
            invokerFactory.setServiceRegistryParam(new HashMap<String, String>(){{
                put(XxlRegistryServiceRegistry.XXL_REGISTRY_ADDRESS, address);
                put(XxlRegistryServiceRegistry.ENV, env);
                put(XxlRegistryServiceRegistry.ACCESS_TOKEN,token);
            }});
    
            logger.info(">>>>>>>>>>> xxl-rpc invoker config init finish.");
            return invokerFactory;
        }
    
    }
    

    直接去XxlRpcSpringInvokerFactory里面看看吧,InitializingBean,DisposableBean不再赘述

    实现BeanFactoryAware接口的Bean,在Bean加载的过程中可以获取到加载该Bean的BeanFactory

    InstantiationAwareBeanPostProcessor作用的是Bean实例化前后,即:
    1、Bean构造出来之前调用postProcessBeforeInstantiation()方法
    2、Bean构造出来之后调用postProcessAfterInstantiation()方法

    public class XxlRpcSpringInvokerFactory extends InstantiationAwareBeanPostProcessorAdapter implements InitializingBean,DisposableBean, BeanFactoryAware {
    

    ok,看看具体类里面的代码,先看第一段config相关
    这段代码似曾相识,在ProviderFactory里面也有:注册中心配置

    // ---------------------- config ----------------------
    
        private Class<? extends ServiceRegistry> serviceRegistryClass;          // class.forname
        private Map<String, String> serviceRegistryParam;
    
    
        public void setServiceRegistryClass(Class<? extends ServiceRegistry> serviceRegistryClass) {
            this.serviceRegistryClass = serviceRegistryClass;
        }
    
        public void setServiceRegistryParam(Map<String, String> serviceRegistryParam) {
            this.serviceRegistryParam = serviceRegistryParam;
        }
    

    接着往下看,定义了一个 XxlRpcInvokerFactory

    // ---------------------- util ----------------------
    
        private XxlRpcInvokerFactory xxlRpcInvokerFactory;
    

    进到XxlRpcInvokerFactory 里面看看吧
    首先很明显,这是一个单例模式
    然后也配置了注册中心

    public class XxlRpcInvokerFactory {
        private static Logger logger = LoggerFactory.getLogger(XxlRpcInvokerFactory.class);
    
        // ---------------------- default instance ----------------------
    
        private static volatile XxlRpcInvokerFactory instance = new XxlRpcInvokerFactory(LocalServiceRegistry.class, null);
        public static XxlRpcInvokerFactory getInstance() {
            return instance;
        }
    
    
        // ---------------------- config ----------------------
    
        private Class<? extends ServiceRegistry> serviceRegistryClass;          // class.forname
        private Map<String, String> serviceRegistryParam;
    
    
        public XxlRpcInvokerFactory() {
        }
        public XxlRpcInvokerFactory(Class<? extends ServiceRegistry> serviceRegistryClass, Map<String, String> serviceRegistryParam) {
            this.serviceRegistryClass = serviceRegistryClass;
            this.serviceRegistryParam = serviceRegistryParam;
        }
        // 略
    }
    

    再往下看,似曾相识的代码
    start方法是开始想注册中心注册
    stop方法先从注册中心移除,然后在吧stopCallbackList里面还没执行的方法执行了
    那什么时候调用addStopCallBackstopCallBack加进list呢?用IDE搜一搜,
    发现在JettyClientConnectClient的时候用到了,好像暂时和我们demo的代码没什么关系,先放一放
    最后把responseCallbackThreadPool 线程池shutDown了。

    // ---------------------- start / stop ----------------------
    
        public void start() throws Exception {
            // start registry
            if (serviceRegistryClass != null) {
                serviceRegistry = serviceRegistryClass.newInstance();
                serviceRegistry.start(serviceRegistryParam);
            }
        }
    
        public void  stop() throws Exception {
            // stop registry
            if (serviceRegistry != null) {
                serviceRegistry.stop();
            }
    
            // stop callback
            if (stopCallbackList.size() > 0) {
                for (BaseCallback callback: stopCallbackList) {
                    try {
                        callback.run();
                    } catch (Exception e) {
                        logger.error(e.getMessage(), e);
                    }
                }
            }
    
            // stop CallbackThreadPool
            stopCallbackThreadPool();
        }
    
    // ---------------------- service registry ----------------------
    
        private ServiceRegistry serviceRegistry;
        public ServiceRegistry getServiceRegistry() {
            return serviceRegistry;
        }
    
    
        // ---------------------- service registry ----------------------
    
        private List<BaseCallback> stopCallbackList = new ArrayList<BaseCallback>();
    
        public void addStopCallBack(BaseCallback callback){
            stopCallbackList.add(callback);
        }
    

    responseCallbackThreadPool线程池是用来干什么的?用IDE搜一搜,就在stopCallbackThreadPool上面
    executeResponseCallback接受一个Runnable对象,并初始化线程池,并放入线程池
    那么executeResponseCallback什么时候会被用到?
    // ---------------------- response callback ThreadPool ----------------------
    
        private ThreadPoolExecutor responseCallbackThreadPool = null;
        public void executeResponseCallback(Runnable runnable){
    
            if (responseCallbackThreadPool == null) {
                synchronized (this) {
                    if (responseCallbackThreadPool == null) {
                        responseCallbackThreadPool = new ThreadPoolExecutor(
                                10,
                                100,
                                60L,
                                TimeUnit.SECONDS,
                                new LinkedBlockingQueue<Runnable>(1000),
                                new ThreadFactory() {
                                    @Override
                                    public Thread newThread(Runnable r) {
                                        return new Thread(r, "xxl-rpc, XxlRpcInvokerFactory-responseCallbackThreadPool-" + r.hashCode());
                                    }
                                },
                                new RejectedExecutionHandler() {
                                    @Override
                                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                                        throw new XxlRpcException("xxl-rpc Invoke Callback Thread pool is EXHAUSTED!");
                                    }
                                });     // default maxThreads 300, minThreads 60
                    }
                }
            }
            responseCallbackThreadPool.execute(runnable);
        }
        public void stopCallbackThreadPool() {
            if (responseCallbackThreadPool != null) {
                responseCallbackThreadPool.shutdown();
            }
        }
    

    其实就在上面
    定义了一个concurrentMap,参数是一个string和一个XxlRpcFutureResponse
    根据future这个名字,可以猜一下应该是个future(多线程)操作相关的
    set和remove方法看起来就是对map进行一些关于request的操作
    notifyInvokerFuture方法,从futureResponsePool根据requestId取出一个XxlRpcFutureResponse对象
    然后判断Response的状态,并做一些设置,然后从futureResponsePool移出这个requestId
    到这里,还比较蒙逼,大概只能看出,利用了future,对response做一些处理。
    那么问题来了,response是什么时候生产的?为什么会有一堆callback方法?这样做的目的是什么?
    因为没有看到调用远程服务的代码,看不懂很正常!

     // ---------------------- future-response pool ----------------------
        // XxlRpcFutureResponseFactory
    
        private ConcurrentMap<String, XxlRpcFutureResponse> futureResponsePool = new ConcurrentHashMap<String, XxlRpcFutureResponse>();
        public void setInvokerFuture(String requestId, XxlRpcFutureResponse futureResponse){
            futureResponsePool.put(requestId, futureResponse);
        }
        public void removeInvokerFuture(String requestId){
            futureResponsePool.remove(requestId);
        }
        public void notifyInvokerFuture(String requestId, final XxlRpcResponse xxlRpcResponse){
    
            // get
            final XxlRpcFutureResponse futureResponse = futureResponsePool.get(requestId);
            if (futureResponse == null) {
                return;
            }
    
            // notify
            if (futureResponse.getInvokeCallback()!=null) {
    
                // callback type
                try {
                    executeResponseCallback(new Runnable() {
                        @Override
                        public void run() {
                            if (xxlRpcResponse.getErrorMsg() != null) {
                                futureResponse.getInvokeCallback().onFailure(new XxlRpcException(xxlRpcResponse.getErrorMsg()));
                            } else {
                                futureResponse.getInvokeCallback().onSuccess(xxlRpcResponse.getResult());
                            }
                        }
                    });
                }catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
            } else {
    
                // other nomal type
                futureResponse.setResponse(xxlRpcResponse);
            }
    
            // do remove
            futureResponsePool.remove(requestId);
    
        }
    

    让我们回到XxlRpcSpringInvokerFactory,还剩最后一个方法postProcessAfterInstantiation
    看看都干了些什么吧
    取出加了XxlRpcReference注解的字段(field)
    组装成XxlRpcReferenceBean,并根据名字猜测,通过这个bean得到一个service的代理对象!

    @Override
        public boolean postProcessAfterInstantiation(final Object bean, final String beanName) throws BeansException {
    
            // collection
            final Set<String> serviceKeyList = new HashSet<>();
    
            // parse XxlRpcReferenceBean
            ReflectionUtils.doWithFields(bean.getClass(), new ReflectionUtils.FieldCallback() {
                @Override
                public void doWith(Field field) throws IllegalArgumentException, IllegalAccessException {
                    if (field.isAnnotationPresent(XxlRpcReference.class)) {
                        // valid
                        Class iface = field.getType();
                        if (!iface.isInterface()) {
                            throw new XxlRpcException("xxl-rpc, reference(XxlRpcReference) must be interface.");
                        }
    
                        XxlRpcReference rpcReference = field.getAnnotation(XxlRpcReference.class);
    
                        // init reference bean
                        XxlRpcReferenceBean referenceBean = new XxlRpcReferenceBean(
                                rpcReference.netType(),
                                rpcReference.serializer().getSerializer(),
                                rpcReference.callType(),
                                rpcReference.loadBalance(),
                                iface,
                                rpcReference.version(),
                                rpcReference.timeout(),
                                rpcReference.address(),
                                rpcReference.accessToken(),
                                null,
                                xxlRpcInvokerFactory
                        );
    
                        Object serviceProxy = referenceBean.getObject();
    
                        // set bean
                        field.setAccessible(true);
                        field.set(bean, serviceProxy);
    
                        logger.info(">>>>>>>>>>> xxl-rpc, invoker factory init reference bean success. serviceKey = {}, bean.field = {}.{}",
                                XxlRpcProviderFactory.makeServiceKey(iface.getName(), rpcReference.version()), beanName, field.getName());
    
                        // collection
                        String serviceKey = XxlRpcProviderFactory.makeServiceKey(iface.getName(), rpcReference.version());
                        serviceKeyList.add(serviceKey);
    
                    }
                }
            });
    
            // mult discovery
            if (xxlRpcInvokerFactory.getServiceRegistry() != null) {
                try {
                    xxlRpcInvokerFactory.getServiceRegistry().discovery(serviceKeyList);
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
            }
    
            return super.postProcessAfterInstantiation(bean, beanName);
        }
    

    看看getObject做了什么吧

    • 配置一个动态代理(猜测就是调用远程服务用的)
    • 根据XxlRpcInvokerFactory配置的注册中心,查找provider地址
    • 通过通信框架,把数据发送到provider机器
    • NettyClientHandler获得响应的时候, 会调用futureResponse.setResponse(xxlRpcResponse);把拿到的response放进futureResponse里面
    • 再通过XxlRpcResponse xxlRpcResponse = futureResponse.get(timeout, TimeUnit.MILLISECONDS);拿到response(同步调用)
    // ---------------------- util ----------------------
    
        public Object getObject() {
            return Proxy.newProxyInstance(Thread.currentThread()
                    .getContextClassLoader(), new Class[] { iface },
                    new InvocationHandler() {
                        @Override
                        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    
                            // method param
                            String className = method.getDeclaringClass().getName();    // iface.getName()
                            String varsion_ = version;
                            String methodName = method.getName();
                            Class<?>[] parameterTypes = method.getParameterTypes();
                            Object[] parameters = args;
    
                            // filter for generic
                            if (className.equals(XxlRpcGenericService.class.getName()) && methodName.equals("invoke")) {
    
                                Class<?>[] paramTypes = null;
                                if (args[3]!=null) {
                                    String[] paramTypes_str = (String[]) args[3];
                                    if (paramTypes_str.length > 0) {
                                        paramTypes = new Class[paramTypes_str.length];
                                        for (int i = 0; i < paramTypes_str.length; i++) {
                                            paramTypes[i] = ClassUtil.resolveClass(paramTypes_str[i]);
                                        }
                                    }
                                }
    
                                className = (String) args[0];
                                varsion_ = (String) args[1];
                                methodName = (String) args[2];
                                parameterTypes = paramTypes;
                                parameters = (Object[]) args[4];
                            }
    
                            // filter method like "Object.toString()"
                            if (className.equals(Object.class.getName())) {
                                logger.info(">>>>>>>>>>> xxl-rpc proxy class-method not support [{}#{}]", className, methodName);
                                throw new XxlRpcException("xxl-rpc proxy class-method not support");
                            }
    
                            // address
                            String finalAddress = address;
                            if (finalAddress==null || finalAddress.trim().length()==0) {
                                if (invokerFactory!=null && invokerFactory.getServiceRegistry()!=null) {
                                    // discovery
                                    String serviceKey = XxlRpcProviderFactory.makeServiceKey(className, varsion_);
                                    TreeSet<String> addressSet = invokerFactory.getServiceRegistry().discovery(serviceKey);
                                    // load balance
                                    if (addressSet==null || addressSet.size()==0) {
                                        // pass
                                    } else if (addressSet.size()==1) {
                                        finalAddress = addressSet.first();
                                    } else {
                                        finalAddress = loadBalance.xxlRpcInvokerRouter.route(serviceKey, addressSet);
                                    }
    
                                }
                            }
                            if (finalAddress==null || finalAddress.trim().length()==0) {
                                throw new XxlRpcException("xxl-rpc reference bean["+ className +"] address empty");
                            }
    
                            // request
                            XxlRpcRequest xxlRpcRequest = new XxlRpcRequest();
                            xxlRpcRequest.setRequestId(UUID.randomUUID().toString());
                            xxlRpcRequest.setCreateMillisTime(System.currentTimeMillis());
                            xxlRpcRequest.setAccessToken(accessToken);
                            xxlRpcRequest.setClassName(className);
                            xxlRpcRequest.setMethodName(methodName);
                            xxlRpcRequest.setParameterTypes(parameterTypes);
                            xxlRpcRequest.setParameters(parameters);
                            
                            // send
                            if (CallType.SYNC == callType) {
                                // future-response set
                                XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(invokerFactory, xxlRpcRequest, null);
                                try {
                                    // do invoke
                                    client.asyncSend(finalAddress, xxlRpcRequest);
    
                                    // future get
                                    XxlRpcResponse xxlRpcResponse = futureResponse.get(timeout, TimeUnit.MILLISECONDS);
                                    if (xxlRpcResponse.getErrorMsg() != null) {
                                        throw new XxlRpcException(xxlRpcResponse.getErrorMsg());
                                    }
                                    return xxlRpcResponse.getResult();
                                } catch (Exception e) {
                                    logger.info(">>>>>>>>>>> xxl-rpc, invoke error, address:{}, XxlRpcRequest{}", finalAddress, xxlRpcRequest);
    
                                    throw (e instanceof XxlRpcException)?e:new XxlRpcException(e);
                                } finally{
                                    // future-response remove
                                    futureResponse.removeInvokerFuture();
                                }
                            } else if (CallType.FUTURE == callType) {
                                // future-response set
                                XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(invokerFactory, xxlRpcRequest, null);
                                try {
                                    // invoke future set
                                    XxlRpcInvokeFuture invokeFuture = new XxlRpcInvokeFuture(futureResponse);
                                    XxlRpcInvokeFuture.setFuture(invokeFuture);
    
                                    // do invoke
                                    client.asyncSend(finalAddress, xxlRpcRequest);
    
                                    return null;
                                } catch (Exception e) {
                                    logger.info(">>>>>>>>>>> xxl-rpc, invoke error, address:{}, XxlRpcRequest{}", finalAddress, xxlRpcRequest);
    
                                    // future-response remove
                                    futureResponse.removeInvokerFuture();
    
                                    throw (e instanceof XxlRpcException)?e:new XxlRpcException(e);
                                }
    
                            } else if (CallType.CALLBACK == callType) {
    
                                // get callback
                                XxlRpcInvokeCallback finalInvokeCallback = invokeCallback;
                                XxlRpcInvokeCallback threadInvokeCallback = XxlRpcInvokeCallback.getCallback();
                                if (threadInvokeCallback != null) {
                                    finalInvokeCallback = threadInvokeCallback;
                                }
                                if (finalInvokeCallback == null) {
                                    throw new XxlRpcException("xxl-rpc XxlRpcInvokeCallback(CallType="+ CallType.CALLBACK.name() +") cannot be null.");
                                }
    
                                // future-response set
                                XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(invokerFactory, xxlRpcRequest, finalInvokeCallback);
                                try {
                                    client.asyncSend(finalAddress, xxlRpcRequest);
                                } catch (Exception e) {
                                    logger.info(">>>>>>>>>>> xxl-rpc, invoke error, address:{}, XxlRpcRequest{}", finalAddress, xxlRpcRequest);
    
                                    // future-response remove
                                    futureResponse.removeInvokerFuture();
    
                                    throw (e instanceof XxlRpcException)?e:new XxlRpcException(e);
                                }
    
                                return null;
                            } else if (CallType.ONEWAY == callType) {
                                client.asyncSend(finalAddress, xxlRpcRequest);
                                return null;
                            } else {
                                throw new XxlRpcException("xxl-rpc callType["+ callType +"] invalid");
                            }
    
                        }
                    });
        }
    

    回到前面的代码
    getObject的代理之后,为field设置了代理,并把serviceKey(接口名+版本号)放在里一个set里面。

    现先总结一下XxlRpcInvokerConfig干的事情吧

    1. 初始化一个XxlRpcSpringInvokerFactory 的bean
    2. 配置注册中心,通信框架,序列化框架
    3. 向注册中心注册
    4. @XxlRpcReference注解的field配置代理

    是不是似曾相识,没错,和provider的config其实几乎一样。
    那我们从consumer角度看看,一次调用是怎么完成的吧!看完了,前面的疑问应该都能解决了,吧
    看看调用代码,用@XxlRpcReference注解了provider提供的接口
    然后直接通过接口调用方法。
    ok,那么所有猫腻都在@XxlRpcReference这个注解里面了

    @Controller
    public class IndexController {
        
        @XxlRpcReference
        private DemoService demoService;
    
    
        @RequestMapping("")
        @ResponseBody
        public UserDTO http(String name) {
    
            try {
                return demoService.sayHi(name);
            } catch (Exception e) {
                e.printStackTrace();
                return new UserDTO(null, e.getMessage());
            }
        }
    
    }
    

    先看看这个注解本身吧
    给了几个默认值:默认使用netty,使用HESSIAN,使用同步调用,负责均衡方式是轮询

    @Target({ElementType.FIELD})
    @Retention(RetentionPolicy.RUNTIME)
    @Inherited
    public @interface XxlRpcReference {
    
        NetEnum netType() default NetEnum.NETTY;
        Serializer.SerializeEnum serializer() default Serializer.SerializeEnum.HESSIAN;
        CallType callType() default CallType.SYNC;
        LoadBalance loadBalance() default LoadBalance.ROUND;
    
        //Class<?> iface;
        String version() default "";
    
        long timeout() default 1000;
    
        String address() default "";
        String accessToken() default "";
    
        //XxlRpcInvokeCallback invokeCallback() ;
    
    }
    

    搜一搜那里对这个注解做了处理吧
    似曾相识,就是前面提到过的XxlRpcSpringInvokerFactorypostProcessAfterInstantiation方法!
    所以整个调用过程应该就是调用代理方法的过程。
    这样整个客户端调用过程就比较清晰了

    • 初始化的时候,配置客户端的通信框架,序列化框架,注册中心
    • 通过扫描@XxlRpcReference注解,初始化provider提供的接口的代理
    • 进行远程调用的时候,实际是代理调用
    • 通过代理通信协议客户端和注册中心,向provider请求数据

    发一次请求,下个断点看看
    果然进到这个代理调用里面了


    看看服务端是怎么接收请求的吧,有了前面的服务端代码阅读和客户端调用代码阅读,其实服务端的就比较简单了
    长话短说:
    在装载XxlRpcSpringProviderFactory的时候会扫描@XxlRpcService注解的类,并把它作为service放进(put)服务map里面

        @Override
        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
    
            Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(XxlRpcService.class);
            if (serviceBeanMap!=null && serviceBeanMap.size()>0) {
                for (Object serviceBean : serviceBeanMap.values()) {
                    // valid
                    if (serviceBean.getClass().getInterfaces().length ==0) {
                        throw new XxlRpcException("xxl-rpc, service(XxlRpcService) must inherit interface.");
                    }
                    // add service
                    XxlRpcService xxlRpcService = serviceBean.getClass().getAnnotation(XxlRpcService.class);
    
                    String iface = serviceBean.getClass().getInterfaces()[0].getName();
                    String version = xxlRpcService.version();
    
                    super.addService(iface, version, serviceBean);
                }
            }
    
            // TODO,addServices by api + prop
    
        }
    

    当发生远程调用的时候,会调用invokeService方法,主要的代码就是这段通过反射来获取真正需要调用的方法

            try {
                // invoke
                Class<?> serviceClass = serviceBean.getClass();
                String methodName = xxlRpcRequest.getMethodName();
                Class<?>[] parameterTypes = xxlRpcRequest.getParameterTypes();
                Object[] parameters = xxlRpcRequest.getParameters();
    
                Method method = serviceClass.getMethod(methodName, parameterTypes);
                method.setAccessible(true);
                Object result = method.invoke(serviceBean, parameters);
    
                /*FastClass serviceFastClass = FastClass.create(serviceClass);
                FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes);
                Object result = serviceFastMethod.invoke(serviceBean, parameters);*/
    
                xxlRpcResponse.setResult(result);
            } catch (Throwable t) {
                // catch error
                logger.error("xxl-rpc provider invokeService error.", t);
                xxlRpcResponse.setErrorMsg(ThrowableUtil.toString(t));
            }
    

    ok,到这里整个同步调用的流程就比较清楚了。我们在回顾一下它吹嘘的功能。

    • 1、快速接入:接入步骤非常简洁,两分钟即可上手;

    确实还行

    • 2、服务透明:系统完整的封装了底层通信细节,开发时调用远程服务就像调用本地服务,在提供远程调用能力时不损失本地调用的语义简洁性;

    通过扫描注解和反射的方式,做到了本地调用的效果

    • 3、多调用方案:支持 SYNC、ONEWAY、FUTURE、CALLBACK 等方案;

    现在我们只试过同步调用(SYNC),接下来看看其他调用方式吧

    ONEWAY

    用ONEWAY模式拿到的返回值是null。
    provider的代码如下,其实就是发起了一个不需要结果的调用

    else if (CallType.ONEWAY == callType) {
        client.asyncSend(finalAddress, xxlRpcRequest);
        return null;
    }
    

    FUTURE

    直接返回null,拿到结果要从future里面get

    else if (CallType.FUTURE == callType) {
        // future-response set
        XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(invokerFactory, xxlRpcRequest, null);
        try {
            // invoke future set
            XxlRpcInvokeFuture invokeFuture = new XxlRpcInvokeFuture(futureResponse);
            XxlRpcInvokeFuture.setFuture(invokeFuture);
            // do invoke
            client.asyncSend(finalAddress, xxlRpcRequest);
            return null;
        } catch (Exception e) {
            logger.info(">>>>>>>>>>> xxl-rpc, invoke error, address:{}, XxlRpcRequest{}", finalAddress, xxlRpcRequest);
            // future-response remove
            futureResponse.removeInvokerFuture();
            throw (e instanceof XxlRpcException) ? e : new XxlRpcException(e);
        }
    }
    
        @RequestMapping("")
        @ResponseBody
        public UserDTO http(String name) {
    
            try {
                // dto is null
                UserDTO dto =  demoService.sayHi(name);
                Future<UserDTO> future = XxlRpcInvokeFuture.getFuture(UserDTO.class);
                return future.get();
            } catch (Exception e) {
                e.printStackTrace();
                return new UserDTO(null, e.getMessage());
            }
        }
    

    callback

    调用完成后会调用onSuccessonFailure方法

    else if (CallType.CALLBACK == callType) {
        // get callback
        XxlRpcInvokeCallback finalInvokeCallback = invokeCallback;
        XxlRpcInvokeCallback threadInvokeCallback = XxlRpcInvokeCallback.getCallback();
        if (threadInvokeCallback != null) {
            finalInvokeCallback = threadInvokeCallback;
        }
        if (finalInvokeCallback == null) {
            throw new XxlRpcException("xxl-rpc XxlRpcInvokeCallback(CallType=" + CallType.CALLBACK.name() + ") cannot be null.");
        }
        // future-response set
        XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(invokerFactory, xxlRpcRequest, finalInvokeCallback);
        try {
            client.asyncSend(finalAddress, xxlRpcRequest);
        } catch (Exception e) {
            logger.info(">>>>>>>>>>> xxl-rpc, invoke error, address:{}, XxlRpcRequest{}", finalAddress, xxlRpcRequest);
            // future-response remove
            futureResponse.removeInvokerFuture();
            throw (e instanceof XxlRpcException) ? e : new XxlRpcException(e);
        }
        return null;
    }
    
    XxlRpcInvokeCallback.setCallback(new XxlRpcInvokeCallback<UserDTO>() {
        @Override
        public void onSuccess(UserDTO result) {
            System.out.println(result);
        }
        @Override
        public void onFailure(Throwable exception) {
            exception.printStackTrace();
        }
    });
    demoService.sayHi("[CALLBACK]jack");
    

    ps:这部分有很多值得深度阅读的地方,暂时todo

    • 4、多通讯方案:支持 TCP 和 HTTP 两种通讯方式进行服务调用;其中 TCP 提供可选方案 NETTY 或
      MINA ,HTTP 提供可选方案 NETTY_HTTP 或 Jetty;

    通过继承同一接口来完成调用方的细节隐藏

    • 5、多序列化方案:支持 HESSIAN、HESSIAN1、PROTOSTUFF、KRYO、JACKSON 等方案;

    和通信的方式差不多

    • 6、负载均衡/软负载:提供丰富的负载均衡策略,包括:轮询、随机、LRU、LFU、一致性HASH等;

    这部分比较简单,不在赘述

    • 7、注册中心:可选组件,支持服务注册并动态发现;可选择不启用,直接指定服务提供方机器地址通讯;选择启用时,内置可选方案:“XXL-REGISTRY 轻量级注册中心”(推荐)、“ZK注册中心”、“Local注册中心”等;

    前面已经提到

    • 8、服务治理:提供服务治理中心,可在线管理注册的服务信息,如服务锁定、禁用等;

    通过注册中心实现

    • 9、服务监控:可在线监控服务调用统计信息以及服务健康状况等(计划中);

    pass

    • 10、容错:服务提供方集群注册时,某个服务节点不可用时将会自动摘除,同时消费方将会移除失效节点将流量分发到其余节点,提高系统容错能力。

    通过注册中心实现

    • 11、解决1+1问题:传统分布式通讯一般通过nginx或f5做集群服务的流量负载均衡,每次请求在到达目标服务机器之前都需要经过负载均衡机器,即1+1,这将会把流量放大一倍。而XXL-RPC将会从消费方直达
      服务提供方,每次请求直达目标机器,从而可以避免上述问题;

    通过注册中心实现

    • 12、高兼容性:得益于优良的兼容性与模块化设计,不限制外部框架;除 spring/springboot 环境之外,理论上支持运行在任何Java代码中,甚至main方法直接启动运行;

    demo里面有,比较简单,就是不通过反射,手动创建对象。

    • 13、泛化调用:服务调用方不依赖服务方提供的API;

    同上

    ok,就剩下一个todo了,那就是各种调用方案的具体实现
    先todo了...

    相关文章

      网友评论

          本文标题:源码阅读笔记:分布式服务框架XXL-RPC(基于1.4.1)to

          本文链接:https://www.haomeiwen.com/subject/noysfctx.html