美文网首页
Netty使用kryo序列化传输对象

Netty使用kryo序列化传输对象

作者: 横渡 | 来源:发表于2019-07-31 13:34 被阅读0次

    参考文章:
    https://blog.csdn.net/eguid_1/article/details/79316403
    https://blog.csdn.net/top_code/article/details/50901623

    通常我们习惯将编码(Encode)称为序列化(serialization),它将对象序列化为字节数组,用于网络传输、数据持久化或其他用途。
    同样的,解码(Decoder)称为反序列化(deserialization),它把从网络、磁盘等读取到的字节数组还原成原始对象,以方便后续业务逻辑操作。

    kryo是个高效的java序列化/反序列化库,目前Twitter、Apache、Storm、Hive等等都在使用该技术。
    kryo序列化技术的性能足够好,比它更高效的序列化工具就只有google的protobuf了,protobuf有个缺点就是传输的每一个类结构都需要相对应的proto文件,如果类结构发生了变化,需要重新生成proto文件;protobuf的优点是和平台无关扩展性好,支持java,C++,Python三种语言。kyro速度快,序列化后体积小,性能仅次于protobuf,跨语言支持复杂。
    常见序列化框架性能对比图:


    序列化性能对比-耗时 序列化性能对比-体积
    1. 实现Kryo序列化工具类。
      序列化接口Serializer :
    package learn.netty.serial.kryo;
    
    /**
     * 序列化工具接口
     *
     * @author stone
     * @date 2019/7/31 9:25
     */
    public  interface Serializer {
        /**
         * 序列化
         * @param obj
         */
        byte[] serialize(Object obj);
    
    
        /**
         * 反序列化
         * @param bytes 字节数组
         * @return
         */
        <T> T deserialize(byte[] bytes);
    }
    

    KryoSerializer 序列化工具类

    package learn.netty.serial.kryo;
    
    import com.esotericsoftware.kryo.Kryo;
    import com.esotericsoftware.kryo.io.Input;
    import com.esotericsoftware.kryo.io.Output;
    import com.esotericsoftware.kryo.serializers.BeanSerializer;
    import org.apache.commons.io.IOUtils;
    
    import java.io.ByteArrayInputStream;
    import java.io.ByteArrayOutputStream;
    
    /**
     * 基于kryo的序列化/反序列化工具
     * @author stone
     * @date 2019/7/31 9:30
     */
    public class KryoSerializer implements Serializer {
        private final Class<?> ct;
        // kryo 是非线程安全类
        final ThreadLocal<Kryo> kryoLocal = new ThreadLocal<Kryo>() {
            @Override
            protected Kryo initialValue() {
                Kryo kryo = new Kryo();
                kryo.register(ct, new BeanSerializer(kryo, ct));
                return kryo;
            }
        };
    
        public KryoSerializer(Class<?> ct) {
            this.ct = ct;
        }
    
        public Class<?> getCt() {
            return ct;
        }
    
        private Kryo getKryo() {
            return kryoLocal.get();
        }
    
    
    
        @Override
        public byte[] serialize(Object obj) {
            ByteArrayOutputStream bos = new ByteArrayOutputStream();
            Output output = new Output(bos);
            try {
                Kryo kryo = getKryo();
                kryo.writeObjectOrNull(output, obj, obj.getClass());
                output.flush();
                return bos.toByteArray();
            } finally {
                IOUtils.closeQuietly(output);
                IOUtils.closeQuietly(bos);
            }
        }
    
        @Override
        public <T> T deserialize(byte[] bytes) {
            if (bytes == null)
                return null;
            ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
            Input input = new Input(bais);
            try {
                Kryo kryo = getKryo();
                return (T) kryo.readObjectOrNull(input, ct);
            } finally {
                IOUtils.closeQuietly(input);
                IOUtils.closeQuietly(bais);
            }
        }
        
    }
    

    SerializerFactory:

    package learn.netty.serial.kryo;
    
    /**
     * 序列化工具类工厂实现
     * @author stone
     * @date 2019/7/31 11:21
     */
    public class SerializerFactory {
        public static Serializer getSerializer(Class<?> cls) {
            return new KryoSerializer(cls);
        }
    }
    
    1. 实现编码器Encoder,继承自MessageToByteEncoder
    package learn.netty.serial.kryo;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.MessageToByteEncoder;
    
    /**
     * 自定义kryo编码器(将传输对象变为byte数组)
     *
     * @author stone
     * @date 2019/7/30 14:16
     */
    public class KryoMsgEncoder extends MessageToByteEncoder<LsCar>{
        private Serializer serializer = SerializerFactory.getSerializer(LsCar.class);
    
        @Override
        protected void encode(ChannelHandlerContext ctx, LsCar msg, ByteBuf out) throws Exception {
            // 1. 将对象转换为byte
            byte[] body = serializer.serialize(msg);
            // 2. 读取消息的长度
            int dataLength = body.length;
            // 3. 先将消息长度写入,也就是消息头
            out.writeInt(dataLength);
            out.writeBytes(body);
        }
    }
    
    
    1. 实现解码器Decoder,继承ByteToMessageDecoder类
    package learn.netty.serial.kryo;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.ByteToMessageDecoder;
    
    import java.util.List;
    
    /**
     * 自定义解码器(将字节数组变为对象)
     * @author stone
     * @date 2019/7/30 14:32
     */
    public class KryoMsgDecoder extends ByteToMessageDecoder {
        private static final int HEAD_LENGTH = 4; // 表示数据流(头部是消息长度)头部的字节数
        private Serializer serializer = SerializerFactory.getSerializer(LsCar.class);
    
        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
            if (in.readableBytes() < HEAD_LENGTH) {
                return;
            }
            // 标记当前readIndex的位置
            in.markReaderIndex();
            // 读取传送过来的消息长度,ByteBuf的 readInt() 方法会让它的readIndex+4
            int dataLength = in.readInt();
            if (dataLength <= 0) {// 如果读到的消息长度不大于0,这是不应该出现的情况,关闭连接
                ctx.close();
            }
            if (in.readableBytes() < dataLength) { // 说明是不完整的报文,重置readIndex
                in.resetReaderIndex();
                return;
            }
    
            // 至此,读取到一条完整报文
            byte[] body = new byte[dataLength];
            in.readBytes(body);
    
            // 将bytes数组转换为我们需要的对象
            LsCar msg = serializer.deserialize(body);
            out.add(msg);
        }
    }
    
    1. 服务端代码
    package learn.netty.serial.kryo;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    
    /**
     * @author stone
     * @date 2019/7/30 14:49
     */
    public class KryoTransferServer {
        private final int port;
    
        public KryoTransferServer(int port) {
            this.port = port;
        }
    
        public void run() throws Exception {
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                ServerBootstrap b = new ServerBootstrap();
                b.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .handler(new LoggingHandler(LogLevel.INFO))
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ch.pipeline().addLast(new KryoMsgEncoder());
                                ch.pipeline().addLast(new KryoMsgDecoder());
                                ch.pipeline().addLast(new KryoServerHandler());
                            }
                        })
                        .option(ChannelOption.SO_BACKLOG, 128) // 设置tcp缓冲区
                        .option(ChannelOption.SO_KEEPALIVE, true);
                // 绑定端口,同步等待绑定成功
                ChannelFuture f = b.bind(port).sync();
                // 等待服务端监听端口关闭
                f.channel().closeFuture().sync();
            } finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    
        public static void main(String[] args) throws Exception {
            new KryoTransferServer(8889).run();
        }
    
    }
    
    1. 服务端业务处理类
    package learn.netty.serial.kryo;
    
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.util.ReferenceCountUtil;
    
    
    /**
     * @author stone
     * @date 2019/7/30 14:54
     */
    public class KryoServerHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            try {LsCar car = new LsCar();
                car.setName("Server");
                car.setBrand("qirui");
                car.setPrice(24.5f);
                car.setSpeed(196);
    
                System.out.println("Server write msg: " + car);
                ChannelFuture f = ctx.writeAndFlush(car);
                f.addListener(ChannelFutureListener.CLOSE);
            } finally {
                ReferenceCountUtil.release(msg);
            }
    
    
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    }
    
    
    1. 客户端代码
    package learn.netty.serial.kryo;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    
    /**
     * @author stone
     * @date 2019/7/30 14:59
     */
    public class KryoTransferClient {
        private String host;
        private int port;
        private LsCar message;
    
        public KryoTransferClient(String host, int port, LsCar message) {
            this.host = host;
            this.port = port;
            this.message = message;
        }
    
        public void send() throws InterruptedException {
            // 配置客户端NIO线程组
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                // 配置启动辅助类
                Bootstrap b = new Bootstrap();
                b.group(group)
                        .channel(NioSocketChannel.class)
                        .option(ChannelOption.TCP_NODELAY, true)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ch.pipeline().addLast(new KryoMsgEncoder());
                                ch.pipeline().addLast(new KryoMsgDecoder());
                                ch.pipeline().addLast(new KryoClientHandler(message));
                            }
                        });
                // 异步连接服务器,同步等待连接成功
                ChannelFuture f = b.connect(host, port).sync();
                // 等待连接关闭
                f.channel().closeFuture().sync();
            } finally {
                group.shutdownGracefully();
            }
        }
    
        public static void main(String[] args) throws InterruptedException {
            LsCar msg = new LsCar();
            msg.setName("Client");
            msg.setBrand("changcheng");
            msg.setSpeed(100);
            msg.setPrice(12.5f);
            KryoTransferClient client = new KryoTransferClient("127.0.0.1", 8889, msg);
            client.send();
        }
    }
    
    1. 客户端服务类
    package learn.netty.serial.kryo;
    
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.util.ReferenceCountUtil;
    
    /**
     * @author stone
     * @date 2019/7/30 15:04
     */
    public class KryoClientHandler extends ChannelInboundHandlerAdapter {
        private final LsCar message;
    
        public KryoClientHandler(LsCar message) {
            this.message = message;
        }
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            super.channelActive(ctx);
            System.out.println("client send message: " + message);
            ctx.writeAndFlush(message);
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            try {
                LsCar body = (LsCar) msg;
                System.out.println("client receive msg: " + body);
            } finally {
                ReferenceCountUtil.release(msg);
            }
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    }
    
    

    相关文章

      网友评论

          本文标题:Netty使用kryo序列化传输对象

          本文链接:https://www.haomeiwen.com/subject/pgzgrctx.html