美文网首页IT@程序员猿媛SpringBoot精选
Netty整合JBoss Marshalling编解码

Netty整合JBoss Marshalling编解码

作者: 程就人生 | 来源:发表于2019-10-26 17:52 被阅读0次

    JBoss Marshalling也是在Netty框架中被经常使用的编解码技术,它对JDK默认的序列号框架做了优化,又保持和java.io.Serializable接口的兼容,有必要学习一下,现在看一下JBoss Marshalling在netty中是如何编解码的,对于半包的问题是否有处理呢?

    示例编码环境:
    springboot2.1.4
    jdk1.8

    首先,在pom中引入必要的架包:

    <!-- jboss-marshalling编解码和序列号架包 -->
            <dependency>
                <groupId>org.jboss.marshalling</groupId>
                <artifactId>jboss-marshalling</artifactId>
                <version>2.0.9.Final</version>
            </dependency>
            <dependency>
                <groupId>org.jboss.marshalling</groupId>
                <artifactId>jboss-marshalling-serial</artifactId>
                <version>2.0.9.Final</version>
            </dependency>
    

    第二步,服务端编码实现;

    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    
    /**
     * JBoss Marshalling编解码服务端测试示例
     * @author 程就人生
     * @date 2019年10月26日
     */
    public class MarshallingServer {
        
        //创建线程组
        private final EventLoopGroup bossGroup = new NioEventLoopGroup();
        
        private final EventLoopGroup workerGroup = new NioEventLoopGroup();
        
        /**
         * 启动服务
         * @param port
         */
        public void start(int port){
    
            try {
    
                ServerBootstrap serverBootstrap = new ServerBootstrap().group(bossGroup,workerGroup)
                                        //非阻塞
                                        .channel(NioServerSocketChannel.class)
                                        .option(ChannelOption.SO_BACKLOG, 100)
                                        .handler(new LoggingHandler(LogLevel.INFO))
                                        .childHandler(new ChannelInitializer<SocketChannel>(){
    
                                            @Override
                                            protected void initChannel(SocketChannel ch) throws Exception {
                                                //JBoss解码
                                                ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                                                //JBoss编码
                                                ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                                                ch.pipeline().addLast(new MarshallingServerHandler());
                                            }
                                            
                                        });
                //绑定端口,同步等等成功
                ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
                //等待服务端监听端口关闭
                channelFuture.channel().closeFuture().sync();           
            } catch (Exception e) {
                e.printStackTrace();
            } finally{
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
        
    
        public static void main(String[] args) {
            MarshallingServer server = new MarshallingServer();
            server.start(7788);
        }
    }
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    
    /**
     * I/O消息的接收和处理
     * @author 程就人生
     * @date 2019年10月26日
     */
    public class MarshallingServerHandler extends ChannelInboundHandlerAdapter{
    
        private static Logger log = LoggerFactory.getLogger(MarshallingServerHandler.class);
        
        /**
         * 接收到消息时的处理
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            Test test = (Test) msg;
            if(test.getBody().contains("程就人生")){
                log.info("接收到的消息:" + test.toString());
                ctx.writeAndFlush(test);
            }
        }
        
        /**
         * 异常时关闭
         */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
                throws Exception {
            //发生异常,关闭连接
            ctx.close();
        }
    }
    
    import org.jboss.marshalling.MarshallerFactory;
    import org.jboss.marshalling.Marshalling;
    import org.jboss.marshalling.MarshallingConfiguration;
    
    import io.netty.handler.codec.marshalling.DefaultMarshallerProvider;
    import io.netty.handler.codec.marshalling.DefaultUnmarshallerProvider;
    import io.netty.handler.codec.marshalling.MarshallerProvider;
    import io.netty.handler.codec.marshalling.MarshallingDecoder;
    import io.netty.handler.codec.marshalling.MarshallingEncoder;
    import io.netty.handler.codec.marshalling.UnmarshallerProvider;
    /**
     * JBoss 编解码实现
     * @author 程就人生
     * @date 2019年10月26日
     */
    public final class MarshallingCodeCFactory {
    
        /**
         * 创建JBoss Marshalling解码器
         * @return
         *
         */
        public static MarshallingDecoder buildMarshallingDecoder() {
            final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
            final MarshallingConfiguration configuration = new MarshallingConfiguration();
            configuration.setVersion(5);
            UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
            MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024*1024*1);
            return decoder;
        }
    
        /**
         * 创建JBoss Marshalling编码器
         * @return
         *
         */
        public static MarshallingEncoder buildMarshallingEncoder() {
            final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
            final MarshallingConfiguration configuration = new MarshallingConfiguration();
            configuration.setVersion(5);
            MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
            MarshallingEncoder encoder = new MarshallingEncoder(provider);
            return encoder;
        }
    
    }
    
    import java.io.Serializable;
    /**
     * java POJO 必须序列化
     * @author 程就人生
     * @date 2019年10月26日
     */
    public class Test implements Serializable {
        
        private String uid;
        private String body;
        
        public String getUid() {
            return uid;
        }
        public void setUid(String uid) {
            this.uid = uid;
        }
        public String getBody() {
            return body;
        }
        public void setBody(String body) {
            this.body = body;
        }
        
        @Override
        public String toString() {
            return "Test [uid=" + uid + ", body=" + body + "]";
        }
    }
    

    第三步,客户端编码实现;

    import com.example.demo.server5.MarshallingCodeCFactory;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    /**
     * JBoss Marshalling编解码,客户端模拟
     * @author 程就人生
     * @date 2019年10月13日
     */
    public class MarshallingClient {
    
        public void connect(int port, String host){
            EventLoopGroup group = new NioEventLoopGroup();
            try{
                Bootstrap bootstrap = new Bootstrap();
                bootstrap.group(group).channel(NioSocketChannel.class)//设置为非阻塞
                .option(ChannelOption.TCP_NODELAY, true)
                .handler(new ChannelInitializer<SocketChannel>(){
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        //JBoss解码
                        ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                        //JBoss编码
                        ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                        ch.pipeline().addLast(new MarshallingClientHandler());
                    }
                });
                
                //建立连接
                ChannelFuture channelFuture = bootstrap.connect(host, port);
                //等待服务端监听端口关闭
                channelFuture.channel().closeFuture().sync();
                
            }catch(Exception e){
                e.printStackTrace();
            }finally{
                group.shutdownGracefully();
            }
        }
        
        public static void main(String[] argo){
            new MarshallingClient().connect(7788, "localhost");
        }
    }
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import com.example.demo.server5.Test;
    
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    /**
     * I/O消息的接收和处理
     * @author 程就人生
     * @date 2019年10月26日
     */
    public class MarshallingClientHandler extends ChannelInboundHandlerAdapter{
    
        private static Logger log = LoggerFactory.getLogger(MarshallingClientHandler.class);
        
        /**
         * 
         */
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            Test test = null;
            for(int i=0;i<100;i++){
                test = new Test();
                test.setUid(i+"");
                test.setBody("程就人生"+i);
                ctx.writeAndFlush(test);
            }
        }
         
        /**
         * 接收到消息时的处理
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {      
            Test test = (Test) msg;
            log.info("接收到的消息:" + test.toString());      
        }
        
        /**
         * 异常时关闭
         */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
                throws Exception {
            ctx.close();
        }
    }
    

    第四步,测试;先启动服务端,在启动客户端,为了了解Marshalling对半包的处理,代码里第一次连接服务器成功后,就向服务端主动发送一百次消息,看看服务端是否能够完整接收,完整返回;

    服务端测试结果
    客户端测试结果

    从控制台的结果来看,服务器端很完整地接收到了客户端发送的消息,并没有发送粘包/半包的情况,客户端也完整地接收到了服务端发生的消息;通过本次测试,可以发现JBoss的Marshalling是支持半包处理的,在服务器端的hanlder和客户端的hanlder中只是加了编解码,并未加入其它的对半包处理的代码,使用起来还是很方便的。

    相关文章

      网友评论

        本文标题:Netty整合JBoss Marshalling编解码

        本文链接:https://www.haomeiwen.com/subject/requvctx.html