美文网首页
cim源码分析

cim源码分析

作者: with_dream | 来源:发表于2020-12-23 22:34 被阅读0次

cim是基于netty的即时通信框架 只实现了基本的登录、发送功能 比较简陋
https://gitee.com/farsunset/cim

主要分为:
cim-boot-server 基于springboot的业务逻辑
cim-server-sdk 封装netty

1、初始化

springboot启动完成后再去初始化netty 通过实现ApplicationListener接口

初始化netty通过CIMNioSocketAcceptor的bind方法实现
还有将处理业务类(实现CIMRequestHandler接口)放入appHandlerMap 用于根据指令处理具体的业务 使用map存储 方便后续的扩展

public class CIMConfig implements CIMRequestHandler, ApplicationListener<ApplicationStartedEvent> {

private final HashMap<String,CIMRequestHandler> appHandlerMap

@Override
    public void onApplicationEvent(ApplicationStartedEvent applicationStartedEvent) {
        appHandlerMap.put("client_bind",applicationContext.getBean(BindHandler.class));
        appHandlerMap.put("client_closed",applicationContext.getBean(SessionClosedHandler.class));
        applicationContext.getBean(CIMNioSocketAcceptor.class).bind();
    }
}
2、主要逻辑

1、CIMNioSocketAcceptor调用bind方法初始化netty
2、然后注册编解码Handler
3、当有客户端连接 会回调channelActive方法 由服务端保存channel 后续使用该channel推送消息到客户端
4、然后由FinalChannelEventHandler的channelRead0做具体的业务逻辑
如果根据指令获取到CIMRequestHandler的实例 则直接执行
否则将消息回调到业务层做处理

public class CIMNioSocketAcceptor {
  //用于存放指令和相应业务逻辑的映射
  private HashMap<String, CIMRequestHandler> innerHandlerMap;
  //存储channel 用于推送消息
  private ConcurrentHashMap<String,Channel> channelGroup;
  //具体的业务逻辑操作
  ChannelHandler channelEventHandler = new FinalChannelEventHandler();
  //绑定操作
    private void bindAppPort(){
        bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch){
                ch.pipeline().addLast(new AppMessageDecoder());
                ch.pipeline().addLast(new AppMessageEncoder());
                //心跳超时操作 对应userEventTriggered方法
                ch.pipeline().addLast(new IdleStateHandler(READ_IDLE_TIME, WRITE_IDLE_TIME, 0));
                //注册FinalChannelEventHandler 处理具体的业务逻辑
                ch.pipeline().addLast(channelEventHandler);
            }
        });

        ChannelFuture channelFuture = bootstrap.bind(appPort).syncUninterruptibly();
        channelFuture.channel().closeFuture().addListener(future -> this.destroy(appBossGroup,appWorkerGroup));
    }

  private class FinalChannelEventHandler extends  SimpleChannelInboundHandler<Object>{
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, Object data) {
            SentBody body = (SentBody) data;
            CIMSession session = new CIMSession(ctx.channel());
            //根据指令 获取相应的逻辑实现
            CIMRequestHandler handler = innerHandlerMap.get(body.getKey());
            if (handler != null) {
                handler.process(session, body);
                return;
            }

            //否则回调到上层的业务接口
            outerRequestHandler.process(session, body);
        }
      
      @Override
        public void channelActive(ChannelHandlerContext ctx){
          //保存channel  
          channelGroup.put(ctx.channel().id().asShortText(),ctx.channel());
        }
        //心跳处理逻辑
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt){}
    }
}
3、推送消息

通过目标id 找到关联的channel写消息即可

public class DefaultMessagePusher implements CIMMessagePusher {
    @Override
    public void push(Message message) {
        //根据id 找到channel
        CIMSession session = cimSessionService.get(message.getReceiver());
        //Channel.writeAndFlush
        if (session.isConnected() && Objects.equals(host, session.getHost())) {
            session.write(message);
        }
    }
}

1、最关键的地方是登录时保存channel 并将channel和用户id做映射
推送时 通过用户id找到channel 通过channel推送消息即可
2、处理消息 通过指令和CIMRequestHandler绑定 易于扩展功能
总结:
框架很简陋 实现原理可以参考

相关文章

网友评论

      本文标题:cim源码分析

      本文链接:https://www.haomeiwen.com/subject/gphunktx.html