美文网首页vert.x & netty
Netty系列(4)Protobuf 及其在 Netty 中的应

Netty系列(4)Protobuf 及其在 Netty 中的应

作者: suxin1932 | 来源:发表于2020-03-25 22:17 被阅读0次

    1.protobuf

    RPC(一般通过SOCKET进行数据传输)
    1.定义一个接口说明文件:描述了对象(结构体),对象成员, 接口方法等信息
    2.通过RPC框架所提供的编译器, 将接口说明文件编译成具体的语言文件,如java文件, python文件
    3.在客户端和服务端分别引入RPC编译器所生成的文件, 即可像调用本地方法一样, 调用远程方法
    

    1.1windows上安装protobuf编译器

    https://github.com/protocolbuffers/protobuf/releases

    protobuf编译器-windows版-下载后解压.png protobuf编译器-windows版-环境变量配置.png cmd验证.png

    3.示例用法

    Netty-protobuf 多协议解决方案1代码结构.png

    3.0 pom.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <parent>
            <artifactId>java-tools</artifactId>
            <groupId>com.zy</groupId>
            <version>1.0-SNAPSHOT</version>
        </parent>
        <modelVersion>4.0.0</modelVersion>
        <properties>
            <org.slf4j.version>1.7.25</org.slf4j.version>
            <log4j.version>1.2.17</log4j.version>
            <lombok.version>1.16.20</lombok.version>
            <protobuf.version>3.9.1</protobuf.version>
        </properties>
    
        <artifactId>tools-netty</artifactId>
        <dependencies>
            <dependency>
                <groupId>io.netty</groupId>
                <artifactId>netty-all</artifactId>
                <version>4.1.48.Final</version>
            </dependency>
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
            </dependency>
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-api</artifactId>
                <version>${org.slf4j.version}</version>
            </dependency>
            <!--slf4j-log4j12包含了log4j依赖 -->
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
                <version>${org.slf4j.version}</version>
            </dependency>
            <dependency>
                <groupId>log4j</groupId>
                <artifactId>log4j</artifactId>
                <version>${log4j.version}</version>
            </dependency>
            <dependency>
                <groupId>com.google.protobuf</groupId>
                <artifactId>protobuf-java</artifactId>
                <version>${protobuf.version}</version>
            </dependency>
            <dependency>
                <groupId>com.google.protobuf</groupId>
                <artifactId>protobuf-java-util</artifactId>
                <version>${protobuf.version}</version>
            </dependency>
        </dependencies>
        
    </project>
    

    3.1 定义 protoc 文件, 生成代码

    syntax = "proto3";
    
    option java_package = "com.zy.netty.netty03";
    option java_outer_classname = "DataInfo";
    
    // 这里定义传递多种实例的 protobuf 与 netty 结合的写法
    // protobuf 多协议传输的解决方案一
    message Message {
    
        enum DataType {
            SchoolType = 0;
            TeacherType = 1;
            StuType = 2;
        }
    
        DataType data_type = 1;
        oneof dataBody {
            School school = 2;
            Teacher teacher = 3;
            Stu stu = 4;
        }
    }
    
    message Stu {
        string name = 1;
        int32 age = 2;
        string gender = 3;
    }
    
    message Teacher {
        string name = 1;
        int32 age = 2;
        string gender = 3;
    }
    
    message School {
        string name = 1;
        float square = 2;
    }
    

    执行 protoc --java_out=src/main/java src/main/resources/proto/Data.proto 命令, 生成代码 com.zy.netty.netty03.DataInfo

    3.2 server

    package com.zy.netty.netty03;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.protobuf.ProtobufDecoder;
    import io.netty.handler.codec.protobuf.ProtobufEncoder;
    import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
    import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    import io.netty.util.concurrent.DefaultThreadFactory;
    import lombok.extern.slf4j.Slf4j;
    
    @Slf4j
    public class Server03 {
        public static void main(String[] args) {
            ServerBootstrap server = new ServerBootstrap();
            // server 这里的 boss, worker 都没有设置 nThreads, 走默认值: io.netty.channel.MultithreadEventLoopGroup.DEFAULT_EVENT_LOOP_THREADS
            // 1.当 worker 不存在, server.group(boss, boss) 是 Reactor 的单线程模型
            // 2.当 worker 存在, boss 的 nThreads == 1 时,  server.group(boss, worker) 是 Reactor 的多线程模型
            // 3.当 worker 存在, boss 的 nThreads > 1 时,  server.group(boss, worker) 是 Reactor 的主从线程模型
            NioEventLoopGroup bossGroup = new NioEventLoopGroup(new DefaultThreadFactory("bossGroupExecutor", true));
            NioEventLoopGroup workerGroup = new NioEventLoopGroup(new DefaultThreadFactory("bossGroupExecutor", true));
    
            try {
                server.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .option(ChannelOption.TCP_NODELAY, true)
                        .handler(new LoggingHandler(LogLevel.DEBUG))
                        .childHandler(new ChannelInitializer<NioSocketChannel>() {
                            @Override
                            protected void initChannel(NioSocketChannel ch) throws Exception {
                                ch.pipeline()
                                        // ProtobufVarint32FrameDecoder 用于解决粘包拆包问题
                                        .addLast(new ProtobufVarint32FrameDecoder())
                                        // ProtobufDecoder 仅仅支持解码, 不支持半包处理
                                        // 这个解码器定义了要解码的数据类型: 这里限制了只能解码Student.Stu类型, 思考解决方案, 如何通用?
                                        .addLast(new ProtobufDecoder(DataInfo.Message.getDefaultInstance()))
                                        .addLast(new ProtobufVarint32LengthFieldPrepender())
                                        .addLast(new ProtobufEncoder())
                                        .addLast(new ServerHandler03());
                            }
                        });
    
                ChannelFuture channelFuture = server.bind("127.0.0.1", 8099).sync();
                channelFuture.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                log.error("server error.", e);
            } finally {
                workerGroup.shutdownGracefully();
                bossGroup.shutdownGracefully();
            }
        }
    
        /**
         * 这里讲述下 @Sharable 的作用:
         * 1.一般情况下, Server 每与 Client 建立一个连接, 都会建立一个 Channel, 一个 ChannelPipeline
         * 在每一个 ChannelPipeline 中都会新建 ChannelHandler, 如: ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());
         *
         * 2.如果要想定义一个全局的 ChannelHandler (单例) 供所有的 Channel 使用, 比如统计全局的 metrics 信息, 则可以:
         * ServerHandler03 sharableHandler = new ServerHandler03(); // 这个定义在 server.group 之前
         * server.group(boss, worker).
         * ... // 这里 加入
         * ch.pipeline().addLast(sharableHandler);
         *
         * 3.需要说明的是: 第2步中的定义的 sharableHandler 必须被 @Sharable 修饰, 否则, 当有第二个 Channel 建立时, 将会报错, 详见:
         * io.netty.channel.DefaultChannelPipeline#checkMultiplicity(io.netty.channel.ChannelHandler)
         *
         * 4.当然, 如果只是每建立一个 Channel, 就 new 一个 ChannelHandler, 则 @Sharable 修饰与否 都不影响
         */
        @ChannelHandler.Sharable
        private static class ServerHandler03 extends SimpleChannelInboundHandler<DataInfo.Message> {
    
            @Override
            protected void channelRead0(ChannelHandlerContext ctx, DataInfo.Message msg) throws Exception {
                // protobuf 多协议传输的解决方案一
                if (msg.getDataType() == DataInfo.Message.DataType.SchoolType) {
                    System.out.println("server received msg: " + msg.getSchool());
                } else if (msg.getDataType() == DataInfo.Message.DataType.TeacherType) {
                    System.out.println("server received msg: " + msg.getTeacher());
                }  else if (msg.getDataType() == DataInfo.Message.DataType.StuType) {
                    System.out.println("server received msg: " + msg.getStu());
                }
            }
        }
    }
    

    3.3 client

    package com.zy.netty.netty03;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.protobuf.ProtobufDecoder;
    import io.netty.handler.codec.protobuf.ProtobufEncoder;
    import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
    import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.Random;
    
    @Slf4j
    public class Client03 {
        public static void main(String[] args) {
            Bootstrap client = new Bootstrap();
            NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();
    
            try {
                client.group(nioEventLoopGroup)
                        .channel(NioSocketChannel.class)
                        .option(ChannelOption.TCP_NODELAY, true)
                        .handler(new ChannelInitializer<NioSocketChannel>() {
                            @Override
                            protected void initChannel(NioSocketChannel ch) throws Exception {
                                ch.pipeline()
                                        .addLast(new ProtobufVarint32FrameDecoder())
                                        .addLast(new ProtobufDecoder(DataInfo.Message.getDefaultInstance()))
                                        .addLast(new ProtobufVarint32LengthFieldPrepender())
                                        .addLast(new ProtobufEncoder())
                                        .addLast(new ClientHandler03());
                            }
                        });
    
                ChannelFuture channelFuture = client.connect("127.0.0.1", 8099).sync();
                channelFuture.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                log.error("client error.", e);
            } finally {
                nioEventLoopGroup.shutdownGracefully();
            }
        }
    
        private static class ClientHandler03 extends SimpleChannelInboundHandler<DataInfo.Message> {
    
            @Override
            protected void channelRead0(ChannelHandlerContext ctx, DataInfo.Message msg) throws Exception {
                System.out.println("client receive msg: " + msg);
            }
    
            @Override
            public void channelActive(ChannelHandlerContext ctx) throws Exception {
                // protobuf 多协议传输的解决方案一
                DataInfo.Message message;
                int i = new Random().nextInt(3);
                switch (i) {
                    case 0:
                        message = DataInfo.Message.newBuilder().setDataType(DataInfo.Message.DataType.SchoolType)
                                .setSchool(DataInfo.School.newBuilder().setName("nanjingjinlingzhognxue").setSquare(800).build())
                                .build();
                        break;
                    case 1:
                        message = DataInfo.Message.newBuilder().setDataType(DataInfo.Message.DataType.TeacherType)
                                .setTeacher(DataInfo.Teacher.newBuilder().setName("tom").setAge(30).build())
                                .build();
                        break;
                    default:
                        message = DataInfo.Message.newBuilder().setDataType(DataInfo.Message.DataType.StuType)
                                .setStu(DataInfo.Stu.newBuilder().setName("jerry").setAge(10).build())
                                .build();
                }
                ctx.writeAndFlush(message);
            }
        }
    }
    

    参考资料
    https://developers.google.com/protocol-buffers/docs/proto3
    https://github.com/protocolbuffers/protobuf
    https://www.jianshu.com/p/506667a6651f
    https://blog.csdn.net/u011518120/article/details/54604615

    相关文章

      网友评论

        本文标题:Netty系列(4)Protobuf 及其在 Netty 中的应

        本文链接:https://www.haomeiwen.com/subject/bgjfuhtx.html