美文网首页
基于netty的Marshalling序列化框架简单实现

基于netty的Marshalling序列化框架简单实现

作者: senninha | 来源:发表于2017-03-18 23:48 被阅读232次

    Marshalling序列化框架简单实现

    1.导入相关jar包

    maven项目直接添加依赖即可。
    
            <!-- MarshAlling dependency -->
            <dependency>
                <groupId>org.jboss.marshalling</groupId>
                <artifactId>jboss-marshalling-osgi</artifactId>
                <version>2.0.0.Beta5</version>
            </dependency>
    

    2.创建序列化传输的类

    //记得要实现Serializable接口
    public class UserInfo implements Serializable {
        private String username;
        private String age;
        public String getUsername() {
            return username;
        }
        public String getAge() {
            return age;
        }
        public void setUsername(String username) {
            this.username = username;
        }
        public void setAge(String age) {
            this.age = age;
        }
        public UserInfo(String username, String age) {
            super();
            this.username = username;
            this.age = age;
        }
        
        public UserInfo(){
            
        }
        @Override
        public String toString() {
            return "UserInfo [username=" + username + ", age=" + age + "]";
        }
        
        
        
        
    }
    
    

    3.编写创建MarshallingEncoder和MarshallingDecoder的工厂类

    public class MarshallingCodeFactory {
        public static MarshallingEncoder getEncoder(){
        //这里表示的是支持java serial对象的序列化。所以我们传输的对象要实现Serializable接口
            MarshallerFactory factory = Marshalling.getProvidedMarshallerFactory("serial");
            MarshallingConfiguration configuration = new MarshallingConfiguration();
            configuration.setVersion(5);
            MarshallerProvider provider = new DefaultMarshallerProvider(factory, configuration);
            MarshallingEncoder encoder = new MarshallingEncoder(provider);
            return encoder;
        }
        
        public static MarshallingDecoder getDecoder(){
            MarshallerFactory factory = Marshalling.getProvidedMarshallerFactory("serial");
            MarshallingConfiguration configuration = new MarshallingConfiguration();
            configuration.setVersion(5);
            UnmarshallerProvider provider = new DefaultUnmarshallerProvider(factory, configuration);
            MarshallingDecoder decoder = new MarshallingDecoder(provider);
            return decoder;
        }
    

    4.Server端

    package cn.senninha.concurrent.server;
    
    import cn.senninha.concurrent.code.MarshallingCodeFactory;
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.marshalling.MarshallingDecoder;
    import io.netty.handler.codec.marshalling.MarshallingEncoder;
    
    public class TimeServerMarshalling {
        public void bind(int port) throws Exception {
            EventLoopGroup bossGruop = new NioEventLoopGroup();
            EventLoopGroup workGroup = new NioEventLoopGroup();
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGruop, workGroup).channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024).childHandler(new ChildChannelHandler());
    
            try {
                ChannelFuture future = bootstrap.bind(port).sync();
                future.channel().closeFuture().sync();
            } catch (Exception e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } finally {
                bossGruop.shutdownGracefully();
                workGroup.shutdownGracefully();
            }
        }
    
        private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
    
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                // TODO Auto-generated method stub
                //这里添加marshalling的序列化支持
                MarshallingEncoder encoder = MarshallingCodeFactory.getEncoder();
                MarshallingDecoder decoder = MarshallingCodeFactory.getDecoder();
                ch.pipeline().addLast(encoder);
                ch.pipeline().addLast(decoder);
                ch.pipeline().addLast(new TimeServerHandler());
            }
        }
    
        public static void main(String[] args) {
            int port = 12580;
            try {
                new TimeServerMarshalling().bind(port);
            } catch (Exception e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }
    
    

    对应的serverhanlder:

    package cn.senninha.concurrent.server;
    
    import java.nio.ByteBuffer;
    
    import org.msgpack.MessagePack;
    import org.omg.Messaging.SyncScopeHelper;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.ByteBufUtil;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    
    public class TimeServerHandler extends ChannelHandlerAdapter {
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            // TODO Auto-generated method stub
    //      ByteBuf in = (ByteBuf) msg;
            try {
                System.out.println(msg);
                String remsg = new String("has receive");
                ctx.write(remsg);
                ctx.flush();
            } catch (Exception e) {
                e.printStackTrace();
            }finally {
            }
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            // TODO Auto-generated method stub
            ctx.flush();
        }
    }
    
    

    5.Client端

    package cn.senninha.concurrent.client;
    
    import java.net.UnknownHostException;
    
    import cn.senninha.concurrent.code.MarshallingCodeFactory;
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.marshalling.MarshallingDecoder;
    import io.netty.handler.codec.marshalling.MarshallingEncoder;
    
    public class NettyClientMarshalling {
    
        
        private void bind(int port,String host){
            EventLoopGroup group = new NioEventLoopGroup();
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class)
            .option(ChannelOption.TCP_NODELAY, true).handler(new ClientHandlerInit());
            
            try {
                ChannelFuture f = b.connect(host, port).sync();
                f.channel().closeFuture().sync();
            } catch (Exception e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } finally{
                group.shutdownGracefully();
            }
            
            
            
        }
        
        private class ClientHandlerInit extends ChannelInitializer<SocketChannel>{
    
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                // TODO Auto-generated method stub
                //添加对marshalling框架的支持
                MarshallingEncoder encoder = MarshallingCodeFactory.getEncoder();
                MarshallingDecoder decoder = MarshallingCodeFactory.getDecoder();
                ch.pipeline().addLast(encoder);
                ch.pipeline().addLast(decoder);
                ch.pipeline().addLast(new ClientHandler());
            }
            
        }
    
        public static void main(String[] args) throws UnknownHostException {
            // TODO Auto-generated method stub
            NettyClientMarshalling client = new NettyClientMarshalling();
            client.bind(12580,"localhost");
        }
    
    }
    

    对应的clienthandler代码:

    package cn.senninha.concurrent.client;
    
    import cn.senninha.concurrent.code.model.UserInfo;
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    import javassist.bytecode.ByteArray;
    
    public class ClientHandler extends ChannelHandlerAdapter {
        private byte[] request = ("senninha" + System.getProperty("line.separator")).getBytes();
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            // TODO Auto-generated method stub
            System.out.println(msg);
        }
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            // TODO Auto-generated method stub
            for (int i = 0; i < 500; i++) {
                UserInfo userInfo = new UserInfo();
                userInfo.setAge(i + "year");
                userInfo.setUsername("senninha");
                ctx.write(userInfo);
                ctx.flush();
            }
            System.out.println("-----------------send over-----------------");
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            // TODO Auto-generated method stub
            System.out.println("error");
        }
    }
    
    

    6.运行

    UserInfo [username=senninha, age=0year]
    UserInfo [username=senninha, age=1year]
    UserInfo [username=senninha, age=2year]
    UserInfo [username=senninha, age=3year]
    UserInfo [username=senninha, age=4year]
    

    参考《netty权威指南》

    相关文章

      网友评论

          本文标题:基于netty的Marshalling序列化框架简单实现

          本文链接:https://www.haomeiwen.com/subject/fpopnttx.html