美文网首页netty
netty通信的简单demo

netty通信的简单demo

作者: 小胖学编程 | 来源:发表于2021-06-21 15:35 被阅读0次

    seata中使用netty完成了TM、RM与TC之间的通信,若不熟悉netty的语法,那么阅读seata的源码是比较困难的,本文实现了一个简单的netty通信的demo。来入门netty通信。

    1. 引入依赖

    <dependencies>
     <!-- Netty -->
     <dependency>
         <groupId>io.netty</groupId>
         <artifactId>netty-all</artifactId>
         <version>4.1.30.Final</version>
     </dependency>
    
     <dependency>
         <groupId>org.slf4j</groupId>
         <artifactId>slf4j-log4j12</artifactId>
         <version>1.7.25</version>
     </dependency>
    
     <!-- Protostuff -->
     <dependency>
         <groupId>com.dyuproject.protostuff</groupId>
         <artifactId>protostuff-core</artifactId>
         <version>1.0.9</version>
     </dependency>
    
     <dependency>
         <groupId>com.dyuproject.protostuff</groupId>
         <artifactId>protostuff-runtime</artifactId>
         <version>1.0.9</version>
     </dependency>
    
     <!-- Objenesis -->
     <dependency>
         <groupId>org.objenesis</groupId>
         <artifactId>objenesis</artifactId>
         <version>2.1</version>
     </dependency>
    
     <!-- fastjson -->
     <dependency>
         <groupId>com.alibaba</groupId>
         <artifactId>fastjson</artifactId>
         <version>1.2.38</version>
     </dependency>
    </dependencies>
    

    2. 定义基类对象

    请求对象:

    /**
     * 请求对象体
     */
    public class Request {
        private String requestId;
    
        private Object parameter;
    
        public String getRequestId() {
            return requestId;
        }
    
        public void setRequestId(String requestId) {
            this.requestId = requestId;
        }
    
        public Object getParameter() {
            return parameter;
        }
    
        public void setParameter(Object parameter) {
            this.parameter = parameter;
        }
    }
    

    响应对象:

    /**
     * 响应对象体
     */
    public class Response {
        private String requestId;
    
        private Object result;
    
        public String getRequestId() {
            return requestId;
        }
    
        public void setRequestId(String requestId) {
            this.requestId = requestId;
        }
    
        public Object getResult() {
            return result;
        }
    
        public void setResult(Object result) {
            this.result = result;
        }
    }
    

    3. 自定义传输协议

    netty可以自定义传输协议,实现如下列代码

    import com.alibaba.fastjson.JSON;
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.MessageToByteEncoder;
    
    /**
     * 请求的编码格式
     * 自定义传输协议,length、data
     */
    public class RpcEncoder extends MessageToByteEncoder {
    
        //目标对象类型进行编码
        private Class<?> target;
    
        public RpcEncoder(Class target) {
            this.target = target;
        }
    
        @Override
        protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
            if (target.isInstance(msg)) {
                byte[] data = JSON.toJSONBytes(msg);    // 使用fastJson将对象转换为byte
                out.writeInt(data.length);  // 先将消息长度写入,也就是消息头
                out.writeBytes(data);   // 消息体中包含我们要发送的数据
            }
        }
    
    }
    
    
    /**
     * 请求的解码逻辑(协议)
     */
    public class RpcDecoder extends ByteToMessageDecoder {
    
        // 目标对象类型进行解码
        private Class<?> target;
    
        public RpcDecoder(Class target) {
            this.target = target;
        }
    
        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
            // 不够长度丢弃
            if (in.readableBytes() < 4) {
                return;
            }
            // 标记一下当前的readIndex的位置
            in.markReaderIndex();
            // 读取传送过来的消息的长度。ByteBuf 的readInt()方法会让他的readIndex增加4
            int dataLength = in.readInt();
            // 读到的消息体长度如果小于我们传送过来的消息长度,则resetReaderIndex. 这个配合markReaderIndex使用的。把readIndex重置到mark的地方
            if (in.readableBytes() < dataLength) {
                in.resetReaderIndex();
                return;
            }
            byte[] data = new byte[dataLength];
            in.readBytes(data);
    
            Object obj = JSON.parseObject(data, target);    // 将byte数据转化为我们需要的对象
            out.add(obj);
        }
    }
    

    4. 定义消息处理器

    channl收到消息后,经过decode后需要交由消息处理器进行处理:

    /**
     * 消息处理器
     */
    @Slf4j
    public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            Request request = (Request) msg;
    
            log.info("=========》》》Client Data:" + JSON.toJSONString(request));
    
            Response response = new Response();
            response.setRequestId(request.getRequestId());
            response.setResult("Hello Client !");
    
            // client接收到信息后主动关闭掉连接
            ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            ctx.flush();
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    }
    

    注意:使用ctx.writeAndFlush(response)完成响应消息的发送。

    5. 定义netty服务器端

    /**
     * netty的服务端
     */
    public class NettyServer {
    
        private String ip;
        private int port;
    
        public NettyServer(String ip, int port) {
            this.ip = ip;
            this.port = port;
        }
    
        public void server() throws Exception {
    
            /**
             * Server端的EventLoopGroup分为两个
             * workerGroup作为处理请求,
             * bossGroup作为接收请求。
             */
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
    
            try {
    
                final ServerBootstrap serverBootstrap = new ServerBootstrap();
                /**
                 * ChannelOption四个常量作为TCP连接中的属性。
                 *
                 *
                 */
                serverBootstrap.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .option(ChannelOption.SO_BACKLOG, 1024)
                        .option(ChannelOption.SO_SNDBUF, 32 * 1024)
                        .option(ChannelOption.SO_RCVBUF, 32 * 1024)
                        .option(ChannelOption.SO_KEEPALIVE, true)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                socketChannel.pipeline().addLast(new RpcDecoder(Request.class))
                                        .addLast(new RpcEncoder(Response.class));
    
                                //注册消息处理器
                                socketChannel.pipeline().addLast(new NettyServerHandler());
    //                                    .addLast(new NettyServerHandler());
                            }
                        });
    
                serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);  // 开启长连接
    
                ChannelFuture future = serverBootstrap.bind(ip, port).sync();
                future.channel().closeFuture().sync();
            } finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    
        public static void main(String[] args) throws Exception {
            new NettyServer("127.0.0.1", 20000).server();
        }
    }
    

    启动该类后,server便一直监听20000端口的消息。

    6. 定义netty客户端

    实现了SimpleChannelInboundHandler<Response>类,重写了channelRead0方法,即消息处理器。

    /**
     * netty的客户端,并实现了SimpleChannelInboundHandler接口,即也可以处理消息。
     */
    public class NettyClient extends SimpleChannelInboundHandler<Response> {
        private final String ip;
        private final int port;
        private Response response;
    
        public NettyClient(String ip, int port) {
            this.ip = ip;
            this.port = port;
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    
        @Override
        protected void channelRead0(ChannelHandlerContext channelHandlerContext, Response response) throws Exception {
            this.response = response;
        }
    
        public Response client(Request request) throws Exception {
            EventLoopGroup group = new NioEventLoopGroup();
    
            try {
    
                // 创建并初始化 Netty 客户端 Bootstrap 对象
                Bootstrap bootstrap = new Bootstrap();
                bootstrap.group(group);
                bootstrap.channel(NioSocketChannel.class);
                bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel channel) throws Exception {
                        ChannelPipeline pipeline = channel.pipeline();
    
                        pipeline.addLast(new RpcDecoder(Response.class));
                        pipeline.addLast(new RpcEncoder(Request.class));
                        pipeline.addLast(NettyClient.this);
                    }
                });
                bootstrap.option(ChannelOption.TCP_NODELAY, true);
                // 连接 RPC 服务器
                ChannelFuture future = bootstrap.connect(ip, port).sync();
    
                // 写入 RPC 请求数据并关闭连接
                Channel channel = future.channel();
    
                channel.writeAndFlush(request).sync();
                channel.closeFuture().sync();
    
                return response;
            } finally {
                group.shutdownGracefully();
            }
    
        }
    
        public static void main(String[] args) throws Exception {
            Request request = new Request();
            request.setRequestId(UUID.randomUUID().toString());
            request.setParameter("Hello Server !");
            System.out.println(JSON.toJSONString(new NettyClient("127.0.0.1", 20000).client(request)));
        }
    }
    

    文章参考

    使用Java搭建一个简单的Netty通信例子

    相关文章

      网友评论

        本文标题:netty通信的简单demo

        本文链接:https://www.haomeiwen.com/subject/ffmyyltx.html