cim是基于netty的即时通信框架 只实现了基本的登录、发送功能 比较简陋
https://gitee.com/farsunset/cim
主要分为:
cim-boot-server 基于springboot的业务逻辑
cim-server-sdk 封装netty
1、初始化
springboot启动完成后再去初始化netty 通过实现ApplicationListener接口
初始化netty通过CIMNioSocketAcceptor的bind方法实现
还有将处理业务类(实现CIMRequestHandler接口)放入appHandlerMap 用于根据指令处理具体的业务 使用map存储 方便后续的扩展
public class CIMConfig implements CIMRequestHandler, ApplicationListener<ApplicationStartedEvent> {
private final HashMap<String,CIMRequestHandler> appHandlerMap
@Override
public void onApplicationEvent(ApplicationStartedEvent applicationStartedEvent) {
appHandlerMap.put("client_bind",applicationContext.getBean(BindHandler.class));
appHandlerMap.put("client_closed",applicationContext.getBean(SessionClosedHandler.class));
applicationContext.getBean(CIMNioSocketAcceptor.class).bind();
}
}
2、主要逻辑
1、CIMNioSocketAcceptor调用bind方法初始化netty
2、然后注册编解码Handler
3、当有客户端连接 会回调channelActive方法 由服务端保存channel 后续使用该channel推送消息到客户端
4、然后由FinalChannelEventHandler的channelRead0做具体的业务逻辑
如果根据指令获取到CIMRequestHandler的实例 则直接执行
否则将消息回调到业务层做处理
public class CIMNioSocketAcceptor {
//用于存放指令和相应业务逻辑的映射
private HashMap<String, CIMRequestHandler> innerHandlerMap;
//存储channel 用于推送消息
private ConcurrentHashMap<String,Channel> channelGroup;
//具体的业务逻辑操作
ChannelHandler channelEventHandler = new FinalChannelEventHandler();
//绑定操作
private void bindAppPort(){
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch){
ch.pipeline().addLast(new AppMessageDecoder());
ch.pipeline().addLast(new AppMessageEncoder());
//心跳超时操作 对应userEventTriggered方法
ch.pipeline().addLast(new IdleStateHandler(READ_IDLE_TIME, WRITE_IDLE_TIME, 0));
//注册FinalChannelEventHandler 处理具体的业务逻辑
ch.pipeline().addLast(channelEventHandler);
}
});
ChannelFuture channelFuture = bootstrap.bind(appPort).syncUninterruptibly();
channelFuture.channel().closeFuture().addListener(future -> this.destroy(appBossGroup,appWorkerGroup));
}
private class FinalChannelEventHandler extends SimpleChannelInboundHandler<Object>{
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object data) {
SentBody body = (SentBody) data;
CIMSession session = new CIMSession(ctx.channel());
//根据指令 获取相应的逻辑实现
CIMRequestHandler handler = innerHandlerMap.get(body.getKey());
if (handler != null) {
handler.process(session, body);
return;
}
//否则回调到上层的业务接口
outerRequestHandler.process(session, body);
}
@Override
public void channelActive(ChannelHandlerContext ctx){
//保存channel
channelGroup.put(ctx.channel().id().asShortText(),ctx.channel());
}
//心跳处理逻辑
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt){}
}
}
3、推送消息
通过目标id 找到关联的channel写消息即可
public class DefaultMessagePusher implements CIMMessagePusher {
@Override
public void push(Message message) {
//根据id 找到channel
CIMSession session = cimSessionService.get(message.getReceiver());
//Channel.writeAndFlush
if (session.isConnected() && Objects.equals(host, session.getHost())) {
session.write(message);
}
}
}
1、最关键的地方是登录时保存channel 并将channel和用户id做映射
推送时 通过用户id找到channel 通过channel推送消息即可
2、处理消息 通过指令和CIMRequestHandler绑定 易于扩展功能
总结:
框架很简陋 实现原理可以参考
网友评论