美文网首页Android网络编程+RxJava
Netty 实现长连接消息推送初尝试

Netty 实现长连接消息推送初尝试

作者: ImWiki | 来源:发表于2019-05-22 19:27 被阅读0次

    由于我最近在研究推送功能,计划在项目中接入5大厂家推送服务,然后有部分厂家推送服务只提供通知栏推送功能,并不提供自定义消息推送(透传),所以我在想,如果我们自己搭建服务器实现是不是成本也不至于很高呢,虽然我们可以利用其中一家推送作为透传推送功能。我了解到Netty是一个很成熟的方案,所以做一些简单的尝试。
    https://netty.io/wiki/user-guide-for-4.x.html#wiki-h2-3

    什么是 Netty

    Netty是由JBOSS提供的一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。

    添加依赖

    Gradle

        compile 'io.netty:netty-all:4.1.36.Final'
    

    Maven

    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
        <version>4.1.36.Final</version>
    </dependency>
    

    服务端代码

    public class DiscardServer {
        private int port;
        public DiscardServer(int port) {
            this.port = port;
        }
        public void run() throws Exception {
            EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                ServerBootstrap b = new ServerBootstrap(); // (2)
                b.group(bossGroup, workerGroup)
                 .channel(NioServerSocketChannel.class) // (3)
                 .childHandler(new ChannelInitializer<SocketChannel>() { // (4)
                     @Override
                     public void initChannel(SocketChannel ch) throws Exception {
                         ch.pipeline().addLast(new DiscardServerHandler(),new TimeServerHandler());
                     }
                 })
                 .option(ChannelOption.SO_BACKLOG, 128)          // (5)
                 .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
        
                ChannelFuture f = b.bind(port).sync(); // (7)
                f.channel().closeFuture().sync();
            } finally {
                workerGroup.shutdownGracefully();
                bossGroup.shutdownGracefully();
            }
        }
        public static void main(String[] args) throws Exception {
            int port = 8080;
            if (args.length > 0) {
                port = Integer.parseInt(args[0]);
            }
            new DiscardServer(port).run();
        }
    }
    

    定义客户端连接Handler:如果收到了客户端的连接就发送给客户端一个小时“你好,欢迎建立长连接”。

    public class TimeServerHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelActive(final ChannelHandlerContext ctx) { // (1)
            final ByteBuf byteBuf = ctx.alloc().buffer(8);
            byteBuf.writeBytes("你好,欢迎建立长连接".getBytes());
            ctx.writeAndFlush(byteBuf,ctx.channel().newPromise());
            System.out.println("TimeServerHandler,有新连接");
        }
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            cause.printStackTrace();
            ctx.close();
        }
    }
    

    定义服务端消息接收Handler,如果收到客户端消息就打印出来

    public class DiscardServerHandler extends ChannelInboundHandlerAdapter {
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            // Discard the received data silently.
            ByteBuf byteBuf = (ByteBuf) msg;
            try {
                byte[] result = new byte[byteBuf.readableBytes()];
                byteBuf.readBytes(result);
                String text = new String(result);
                System.out.println("收到客户端消息:" + text);
            } finally {
                byteBuf.release();
            }
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
            // Close the connection when an exception is raised.
            cause.printStackTrace();
            ctx.close();
        }
    }
    

    客户端代码

    public class TimeClient {
        public static void main(String[] args) throws Exception {
            String host = "127.0.0.1";
            int port = Integer.parseInt("8080");
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                Bootstrap b = new Bootstrap(); // (1)
                b.group(workerGroup); // (2)
                b.channel(NioSocketChannel.class); // (3)
                b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
                b.handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new TimeClientHandler());
                    }
                });
                // Start the client.
                ChannelFuture f = b.connect(host, port).sync(); // (5)
                // Wait until the connection is closed.
                f.channel().closeFuture().sync();
            } finally {
                workerGroup.shutdownGracefully();
            }
        }
    }
    

    定义消息接收Handler,如果收到消息就回复服务端“我已经收到了,谢谢你。”。

    public class TimeClientHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            ByteBuf byteBuf = (ByteBuf) msg;
            try {
                byte[] result = new byte[byteBuf.readableBytes()];
                byteBuf.readBytes(result);
                String text = new String(result);
                System.out.println("收到服务端消息:" + text);
            } finally {
                byteBuf.release();
            }
            final ByteBuf time = ctx.alloc().buffer(8);
            time.writeBytes("我已经收到了,谢谢你。".getBytes());
            final ChannelFuture f = ctx.writeAndFlush(time);
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            cause.printStackTrace();
            ctx.close();
            System.out.println("TimeClientHandler:exceptionCaught");
        }
    }
    

    测试结果

    先运行服务端服务端,然后再运行客户端

    1. 客户端
    收到服务端消息:你好,欢迎建立长连接
    
    1. 服务端
    收到客户端消息:我已经收到了,谢谢你。
    

    相关文章

      网友评论

        本文标题:Netty 实现长连接消息推送初尝试

        本文链接:https://www.haomeiwen.com/subject/skeezqtx.html