美文网首页手写RPC框架
手写RPC框架(4)-重写服务治理,开启1000个线程看看net

手写RPC框架(4)-重写服务治理,开启1000个线程看看net

作者: jwfy | 来源:发表于2019-07-28 15:11 被阅读16次

    本人微信公众号(jwfy)欢迎关注

    手写RPC框架
    1、手写一个RPC框架,看看100个线程同时调用效果如何
    2、手写RPC框架(2)-引入zookeeper做服务治理
    3、手写RPC框架(3)-引入Hessian序列化工具

    原本打算写Netty替换BIO模型的,但是在写Netty中关于Channel长链接部分时,再回顾服务治理这一块的代码实在不太好(当然不意味现在的就很好),决定重写服务治理这一块的逻辑,最后验证1000个线程的执行效果,如下是涉及到改动的点:

    • 添加了logback日志模块,便于日志输出
    • 拆分了之前的服务治理,形成了服务发现和服务注册两个模块
    • 之前服务发现是细化到方法层面,现在修改成接口层面
    • 新增了zk节点监听模式,实现异步动态修改节点信息
    • netty的channel长链接模式
    • 负载均衡由之前的服务提供方ip的,改成netty的channel

    在笔记中也会尽可能的阐述自己的设计思路,思路&实现都可能不那么完美,但这也是手写RPC的目的所在。当这个问题抛给你时,你是如何思考的,落实到具体代码实现,又有多少需要去妥协的

    看各种框架源码也是如此,不能为了看源码而看源码,看源码一方面能帮助我们解决实际的框架使用问题,另一方面是学习大佬的设计思路&好的架构经验,以能为我们所学习和使用

    增加logback模块

    使用SLF4J标准规范,再关联绑定使用了logback,因为在com.101tec#zkclient#0.11版本的zk中包含了log4j,为了避免冲突,所以要么移除,要么采用maven的最近原则,屏蔽zk中的日志模块,本文中则是采用了最近原则的做法,把logback放在pom文件的最顶部,如下图就是启动后的日志输出了(别忘记了往.gitignore中添加logs文件夹)

    image

    服务注册

    服务注册之前是会注册服务提供方和服务调用方两者,以provider和consumer区分,但是这个新的改进中去掉了该部分逻辑,服务提供方只进行服务注册,服务调用方只进行服务发现,所以这也算一个妥协的点吧。

    image

    如上图是改写之后的zk节点信息,最下面的节点路径就是存在的ip数据,是临时节点,接下来看看服务注册这一块是如何实现的。

    public interface ServiceRegister {
    
        /**
         * 服务注册
         * @param config
         */
        void register(BasicConfig config);
    
        /**
         * 优雅关闭
         */
        void close();
    }
    

    保留了之前的注册接口,以便于能做到协议扩展,同时加上了close关闭操作。

    public class ZkServiceRegister implements ServiceRegister {
    
        private static final Logger logger = LoggerFactory.getLogger(ZkServiceRegister.class);
        private CuratorFramework client;
        private RegisterConfig registerConfig;
    
        public ZkServiceRegister(RegisterConfig registerConfig) {
            this.registerConfig = registerConfig;
            RetryPolicy policy = new ExponentialBackoffRetry(1000, 3);
    
            this.client = CuratorFrameworkFactory
                    .builder()
                    .connectString(registerConfig.getZkHost())
                    .sessionTimeoutMs(registerConfig.getSessionTimeOut())
                    .retryPolicy(policy)
                    .namespace(registerConfig.getZkNameSpace())
                    .build();
            // 业务的根路径是 /jwfy/simple-rpc ,其他的都会默认挂载在这里
    
            this.client.start();
            logger.info("zk启动正常");
        }
    
        @Override
        public void register(BasicConfig config) {
            String interfacePath = "/" + config.getInterfaceName();
            try {
                if (this.client.checkExists().forPath(interfacePath) == null) {
                    // 创建 服务的永久节点
                    this.client.create()
                            .creatingParentsIfNeeded()
                            .withMode(CreateMode.PERSISTENT)
                            .forPath(interfacePath);
                }
                String address = getServiceAddress(config);
                String path = String.format("%s/%s/%s", interfacePath, ServiceType.PROVIDER.getType(), address);
                // 这里强制采用了ServiceType.PROVIDER.getType(),也就是provider
    
                logger.info("注册 zk path: [" + this.registerConfig.getZkNameSpace() + path + "]");
                this.client.create()
                        .creatingParentsIfNeeded()
                        .withMode(CreateMode.EPHEMERAL)
                        .forPath(path, "0".getBytes());
                // 创建临时节点,节点内的数据是0
            } catch (Exception e) {
                // 如果节点session未过期,还未被清除,再次创建则会提示节点已存在
                logger.error("注册zk失败, [{}]:{}", interfacePath, e.getMessage());
            }
        }
    
        @Override
        public void close() {
            this.client.close();
            logger.warn("zkClient关闭");
        }
    
        private String getServiceAddress(BasicConfig config) {
            return new StringBuilder().append(config.getHost()).append(":").append(config.getPort()).toString();
        }
    }
    

    相比上一个版本,代码精简了不少,只是针对服务提供方的接口进行了一个zk节点的注册,此外添加了close操作,以便于在服务停止时,主动关闭zk节点的链接。

    服务发现

    public interface ServiceDiscovery {
    
        /**
         * 获取服务的ip信息并添加zk监听
         * @param interfaceName
         * @return
         */
        List<String> get(String interfaceName);
    }
    

    原本这部分功能是在服务注册中的,现在移出来了,单独弄成一个接口ServiceDiscovery,同样是为了协议拓展的功能。

    public class ZkServiceDiscovery implements ServiceDiscovery {
    
        private static final Logger logger = LoggerFactory.getLogger(ZkServiceDiscovery.class);
        private CuratorFramework client;
        private Map<String, List<String>> servicePathMap = new ConcurrentHashMap<>();
        // 存储的是interface->ip信息的映射关系
    
        public ZkServiceDiscovery(RegisterConfig registerConfig) {
            RetryPolicy policy = new ExponentialBackoffRetry(1000, 3);
            this.client = CuratorFrameworkFactory
                    .builder()
                    .connectString(registerConfig.getZkHost())
                    .sessionTimeoutMs(registerConfig.getSessionTimeOut())
                    .retryPolicy(policy)
                    .namespace(registerConfig.getZkNameSpace())
                    .build();
    
            this.client.start();
            logger.info("zk启动正常");
        }
    
        @Override
        public List<String> get(String interfaceName){
            String path = String.format("/%s/%s", interfaceName, ServiceType.PROVIDER.getType());
            List<String> ips = null;
            try {
                ips = this.client.getChildren().forPath(path);
                // 先获取子节点的所有信息
                this.addWatcher(interfaceName, path);
                // 添加监听模式
                servicePathMap.put(path, ips);
            } catch (Exception e) {
            }
            return ips;
        }
    
        private void addWatcher(String interfaceName, String path) throws Exception {
            PathChildrenCache cache = new PathChildrenCache(this.client, path, true);
            cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
            cache.getListenable().addListener(new PathChildrenCacheListener() {
                @Override
                public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                    if (CHILD_ADDED == event.getType() || CHILD_REMOVED == event.getType()) {
                        // 节点添加和节点移除操作
                        String[] childPath = event.getData().getPath().split("/");
                        String ip = childPath[childPath.length-1];
                        PathChildrenCacheEvent.Type type = event.getType();
                        logger.info("path:[{}],ip:[{}], type:{}", path, ip, type.toString());
                        List<String> stringList = servicePathMap.get(path);
                        if (stringList == null) {
                            stringList = new ArrayList<>();
                            servicePathMap.put(path, stringList);
                        }
    
                        if (CHILD_ADDED == type && !stringList.contains(ip)) {
                            stringList.add(ip);
                            ClientConnection.getInstance().connection(interfaceName, ip);
                            // 这里有使用ClientConnection(服务连接器)去完成通知操作
                        } else if (CHILD_REMOVED == type) {
                            stringList.remove(ip);
                            ClientConnection.getInstance().remove(interfaceName, ip);
                        }
                    }
                }
            });
        }
    }
    

    通过zk获取对应服务的服务提供方信息的同时,添加了watcher模式以能实时感知zk节点的变化,然后把其变化信息告诉给服务调用方连接器

    服务连接器

    管理zk节点和netty可用channel的连接器,负载均衡也是在此处发挥作用,客户端可通过此连接器完成有效的Channel获取

    public class ClientConnection  {
    
        private static final Logger logger = LoggerFactory.getLogger(ClientConnection.class);
        private SerializeProtocol serializeProtocol = new HessianSerialize();
        private Map<String, CopyOnWriteArrayList<IpClientHandler>> clientHandlerMap = new ConcurrentHashMap<>();
        private CopyOnWriteArrayList<ClientHandler> clientHandlerList = new CopyOnWriteArrayList<>();
        private volatile boolean flag = true;
        private ReentrantLock reentrantLock = new ReentrantLock();
    
        /**
         * 一个开关以控制对cliendhandler对获取
         */
        private Condition condition = reentrantLock.newCondition();
    
        private static class Single {
            private static final ClientConnection INSTANCE = new ClientConnection();
        }
    
        public static ClientConnection getInstance() {
            // 内部类的单例模式,线程安全
            return Single.INSTANCE;
        }
    
        public void connection(String interfaceName, String ip) {
            InetSocketAddress address = CommonUtils.parseIp(ip);
            EventLoopGroup work = new NioEventLoopGroup();
            // netty的客户端连接操作
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(work).channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline()
                                    .addLast(new LengthFieldPrepender(2,0,false))
                                    .addLast(new RpcEncoder(RpcRequest.class, serializeProtocol))
                                    .addLast(new RpcDecoder(RpcResponse.class, serializeProtocol))
                                    .addLast(new ClientHandler());
                        }
                    });
            ChannelFuture channelFuture = bootstrap.connect(address);
            channelFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture channelFuture) throws Exception {
                    if (channelFuture.isSuccess()) {
                        ClientHandler handler = channelFuture.channel().pipeline().get(ClientHandler.class);
                        InetSocketAddress remoteAddress = (InetSocketAddress) handler.getChannel().remoteAddress();
                        logger.info("链接到远程服务器:{}, address:{}",  handler,  remoteAddress);
                        CopyOnWriteArrayList<IpClientHandler> ips = clientHandlerMap.get(interfaceName);
                        if (ips == null) {
                            ips = new CopyOnWriteArrayList<IpClientHandler>();
                            clientHandlerMap.put(interfaceName, ips);
                        }
                        ips.add(new IpClientHandler(ip, handler));
                        clientHandlerList.add(handler);
                        // 通知其他可能被阻塞的线程
                        notifyHandler();
                    }
                }
            });
        }
    
        public void remove(String interfaceName, String ip) {
            // 移除掉已经废弃的不可用的channel信息
            CopyOnWriteArrayList<IpClientHandler> ips = clientHandlerMap.get(interfaceName);
            if (ips != null && !ips.isEmpty()) {
                ips.stream().filter(x -> {
                    return x.getIp().equals(ip);
                }).findFirst().ifPresent(x -> {
                    ips.remove(x);
                    clientHandlerList.remove(x.getHandler());
                });
            }
        }
    
        public ClientHandler getHandler(String interfaceName) {
            CopyOnWriteArrayList<IpClientHandler> ips = clientHandlerMap.get(interfaceName);
            int size = ips.size();
            while (flag && size <=0) {
                // 线程等待
                try {
                    awaitHandler();
                } catch (InterruptedException e) {
                    throw new RuntimeException("无法获取有效的handler");
                }
                ips = clientHandlerMap.get(interfaceName);
                size = ips.size();
            }
            if (!flag) {
                return null;
            }
            if (size == 1) {
                return ips.get(0).getHandler();
            }
            Random random = new Random();
            return ips.get(random.nextInt(size)).getHandler();
        }
    
        private void awaitHandler() throws InterruptedException {
            this.reentrantLock.lock();
            try {
                this.condition.await(2000, TimeUnit.MILLISECONDS);
                // 等待有有效的handler,不过也是有时间的,避免长时间等待
            } finally {
                this.reentrantLock.unlock();
            }
        }
    
        private void notifyHandler() {
            this.reentrantLock.lock();
            try {
                this.condition.signalAll();
            } finally {
                this.reentrantLock.unlock();
            }
        }
    
        public void close() {
            // 关闭,退出可能的条件判断、存储的channel进行主动关闭操作
            this.flag = true;
            this.clientHandlerList.forEach(x -> {
                x.getChannel().close();
            });
            this.clientHandlerList.clear();
            this.clientHandlerMap.clear();
            logger.warn("Netty服务端关闭了");
        }
    }
    

    这短代码稍微比较长,而且也包含了netty客户端连接的逻辑(暂时可以不用关心netty连接的逻辑)。

    那么为什么会在getHandler中添加条件判断呢?是因为netty连接有效的channel处理器是异步完成的,所以是在operationComplete异步回调方法中去通知的,确保可以获取到有效channel,而同时为了避免长时间的阻塞等待,故使用了await 2s的时间。

    负载均衡也是比较简单的,如果确实没有数据则直接返回null,当有效的数据只有1个时,也没必要再做负载均衡,只有当数据超过1条时,需要进行选择操作。

    实践

    设置了两个服务提供方,一个服务调用方,按照先启动一个服务提供方、再启动服务调用方、最后启动另一个服务调用方的执行顺序

    • 服务提供方1启动

      image
    • 服务调用方启动

      image
    • 服务提供方2启动

    image
    • 服务提供方1关闭
    image image

    日志显示的也很明显,watcher监听到了节点的移除事件,然后进行关闭channel长连接的操作

    • 客户端退出
    image
    • 负载均衡开启1000个线程

    负载均衡

    image

    服务调用方

    image

    服务提供方

    image

    负载均衡在两个channel中随机轮询,开启的1000个线程调用没什么问题,不过发现了个关于netty方面的问题,这期就不再介绍,后面介绍netty的时候再聊。

    总结

    本期针对服务治理的重写改善了之前每次获取远程ip都需要通过zookeeper获取的方式,降低对zookeeper带来压力,采用zookeeper的watcher模式感知到节点的变化,同时本地缓存的也不是ip节点数据,而是长连接channel数据,避免了每次获取ip数据后都需要进行连接和关闭操作,可以进一步的提高效率。最后新加了优雅关闭的操作。

    之前BIO模式采用线程池的方式,1000个线程很快就会把线程池打满,导致后续的任务全部被拒绝,如果在拒绝任务没有处理好还会导致服务假死的情况,而采用netty,并没有出现线程池被打满的情况,采用react模式确实能够很好的处理网络连接的处理逻辑,并且封装了NIO部分的操作。限于篇幅的缘故,netty就还是留到后面再介绍。

    如有想需要demo代码的可关注本人微信公众号,给我发私信,代码存在的问题也欢迎提出~

    本人微信公众号(搜索jwfy)欢迎关注

    微信公众号

    相关文章

      网友评论

        本文标题:手写RPC框架(4)-重写服务治理,开启1000个线程看看net

        本文链接:https://www.haomeiwen.com/subject/qexprctx.html