美文网首页
基于Zookeeper构建分布式RPC框架

基于Zookeeper构建分布式RPC框架

作者: LangK | 来源:发表于2019-02-16 11:20 被阅读0次
    新年伊始,为了让自己快速的进入工作状态。决定自己动手构建一套分布式RPC框架。

    由于是独立的RPC框架,所以采用Zookeeper做注册中心,使用Netty做服务处理。由于Netty是NIO框架,在处理网络请求等待结果返回的时候着实需要一番大改动。

    • 注册中心

      基于Zookeeper做注册中心的实现其实是比较简单的。

      具体的实现逻辑:在服务启动时扫描是否包含RpcService注解的类。然后将该类注解的serverName属性拿到注册到Zookeeper节点。再拿到该服务所在机器(或者容器)的IP,将该IP以临时节点的角色注册到Zookeeper上。同时客户端启动时监听Zookeeper节点改变事件,并及时刷新可用的服务列表。

      代码实现

      IRegisterCenterProvider //服务端逻辑

    package one.bugu.zookeeper.rpc.framework.service.zookeeper;
    
    import java.util.List;
    
    /**
     * Created with IntelliJ IDEA.
     * User: LangK
     * Created Date 2019/2/14
     * Time: 14:30
     * Description:
     */
    public interface IRegisterCenterProvider {
    
        /**
         * 服务端获取服务提供者信息
         *
         * 返回对象:key:服务提供者接口,value:服务提供者IP列表
         * @param serverName
         * @param ips
         */
        void registerProvider(String serverName, List<String> ips);
    
        /**
         * 更新服务端提供者的信息
         */
        void updateProvider();
    }
    
    
    

    RegisterCenterProviderImpl //服务端逻辑实现

    package one.bugu.zookeeper.rpc.framework.service.zookeeper;
    
    import one.bugu.zookeeper.rpc.framework.annotations.RpcService;
    import one.bugu.zookeeper.rpc.framework.service.RpcServiceConfiguration;
    import one.bugu.zookeeper.rpc.framework.service.socket.ServiceSocket;
    import one.bugu.zookeeper.rpc.framework.util.SpringContextUtil;
    import one.bugu.zookeeper.rpc.framework.zookeeper.ZooKeeperHelper;
    import one.bugu.zookeeper.rpc.framework.zookeeper.ZookeeperConfiguration;
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.util.StringUtils;
    
    import java.net.InetAddress;
    import java.net.NetworkInterface;
    import java.net.SocketException;
    import java.util.*;
    
    /**
     * Created with IntelliJ IDEA.
     * User: LangK
     * Created Date 2019/2/14
     * Time: 14:31
     * Description:
     */
    public class RegisterCenterProviderImpl implements IRegisterCenterProvider {
    
        /**
         * zookeeper配置
         */
        private ZookeeperConfiguration zookeeperConfiguration;
        /**
         * rpc配置
         */
        private RpcServiceConfiguration rpcServiceConfiguration;
    
        /**
         * zookeeper连接器
         */
        private ZooKeeperHelper zooKeeperHelper;
    
        /**
         * 保存service对应的Bean
         * 接收到客户端请求时,可快速的找到处理的bean
         */
        public static Map<String, Object> serverBean = new HashMap<>();
    
        private static Logger logger = LoggerFactory.getLogger(RegisterCenterProviderImpl.class);
    
        /**
         * netty服务
         */
        private Thread socketThread;
    
        public RegisterCenterProviderImpl(ZookeeperConfiguration zookeeperConfiguration, RpcServiceConfiguration rpcServiceConfiguration) {
            this.zookeeperConfiguration = zookeeperConfiguration;
            this.rpcServiceConfiguration = rpcServiceConfiguration;
        }
    
        @Override
        public void registerProvider(String serverName, List<String> ips) {
            if (zooKeeperHelper == null) {
                zooKeeperHelper = new ZooKeeperHelper(zookeeperConfiguration.getUrl(), new ServerNodeChangeWatcher());
            }
            String path = zookeeperConfiguration.getPath() + "/" + serverName;
            if (!zooKeeperHelper.existsNode(path)) {
                String resultPath = zooKeeperHelper.createNode(path, null);
                if (StringUtils.isEmpty(resultPath)) {
                    logger.info("RPC服务注册失败,服务名:{}", serverName);
                    return;
                }
            }
            for (String ip : ips) {
                String node = path + "/" + ip;
                if (!zooKeeperHelper.existsNode(node)) {
                    String resultPath = zooKeeperHelper.createTempNode(node, null);
                    if (StringUtils.isEmpty(resultPath)) {
                        logger.info("RPC服务注册失败,服务名:{},IP地址:{}", serverName, ip);
                    }
                }
            }
        }
    
    
        public void updateProvider() {
            Map<String, Object> beansWithAnnotation = SpringContextUtil.getApplicationContext().getBeansWithAnnotation(RpcService.class);
            if (beansWithAnnotation == null || beansWithAnnotation.isEmpty()) {
                return;
            }
            //Socket地址
            serverBean.clear();
            List<String> ips = getIp();
            for (String key : beansWithAnnotation.keySet()) {
                String serverName = beansWithAnnotation.get(key).getClass().getAnnotation(RpcService.class).name();
                serverBean.put(serverName, beansWithAnnotation.get(key));
                registerProvider(serverName, ips);
            }
            if (socketThread == null) {
                socketThread = new Thread(new ServiceSocket(rpcServiceConfiguration));
            }
            if (!socketThread.isAlive()) {
                socketThread.start();
            }
        }
    
    
        /**
         * 获取IP
         *
         * @return IP地址:端口号
         */
        public List<String> getIp() {
            List<String> host_ip_list = new ArrayList<String>();
            try {
                for (NetworkInterface networkInterface : Collections
                        .list(NetworkInterface.getNetworkInterfaces())) {
                    for (InetAddress addr : Collections.list(networkInterface.getInetAddresses())) {
                        if (!addr.isLoopbackAddress() && !addr.isLinkLocalAddress() && addr.isSiteLocalAddress()) {
                            host_ip_list.add(addr.getHostAddress() + ":" + rpcServiceConfiguration.getPort());
                        }
                    }
                }
            } catch (SocketException e) {
                logger.error("获取IP地址异常", e);
            }
            return host_ip_list;
        }
    
        class ServerNodeChangeWatcher implements Watcher {
            public void process(WatchedEvent event) {
                if (event.getState() == Event.KeeperState.SyncConnected) {
                    logger.info("Watch received SyncConnected event");
                    updateProvider();
                }
                if (event.getState() == Event.KeeperState.Disconnected) {
                    logger.info("Watch received Disconnected event");
                    zooKeeperHelper = null;
                }
            }
        }
    }
    
    

    IRegisterCenterInvoker //客户端逻辑

    package one.bugu.zookeeper.rpc.framework.client.zookeeper;
    
    import java.util.List;
    import java.util.Map;
    
    /**
     * Created with IntelliJ IDEA.
     * User: LangK
     * Created Date 2019/2/14
     * Time: 14:27
     * Description:
     */
    public interface IRegisterCenterInvoker {
    
        /**
         * 消费端初始化服务提供者信息本地缓存
         */
        public void initProviderMap();
    
        public void updateProviderMap();
    
        /**
         * 消费端获取服务提供者信息
         */
        public Map<String, List<String>> getProviderMap();
    
    }
    
    

    RegisterCenterInvokerImpl //客户端逻辑实现

    package one.bugu.zookeeper.rpc.framework.client.zookeeper;
    
    import one.bugu.zookeeper.rpc.framework.zookeeper.ZooKeeperHelper;
    import one.bugu.zookeeper.rpc.framework.zookeeper.ZookeeperConfiguration;
    import com.google.gson.Gson;
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.List;
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    
    /**
     * Created with IntelliJ IDEA.
     * User: LangK
     * Created Date 2019/2/14
     * Time: 15:34
     * Description:
     */
    public class RegisterCenterInvokerImpl implements IRegisterCenterInvoker {
    
        private Gson gson = new Gson();
    
        /**
         * zookeeper配置
         */
        private ZookeeperConfiguration zookeeperConfiguration;
    
        /**
         * 保存所有的service以及对应的Netty提供者的IP:PORT
         */
        private Map<String, List<String>> providerMap;
    
        private static Logger logger = LoggerFactory.getLogger(RegisterCenterInvokerImpl.class);
    
        /**
         * Zookeeper连接器
         */
        private ZooKeeperHelper zooKeeperHelper;
    
        public RegisterCenterInvokerImpl(ZookeeperConfiguration zookeeperConfiguration) {
            this.zookeeperConfiguration = zookeeperConfiguration;
        }
    
    
        @Override
        public void initProviderMap() {
            zooKeeperHelper = new ZooKeeperHelper(zookeeperConfiguration.getUrl(), new ClientNodeChangeWatcher());
            providerMap = new ConcurrentHashMap<>();
            updateProviderMap();
    
        }
    
        @Override
        public void updateProviderMap() {
            synchronized (this){
                providerMap.clear();
                List<String> serverList = zooKeeperHelper.getChildren(zookeeperConfiguration.getPath());
                if (serverList == null || serverList.isEmpty()) {
                    return;
                }
                for (String server : serverList) {
                    String serverPath = zookeeperConfiguration.getPath() + "/" + server;
                    List<String> providerList = zooKeeperHelper.getChildren(serverPath);
                    if (providerList!=null&&!providerList.isEmpty()){
                        providerMap.put(server,providerList);
                    }
                }
                logger.info("update socket server finish. server list is:{}", gson.toJson(providerMap));
            }
        }
    
        @Override
        public Map<String, List<String>> getProviderMap() {
            return providerMap;
        }
    
    
        class ClientNodeChangeWatcher implements Watcher {
    
            public void process(WatchedEvent event) {
                if (event.getType() == Event.EventType.NodeChildrenChanged) {
                    logger.info("Watch received NodeChildrenChanged event");
                    updateProviderMap();
                }
            }
        }
    }
    
    • Netty处理

    Netty服务则是在检测到项目中包含@RpcServer时启动Netty服务。在检测到正在执行含有@RpcClient注解的类方法时拦截该方法,如果还未和服务端建立连接,则建立长连接,进行RPC通信(通信完成连接不断开,考虑到是服务端RPC通信,此处长连接更适合),调用服务端代码,如返回有正确结果则替换服务端返回的结果,异常时继续调用客户端方法的返回结果。

    代码实现

    ClientAspect //客户端拦截器

    package one.bugu.zookeeper.rpc.framework.aspect;
    
    import one.bugu.zookeeper.rpc.framework.annotations.RpcClient;
    import one.bugu.zookeeper.rpc.framework.client.socket.ClientRequestPool;
    import com.google.gson.Gson;
    import org.aspectj.lang.ProceedingJoinPoint;
    import org.aspectj.lang.annotation.Around;
    import org.aspectj.lang.annotation.Aspect;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    import org.springframework.util.StringUtils;
    
    /**
     * Created with IntelliJ IDEA.
     * User: LangK
     * Created Date 2019/2/12
     * Time: 18:28
     * Description:客户端拦截器
     */
    @Aspect
    @Component
    public class ClientAspect {
    
        @Autowired
        private ClientRequestPool clientRequestPool;
    
        private Gson gson = new Gson();
    
        private Logger logger = LoggerFactory.getLogger(ClientAspect.class);
    
        /**
         * 切入所有注有RpcClient注解实体类的方法
         * @param pjp
         * @return
         * @throws Throwable
         */
        @Around("@within(one.bugu.zookeeper.rpc.framework.annotations.RpcClient)")
        public Object doSocket(ProceedingJoinPoint pjp) throws Throwable {
            RpcClient an = (RpcClient) pjp.getSignature().getDeclaringType().getAnnotation(RpcClient.class);
            String serverName = an.serverName();
            String ip = an.serverIp();
            String method = pjp.getSignature().getName();
            String resultObject;
            if (StringUtils.isEmpty(ip)) {
                resultObject = clientRequestPool.send(serverName, method, pjp.getArgs());
            } else {
                resultObject = clientRequestPool.send(serverName, ip, method, pjp.getArgs());
            }
            Object object = pjp.proceed();
            if (resultObject != null) {
                try {
                    return gson.fromJson(resultObject, object.getClass());
                } catch (Exception e) {
                    logger.info("RPC接收结果转换异常:{}", resultObject);
                    return object;
                }
            }
            return object;
        }
    
    }
    
    
    大体上的思路就是这样,具体代码当然少不了封装。这里就不过多的贴代码了,有兴趣的伙伴可以去LangK开源 / RpcFramework下载项目。

    通过该项目,熟悉了使用Zookeeper做注册中心的关键功能。也对Netty框架NIO有了更深刻的理解。算是为2019年有个好的开始。

    当然如果你有好的设计思想,可以一起参与完善该项目。

    相关文章

      网友评论

          本文标题:基于Zookeeper构建分布式RPC框架

          本文链接:https://www.haomeiwen.com/subject/fgrbeqtx.html