美文网首页即时通讯
CIM-router功能和设计分析

CIM-router功能和设计分析

作者: maskwang520 | 来源:发表于2019-04-08 22:36 被阅读0次

    第一篇已经分析了CIM-client功能和设计。其中也提到了client需要向router注册,获取可用的服务器(负载均衡),线上运维(统计在线人数,模糊查找)。那么这篇注重看看router的设计和实现。
    cim github地址: https://github.com/crossoverJie/cim

    1. 协议
    1.1 请求协议

    请求协议的类图结构如下:

    image.png
    其中基类BaseRequest的实现(在client篇里面也谈到过)如下:
    public class BaseRequest {
      //请求序列号
        private String reqNo;
    // 请求时间戳
        private int timeStamp;
    }
    
    • ChatReqVO增加了userId,msg字段,用来表示抽象聊天的数据请求。
    • LoginReqVO 增加了userId,userName字段,用来表示登陆的数据请求。
    • P2PReqVO 增加了userIdreceiveUserIdmsg字段,用来表示私聊的数据请求。
    1.2 响应协议
    • CIMServerResVO 包含了ip,cimServerPort,httpPort这三个字段,用来表示获取某个服务的ip+prot数据请求。
    • RegisterInfoResVO包含了userId,userName用来表示用户注册的某个服务。
    2. 程序运行流程
    public class RouteApplication implements CommandLineRunner{
    
        private final static Logger LOGGER = LoggerFactory.getLogger(RouteApplication.class);
    
        public static void main(String[] args) {
            SpringApplication.run(RouteApplication.class, args);
            LOGGER.info("启动 route 成功");
        }
    
        @Override
        public void run(String... args) throws Exception {
    
            //监听服务
            Thread thread = new Thread(new ServerListListener());
            thread.setName("zk-listener");
            thread.start() ;
        }
    }
    
    • 标准的Springboot应用启动,并在容器启动后,启动一个线程去向ZK注册监听器。
    public class ServerListListener implements Runnable{
    
        private static Logger logger = LoggerFactory.getLogger(ServerListListener.class);
    
        private ZKit zkUtil;
    
        private AppConfiguration appConfiguration ;
    
    
        public ServerListListener() {
            zkUtil = SpringBeanFactory.getBean(ZKit.class) ;
            appConfiguration = SpringBeanFactory.getBean(AppConfiguration.class) ;
        }
    
        @Override
        public void run() {
            //注册监听服务
            zkUtil.subscribeEvent(appConfiguration.getZkRoot());
    
        }
    }
    // 当获取ZK中root节点发生变更(增删改)后更新本地ServerCache
    public void subscribeEvent(String path) {
            zkClient.subscribeChildChanges(path, new IZkChildListener() {
                @Override
                public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
                    logger.info("清除/更新本地缓存 parentPath=【{}】,currentChilds=【{}】", parentPath,currentChilds.toString());
    
                    //更新所有缓存/先删除 再新增
                    serverCache.updateCache(currentChilds) ;
                }
            });
    
    
        }
    

    ServerCache 保存的是ZK根目录下的所有注册的服务器ip,这样的目的在于每次都缓存了所有的服务节点,而不用每次都向ZK请求,减少网络请求次数。

    3. 对外提供的Http服务

    router就是对外提供的http服务,下面介绍它对外提供服务的具体实现

    3.1 注册服务
    //提供的http接口
    @ApiOperation("注册账号")
        @RequestMapping(value = "registerAccount", method = RequestMethod.POST)
        @ResponseBody()
        public BaseResponse<RegisterInfoResVO> registerAccount(@RequestBody RegisterInfoReqVO registerInfoReqVO) throws Exception {
            BaseResponse<RegisterInfoResVO> res = new BaseResponse();
    
            long userId = System.currentTimeMillis();
            RegisterInfoResVO info = new RegisterInfoResVO(userId, registerInfoReqVO.getUserName());
            info = accountService.register(info);
    
            res.setDataBody(info);
            res.setCode(StatusEnum.SUCCESS.getCode());
            res.setMessage(StatusEnum.SUCCESS.getMessage());
            return res;
        }
    
     public RegisterInfoResVO register(RegisterInfoResVO info) {
            String key = ACCOUNT_PREFIX + info.getUserId();
    
            String name = redisTemplate.opsForValue().get(info.getUserName());
            if (null == name) {
                //为了方便查询,冗余一份
                redisTemplate.opsForValue().set(key, info.getUserName());
                redisTemplate.opsForValue().set(info.getUserName(), key);
            } else {
              //已经存在
                long userId = Long.parseLong(name.split(":")[1]);
                info.setUserId(userId);
                info.setUserName(info.getUserName());
            }
    
            return info;
        }
    
    3.2 获取所有的在线用户
     @ApiOperation("获取所有在线用户")
        @RequestMapping(value = "onlineUser", method = RequestMethod.POST)
        @ResponseBody()
        public BaseResponse<Set<CIMUserInfo>> onlineUser() throws Exception {
            BaseResponse<Set<CIMUserInfo>> res = new BaseResponse();
    
            Set<CIMUserInfo> cimUserInfos = userInfoCacheService.onlineUser();
            res.setDataBody(cimUserInfos) ;
            res.setCode(StatusEnum.SUCCESS.getCode());
            res.setMessage(StatusEnum.SUCCESS.getMessage());
            return res;
        }
    
    public Set<CIMUserInfo> onlineUser() {
            Set<CIMUserInfo> set = null ;
            Set<String> members = redisTemplate.opsForSet().members(LOGIN_STATUS_PREFIX);
            for (String member : members) {
                if (set == null){
                    set = new HashSet<>(64) ;
                }
                //通过usrid获取到UserInfo
                CIMUserInfo cimUserInfo = loadUserInfoByUserId(Long.valueOf(member)) ;
                set.add(cimUserInfo) ;
            }
    
            return set;
        }
    

    通过LOGIN_STATUS_PREFIX记录所有的登陆用户,因此获取到这个Set集合就行。

    3.3 登陆并获取到可用的一个服务节点
    @ApiOperation("登录并获取服务器")
        @RequestMapping(value = "login", method = RequestMethod.POST)
        @ResponseBody()
        public BaseResponse<CIMServerResVO> login(@RequestBody LoginReqVO loginReqVO) throws Exception {
            BaseResponse<CIMServerResVO> res = new BaseResponse();
    
            //登录校验,如果登陆成功,则保存登陆状态
            StatusEnum status = accountService.login(loginReqVO);
            if (status == StatusEnum.SUCCESS) {
    
                String server = routeHandle.routeServer(serverCache.getAll(),String.valueOf(loginReqVO.getUserId()));
                String[] serverInfo = server.split(":");
                //下面讲到一致性hash算法
                CIMServerResVO vo = new CIMServerResVO(serverInfo[0], Integer.parseInt(serverInfo[1]),Integer.parseInt(serverInfo[2]));
    
                //保存路由信息,即把(userid,server)对应起来
                accountService.saveRouteInfo(loginReqVO,server);
    
                res.setDataBody(vo);
    
            }
            res.setCode(status.getCode());
            res.setMessage(status.getMessage());
    
            return res;
        }
    
    • 这里选择服务器的形式有3中,如下图:
      image.png
      其中,LoopHandle就是对轮询的形式获取到服务节点,RandomHandle就是随机获取到服务节点,ConsistentHashHandle就是通过一致性hash获取到服务节点。关于一致性hash,有两种实现形式,如下:
    public class TreeMapConsistentHash extends AbstractConsistentHash {
        //通过treemap来实现
        private TreeMap<Long,String> treeMap = new TreeMap<Long, String>() ;
    
        /**
         * 虚拟节点数量
         */
        private static final int VIRTUAL_NODE_SIZE = 2 ;
        //加入虚拟节点
        @Override
        public void add(long key, String value) {
            for (int i = 0; i < VIRTUAL_NODE_SIZE; i++) {
                Long hash = super.hash("vir" + key + i);
                treeMap.put(hash,value);
            }
            treeMap.put(key, value);
        }
    
        @Override
        public String getFirstNodeValue(String value) {
            long hash = super.hash(value);
            System.out.println("value=" + value + " hash = " + hash);
            //返回大于等于value的视图SortedMap
            SortedMap<Long, String> last = treeMap.tailMap(hash);
            if (!last.isEmpty()) {
                //返回第一个key大于value的对应map里面保存的server
                return last.get(last.firstKey());
            }
            //如果没有,则返回第一个
            return treeMap.firstEntry().getValue();
        }
    }
    
    //用Node数组实现的
    public class SortArrayMapConsistentHash extends AbstractConsistentHash {
    
        private SortArrayMap sortArrayMap = new SortArrayMap();
    
        /**
         * 虚拟节点数量
         */
        private static final int VIRTUAL_NODE_SIZE = 2 ;
    
        @Override
        public void add(long key, String value) {
            for (int i = 0; i < VIRTUAL_NODE_SIZE; i++) {
                Long hash = super.hash("vir" + key + i);
                sortArrayMap.add(hash,value);
            }
            sortArrayMap.add(key, value);
        }
        //Arrays的sort
        @Override
        public void sort() {
            sortArrayMap.sort();
        }
        //顺时针找第一个比给定key大的server
        @Override
        public String getFirstNodeValue(String value) {
            long hash = super.hash(value);
            System.out.println("value=" + value + " hash = " + hash);
            return sortArrayMap.firstNodeValue(hash);
        }
    }
    
    • 上面是两种一种一致性hash的实现,都是加入虚拟节点,然后把寻找第一个比给定value大的节点,返回该虚拟节点对应的value就行。
    3.4 用户下线
    @ApiOperation("客户端下线")
        @RequestMapping(value = "offLine", method = RequestMethod.POST)
        @ResponseBody()
        public BaseResponse<NULLBody> offLine(@RequestBody ChatReqVO groupReqVO) throws Exception {
            BaseResponse<NULLBody> res = new BaseResponse();
    
            CIMUserInfo cimUserInfo = userInfoCacheService.loadUserInfoByUserId(groupReqVO.getUserId());
    
            LOGGER.info("下线用户[{}]", cimUserInfo.toString());
            accountService.offLine(groupReqVO.getUserId());
    
            res.setCode(StatusEnum.SUCCESS.getCode());
            res.setMessage(StatusEnum.SUCCESS.getMessage());
            return res;
        }
    
     @Override
        public void offLine(Long userId) throws Exception {
            //删除路由
            redisTemplate.delete(ROUTE_PREFIX + userId) ;
    
            //删除登录状态
            userInfoCacheService.removeLoginStatus(userId);
        }
    
    • 下线就是删除,删除登录状态就行。
    3.5 群聊
     public BaseResponse<NULLBody> groupRoute(@RequestBody ChatReqVO groupReqVO) throws Exception {
            BaseResponse<NULLBody> res = new BaseResponse();
            LOGGER.info("msg=[{}]", groupReqVO.toString());
            //获取所有的推送列表
            Map<Long, CIMServerResVO> serverResVOMap = accountService.loadRouteRelated();
            for (Map.Entry<Long, CIMServerResVO> cimServerResVOEntry : serverResVOMap.entrySet()) {
                Long userId = cimServerResVOEntry.getKey();
                CIMServerResVO value = cimServerResVOEntry.getValue();
                if (userId.equals(groupReqVO.getUserId())){
                    //过滤掉自己
                    CIMUserInfo cimUserInfo = userInfoCacheService.loadUserInfoByUserId(groupReqVO.getUserId());
                    LOGGER.warn("过滤掉了发送者 userId={}",cimUserInfo.toString());
                    continue;
                }
                //推送消息
                String url = "http://" + value.getIp() + ":" + value.getHttpPort() + "/sendMsg" ;
                ChatReqVO chatVO = new ChatReqVO(userId,groupReqVO.getMsg()) ;
    
                accountService.pushMsg(url,groupReqVO.getUserId(),chatVO);
    
            }
            res.setCode(StatusEnum.SUCCESS.getCode());
            res.setMessage(StatusEnum.SUCCESS.getMessage());
            return res;
        }
     
    //扫描以ROUTE_PREFIX开始的字符串,这个字符串对应着每个用户使用的server
     public Map<Long, CIMServerResVO> loadRouteRelated() {
    
            Map<Long, CIMServerResVO> routes = new HashMap<>(64);
    
    
            RedisConnection connection = redisTemplate.getConnectionFactory().getConnection();
            ScanOptions options = ScanOptions.scanOptions()
                    .match(ROUTE_PREFIX + "*")
                    .build();
            Cursor<byte[]> scan = connection.scan(options);
    
            while (scan.hasNext()) {
                byte[] next = scan.next();
                String key = new String(next, StandardCharsets.UTF_8);
                LOGGER.info("key={}", key);
                parseServerInfo(routes, key);
    
            }
            try {
                scan.close();
            } catch (IOException e) {
                LOGGER.error("IOException",e);
            }
    
            return routes;
        }
    
    • 上面就是找到找到每个节点的对应的服务节点
     public void pushMsg(String url, long sendUserId, ChatReqVO groupReqVO) throws Exception {
            CIMUserInfo cimUserInfo = userInfoCacheService.loadUserInfoByUserId(sendUserId);
    
            JSONObject jsonObject = new JSONObject();
            jsonObject.put("msg", cimUserInfo.getUserName() + ":【" + groupReqVO.getMsg() + "】");
            jsonObject.put("userId", groupReqVO.getUserId());
            RequestBody requestBody = RequestBody.create(mediaType, jsonObject.toString());
    
            Request request = new Request.Builder()
                    .url(url)
                    .post(requestBody)
                    .build();
    
            Response response = okHttpClient.newCall(request).execute();
            try {
                if (!response.isSuccessful()) {
                    throw new IOException("Unexpected code " + response);
                }
            }finally {
                response.body().close();
            }
        }
    
    • 发送到具体的Server,让sever负责广播它收到的群聊消息。
    3.6 私聊
     @ApiOperation("私聊 API")
        @RequestMapping(value = "p2pRoute", method = RequestMethod.POST)
        @ResponseBody()
        public BaseResponse<NULLBody> p2pRoute(@RequestBody P2PReqVO p2pRequest) throws Exception {
            BaseResponse<NULLBody> res = new BaseResponse();
    
            try {
                //获取接收消息用户的路由信息
                CIMServerResVO cimServerResVO = accountService.loadRouteRelatedByUserId(p2pRequest.getReceiveUserId());
                //推送消息
                String url = "http://" + cimServerResVO.getIp() + ":" + cimServerResVO.getHttpPort() + "/sendMsg" ;
    
                //p2pRequest.getReceiveUserId()==>消息接收者的 userID
                ChatReqVO chatVO = new ChatReqVO(p2pRequest.getReceiveUserId(),p2pRequest.getMsg()) ;
                accountService.pushMsg(url,p2pRequest.getUserId(),chatVO);
    
                res.setCode(StatusEnum.SUCCESS.getCode());
                res.setMessage(StatusEnum.SUCCESS.getMessage());
    
            }catch (CIMException e){
                res.setCode(e.getErrorCode());
                res.setMessage(e.getErrorMessage());
            }
            return res;
        }
    
    • 与群聊很相似,只不过这里是一个用户。都是定位具体的sever,然后再根据登陆在server的消息,发送消息到具体的channel就可以。
    总结

    至此,router分析也完成了,router的实现主要是获取到server,同过http调用,从而发送消息。这个过程中,也涉及到负载均衡,在用户注册的时候,尽可能均衡分布。这里用了轮询,随机,一致性hash这三种负载均衡算法。而且后面也方便拓展。

    相关文章

      网友评论

        本文标题:CIM-router功能和设计分析

        本文链接:https://www.haomeiwen.com/subject/dxcpiqtx.html