美文网首页
netty使用Google Protobuf协议优化通道的序列化

netty使用Google Protobuf协议优化通道的序列化

作者: 天草二十六_简村人 | 来源:发表于2022-10-17 10:48 被阅读0次

    一、Google Protobuf 协议

    解决序列化和检索效率的问题。

    序列化的方法有:

    • 使用java原生的序列化
    • xml格式
    • Protocol buffers

    Protocol buffers是灵活,高效,自动化的解决方案来解决这个问题。 使用Protocol buffers,您可以编写一个.proto描述您希望存储的数据结构。 Protocol buffers编译器创建一个实现自动编码和解析协议缓冲区数据的类,并使用高效的二进制格式。 生成的类为组成Protocol buffers的字段提供getter和setter。

    1.1、定义文件xxxSocketMessage.proto(首字母小写)

    syntax = "proto3";
    
    package com.xxx.channel.core.common.bean.dto;
    
    option java_package = "com.xxx.channel.core.common.bean.dto";
    option java_outer_classname = "XxxSocketMessage";
    
    
    enum MessageType {
        // 服务端
        SERVER = 0;
        // 默认
        DEFAULT = 1;
    }
    
    /**
     * 消息数据包
     */
    message Message {
        // 消息来源Type
        MessageType type = 1;
        // 自定义Tag,用于划分消息内容
        int32 tag = 2;
        // 消息Action
        int32 action = 3;
        // 消息内容
        string data = 4;
    }
    
    

    1.2、java类调用Protobuf的类Message

    //实际使用protobuf序列化框架客户端将对象转译成字节数组,然后通过协议传输到服务器端,服务器端可以是其他的语言框架(比如说python)将
    //字节对象反编译成java对象
    public class ProtobuffTest {
        public static void main(String[] args) throws Exception{
    // 对应java_outer_classname的名称
           XxxSocketMessage.Message message =  XxxSocketMessage.Message.newBuilder()
                    .setTag(XxxSocketMessage.MessageType.SERVER_VALUE)
                    .setAction(ResponseCode.Notification.ONLINE_LIST)
                    .setData(onlineListJson).build();
    
            //将对象转译成字节数组,序列化
            byte[] message2ByteArray = message.toByteArray();
    
            //将字节数组转译成对象,反序列化
            XxxSocketMessage.Message message2 = XxxSocketMessage.Message.parseFrom(student2ByteArray);
    
            System.out.println(message2.getType());
            System.out.println(message2.getTag());
            System.out.println(message2.getData());
            System.out.println(message2.getAction());
        }
    }
    

    1.3、pom.xml

    引入相关jar包

    <properties>
            <netty.version>4.1.29.Final</netty.version>
            <protobuf.version>3.7.1</protobuf.version>
            <grpc.version>1.12.0</grpc.version>
        </properties>
    
    <dependency>
                <groupId>io.netty</groupId>
                <artifactId>netty-all</artifactId>
                <version>${netty.version}</version>
            </dependency>
    <dependency>
                <groupId>com.google.protobuf</groupId>
                <artifactId>protobuf-java</artifactId>
                <version>${protobuf.version}</version>
            </dependency>
    <dependency>
                <groupId>io.grpc</groupId>
                <artifactId>grpc-all</artifactId>
                <version>${grpc.version}</version>
            </dependency>
    

    maven插件

    用于编译.proto文件

    <plugin>
                    <groupId>org.xolstice.maven.plugins</groupId>
                    <artifactId>protobuf-maven-plugin</artifactId>
                    <version>0.5.0</version>
                    <configuration>
                        <protocArtifact>
                            com.google.protobuf:protoc:3.1.0:exe:${os.detected.classifier}
                        </protocArtifact>
                        <pluginId>grpc-java</pluginId>
                        <pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}
                        </pluginArtifact>
                    </configuration>
                    <executions>
                        <execution>
                            <goals>
                                <goal>compile</goal>
                                <goal>compile-custom</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
    

    二、定义消息处理类Handler

    ClientHandler继承SimpleChannelInboundHandler并重写channelActive方法, 在该方法中我们处理登录逻辑,最后调用方法writeAndFlush()将其发送到server端。
    这里,只给出客户端的代码示例,服务端类似。

    public class ClientHandler extends SimpleChannelInboundHandler<ProtobufData.Task>{
    
    @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            responseLoginInfo(ctx.channel());
            super.channelActive(ctx);
        }
    
    private void responseLoginInfo(Channel channel) {
            UserInfo userInfo = getUserInfo();
            ProtobufData.LoginPack loginPack = ProtobufData.LoginPack.newBuilder()
                    .setAppKey(userInfo.getAppKey())
                    .setUserType(userInfo.getUserType())
                    .setRoomId(userInfo.getRoomId())
                    .setUserId(userInfo.getUserId()).build();
            ProtobufData.Task task = TaskPackage.login(loginPack);
            channel.writeAndFlush(task);
        }
    
    }
    
    

    三、客户端连接服务端的代码示例

    
    import com.google.common.base.Joiner;
    import com.google.protobuf.AbstractMessage;
    import com.xxx.channel.core.common.constant.ACKType;
    import com.xxx.channel.core.common.constant.CMD;
    import com.xxx.channel.core.common.protobuf.ProtobufData;
    import com.xxx.channel.core.common.protobuf.TaskPackage;
    import com.xxx.channel.core.common.utils.UUIDUtil;
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.protobuf.ProtobufDecoder;
    import io.netty.handler.codec.protobuf.ProtobufEncoder;
    import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
    import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
    import io.netty.handler.timeout.IdleStateHandler;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.scheduling.annotation.Scheduled;
    
    import java.util.Set;
    
    
    public class ChannelClient {
        /**
         * 通道ip
         */
        @Value("${channelService.ip}")
        private String ip;
        
        /**
         * 通道端口
         */
        @Value("${channelService.port}")
        private int port;
        /**
         * 连接通道
         */
        private Channel channel;
        
    private static final Logger logger = LoggerFactory.getLogger(ChannelClient.class);
    
        /**
         * init
         */
        private void init() {
            EventLoopGroup group = new NioEventLoopGroup();
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) {
                            ch.pipeline()
                                    // 下面四行,和Proto协议有关,主要是编解码。
                                    .addLast(new ProtobufVarint32FrameDecoder())
                                    .addLast(new ProtobufDecoder(ProtobufData.Task.getDefaultInstance()))
                                    .addLast(new ProtobufVarint32LengthFieldPrepender())
                                    .addLast(new ProtobufEncoder())
    
                                    .addLast(new IdleStateHandler(0, 3, 0))
                                    
                                    // 消息处理类,见上一步的定义
                                    .addLast(new ClientHandler());
                        }
                    });
            ChannelFuture future;
            try {
                future = bootstrap.connect(ip, port).sync();
                channel = future.channel();
            } catch (InterruptedException e) {
                logger.error("connect channel-service-server error", e);
            }
        }
    
    
        /**
         * 每隔3秒尝试重连
         */
        @Scheduled(initialDelay = 3000, fixedRate = 10000)
        public void reconnect() {
            if (!isConnected()) {
                init();
                logger.info("channel_service_client start connect to:{}", channel.remoteAddress());
            }
        }
    
        /**
         * 获取指定连接
         *
         * @return
         */
        public Channel getChannel() {
            return channel;
        }
    
        /**
         * 连接是否正常
         *
         * @return boolean
         */
        private boolean isConnected() {
            return channel != null && channel.isActive();
        }
    
    }
    
    

    四、客户端的对接

    4.1、处理消息

    实现接口IBusinessHandler的接收和响应报文,在ClientHandler类中重写方法channelRead0()。

    private IBusinessHandler businessHandler;
    
    @Override
        protected void channelRead0(ChannelHandlerContext ctx, ProtobufData.Task task) {
            //业务处理
            switch (task.getPackType()) {
    // 接收消息
                case MESSAGE:
                    businessHandler.handleMessage(task.getMessagePack());
                    boolean enableACKResponse = task.getMessagePack().getAckType() > 0;
                    responseACK(task.getMessagePack().getMessageId(), ctx.channel(), enableACKResponse);
                    break;
                case ACK:
                    break;
                case HEARTBEAT:
                    break;
    // 响应消息
                case RESPONSE:
                    ProtobufData.ResponsePack responsePack = task.getResponsePack();
                    businessHandler.handleResponse(responsePack);
                    responseACK(responsePack.getMessageId(), ctx.channel(), false);
                    break;
                default:
                    break;
            }
    
        }
    

    PackType的枚举类见下:

    enum PackType {
        //登录
        Login = 0;
        //数据传输
        MESSAGE = 1;
        //退出
        EXIT = 2;
        //ack
        ACK = 3;
        //heartBeat
        HEARTBEAT = 4;
        //Response pack 响应体
        RESPONSE = 5;
    }
    

    4.2、IBusinessHandler的实现

    import com.google.protobuf.InvalidProtocolBufferException;
    import com.xxx.channel.core.common.bean.dto.XxxSocketMessage;
    import com.xxx.channel.core.common.constant.ResponseCode;
    import com.xxx.channel.core.common.netty.handle.IBusinessHandler;
    import com.xxx.channel.core.common.protobuf.ProtobufData;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    @Component
    public class BusinessHandler implements IBusinessHandler {
        private final Logger log = LoggerFactory.getLogger(BusinessHandler.class);
    
        @Override
        public void handleResponse(ProtobufData.ResponsePack responsePack) {
            short code = (short) responsePack.getCode();
            switch (code) {
                //房主上线
                case ResponseCode.Notification.OWNER_ONLINE:
                    log.info("online response pack:{}", responsePack);
                    break;
                //房主下线
                case ResponseCode.Notification.OWNER_OFFLINE:
                    log.info("offline response pack:{}", responsePack);
                    break;
                default:
                    break;
            }
        }
    
        @Override
        public void handleMessage(ProtobufData.MessagePack messagePack) {
            try {
                XxxSocketMessage.Message message = XxxSocketMessage.Message.parseFrom(messagePack.getDataBytes());
               
    // 调用ChannelClient中的方法,实现消息的上行和下行。
     tdService.handlerMessages(message.getData(), messagePack.getFrom());
            } catch (InvalidProtocolBufferException e) {
                log.error("数据解析异常 e : {}", e);
            } catch (Exception e1) {
                log.error("业务异常 e : {}", e1);
            }
        }
    
    }
    
    

    相关文章

      网友评论

          本文标题:netty使用Google Protobuf协议优化通道的序列化

          本文链接:https://www.haomeiwen.com/subject/ndwrzrtx.html