美文网首页
基于netty 实现 ws协议的 im 组件(一)

基于netty 实现 ws协议的 im 组件(一)

作者: 原来Yhy | 来源:发表于2020-06-17 21:47 被阅读0次

    期望达成目标:

    1.消息稳定可靠

    2.支持点对点消息

    3.支持一对多消息

    4.支持消息广播

    5.支持节点扩容

    6.支持服务注册发现

    针对目标的思考:

    1.消息稳定可靠方面:

    采用netty为网络框架,实现websocket协议(长连接),如需要持久化消息,可将消息写入数据库,接收端进行消息确认。

    2.点对点消息的支持

    通过给 channel 绑定身份标识,消息体指定消息类型
    

    3.支持一对多消息

    通过拿到用户所属的组 ,将channel 加入ChannelGroup
    

    4.支持广播消息

    单机模式,通过 channel 的 map,获取所有的 channel 进行广播
    
    多节点模式,先将消息发送的直接发布订阅的中间件,每台服务收到中间件的广播,将消息发送到当前节点的所有channel
    

    5.节点扩容

     采用消息中间件的发布订阅模式,将收到的消息先投递到消息中间件,服务节点通过消费中间件消息,进行当前节点消息的转发
    

    6.支持服务注册发现

     采用 springcloud 进行服务的注册与发现(即通过springcloud 服务注册相关的实现,Nacos,eurake等)
    

    具体实现:

    1.基于消息code进行业务处理的事件机制,服务启动时会从spring上下文中拿到 CmdProcess 实现,通过 code 进行 事件的分发

    public interface CmdProcess {
    
        /**
    
         * 消息接收处理
    
         * @param message 消息体
    
         * @param channel 上下文
    
         * @return 响应消息体(无响应,则返回null)
    
         */
    
        Message handler(Message message, Channel channel);
    
        /**
    
         * 设置命令码(此处的命令码,需要和消息包对应上)
    
         * @return 命令码
    
         */
    
        Byte getCmdCode();
    
    }
    
    image.gif

    2.连接的安全认证机制(第一次握手执行的事件),提供了简单的实现,通过具体业务定义自己的认证实现

    public abstract class AuthProcess {
    
        /**
    
         * 登录事件
    
         * @param username 用户名
    
         * @param password 用户密码
    
         * @return IM用户对象
    
         */
    
        public abstract ImSession login(String username, String password);
    
    }
    
    
    image.gif

    3.channel 建立连接生命周期的执行的事件(客户端连接成功的建立,客户端连接的断开)

    public interface LifeCycleEvent {
    
        /**
    
         * 绑定通道上下文
    
         * @param login
    
         * @param ctx
    
         */
    
        void bindContext(ImSession login, ChannelHandlerContext ctx);
    
        /**
    
         * 解绑通道上下文
    
         * @param channel
    
         */
    
        void cleanContext(Channel channel);
    
    }
    
    image.gif

    4.多节点部署消息分发接口的预留(通过参数设置,是否采用多节点模式)

    public interface ImClusterTopic {
    
        /**
    
         * 发布消息
    
         */
    
        void publish(ClusterMessage message);
    
        /**
    
         * 订阅消息(采用广播模式)
    
         */
    
        void consumer();
    
    }
    
    
    image.gif

    5.消息分发的工具类

    package com.awy.common.ws.netty.toolkit;
    
    import cn.hutool.core.collection.CollUtil;
    import cn.hutool.json.JSONUtil;
    import com.awy.common.message.api.packets.Message;
    import com.awy.common.ws.netty.cluster.ImClusterTopic;
    import com.awy.common.ws.netty.context.GlobalContent;
    import com.awy.common.ws.netty.context.SessionContext;
    import com.awy.common.ws.netty.config.ImConfig;
    import com.awy.common.ws.netty.model.ClusterMessage;
    import io.netty.channel.Channel;
    import io.netty.channel.group.ChannelGroup;
    import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.List;
    import java.util.Locale;
    import java.util.Map;
    
    @Slf4j
    public class ImSendUtil {
    
        /**
         * 发送给指定用户
         * @param userId 用户id
         * @param message 消息体
         */
        public static void sendUser(String userId, Message message){
            if(isCluster()){
                getImClusterTopic().publish(ClusterMessage.chatMessage(userId,message));
            }else {
                sendCurrentNodeUser(userId,message);
            }
        }
    
        /**
         * 发送给指定用户列表
         * @param userIds 用户id列表
         * @param message 消息体
         */
        public static void sendUsers(List<String> userIds,Message message){
            if(isCluster()){
                getImClusterTopic().publish(ClusterMessage.chatsMessage(userIds,message));
            }else {
                sendCurrentNodeUsers(userIds,message);
            }
        }
    
        /**
         * 发送指定群组
         * @param groupId 群组id
         * @param message 消息体
         */
        public static void sendGroup(String groupId,Message message){
            if(isCluster()){
                getImClusterTopic().publish(ClusterMessage.groupMessage(groupId,message));
            }else {
                sendCurrentNodeGroup(groupId,message);
            }
        }
    
        /**
         * 发送指定群组列表
         * @param groupIds 群组列表
         * @param message 消息体
         */
        public static void sendGroups(List<String> groupIds,Message message){
            if(isCluster()){
                getImClusterTopic().publish(ClusterMessage.groupsMessage(groupIds,message));
            }else {
                sendCurrentNodeGroups(groupIds,message);
            }
        }
    
        /**
         * 发送给全部用户
         * @param message
         */
        public static void sendAll(Message message){
            //if cluster
            if(isCluster()){
                getImClusterTopic().publish(ClusterMessage.noStateMessage(message));
            }else {
                //if standalone
                sendCurrentNodeAllChannel(message);
            }
        }
    
        /**
         * 发送给当前节点指定用户
         * @param userId 用户id
         * @param message 消息
         */
        public static void sendCurrentNodeUser(String userId,Message message){
            Channel channel = SessionContext.getChannel(userId);
            if(channel != null){
                channel.writeAndFlush(getMessage(message));
            }
        }
    
        /**
         * 发送给当前节点指定用户列表
         * @param userIds 用户id 列表
         * @param message 消息体
         */
        public static void sendCurrentNodeUsers(List<String> userIds,Message message){
            if(CollUtil.isNotEmpty(userIds)){
                if(userIds.size() == 1){
                    sendCurrentNodeUser(userIds.get(0),message);
                }else {
                    for (String userId : userIds) {
                        sendCurrentNodeUser(userId,message);
                    }
                }
            }
        }
    
        /**
         * 发送给当前节点指定群组
         * @param groupId
         * @param message
         */
        public static void sendCurrentNodeGroup(String groupId,Message message){
            ChannelGroup channelGroup = SessionContext.getChannelGroup(groupId);
            if(channelGroup != null){
                channelGroup.writeAndFlush(getMessage(message));
            }
        }
    
        /**
         * 发送给当前节点指定群组列表
         * @param groupIds 群组id列表
         * @param message 消息
         */
        public static void sendCurrentNodeGroups(List<String> groupIds, Message message){
            if(CollUtil.isNotEmpty(groupIds)){
                if(groupIds.size() == 1){
                    sendCurrentNodeGroup(groupIds.get(0),message);
                }else {
                    for (String groupId : groupIds) {
                        sendCurrentNodeGroup(groupId,message);
                    }
                }
            }
        }
    
        /**
         * 发送给当前节点所有用户
         * @param message 消息
         */
        public static void sendCurrentNodeAllChannel(Message message){
            for (Map.Entry<String, Channel> entry : SessionContext.getAllChannel().entrySet()){
                entry.getValue().writeAndFlush(getMessage(message));
            }
        }
    
        /**
         * 是否多节点
         * @return
         */
        private static boolean isCluster(){
            return ImConfig.getImConfig().getPropertiesConfig().isCluster();
        }
    
        /**
         * 获取节点推送主题
         * @return
         */
        private static ImClusterTopic getImClusterTopic(){
            return GlobalContent.getInstance().getImClusterTopic();
        }
    
        /**
         * 获取消息体
         * @param message 消息体
         * @return webSocket 消息体
         */
        public static TextWebSocketFrame getMessage(Message message){
            if(message == null){
                log.error(">>>>>>>>>>>> message can not be empty ");
                return null;
            }
            String result = JSONUtil.toJsonStr(message);
            return new TextWebSocketFrame(result);
        }
    
    }
    
    image.gif

    6.webSocket服务器的实现

    public class WebSocketServer {
    
        /**
    
         * 是否启用ssl
    
         */
    
        private boolean ssl = false;
    
        //监听端口
    
        private int port;
    
        //ws 前缀
    
        private String websocketPath;
    
        private ServerBootstrap serverBootstrap;
    
        private NioEventLoopGroup boss;
    
        private NioEventLoopGroup work;
    
        private WebSocketServer(){}
    
        public WebSocketServer(int port, String websocketPath, boolean ssl, AuthProcess authProcess, LifeCycleEvent lifeCycleEvent, ImClusterTopic imClusterTopic){
    
            this.ssl = ssl;
    
            this.port = port;
    
            this.websocketPath = websocketPath;
    
            //设置全局上下文
    
            if(authProcess == null){
    
                authProcess = new SimpleAuthProcess();
    
            }
    
            if( lifeCycleEvent == null){
    
                lifeCycleEvent = new SimpleLifeCycleEvent();
    
            }
    
            GlobalContent globalContent = GlobalContent.getInstance();
    
            globalContent.setAuthProcess(authProcess);
    
            globalContent.setImClusterTopic(imClusterTopic);
    
            globalContent.setLifeCycleEvent(lifeCycleEvent);
    
            //主从 react 模型
    
            serverBootstrap = new ServerBootstrap();
    
            boss = new NioEventLoopGroup(1);
    
            work = new NioEventLoopGroup();
    
            serverBootstrap.group(boss,work)
    
                    .channel(NioServerSocketChannel.class)
    
                    .childHandler(new WebSocketServerInitializer(getSslContext(),websocketPath));
    
        }
    
        private SslContext getSslContext(){
    
            SslContext sslCtx = null;
    
            try{
    
                if (this.ssl) {
    
                    SelfSignedCertificate ssc = new SelfSignedCertificate();
    
                    sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
    
                }
    
            }catch (Exception e){
    
                e.printStackTrace();
    
            }
    
            return sslCtx;
    
        }
    
        public void start(){
    
            try {
    
                Channel ch = serverBootstrap.bind(port).sync().channel();
    
                log.info("Open your web browser and navigate to " +
    
                        (ssl ? "https" : "http") + "://127.0.0.1:" + port + "" + websocketPath);
    
            } catch (Exception e){
    
                e.printStackTrace();
    
            }
    
        }
    
        public void stop(){
    
            boss.shutdownGracefully();
    
            work.shutdownGracefully();
    
        }
    
    }
    
    image.gif

    7.spring引导类

    public class NettyWebSocketStarter {
    
        private Integer port;
    
        private String websocketPath;
    
        /**
         * 是否启用ssl
         */
        private boolean ssl = false;
    
        private WebSocketServer server;
    
        /**
         * 认证处理器
         */
        private AuthProcess authProcess;
    
        private LifeCycleEvent lifeCycleEvent;
    
        private ImClusterTopic imClusterTopic;
    
        private NettyWebSocketStarter(){}
    
        public NettyWebSocketStarter(AuthProcess authProcess){
            this(authProcess,null,null);
        }
    
        public NettyWebSocketStarter(AuthProcess authProcess, LifeCycleEvent lifeCycleEvent, ImClusterTopic imClusterTopic){
            this.authProcess = authProcess;
            this.lifeCycleEvent = lifeCycleEvent;
            this.imClusterTopic = imClusterTopic;
        }
    
        @PostConstruct
        public void init(){
            setAttributes(ImConfig.getImConfig().getPropertiesConfig());
            initCmdProcess();
            server = new WebSocketServer(port,websocketPath,ssl,authProcess,lifeCycleEvent,imClusterTopic);
            server.start();
            registerDiscovery(ImConfig.getImConfig().getPropertiesConfig());
        }
    
        private void setAttributes(ImPropertiesConfig propertiesConfig){
            String packetScanPath = "";
            if(propertiesConfig == null){
                log.error(">>>>>>>> im config prefix: im.ws ");
                log.error(">>>>>>>> im config attributes can not by empty ");
                System.exit(0);
            }
    
            if(propertiesConfig.isCluster() && this.imClusterTopic == null){
                log.error(">>>>>>>> not allowed  when Cluster model imClusterTopic is null");
                System.exit(0);
            }
    
            this.port = propertiesConfig.getPort();
            if(this.port == null){
                this.port = getPort();
            }
            this.websocketPath = propertiesConfig.getWebsocketPath();
            if(this.websocketPath == null || this.websocketPath.isEmpty()){
                this.websocketPath = ImCommonConstant.DEFAULT_WEBSOCKET_PATH;
            }
    
            this.ssl = propertiesConfig.isSsl();
            packetScanPath = propertiesConfig.getPacketScan();
            if(packetScanPath == null || packetScanPath.isEmpty()){
                log.error(">>>>>>>> im.ws.packetScan can not by empty ");
                System.exit(0);
            }
            initPacket(packetScanPath);
    
        }
    
        private void registerDiscovery(ImPropertiesConfig propertiesConfig){
            if(propertiesConfig.isRegister()){
                Map<String, AbstractAutoServiceRegistration> serviceRegistrationMap = getApplicationContext().getBeansOfType(AbstractAutoServiceRegistration.class);
                for (Map.Entry<String,AbstractAutoServiceRegistration>  registrationEntry : serviceRegistrationMap.entrySet()){
                    registrationEntry.getValue().start();
                }
    
            }
        }
    
        private int getPort(){
            int defaultPort = 8888;
            return getPort(defaultPort);
        }
    
        private int getPort(int port){
            ServerSocket socket = null;
            try{
                socket = new ServerSocket(port);
            }catch (IOException e){
                ++port;
                return getPort(port);
            }finally {
                if (socket != null) {
                    try {
                        socket.close();
                    } catch (IOException e) {
                        log.error("");
                    }
                }
            }
            return port;
        }
    
        private void initCmdProcess(){
            try{
                List<CmdProcess> list = new ArrayList<>();
    
                String[] beanDefinitionNames = getApplicationContext().getBeanDefinitionNames();
                Stream.of(beanDefinitionNames).forEach(beanName -> {
                    Object bean = getApplicationContext().getBean(beanName);
                    if(bean instanceof CmdProcess){
                        list.add((CmdProcess)bean);
                    }
                });
    
                ProcessManager.getInstance().addCmdProcessList(list);
                log.info("init im process repository success ! count [" + list.size() + "]");
            }catch (Exception e){
                log.error("nit im process repository error",e);
                System.exit(0);
            }
        }
    
        private  ApplicationContext getApplicationContext(){
            return ImConfig.getImConfig().getApplicationContext();
        }
    
        private void initPacket(String packetScanPath){
            try{
                Set<Class<?>> set = ClassUtil.scanPackage(packetScanPath);
                Object obj;
                List<Message> list = new ArrayList<>();
                if(CollectionUtil.isNotEmpty(set)){
                    for (Class clazz : set) {
                        obj = ReflectUtil.newInstance(clazz);
                        if(obj instanceof Message){
                            list.add((Message) obj);
                        }
                    }
                }
    
                MessageManager.getInstance().addMessages(list);
                log.info("init IM packet repository success ! count [" + list.size() + "]");
            }catch (Exception e){
                log.error("init packet repository error",e);
                System.exit(0);
            }
        }
    
        @PreDestroy
        public void stop(){
            server.stop();
        }
    }
    
    image.gif

    说明:当前实现需要依赖spring,有好的建议欢迎大家提出,指正,最后贴出代码地址

    github地址: https://github.com/awyFamily/awy-common-all/tree/master/common-ws-netty

    相关文章

      网友评论

          本文标题:基于netty 实现 ws协议的 im 组件(一)

          本文链接:https://www.haomeiwen.com/subject/ncyexktx.html