美文网首页
基于netty-socketio的web推送

基于netty-socketio的web推送

作者: TheUnforgiven | 来源:发表于2019-04-30 10:17 被阅读0次

    服务端集成到spriongboot,netty-socketIO与socket.io.js可能会会存在版本问题,试了好多个版本终于正常。
    netty-socketIO依赖:

      <dependency>
                <groupId>com.corundumstudio.socketio</groupId>
                <artifactId>netty-socketio</artifactId>
                <version>1.7.13</version>
     </dependency>
    

    js版本:

    <script src="https://cdn.bootcss.com/socket.io/1.7.4/socket.io.js"></script>
    

    做之前先了解一下socketIO中namespace,room概念,本项目中使用了多个namespace,实现了namespace之间的互相通信,也可以在namespcae中单独通信,该例子中没有实现多个room,只用了默认的room。

    部分代码参考了SpringBoot + Netty-SocketIO在项目中实战详解
    代码如下:
    Socket Config

    socketio:
      pingInterval: 25000
      bossCount: 1
      maxHttpContentLength: 1048576
      port: 9092
      host: 0.0.0.0//如果连外网,内网都需要连接,设置为0.0.0.0(还没实验)
      pingTimeout: 6000000
      maxFramePayloadLength: 1048576
      upgradeTimeout: 1000000
      allowCustomRequests: true
      workCount: 100
      nameSpaces: /namespace1,/namespace1,/namespace1
    
    @Configuration
    @Slf4j
    public class SocketIOConfig {
        @Value("${socketio.host}")
        private String host;
    
        @Value("${socketio.port}")
        private Integer port;
    
        @Value("${socketio.bossCount}")
        private int bossCount;
    
        @Value("${socketio.workCount}")
        private int workCount;
    
        @Value("${socketio.allowCustomRequests}")
        private boolean allowCustomRequests;
    
        @Value("${socketio.upgradeTimeout}")
        private int upgradeTimeout;
    
        @Value("${socketio.pingTimeout}")
        private int pingTimeout;
    
        @Value("${socketio.pingInterval}")
        private int pingInterval;
    
        @Value("${socketio.nameSpaces}")
        private String[] nameSpaces;
    
        @Bean
        public SocketIOServer socketIOServer() {
            SocketConfig socketConfig = new SocketConfig();
            socketConfig.setTcpNoDelay(true);
            socketConfig.setSoLinger(0);
    
            com.corundumstudio.socketio.Configuration config = new com.corundumstudio.socketio.Configuration();
            config.setSocketConfig(socketConfig);
            config.setHostname(host);
            config.setPort(port);
            config.setBossThreads(bossCount);
            config.setWorkerThreads(workCount);
            config.setAllowCustomRequests(allowCustomRequests);
            config.setUpgradeTimeout(upgradeTimeout);
            config.setPingTimeout(pingTimeout);
            config.setPingInterval(pingInterval);
    
            SocketIOServer socketIOServer = new SocketIOServer(config);
         // 监听namespace
            if (nameSpaces != null && nameSpaces.length > 0) {
                for (int i = 0,j = nameSpaces.length; i < j; i++) {
                    SocketIONamespace space1 = socketIOServer.addNamespace(nameSpaces[i]);
                    space1.addEventListener("notification", SocketParam.class, (client, param, ackRequest) -> {
                        onData(socketIOServer, param);
                    });
    //                space1.addConnectListener(client -> {
    //                    SocketAddress remoteAddress = client.getRemoteAddress();
    //                    SocketIONamespace namespace = client.getNamespace();
    //                    log.info("用户{}上线", client.getSessionId().toString());
    //                        }
    //                );
    //                space1.addDisconnectListener(client ->
    //                        log.info("用户{}离开", client.getSessionId().toString())
    //                );
                }
            }
            return socketIOServer;
        }
    
        private void onData(SocketIOServer socketIOServer, SocketParam param) {
            SocketResponse response = new SocketResponse();
            response.setMessage(param.getMessage());
            response.setUrl(param.getUrl());
            if (StringUtils.isEmpty(param.getNameSpace())) {
                Collection<SocketIONamespace> allNamespaces = socketIOServer.getAllNamespaces();
                allNamespaces.forEach(x -> x.getBroadcastOperations().sendEvent("notification", response));
            } else {
                SocketIONamespace namespace = socketIOServer.getNamespace("/" + param.getNameSpace());
                namespace.getBroadcastOperations().sendEvent("notification", response);
            }
        }
    
        @Bean
        public SpringAnnotationScanner springAnnotationScanner(SocketIOServer socketServer) {
            return new SpringAnnotationScanner(socketServer);
        }
    }
    

    socket service @OnEvent注解好像不可以监听namespace

    @Slf4j
    @Component
    public class SocketIOServiceImpl implements SocketIOService {
    
    
        @Autowired
        private SocketIOServer socketIOServer;
    
        @Override
        public void pushToAll(String message) {
            Collection<SocketIONamespace> allNamespaces = socketIOServer.getAllNamespaces();
            allNamespaces.forEach(x -> {
                x.getBroadcastOperations().sendEvent("notification", message);
            });
    
        }
    
        @Override
        public void pushToNameSpace(SocketParam socketParam) {
            SocketIONamespace namespace = socketIOServer.getNamespace("/" + socketParam.getNameSpace().replace("/", ""));
            namespace.getBroadcastOperations().sendEvent("notification", socketParam);
        }
    
    
        @OnConnect
        public void onConnect(SocketIOClient client) {
            log.info("用户 {} 上线", client.getSessionId().toString());
        }
    
        @OnDisconnect
        public void onDisConnect(SocketIOClient client) {
            log.info("用户 {} 离开了", client.getSessionId().toString());
        }
    
    //    @OnEvent("notification")
    //    public void notification(SocketIOClient client, SocketParam param) {
    //        SocketIONamespace namespace = client.getNamespace();
    //        namespace.getBroadcastOperations().sendEvent("notification", param);
    //        client.sendEvent("notification", param);
    //    }
    
        private String getParamByClient(SocketIOClient client) {
            return client.getHandshakeData().getSingleUrlParam("username");
        }
    }
    

    启动服务,继承CommandLineRunner,随服务器启动

    @Component
    @Slf4j
    public class SocketIOServerRunner implements CommandLineRunner {
    
        @Autowired
        private SocketIOServer server;
    
        @Override
        public void run(String... args) throws Exception {
            log.info("启动socketIOServer");
            server.start();
        }
    
        @PreDestroy
        public void preDestroy() {
            if (server != null) {
                server.stop();
            }
        }
    }
    

    js代码:

    <!DOCTYPE html>
    <html lang="en">
    <head>
        <meta charset="UTF-8">
        <title>Title</title>
    <script src="https://cdn.bootcss.com/socket.io/1.7.4/socket.io.js"></script>
    </head>
    <body>
    
    </body>
    <script>
        var socket = io.connect("http://192.168.0.1:9092/namespace1");
        //连接服务器回调
        socket.on('connect', function () {alert("连接")});
        //断开服务器回调
        socket.on('disconnect', function () {alert("断开")});
        //监听消息
        socket.on('notification', function (data) { 
            alert(data.sessage)
        });
        //发送消息
        socket.emit('notification', {username:"username",nameSpace:"指定nameSpace,如果为空,发送给所有namespace的客户端",message:"this is a message"},function(){
          alert("callback")}
        );
    </script>
    </html>
    
    

    相关文章

      网友评论

          本文标题:基于netty-socketio的web推送

          本文链接:https://www.haomeiwen.com/subject/vwhmnqtx.html