美文网首页
thrift+zk实现服务的注册,发现,调用

thrift+zk实现服务的注册,发现,调用

作者: 海洋小企鹅 | 来源:发表于2018-10-09 16:24 被阅读0次

    本文基于实际生产环境中的Thrift+zookeeper实现的rpc调用总结,大致有以下几个部分:
    1: 服务端将服务注册在zk中
    1.1 解析服务端的网卡IP;
    1.2 获取zookeeper客户端对象;
    1.3 实现服务接口的注册;
    2: 基于zookeeper实现服务接口的自动发现
    3: 实现客户端连接池和客户端通过代理调用服务

    一 服务端将服务注册在zk中

    调用图: 服务注册流程图.png
    代码展示

    1.1解析thrift-server端IP地址,用于注册服务

    接口
    public interface ThriftServerIpResolve {
        // 获取服务所在机器的Ip
        String getServerIp() throws Exception;
    
        String getServerIp(boolean publicIpOnly) throws Exception;
        
        void reset();
        
        //当IP变更时,将会调用reset方法
        static interface IpRestCalllBack{
            public void rest(String newIp);
        }
    }
    
    实现
    public class ThriftServerIpLocalNetworkResolve implements ThriftServerIpResolve {
        
        private Logger logger = LoggerFactory.getLogger(getClass());
    
        //缓存
        private String serverIp;
        
        public void setServerIp(String serverIp) {
            this.serverIp = serverIp;
        }
    
        @Override
        public String getServerIp() {
            return getServerIp(false);
        }
    
        @Override
        public String getServerIp(boolean publicIpOnly) {
            if (serverIp != null) {
                return serverIp;
            }
            // 一个主机有多个网络接口
            try {
                Enumeration<NetworkInterface> netInterfaces = NetworkInterface.getNetworkInterfaces();
                while (netInterfaces.hasMoreElements()) {
                    NetworkInterface netInterface = netInterfaces.nextElement();
                    // 每个网络接口,都会有多个"网络地址",比如一定会有lookback地址,会有siteLocal地址等.以及IPV4或者IPV6 .
                    Enumeration<InetAddress> addresses = netInterface.getInetAddresses();
                    while (addresses.hasMoreElements()) {
                        InetAddress address = addresses.nextElement();
                        if(address instanceof Inet6Address){
                            continue;
                        }
                        if (!address.isLoopbackAddress()) {
                            if (publicIpOnly && !address.isSiteLocalAddress()) {
                                serverIp = address.getHostAddress();
                                logger.info("resolve server ip :" + serverIp);
                                continue;
                            } else if (!publicIpOnly && address.isSiteLocalAddress()) {
                                serverIp = address.getHostAddress();
                                logger.info("resolve server ip :" + serverIp);
                                continue;
                            }
                        }
                    }
                }
            } catch (SocketException e) {
                e.printStackTrace();
            }
            return serverIp;
        }
    
        @Override
        public void reset() {
            serverIp = null;
        }
    }
    
    

    1.2 获取zookeeper客户端链接对象

    public class ZookeeperFactory implements FactoryBean<CuratorFramework> ,Closeable{
    
        private String zkHosts;
        // session超时
        private int sessionTimeout = 30000;
        private int connectionTimeout = 30000;
    
        // 共享一个zk链接
        private boolean singleton = true;
    
        // 全局path前缀,常用来区分不同的应用
        private String namespace;
    
        private final static String ROOT = "rpc";
    
        private CuratorFramework zkClient;
    
        public void setZkHosts(String zkHosts) {
            this.zkHosts = zkHosts;
        }
    
        public void setSessionTimeout(int sessionTimeout) {
            this.sessionTimeout = sessionTimeout;
        }
    
        public void setConnectionTimeout(int connectionTimeout) {
            this.connectionTimeout = connectionTimeout;
        }
    
        public void setSingleton(boolean singleton) {
            this.singleton = singleton;
        }
    
        public void setNamespace(String namespace) {
            this.namespace = namespace;
        }
    
        public void setZkClient(CuratorFramework zkClient) {
            this.zkClient = zkClient;
        }
    
        @Override
        public CuratorFramework getObject() throws Exception {
            if (singleton) {
                if (zkClient == null) {
                    zkClient = create();
                    zkClient.start();
                }
                return zkClient;
            }
            return create();
        }
    
        @Override
        public Class<?> getObjectType() {
            return CuratorFramework.class;
        }
    
        @Override
        public boolean isSingleton() {
            return singleton;
        }
    
        public CuratorFramework create() throws Exception {
            if (StringUtils.isEmpty(namespace)) {
                namespace = ROOT;
            } else {
                namespace = ROOT +"/"+ namespace;
            }
            return create(zkHosts, sessionTimeout, connectionTimeout, namespace);
        }
    
        // 使用CuratorFramework创建zk客户端对象
        /**
         *  connectString zk集群的地址,包括ip和端口
         *  sessionTimeout 
         *  connectionTimeout
         *  namespace 不同的应用可以使用不同的命名空间区分
         *  ExponentialBackoffRetry表示重试机制,重连的时间间隔随着重
         *  试的次数递增的,如果时间间隔计算出来大于默认的最大sleep时 
         *  间的话,则取最大sleep时间。ExponentialBackoffRetry 除了时间 
         *  的限制以外,还有最大重连次数的限制。而 
         *  ExponentialBackoffRetry策略只是让用户设置最大sleep时间而 
         *  已。默认的最大时间是Integer.MAX_VALUE毫秒。 
         **/
        public static CuratorFramework create(String connectString, int sessionTimeout, int connectionTimeout, String namespace) {
            CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder();
            return builder.connectString(connectString).sessionTimeoutMs(sessionTimeout).connectionTimeoutMs(30000)
                    .canBeReadOnly(true).namespace(namespace).retryPolicy(new ExponentialBackoffRetry(1000, Integer.MAX_VALUE))
                    .defaultData(null).build();
        }
    
        public void close() {
            if (zkClient != null) {
                zkClient.close();
            }
        }
    }
    
    

    1.3 将服务接口注册到zookeeper中
    1.3.1 服务端注册工厂调用注册服务的方法,异步启动服务

    /**
     * 服务端注册服务工厂
     */
    public class ThriftServiceServerFactory implements InitializingBean ,Closeable{
        // 服务注册本机端口
        private Integer port = 8299;
        // 优先级
        private Integer weight = 1;// default
        // 服务实现类
        private Object service;// serice实现类
        //服务版本号
        private String version;
        // 是否只取公网ip,如果是true,zk中只注册公网ip;如果是false,zk中只注册私网ip
        private boolean publicIpOnly = false;
        // 解析本机IP
        private ThriftServerIpResolve thriftServerIpResolve;
        //服务注册
        private ThriftServerAddressRegister thriftServerAddressRegister;
    
        private ServerThread serverThread;
    
        private boolean zkUse = true;
        
        public void setPort(Integer port) {
            this.port = port;
        }
    
        public void setWeight(Integer weight) {
            this.weight = weight;
        }
    
        public void setService(Object service) {
            this.service = service;
        }
    
        public void setVersion(String version) {
            this.version = version;
        }
    
        public void setZkUse(boolean zkUse) {
            this.zkUse = zkUse;
        }
    
        public void setPublicIpOnly(boolean publicIpOnly) {
            this.publicIpOnly = publicIpOnly;
        }
    
        public void setThriftServerIpResolve(ThriftServerIpResolve thriftServerIpResolve) {
            this.thriftServerIpResolve = thriftServerIpResolve;
        }
    
        public void setThriftServerAddressRegister(ThriftServerAddressRegister thriftServerAddressRegister) {
            this.thriftServerAddressRegister = thriftServerAddressRegister;
        }
    
        @Override
        public void afterPropertiesSet() throws Exception {
            if (thriftServerIpResolve == null) {
                thriftServerIpResolve = new ThriftServerIpLocalNetworkResolve();
            }
            String serverIP = thriftServerIpResolve.getServerIp(publicIpOnly);
            if (StringUtils.isEmpty(serverIP)) {
                throw new ThriftException("cant find server ip...");
            }
    
            String hostname = serverIP + ":" + port + ":" + weight;
            Class<?> serviceClass = service.getClass();
            // 获取实现类接口
            Class<?>[] interfaces = serviceClass.getInterfaces();
            if (interfaces.length == 0) {
                throw new IllegalClassFormatException("service-class should implements Iface");
            }
            // reflect,load "Processor";
            TProcessor processor = null;
            String serviceName = null;
    
            for (Class<?> clazz : interfaces) {
                String cname = clazz.getSimpleName();
                if (!cname.equals("Iface")) {
                    continue;
                }
                serviceName = clazz.getEnclosingClass().getName();
                String pname = serviceName + "$Processor";
                try {
                    ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
                    Class<?> pclass = classLoader.loadClass(pname);
                    if (!TProcessor.class.isAssignableFrom(pclass)) {
                        continue;
                    }
                    Constructor<?> constructor = pclass.getConstructor(clazz);
                    processor = (TProcessor) constructor.newInstance(service);
                    break;
                } catch (Exception e) {
                    //
                }
            }
            if (processor == null) {
                throw new IllegalClassFormatException("service-class should implements Iface");
            }
            //需要单独的线程,因为serve方法是阻塞的.
            serverThread = new ServerThread(processor, port);
            serverThread.start();
            // 注册服务
            if (zkUse && thriftServerAddressRegister != null) {
                thriftServerAddressRegister.register(serviceName, version, hostname);
            }
    
        }
        class ServerThread extends Thread {
            private TServer server;
            ServerThread(TProcessor processor, int port) throws Exception {
    --------------------- 
            /** TThreadedSelectorServer模式是thrift-server最高级的工作模 
           式:主要有以下几个不分组成
          TThreadedSelectorServer模式是目前Thrift提供的最高级的模式, 
          它内部有如果几个部分构成:
    
         (1)  一个AcceptThread线程对象,专门用于处理监听socket上的新连接;
    
         (2)  若干个SelectorThread对象专门用于处理业务socket的网络I/O操作,所有网络数据的读写均是有这些线程来完成;
    
         (3)  一个负载均衡器SelectorThreadLoadBalancer对象,主要用于AcceptThread线程接收到一个新socket连接请求时,决定将这个新连接请求分配给哪个SelectorThread线程。
    
         (4)  一个ExecutorService类型的工作线程池,在SelectorThread线程中,监听到有业务socket中有调用请求过来,则将请求读取之后,交个ExecutorService线程池中的线程完成此次调用的具体执行;
       --------------------- 
      **/
                TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(port);
                TThreadedSelectorServer.Args tArgs = new TThreadedSelectorServer.Args(serverTransport);  
                TProcessorFactory processorFactory = new TProcessorFactory(processor);
                tArgs.processorFactory(processorFactory);
                tArgs.transportFactory(new TFramedTransport.Factory());  
                tArgs.protocolFactory( new TBinaryProtocol.Factory(true, true));
                tArgs.maxReadBufferBytes = 1024 * 1024L; // 防止direct memory oom
                tArgs.selectorThreads(4); // 设置selector线程数,默认是2
                tArgs.workerThreads(32); // 设置工作线程数,默认是5,在数据库负载高时有可能会堵塞
                server = new TThreadedSelectorServer(tArgs);
            }
    
            @Override
            public void run(){
                try{
                    //启动服务
                    server.serve();
                }catch(Exception e){
                    //
                }
            }
            
            public void stopServer(){
                server.stop();
            }
        }
        
        public void close() {
            serverThread.stopServer();
        }
    }
    
    

    1.3.1 真正的注册实现
    第一步:监听zk连接变化;
    第二步:将服务暴露的ip和port以虚拟节点的形式创建在zk的接口服务路径下,切换到指定的组;
    第三步:注册成功后使用NodeCache监听节点内容变化,如果原本节点不存在,那么Cache就会在节点被创建时触发监听事件,如果该节点被删除,就无法再触发监听事件。任意节点的内容变化都会重新注册。

    // 启动服务
    @Override
        public void afterPropertiesSet() throws Exception {
            if (thriftServerIpResolve == null) {
                thriftServerIpResolve = new ThriftServerIpLocalNetworkResolve();
            }
            String serverIP = thriftServerIpResolve.getServerIp(publicIpOnly);
            if (StringUtils.isEmpty(serverIP)) {
                throw new ThriftException("cant find server ip...");
            }
    
            String hostname = serverIP + ":" + port + ":" + weight;
            Class<?> serviceClass = service.getClass();
            // 获取实现类接口
            Class<?>[] interfaces = serviceClass.getInterfaces();
            if (interfaces.length == 0) {
                throw new IllegalClassFormatException("service-class should implements Iface");
            }
            // reflect,load "Processor";
            TProcessor processor = null;
            String serviceName = null;
    
            for (Class<?> clazz : interfaces) {
                String cname = clazz.getSimpleName();
                if (!cname.equals("Iface")) {
                    continue;
                }
                serviceName = clazz.getEnclosingClass().getName();
                String pname = serviceName + "$Processor";
                try {
                    ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
                    Class<?> pclass = classLoader.loadClass(pname);
                    if (!TProcessor.class.isAssignableFrom(pclass)) {
                        continue;
                    }
                    Constructor<?> constructor = pclass.getConstructor(clazz);
                    processor = (TProcessor) constructor.newInstance(service);
                    break;
                } catch (Exception e) {
                    //
                }
            }
            if (processor == null) {
                throw new IllegalClassFormatException("service-class should implements Iface");
            }
            //需要单独的线程,因为serve方法是阻塞的.
            serverThread = new ServerThread(processor, port);
            serverThread.start();
            // 注册服务
            if (zkUse && thriftServerAddressRegister != null) {
                thriftServerAddressRegister.register(serviceName, version, hostname);
            }
    
        }
        class ServerThread extends Thread {
            private TServer server;
            ServerThread(TProcessor processor, int port) throws Exception {
                TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(port);
                TThreadedSelectorServer.Args tArgs = new TThreadedSelectorServer.Args(serverTransport);  
                TProcessorFactory processorFactory = new TProcessorFactory(processor);
                tArgs.processorFactory(processorFactory);
                tArgs.transportFactory(new TFramedTransport.Factory());  
                tArgs.protocolFactory( new TBinaryProtocol.Factory(true, true));
                tArgs.maxReadBufferBytes = 1024 * 1024L; // 防止direct memory oom
                tArgs.selectorThreads(4); // 设置selector线程数,默认是2
                tArgs.workerThreads(32); // 设置工作线程数,默认是5,在数据库负载高时有可能会堵塞
                server = new TThreadedSelectorServer(tArgs);
            }
    
            @Override
            public void run(){
                try{
                    //启动服务
                    server.serve();
                }catch(Exception e){
                    //
                }
            }
            
            public void stopServer(){
                server.stop();
            }
        }
    
    
    // 实现注册和节点监听
    /**
         * 初始化注册方法
         * @param service 服务接口名称,一个产品中不能重复
         * @param version 服务接口的版本号,默认1.0.0
         * @param address 服务发布的地址和端口
         */
        @Override
        public void register(String service, String version, String address) throws InterruptedException {
            // 输入校验
            String[] parts = address.split(":");
            if (parts.length < 3) {
                logger.error("ThriftZookeeper Register Error: address invalid '" + address + "'");
                throw new ThriftException("ThriftZookeeper Register Error: address invalid '" + address + "'");
            }
    
            // 拆解内容
            String ip = parts[0];
            String port = parts[1];
            String weight = parts[2];
    
            // 增加连接状态监听
            setConnectionListener(service, version, ip, port, weight);
    
            // 注册
            generalRegister(service, version, ip, port, weight);
    
            //监听组别变化后重新注册
            setGroupListener(service, version, ip, port, weight);
        }
    
        public void setConnectionListener(final String service, final String version, final String ip, final String port, final String weight)  {
            // 如果zk尚未启动,则启动
            if (zkClient.getState() == CuratorFrameworkState.LATENT) {
                zkClient.start();
            }
    
            logger.info("设置ZK连接状态监听的Listener");
            zkClient.getConnectionStateListenable().addListener(new ConnectionStateListener() {
                @Override
                public void stateChanged(CuratorFramework client, ConnectionState newState) {
                    logger.info("ZkClient状态变化:" + newState.name());
                    if (newState == ConnectionState.LOST) { //处理session过期
                        logger.info("ZkClient连接断开,session过期");
    
                        int i = 0;
                        while (true) {
                            logger.info("尝试重新连接到zk..." + (i++));
                            try {
                                if (client.getZookeeperClient().blockUntilConnectedOrTimedOut()) {
                                    logger.info("尝试重新注册zk的临时节点...");
    
                                    // 重新注册
                                    generalRegister(service, version, ip, port, weight);
    
                                    break;
                                }
                            } catch (InterruptedException e) {
                                logger.info("尝试重新注册zk的临时节点异常(InterruptedException)", e);
                                break;
                            } catch (Exception e) {
                                logger.info("尝试重新注册zk的临时节点异常(Exception)", e);
                                break;
                            }
                        }
                    }
                }
            });
        }
    
        /**
         * 监听数据节点的变化重新注册
         * @param version 版本号
         * @param weight 服务权重信息
         * @throws InterruptedException
         */
        public void setGroupListener(final String service, final String version, final String ip, final String port, final String weight) throws InterruptedException {
            final String ipServicePath = getInstanceGroupPath(localInstance, ip);
            try {
                createGroupNodeIfNotExists(localInstance, ip);
                final NodeCache nodeCache = new NodeCache(zkClient, ipServicePath, false);
                nodeCache.start(true);
                nodeCache.getListenable().addListener(
                        new NodeCacheListener() {
                            public void nodeChanged() throws Exception {
                                synchronized (lock) {
                                    logger.info("服务端监听到节点: " + ipServicePath + " 变化, for service:" + service);
                                    generalRegister(service, version, ip, port, weight);
                                }
                            }
                        }
                );
            } catch (Exception e) {
                logger.error("nodeCache start exception:",e);
                throw new ThriftException("nodeCache start exception:", e);
            }
        }
    
    
        /**
         * 通用注册方法
         * @param service 服务名
         * @param version 版本号
         * @param ip 服务地址,IP
         * @param port 服务端口
         * @param weight 权重信息,格式为1-10的整数字符串形式,例如"5"
         *
         * 获取当前zk中组别配置,如果和本地不同,则删除zk中旧组别下的注册地址,在新组别下注册
         */
        public void generalRegister(String service, String version, String ip, String port, String weight) {
            logger.info("开始注册ServiceLocate, 服务: " + service);
            // 如果zk尚未启动,则启动
            if (zkClient.getState() == CuratorFrameworkState.LATENT) {
                zkClient.start();
            }
            String ipPort = ip + ":" + port;
    
            // 获取组别
            String ipInstancePath = getInstanceGroupPath(localInstance, ip);
            String groupJson = null;
            String group = "";
            try {
                createGroupNodeIfNotExists(localInstance, ip);
                groupJson = new String(zkClient.getData().forPath(ipInstancePath));
                GroupConfig groupConfig = JSON.parseObject(groupJson, GroupConfig.class);
                if (groupConfig == null || groupConfig.getGroup() == null) {
                    throw new ThriftException("获取到错误的分组配置,分组注册不会进行,请检查配置内容是否正确");
                }
                group = groupConfig.getGroup();
            } catch (Exception e) {
                logger.error("获取组别失败, 按默认组别执行", e);
                group = defaultGroup;
            }
            logger.info("注册到group: " + group);
    
            // 注册
            RegisterConfig registerConfig = new RegisterConfig();
            registerConfig.setWeight(weight);
            registerConfig.setGroup(group);
            String groupWeightString = JSON.toJSONString(registerConfig);
            try {
                String serviceLocatePath = getServiceLocatePath(service);
    
                // 创建当前服务定位存储节点,结构:
                // serviceLocatePath
                //    |-- address1(服务定位存储节点,临时节点)
                //    |-- address2(服务定位存储节点,临时节点)
                if (zkClient.checkExists().forPath(serviceLocatePath + "/" + ipPort) != null) {
                    // 已经存在则修改Data
                    zkClient.setData().forPath(serviceLocatePath + "/" + ipPort, groupWeightString.getBytes());
                } else {
                    // 不存在则创建
                    // 使用creatingParentContainersIfNeeded创建服务定位根目录(固定)
                    zkClient.create()
                            .creatingParentsIfNeeded()
                            .withMode(CreateMode.EPHEMERAL)
                            .forPath(serviceLocatePath + "/" + ipPort, groupWeightString.getBytes());
                }
    
                // 切换当前注册组别
                currentRegisterGroup = group;
                logger.info("服务: " + service + " 注册到group: " + group + " 成功");
            } catch (Exception e) {
                logger.error("zk分组注册异常:", e);
                throw new ThriftException("zk分组注册异常:", e);
            }
        }
    
        /**
         * 如果本地服务在ZK中不存在分组注册信息,则创建一个分组信息节点
         */
        private void createGroupNodeIfNotExists(String localInstance, String ip) throws Exception {
            if (zkClient.getState() == CuratorFrameworkState.LATENT) {
                zkClient.start();
            }
    
            String serviceGroupPath = getInstanceGroupPath(localInstance, ip);
    
            GroupConfig groupConfig = new GroupConfig();
            groupConfig.setGroup(defaultGroup);
            groupConfig.setWeight(defaultWeight);
    
            if (zkClient.checkExists().forPath(serviceGroupPath) == null) {
                logger.info("创建group_config,localInstance:" + localInstance + ",ip:" + ip);
                zkClient.create()
                        .creatingParentsIfNeeded()
                        .withMode(CreateMode.PERSISTENT)
                        .forPath(serviceGroupPath, JSON.toJSONString(groupConfig).getBytes());
            }
        }
    

    二 基于zk的服务自动发现


    服务自动发现.png

    1.使用NodeCache监听zk节点,也就是服务的变化;
    2.使用NodePathChild监听子节点,也就是ip:port的变化;

    public void afterPropertiesSet() throws Exception {
            logger.info("Provider初始化开始");
            // 本机IP获取
            if (thriftServerIpResolve == null) {
                thriftServerIpResolve = new ThriftServerIpLocalNetworkResolve();
            }
            String ipAddress = thriftServerIpResolve.getServerIp(publicIpOnly);
            if (StringUtils.isEmpty(ipAddress)) {
                throw new ThriftException("can not find server ip...");
            }
            logger.info("Provider创建ServiceLocate监听器. targetService:" + targetService);
            buildServiceLocateListener();
            cachedPath.start(StartMode.POST_INITIALIZED_EVENT);
            countDownLatch.await();
            logger.info("Provider创建GroupConfig监听器,localInstance:" + localInstance);
            buildInstanceGroupListener(ipAddress);
            logger.info("Provider初始化完成");
        }
    
        private void buildInstanceGroupListener(final String ip) {
            // 如果zk尚未启动,则启动
            if (zkClient.getState() == CuratorFrameworkState.LATENT) {
                zkClient.start();
            }
    
            final String serviceGroupPath = getInstanceGroupPath(localInstance, ip);
            try {
                // 如果不存在则先创建节点
                createGroupNodeIfNotExists(localInstance, ip);
                groupNodeCache = new NodeCache(zkClient, serviceGroupPath, false);
                groupNodeCache.getListenable().addListener(
                        new NodeCacheListener() {
                            public void nodeChanged() throws Exception {
                                synchronized (lock) {
                                    groupNodeCache.rebuild();
                                    createGroupNodeIfNotExists(localInstance, ip);
                                    rebuildGroup();
                                }
                            }
                        }
                );
                groupNodeCache.start(true);
                rebuildGroup();
            } catch (Exception e) {
                logger.error("nodeCache start exception:",e);
                throw new ThriftException("nodeCache start exception:", e);
            }
        }
    
        /**
         * 如果本地服务在ZK中不存在分组注册信息,则创建一个分组信息节点
         */
        private void createGroupNodeIfNotExists(String localInstance, String ip) throws Exception {
            if (zkClient.getState() == CuratorFrameworkState.LATENT) {
                zkClient.start();
            }
    
            String serviceGroupPath = getInstanceGroupPath(localInstance, ip);
    
            GroupConfig groupConfig = new GroupConfig();
            groupConfig.setGroup(defaultGroup);
            groupConfig.setWeight(defaultWeight);
    
            if (zkClient.checkExists().forPath(serviceGroupPath) == null) {
                zkClient.create()
                        .creatingParentsIfNeeded()
                        .withMode(CreateMode.PERSISTENT)
                        .forPath(serviceGroupPath, JSON.toJSONString(groupConfig).getBytes());
            }
        }
    
        private void rebuildGroup() throws UnsupportedEncodingException {
            String data = new String(groupNodeCache.getCurrentData().getData(), "utf-8");
            logger.info("客户端监听到分组变化,当前内容 " + data + " 进行同步");
            GroupConfig groupConfig = JSON.parseObject(data, GroupConfig.class);
            if (groupConfig == null || groupConfig.getGroup() == null) {
                logger.error("分组数据错误,缓存区域不会变更,请查看分组配置区数据,localInstance:" + localInstance
                        + "targetService:" + targetService + ",data: " + data);
                return;
            }
            currentGroup = groupConfig.getGroup();
        }
    
        private void buildServiceLocateListener() throws Exception {
            // 如果zk尚未启动,则启动
            if (zkClient.getState() == CuratorFrameworkState.LATENT) {
                zkClient.start();
            }
    
            // 服务地址监听
            // 寻找目标分组
            final String serviceLocatePath = getServiceLocatePath(targetService);
            cachedPath = new PathChildrenCache(zkClient, serviceLocatePath, true);
            PathChildrenCacheListener childrenCacheListener = new PathChildrenCacheListener() {
                public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                    PathChildrenCacheEvent.Type eventType = event.getType();
                    switch (eventType) {
                        case CONNECTION_RECONNECTED:
                            logger.info("Connection is reconection.");
                            break;
                        case CONNECTION_SUSPENDED:
                            logger.info("Connection is suspended.");
                            break;
                        case CONNECTION_LOST:
                            logger.warn("Connection error,waiting...");
                            // 这个return很讲究,ZK挂掉后,目的是当ZK挂掉后,不影响本地缓存
                            return;
                        case INITIALIZED:
                            //  countDownLatch.countDown();
                            logger.warn("Connection init ...");
                            break;
                        default:
                    }
                    // 任何节点的数据变动,都会rebuild,此处为一个"简单的"做法.
                    cachedPath.rebuild();
                    synchronized (lock) {
                        rebuild();
                    }
                    countDownLatch.countDown();
                }
            };
    
            cachedPath.getListenable().addListener(childrenCacheListener);
        }
    
        protected void rebuild() {
            logger.info("即将更新本地地址缓存, localService: " + localInstance + ", targetService: " + targetService);
            List<ChildData> children = cachedPath.getCurrentData();
            if (children == null || children.isEmpty()) {
                // 有可能所有的thrift server都与zookeeper断开了链接
                // 但是有可能,thrift client与thrift server之间的网络是良好的
                // 因此此处是否需要清空container,是需要多方面考虑的.
                container.clear();
                trace.clear();
                ipPortQueue.clear();
                logger.error("在注册服务区无法找到子节点");
                return;
            }
    
            String path = null;
            Map<String, List<InetSocketAddress>> currentMap = new HashMap<String, List<InetSocketAddress>>();
            try {
                for (ChildData data : children) {
                    path = data.getPath();
                    String address = new String(path.getBytes(), "utf-8");
                    String[] parts = address.split("/");
                    String ipPort = parts[parts.length-1];
                    String jsonString = new String(data.getData(), "utf-8");
                    RegisterConfig registerConfig = JSON.parseObject(jsonString, RegisterConfig.class);
                    if (registerConfig.getWeight() == null) {
                        throw new ThriftException("获取权重失败");
                    }
                    String weight = registerConfig.getWeight();
                    String group = registerConfig.getGroup();
    
                    // 当前InetAddress列表
                    List<InetSocketAddress> addressList = transfer(weight, ipPort);
    
                    // 添加到容器currentMap
                    if (!currentMap.containsKey(group)) {
                        currentMap.put(group, new ArrayList<InetSocketAddress>());
                    }
                    List<InetSocketAddress> groupList = currentMap.get(group);
                    groupList.addAll(addressList);
                    currentMap.put(group, groupList);
                }
    
                trace.clear();
                container.clear();
                ipPortQueue.clear();
                for (Map.Entry<String, List<InetSocketAddress>> entry : currentMap.entrySet()) {
                    String group = entry.getKey();
                    List<InetSocketAddress> current = entry.getValue();
                    Collections.shuffle(current);
    
                    // 先组装到备份容器
                    if (!trace.containsKey(group)) {
                        trace.put(group, new HashSet<InetSocketAddress>());
                    }
                    Set<InetSocketAddress> traceGroup = trace.get(group);
                    traceGroup.addAll(current);
    
                    // 组装容器
                    if (!container.containsKey(group)) {
                        container.put(group, new ArrayList<InetSocketAddress>());
                    }
                    List<InetSocketAddress> groupContainer = container.get(group);
                    groupContainer.addAll(current);
    
                    // 组装队列
                    if (!ipPortQueue.containsKey(group)) {
                        ipPortQueue.put(group, new LinkedList<InetSocketAddress>());
                    }
                    Queue<InetSocketAddress> groupQueue = ipPortQueue.get(group);
                    groupQueue.addAll(current);
                }
    
    
                logger.info("分组缓存重建完毕");
                for (Map.Entry<String, List<InetSocketAddress>> entry : currentMap.entrySet()) {
                    logger.info("group:" + entry.getKey() + ", target:" + entry.getValue());
                }
            } catch (Exception e) {
                logger.error("重建缓存失败" + e.getMessage());
                throw new ThriftException("重建缓存失败", e);
            }
        }
    
    
    
        /**
         * 根据权重分配初始化"IP:PORT"集合
         * @param weight 权重字符串,例如"5"
         * @param ipPort 例如10.0.0.1:9050
         * @return 根据权重信息返回类似10.0.0.1:9050集合
         */
        private List<InetSocketAddress> transfer(String weight, String ipPort) {
            List<InetSocketAddress> result = new ArrayList<InetSocketAddress>();
            InetAddress ipAddress = null;
            try {
                ipAddress = InetAddress.getByName(ipPort.split(":")[0]);
            } catch (UnknownHostException e) {
                logger.error("获取IP地址失败:" + e.getMessage());
            }
            int port = Integer.parseInt(ipPort.split(":")[1]);
            InetSocketAddress inetSocketAddress = new InetSocketAddress(ipAddress, port);
            for (int i = 0; i < Integer.parseInt(weight); i++) {
                result.add(inetSocketAddress);
            }
            return result;
        }
    
    
        /**
         * 根据权重情况随机获取IP:PORT
         * 取当前分组的队列
         */
        @Override
        public synchronized InetSocketAddress selector() {
            Queue<InetSocketAddress> currentQueue = ipPortQueue.get(currentGroup);
            List<InetSocketAddress> currentContainer = container.get(currentGroup);
            Set<InetSocketAddress> currentTrace = trace.get(currentGroup);
            if (currentQueue == null || currentQueue.isEmpty()) {
                if (currentContainer != null && !currentContainer.isEmpty()) {
                    currentQueue.addAll(currentContainer);
                } else if(currentTrace != null && !currentTrace.isEmpty()) {
                    synchronized (lock) {
                        currentContainer.addAll(currentTrace);
                        Collections.shuffle(currentContainer);
                        currentQueue.addAll(currentContainer);
                    }
                }
            }
    
            if (currentQueue == null || currentQueue.size() == 0) {
                logger.error("找不到可用服务,localInstance:" + localInstance + "目标服务:" + targetService);
                throw new ThriftException("找不到可用服务,localInstance:" + localInstance + "目标服务:" + targetService);
            }
            return currentQueue.poll();
        }
    
        @Override
        public boolean validateGroup(String group, InetSocketAddress address) {
            // 当前trace中是否存在该address若无则排除
            Set<InetSocketAddress> cTrace = trace.get(currentGroup);
            if (cTrace == null) {
                logger.error("找不到可用服务,Trace为空");
                return false;
            }
            return cTrace.contains(address);
        }
    
        @Override
        public String getGroup() {
            return currentGroup;
        }
    
        @Override
        public List<InetSocketAddress> findServerAddressList() {
            List<InetSocketAddress> currentContainer = container.get(currentGroup);
            return Collections.unmodifiableList(currentContainer);
        }
    
        @Override
        public String getService() {
            return targetService;
        }
    
        @Override
        public String getServiceUrl() {
            return "";
        }
    
        public void close(){
            try {
                cachedPath.close();
                groupNodeCache.close();
            } catch (IOException e) {
            }
            zkClient.close();
        }
    
    

    三 客户端jdk代理实现客户端代理调用远程服务,客户端代理交给GenericObjectPool实现创建,销毁。

    @Override
        public void afterPropertiesSet() throws Exception {
            ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
            // 加载Iface接口
            objectClass = classLoader.loadClass(serverAddressProvider.getService() + "$Iface");
            // 加载Client.Factory类
            Class<TServiceClientFactory<TServiceClient>> fi = (Class<TServiceClientFactory<TServiceClient>>) classLoader.loadClass(serverAddressProvider.getService() + "$Client$Factory");
            TServiceClientFactory<TServiceClient> clientFactory = fi.newInstance();
                 // 实现客户端连接池
            ThriftClientPoolFactory clientPool = new ThriftClientPoolFactory(serverAddressProvider, clientFactory, callback);
            GenericObjectPool.Config poolConfig = new GenericObjectPool.Config();
            poolConfig.maxActive = maxActive;
            poolConfig.maxIdle = 1;
            poolConfig.minIdle = 0;
            poolConfig.minEvictableIdleTimeMillis = idleTime;
            poolConfig.timeBetweenEvictionRunsMillis = idleTime * 2L;
            poolConfig.testOnBorrow=true;
            poolConfig.testOnReturn=false;
            poolConfig.testWhileIdle=false;
            pool = new GenericObjectPool<TServiceClient>(clientPool, poolConfig);
                    // 创建客户端代理,实现远程调用,异常,超时重试机制
            proxyClient = Proxy.newProxyInstance(classLoader, new Class[] { objectClass }, new InvocationHandler() {
                @Override
                public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                    // 记录调用开始时间t1
                    long t1 = System.currentTimeMillis();
    
                    // 调用远程的目标方法,重试3次
                    Object reObject = null;
                    Exception reException = null;
                    TServiceClient client = null;
                    boolean success = false;
                    int MAX_RETRY = 3, retry = 0;
                    while (++retry <= MAX_RETRY) {
                        client = pool.borrowObject();
                        boolean flag = true;
                        try {
                            reObject = method.invoke(client, args);
                        } catch (Exception e) {
                            flag = false;
                            reException = e;
                            logger.info("retry:"+e.getMessage(),e);
                        } finally {
                            logger.info("retry time:"+retry);
                            if (flag) {
                                pool.returnObject(client);
                            } else {
                                pool.invalidateObject(client);
                            }
                        }
    
                        // 执行成功,退出循环
                        if (flag) {
                            success = true;
                            break;
                        }
    
                        // 只有超时类异常进行重试
                        boolean needRetry = false;
                        if (reException != null && reException instanceof InvocationTargetException) {
                            Throwable cause1 = reException.getCause();
                            if (cause1 != null && cause1 instanceof TTransportException) {
                                Throwable cause2 = cause1.getCause();
                                if (cause2 != null && (cause2 instanceof SocketTimeoutException || cause2 instanceof ConnectTimeoutException)) {
                                    if (method.getName() != null && method.getName().startsWith("get")) {
                                        logger.info("timeout needRetry set true");
                                        needRetry = true;
                                    }
                                }
                            }
                        }
    
                        if (!needRetry)
                            break;
                    }
    
                    // 记录调用结束时间t2
                    long t2 = System.currentTimeMillis();
                    printLog(client, method, (t2 - t1), success, retry > MAX_RETRY ? MAX_RETRY : retry);
    
                    if (!success) {
                        if (reException instanceof InvocationTargetException)
                            throw ((InvocationTargetException) reException).getTargetException();
                        else
                            throw reException;
                    } else {
                        return reObject;
                    }
                }
            });
        }
    

    3.1 客户端连接池的实现

    // 客户端销毁
    @Override
        public void destroyObject(TServiceClient client) throws Exception {
            if (callback != null) {
                try {
                    callback.destroy(client);
                } catch (Exception e) {
                    logger.warn("destroyObject:{}", e);
                }
            }
            clientGroupMap.remove(client);
            clientAddressMap.remove(client);
            logger.info("destroyObject:{}", client);
            TTransport pin = client.getInputProtocol().getTransport();
            pin.close();
            TTransport pout = client.getOutputProtocol().getTransport();
            pout.close();
        }
    // 客户端创建
    @Override
        public TServiceClient makeObject() throws Exception {
            InetSocketAddress address = serverAddressProvider.selector();
            String group = serverAddressProvider.getGroup();
            if (address == null) {
                new ThriftException("No provider available for remote service");
            }
    
            TTransport transport;
            TProtocol protocol;
            if (StringUtils.isEmpty(serverAddressProvider.getServiceUrl())) {
                // 如果serviceUrl是空,则采用TFramedTransport,适用于Java的thrift服务
                // socket超时30s,connect超时5s
                TSocket tsocket = new TSocket(address.getHostName(), address.getPort(), 10000, 5000);
                transport = new TFramedTransport(tsocket);
                protocol = new TBinaryProtocol(transport);
            } else {
                // 如果serviceUrl不空,则采用THttpClient的transport,适用于php的thrift服务
                String url = "";
                try {
                    url = "http://" + address.getHostName() + ":" + address.getPort()
                            + serverAddressProvider.getServiceUrl();
                } catch (NullPointerException e) {
                    if (address == null) {
                        logger.error("address is null");
                    }
                    if (serverAddressProvider == null) {
                        logger.error("serverAddressProvider is null");
                    }
                    throw e;
                }
                transport = new THttpClient(url);
                protocol = new TBinaryProtocol(transport);
            }
    
            TServiceClient client = this.clientFactory.getClient(protocol);
            clientGroupMap.put(client, group);
            clientAddressMap.put(client, address);
            transport.open();
            if (callback != null) {
                try {
                    callback.make(client);
                } catch (Exception e) {
                    logger.warn("makeObject:{}", e);
                }
            }
            return client;
        }

    相关文章

      网友评论

          本文标题:thrift+zk实现服务的注册,发现,调用

          本文链接:https://www.haomeiwen.com/subject/hvcyaftx.html