美文网首页
RPC之美团pigeon源码分析(三)调用方服务监听和调用

RPC之美团pigeon源码分析(三)调用方服务监听和调用

作者: 李亚林1990 | 来源:发表于2019-01-30 20:58 被阅读37次

    在此之前我们理清了pigeon服务方的初始化、注册和消息处理逻辑,本篇我们来看看pigeon调用方的实现。

    第一部分我们先看看服务调用的实现。
    服务调用示例:

    @RestController
    @RequestMapping("/common")
    public class CommonController {
    
        @Autowired
        private CommonService commonService;
    
        @RequestMapping(value = "/hello")
        @ResponseBody
        public String hello(@RequestParam("name") String name) {
            System.out.println("enter hello");
            return commonService.hello(name);
        }
    }
    

    CommonService 就是服务方发布的服务接口,可以看到在调用方只需要引入相应服务的api jar包,就可以像调用本地方法一样调用对应的服务接口,这也是大部分RPC框架的实现效果。
    CommonService 通过@Autowired注解在spring容器中找到对应的bean,我们来看看相应的bean配置

        <bean id="commonService" class="com.dianping.pigeon.remoting.invoker.config.spring.ReferenceBean" init-method="init">
            <!-- 服务全局唯一的标识url,默认是服务接口类名,必须设置 -->
            <property name="url" value="http://service.dianping.com/rpcserver/commonService_1.0.0" />
            <!-- 接口名称,必须设置 -->
            <property name="interfaceName" value="com.study.rpcserver.api.CommonService" />
            <!-- 超时时间,毫秒,默认5000,建议自己设置 -->
            <property name="timeout" value="2000" />
            <!-- 序列化,hessian/fst/protostuff,默认hessian,可不设置-->
            <property name="serialize" value="hessian" />
            <!-- 调用方式,sync/future/callback/oneway,默认sync,可不设置 -->
            <property name="callType" value="sync" />
            <!-- 失败策略,快速失败failfast/失败转移failover/失败忽略failsafe/并发取最快返回forking,默认failfast,可不设置 -->
            <property name="cluster" value="failfast" />
            <!-- 是否超时重试,默认false,可不设置 -->
            <property name="timeoutRetry" value="false" />
            <!-- 重试次数,默认1,可不设置 -->
            <property name="retries" value="1" />
        </bean>
    

    ReferenceBean继承了spring的FactoryBean接口,来处理复杂bean的生成,通过getObject()方法来返回对应bean实例。接下来我们就以ReferenceBean为入口来切入pigeon调用方的实现思路。

        public void init() throws Exception {
            if (StringUtils.isBlank(interfaceName)) {
                throw new IllegalArgumentException("invalid interface:" + interfaceName);
            }
            this.objType = ClassUtils.loadClass(this.classLoader, this.interfaceName.trim());
            //服务调用相关的配置信息,就是我们对每一个接口服务在xml文件中的配置
            InvokerConfig<?> invokerConfig = new InvokerConfig(this.objType, this.url, this.timeout, this.callType,
                    this.serialize, this.callback, this.group, this.writeBufferLimit, this.loadBalance, this.cluster,
                    this.retries, this.timeoutRetry, this.vip, this.version, this.protocol);
            invokerConfig.setClassLoader(classLoader);
            invokerConfig.setSecret(secret);
            invokerConfig.setRegionPolicy(regionPolicy);
    
            if (!CollectionUtils.isEmpty(methods)) {
                Map<String, InvokerMethodConfig> methodMap = new HashMap<String, InvokerMethodConfig>();
                invokerConfig.setMethods(methodMap);
                for (InvokerMethodConfig method : methods) {
                    methodMap.put(method.getName(), method);
                }
            }
    
            checkMock(); // 降级配置检查
            invokerConfig.setMock(mock);
            checkRemoteAppkey();
            invokerConfig.setRemoteAppKey(remoteAppKey);
            //生成接口的代理对象
            this.obj = ServiceFactory.getService(invokerConfig);
            configLoadBalance(invokerConfig);
        }
        //FactoryBean返回的bean实例
        public Object getObject() {
            return this.obj;
        }
    

    ServiceFactory.getService(invokerConfig);根据配置的interfaceName生成一个java代理对象

        private static ServiceProxy serviceProxy = ServiceProxyLoader.getServiceProxy();
    
        public static <T> T getService(InvokerConfig<T> invokerConfig) throws RpcException {
            return serviceProxy.getProxy(invokerConfig);
        }
    

    跟踪代码,进入AbstractServiceProxy.getProxy方法,核心代码如下:

        protected final static Map<InvokerConfig<?>, Object> services = new ConcurrentHashMap<InvokerConfig<?>, Object>();
        @Override
        public <T> T getProxy(InvokerConfig<T> invokerConfig) {
            //InvokerConfig实现了自定义equals和hashCode方法
            service = services.get(invokerConfig);
            if (service == null) {
                synchronized (interner.intern(invokerConfig)) {
                    service = services.get(invokerConfig);
                    if (service == null) {
                        //此处执行调用方的一些初始化逻辑,包括InvokerProcessHandlerFactory.init();初始化调用方Filter责任链等
                        InvokerBootStrap.startup();
                        //生成代理对象
                        service = SerializerFactory.getSerializer(invokerConfig.getSerialize()).proxyRequest(invokerConfig);
                        try {
                            //获取服务信息,创建Client实例
                            ClientManager.getInstance().registerClients(invokerConfig);
                        } catch (Throwable t) {
                            logger.warn("error while trying to setup service client:" + invokerConfig, t);
                        }
                        services.put(invokerConfig, service);
                    }
            }
            return (T) service;
        }
    

    AbstractSerializer.proxyRequest使用我们熟悉的JDK动态代理来生成服务接口的代理对象

        @Override
        public Object proxyRequest(InvokerConfig<?> invokerConfig) throws SerializationException {
            return Proxy.newProxyInstance(ClassUtils.getCurrentClassLoader(invokerConfig.getClassLoader()),
                    new Class[] { invokerConfig.getServiceInterface() }, new ServiceInvocationProxy(invokerConfig,
                            InvokerProcessHandlerFactory.selectInvocationHandler(invokerConfig)));
        }
            //InvokerProcessHandlerFactory.selectInvocationHandler获取调用方请求责任链
            public static void init() {
            if (!isInitialized) {
                if (Constants.MONITOR_ENABLE) {
                    registerBizProcessFilter(new RemoteCallMonitorInvokeFilter());
                }
                registerBizProcessFilter(new TraceFilter());
                registerBizProcessFilter(new DegradationFilter());
                            //关于ClusterInvokeFilter后文详细介绍
                registerBizProcessFilter(new ClusterInvokeFilter());
                registerBizProcessFilter(new GatewayInvokeFilter());
                registerBizProcessFilter(new ContextPrepareInvokeFilter());
                registerBizProcessFilter(new SecurityFilter());
                            //远程调用
                registerBizProcessFilter(new RemoteCallInvokeFilter());
                bizInvocationHandler = createInvocationHandler(bizProcessFilters);
                isInitialized = true;
            }
        }
    
        public static ServiceInvocationHandler selectInvocationHandler(InvokerConfig<?> invokerConfig) {
            return bizInvocationHandler;
        }
    

    ServiceInvocationProxy继承了java.lang.reflect.InvocationHandler接口,invoke实现逻辑如下:

        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            String methodName = method.getName();
                    //代理对象的非服务方法调用走原有逻辑
            Class<?>[] parameterTypes = method.getParameterTypes();
            if (method.getDeclaringClass() == Object.class) {
                return method.invoke(handler, args);
            }
            if ("toString".equals(methodName) && parameterTypes.length == 0) {
                return handler.toString();
            }
            if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
                return handler.hashCode();
            }
            if ("equals".equals(methodName) && parameterTypes.length == 1) {
                return handler.equals(args[0]);
            }
                    //服务接口执行责任链处理逻辑
            return extractResult(handler.handle(new DefaultInvokerContext(invokerConfig, methodName, parameterTypes, args)),
                    method.getReturnType());
        }
    

    同服务端责任链的分析一样,我们首先重点看下RemoteCallInvokeFilter的处理逻辑,核心代码如下:

        @Override
        public InvocationResponse invoke(ServiceInvocationHandler handler, InvokerContext invocationContext)
                throws Throwable {
            Client client = invocationContext.getClient();
            InvocationRequest request = invocationContext.getRequest();
            InvokerConfig<?> invokerConfig = invocationContext.getInvokerConfig();    
            。。。
            //以同步调用场景分析下远程调用逻辑
            CallbackFuture future = new CallbackFuture();
            response = InvokerUtils.sendRequest(client, invocationContext.getRequest(), future);
            invocationContext.getTimeline().add(new TimePoint(TimePhase.Q));
            if (response == null) {
                response = future.getResponse(request.getTimeout());
            }        
            return response;
        }    
    
        public static InvocationResponse sendRequest(Client client, InvocationRequest request, Callback callback) {
            InvocationResponse response = response = client.write(request);
            return response;
        }
    

    client.write(request);最终调用NettyClient或HttpInvokerClient的doWrite方法发送请求消息体。
    至此我们理清了服务调用的逻辑,简单来说就是通过JDK动态代理来生成服务方接口对应的实例对象,在方法执行逻辑中调用远程服务。

    但对于每一个服务接口,调用方是如何知道远程服务的访问地址的呢?以及新注册或者下线的服务地址,调用方如何得到即时通知?
    接下来进入本篇第二部分,远程调用Client的初始化和调用方对服务信息的心跳监听。
    以请求责任链的ClusterInvokeFilter为入口:

        public InvocationResponse invoke(ServiceInvocationHandler handler, InvokerContext invocationContext)
                throws Throwable {
            InvokerConfig<?> invokerConfig = invocationContext.getInvokerConfig();
                    //失败策略cluster可配,默认为快速失败failfast
            Cluster cluster = ClusterFactory.selectCluster(invokerConfig.getCluster());
            if (cluster == null) {
                throw new IllegalArgumentException("Unsupported cluster type:" + cluster);
            }
            return cluster.invoke(handler, invocationContext);
        }
    

    跟踪代码进入FailfastCluster.invoke方法,核心代码如下:

        private ClientManager clientManager = ClientManager.getInstance();
        @Override
        public InvocationResponse invoke(ServiceInvocationHandler handler, InvokerContext invocationContext)
                throws Throwable {
            InvokerConfig<?> invokerConfig = invocationContext.getInvokerConfig();
            //构造请求消息对象
            InvocationRequest request = InvokerUtils.createRemoteCallRequest(invocationContext, invokerConfig);
            //是否超时重试
            boolean timeoutRetry = invokerConfig.isTimeoutRetry();
            //重试次数
            int retry = invokerConfig.getRetries(invocationContext.getMethodName());
            //关于重试和重试次数的逻辑在此不做过多说明,只摘取主干代码
            //获取远程客户端
            Client remoteClient = clientManager.getClient(invokerConfig, request, null);
            //就是在这里设置的RemoteCallInvokeFilter中用到的客户端Client
            invocationContext.setClient(remoteClient);   
            try {
                //向后执行责任链
                return handler.handle(invocationContext);
            } catch (NetworkException e) {
                remoteClient = clientManager.getClient(invokerConfig, request, null);
                invocationContext.setClient(remoteClient);
                return handler.handle(invocationContext);
            }     
        }
    

    ClientManager 为单例模式,我们看看内部实现

            //私有构造函数 
        private ClientManager() {
            this.providerAvailableListener = new ProviderAvailableListener();
            this.clusterListener = new DefaultClusterListener(providerAvailableListener);
            this.clusterListenerManager.addListener(this.clusterListener);
            providerAvailableThreadPool.execute(this.providerAvailableListener);
            RegistryEventListener.addListener(providerChangeListener);
            RegistryEventListener.addListener(registryConnectionListener);
            RegistryEventListener.addListener(groupChangeListener);
            registerThreadPool.getExecutor().allowCoreThreadTimeOut(true);
        }
    
        private RouteManager routerManager = DefaultRouteManager.INSTANCE;
    
        public Client getClient(InvokerConfig<?> invokerConfig, InvocationRequest request, List<Client> excludeClients) {
                    //根据全局唯一标识url获取Client集合
            List<Client> clientList = clusterListener.getClientList(invokerConfig);
            List<Client> clientsToRoute = new ArrayList<Client>(clientList);
            if (excludeClients != null) {
                clientsToRoute.removeAll(excludeClients);
            }
                    //根据负载均衡策略选取有效的Client
                    //此处细节比较多,感兴趣的朋友可以自行细致浏览下源码,限于篇幅不一一讲解了
            return routerManager.route(clientsToRoute, invokerConfig, request);
        }
    

    距离目标越来越近了,我们继续跟踪代码DefaultClusterListener的实现

        private ConcurrentHashMap<String, List<Client>> serviceClients = new ConcurrentHashMap<String, List<Client>>();
    
        public List<Client> getClientList(InvokerConfig<?> invokerConfig) {
            //根据url获取对应的Client集合
            List<Client> clientList = this.serviceClients.get(invokerConfig.getUrl());
            return clientList;
        }
    

    问题来了,serviceClients是在什么时候创建的Client实例呢?
    我们回顾下AbstractServiceProxy.getProxy中的一段逻辑:

                        try {
                            ClientManager.getInstance().registerClients(invokerConfig);
                        } catch (Throwable t) {
                            logger.warn("error while trying to setup service client:" + invokerConfig, t);
                        }
    

    从异常信息我们可以清晰的看到,这里就是创建service client的入口,最终调用到DefaultClusterListener.addConnect添加Client映射关系到serviceClients。调用链路比较长,在此简单贴一下线程调用栈:


    image.png

    至此我们理清了Client的创建,接下来我们看看调用方的心跳监听。
    我们直接连接注册中心zookeeper的相关类CuratorClient,用的是curator-framework-2.7.1.jar,这个ZK客户端功能很强大,可以非常方便的对具体的zk节点添加listener回调。

        private boolean newCuratorClient() throws InterruptedException {
                    //根据zk地址创建zkClient
            CuratorFramework client = CuratorFrameworkFactory.builder().connectString(address)
                    .sessionTimeoutMs(sessionTimeout).connectionTimeoutMs(connectionTimeout)
                    .retryPolicy(new MyRetryPolicy(retries, retryInterval)).build();
                    //监听连接状态,掉线重连
            client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
                @Override
                public void stateChanged(CuratorFramework client, ConnectionState newState) {
                    logger.info("zookeeper state changed to " + newState);
                    if (newState == ConnectionState.RECONNECTED) {
                        RegistryEventListener.connectionReconnected();
                    }
                    monitor.logEvent(EVENT_NAME, "zookeeper:" + newState.name().toLowerCase(), "");
                }
            });
                    //监听change事件!!!
            client.getCuratorListenable().addListener(new CuratorEventListener(this), curatorEventListenerThreadPool);
            client.start();
            boolean isConnected = client.getZookeeperClient().blockUntilConnectedOrTimedOut();
            CuratorFramework oldClient = this.client;
            this.client = client;
            close(oldClient);
            return isConnected;
        }
    

    CuratorEventListener继承org.apache.curator.framework.api.CuratorListener,看下事件处理逻辑

        @Override
        public void eventReceived(CuratorFramework client, CuratorEvent curatorEvent) throws Exception {
            WatchedEvent event = (curatorEvent == null ? null : curatorEvent.getWatchedEvent());
                    //过滤不敢兴趣的EventType
            if (event == null
                    || (event.getType() != EventType.NodeCreated && event.getType() != EventType.NodeDataChanged
                            && event.getType() != EventType.NodeDeleted && event.getType() != EventType.NodeChildrenChanged)) {
                return;
            }
            try {
                            //解析节点路径并分类
                PathInfo pathInfo = parsePath(event.getPath());
                            
                if (pathInfo.type == ADDRESS) {//服务地址  
                    addressChanged(pathInfo);
                } else if (pathInfo.type == WEIGHT) {//权重
                    weightChanged(pathInfo);
                } else if (pathInfo.type == APP) {
                    appChanged(pathInfo);
                } else if (pathInfo.type == VERSION) {
                    versionChanged(pathInfo);
                } else if (pathInfo.type == PROTOCOL) {
                    protocolChanged(pathInfo);
                } else if (pathInfo.type == HOST_CONFIG) {
                    registryConfigChanged(pathInfo);
                }
            } catch (Throwable e) {
                logger.error("Error in ZookeeperWatcher.process()", e);
                return;
            }
        }
        /*
         * 1. Get newest value from ZK and watch again 2. Determine if changed
         * against cache 3. notify if changed 4. pay attention to group fallback
         * notification
         */
        private void addressChanged(PathInfo pathInfo) throws Exception {
            if (shouldNotify(pathInfo)) {
                String hosts = client.get(pathInfo.path);
                logger.info("Service address changed, path " + pathInfo.path + " value " + hosts);
                List<String[]> hostDetail = Utils.getServiceIpPortList(hosts);
                serviceChangeListener.onServiceHostChange(pathInfo.serviceName, hostDetail);
            }
            // Watch again
            client.watch(pathInfo.path);
        }
    

    addressChanged难得加了注释,判断是否需要回调,回调。

    本篇到此结束,内容较多,希望能对大家有所助益。

    相关文章

      网友评论

          本文标题:RPC之美团pigeon源码分析(三)调用方服务监听和调用

          本文链接:https://www.haomeiwen.com/subject/cfbxsqtx.html