美文网首页
Zookeeper应用-rpc改造

Zookeeper应用-rpc改造

作者: 剑道_7ffc | 来源:发表于2020-05-07 08:05 被阅读0次

    整体思路

    服务端:将key:服务名称,value:ip和端口号注册给zookeeper
    客户端:根据服务名称,从zk中获取ip和端口号集合,并做负载均衡

    具体代码

    服务端

    public class ZkConfig {
    
        public static String CONNECTION_STR="192.168.13.102:2181,192.168.13.103:2181,192.168.13.104:2181";
    
    }
    public interface IRegistryCenter {
    
        /**
         * 服务注册名称和服务注册地址实现服务的管理
         * @param serviceName
         * @param serviceAddress
         */
        void registry(String serviceName,String serviceAddress);
    }
    public class RegistryCenterWithZk implements IRegistryCenter{
    
        CuratorFramework curatorFramework =null;
    
        {
            //初始化zookeeper的连接, 会话超时时间是5s,衰减重试
            curatorFramework = CuratorFrameworkFactory.builder().
                    connectString(ZkConfig.CONNECTION_STR).sessionTimeoutMs(5000).
                    retryPolicy(new ExponentialBackoffRetry(1000, 3)).
                    namespace("registry")
                    .build();
            curatorFramework.start();
        }
    
        @Override
        public void registry(String serviceName, String serviceAddress) {
            String servicePath="/"+serviceName;
            try {
                //判断节点是否存在
                if(curatorFramework.checkExists().forPath(servicePath)==null){
                    curatorFramework.create().creatingParentsIfNeeded().
                            withMode(CreateMode.PERSISTENT).forPath(servicePath);
                }
                //serviceAddress: ip:port
                String addressPath=servicePath+"/"+serviceAddress;
                curatorFramework.create().withMode(CreateMode.EPHEMERAL).forPath(addressPath);
                System.out.println("服务注册成功");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    

    客户端

    public class ZkConfig {
    
        public static String CONNECTION_STR="192.168.13.102:2181,192.168.13.103:2181,192.168.13.104:2181";
    }
    public interface IServiceDiscovery {
    
        //根据服务名称返回服务地址
        String  discovery(String serviceName);
    }
    public class ServiceDiscoveryWithZk implements IServiceDiscovery{
    
        CuratorFramework curatorFramework =null;
    
        List<String> serviceRepos=new ArrayList<>(); //服务地址的本地缓存
        {
            //初始化zookeeper的连接, 会话超时时间是5s,衰减重试
            curatorFramework = CuratorFrameworkFactory.builder().
                    connectString(ZkConfig.CONNECTION_STR).sessionTimeoutMs(5000).
                    retryPolicy(new ExponentialBackoffRetry(1000, 3)).
                    namespace("registry")
                    .build();
            curatorFramework.start();
        }
    
        /**
         * 服务的查找
         * 设置监听
         *
         * @param serviceName
         * @return
         */
        @Override
        public String discovery(String serviceName) {
            //完成了服务地址的查找(服务地址被删除)
            String path="/"+serviceName; //registry/com.gupaoedu.demo.HelloService
            if(serviceRepos.isEmpty()) {
                try {
                    serviceRepos = curatorFramework.getChildren().forPath(path);
                    registryWatch(path);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            //针对已有的地址做负载均衡
            LoadBalanceStrategy loadBalanceStrategy=new RandomLoadBalance();
            return loadBalanceStrategy.selectHost(serviceRepos);
        }
    
        private void registryWatch(final String path) throws Exception {
            PathChildrenCache nodeCache=new PathChildrenCache(curatorFramework,path,true);
            PathChildrenCacheListener nodeCacheListener= (curatorFramework1, pathChildrenCacheEvent) -> {
                System.out.println("客户端收到节点变更的事件");
                serviceRepos=curatorFramework1.getChildren().forPath(path);// 再次更新本地的缓存地址
            };
            nodeCache.getListenable().addListener(nodeCacheListener);
            nodeCache.start();
    
        }
    }
    public interface LoadBalanceStrategy {
    
        String selectHost(List<String> repos);
    
    }
    public abstract class AbstractLoadBalance implements LoadBalanceStrategy{
        @Override
        public String selectHost(List<String> repos) {
            //repos可能为空, 可能只有一个。
            if(repos==null||repos.size()==0){
                return null;
            }
            if(repos.size()==1){
                return repos.get(0);
            }
            return doSelect(repos);
        }
    
        protected abstract String doSelect(List<String> repos);
    
    }
    public class RandomLoadBalance extends AbstractLoadBalance{
    
        @Override
        protected String doSelect(List<String> repos) {
            int length=repos.size();
            Random random=new Random(); //从repos的集合内容随机获得一个地址
            return repos.get(random.nextInt(length));
        }
    }
    

    相关文章

      网友评论

          本文标题:Zookeeper应用-rpc改造

          本文链接:https://www.haomeiwen.com/subject/cnznghtx.html