美文网首页Java
Netty 对象传输

Netty 对象传输

作者: ThingLin | 来源:发表于2017-08-12 13:43 被阅读39次

    代码 http://download.csdn.net/download/linjiqian/9930627

    Netty

    <!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
        <version>5.0.0.Alpha2</version>
    </dependency>
    

    编解码依赖

    <!-- https://mvnrepository.com/artifact/org.jboss.marshalling/jboss-marshalling -->
    <dependency>
        <groupId>org.jboss.marshalling</groupId>
        <artifactId>jboss-marshalling</artifactId>
        <version>1.3.0.CR9</version>
    </dependency>
    
    

    序列化依赖

    <!-- https://mvnrepository.com/artifact/org.jboss.marshalling/jboss-marshalling-serial -->
    <dependency>
        <groupId>org.jboss.marshalling</groupId>
        <artifactId>jboss-marshalling-serial</artifactId>
        <version>1.3.0.CR9</version>
        <scope>test</scope>
    </dependency>
    
    对象 函数 说明
    EventLoopGroup Netty内部都是通过线程在处理数据,EventLoopGroup用来管理调度线程,注册Channel,生命周期管理。
    Bootstrap group(EventLoopGroup g1,EventLoopGroup g2) 服务器端客户端,都须指定 g1 , 服务器端需要指定g1、g2, g1 用于处理客户端的连接请求; g2 用于处理与各个客户端连接的 IO 操作。在三次握手阶段的连接在g1,完成握手的移到g2。
    ChannelHandlerAdapter 事件触发处理相应业务
    SocketChannel.pipeline().addLast 在管道末尾追加一个处理器

    NettyServer.java

    
    package thinglin.netty;
    
    import java.io.File;
    import java.io.FileOutputStream;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    import thinglin.util.GzipUtils;
    
    public class NettyServer {
    
        public static void main(String[] args) throws Exception{
            
            EventLoopGroup pGroup = new NioEventLoopGroup();
            EventLoopGroup cGroup = new NioEventLoopGroup();
            
            ServerBootstrap b = new ServerBootstrap();
            b.group(pGroup, cGroup)
             .channel(NioServerSocketChannel.class)
             .option(ChannelOption.SO_BACKLOG, 511) //tcp缓冲队列
             .option(ChannelOption.SO_RCVBUF, 409600) //读取缓冲
             .option(ChannelOption.SO_SNDBUF, 409600) //写缓冲
             //设置日志
             .handler(new LoggingHandler(LogLevel.INFO))
             .childHandler(new ChannelInitializer<SocketChannel>() {
                protected void initChannel(SocketChannel sc) throws Exception {
                    sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                    sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                    sc.pipeline().addLast(new ChannelHandlerAdapter(){
                        @Override
                        public void channelActive(ChannelHandlerContext ctx) throws Exception {
                            System.out.println("ServerHandler channelActive");
                        }
    
                        @Override
                        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                            Request req = (Request)msg;
                            System.out.println("ServerHandler : " + req.getId() + ", " + req.getName() + ", " + req.getRequestMessage());
                            byte[] attachment = GzipUtils.ungzip(req.getAttachment());
                            
                            String path = System.getProperty("user.dir") + File.separatorChar + "receive" +  File.separatorChar + req.getName();
                            FileOutputStream fos = new FileOutputStream(path);
                            fos.write(attachment);
                            fos.close();
                            
                            Response resp = new Response();
                            resp.setId(req.getId());
                            resp.setName(req.getName());
                            resp.setResponseMessage("finish");
                            ctx.writeAndFlush(resp);
                            //ctx.addListener(ChannelFutureListener.CLOSE); //发送完毕就关闭连接
                        }
    
                        @Override
                        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
                            System.out.println("ServerHandler channelReadComplete");
                        }
    
                        @Override
                        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                            System.out.println("ServerHandler exceptionCaught");
                            ctx.close();
                        }
                    });
                }
            });
            
            ChannelFuture cf = b.bind(18765).sync();
            ChannelFuture cf2 = b.bind(18766).sync();//可绑定多个端口
            
            cf.channel().closeFuture().sync();
            cf2.channel().closeFuture().sync();
            pGroup.shutdownGracefully();
            cGroup.shutdownGracefully();
            
        }
    }
    
    
    

    NettyClient.java

    package thinglin.netty;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.util.ReferenceCountUtil;
    import thinglin.util.GzipUtils;
    
    import java.io.File;
    import java.io.FileInputStream;
    import java.util.UUID;
    
    
    public class NettyClient {
    
        
        public static void main(String[] args) throws Exception{
            
            EventLoopGroup group = new NioEventLoopGroup();
            Bootstrap b = new Bootstrap();
            b.group(group)
             .channel(NioSocketChannel.class)
             .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel sc) throws Exception {
                    sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                    sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                    sc.pipeline().addLast(new ChannelHandlerAdapter(){
                        @Override
                        public void channelActive(ChannelHandlerContext ctx) throws Exception {//连接
                            System.out.println("ClientHandler channelActive");
                        }
    
                        @Override
                        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //读取
                            try {
                                Response resp = (Response)msg;
                                System.out.println("ClientHandler : " + resp.getId() + ", " + resp.getName() + ", " + resp.getResponseMessage());           
                            } finally {
                                ReferenceCountUtil.release(msg); //如果是有write可以不release
                            }
                        }
    
                        @Override
                        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { //读取完
                            System.out.println("ClientHandler channelReadComplete");
                        }
    
                        @Override
                        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {//连接断开
                            System.out.println("ClientHandler exceptionCaught");
                            ctx.close();
                        }
                    });
                }
            });
            
            ChannelFuture cf = b.connect("127.0.0.1", 18765).sync();
            
            Request req = new Request();
            req.setId(UUID.randomUUID().toString());
            String path = System.getProperty("user.dir") + File.separatorChar + "sources" +  File.separatorChar + "Netty+3.1中文用户手册.doc";
            File file = new File(path);
            req.setName(file.getName());
            req.setRequestMessage("file");  
            FileInputStream in = new FileInputStream(file);  
            byte[] data = new byte[in.available()];  
            in.read(data);  
            in.close(); 
            req.setAttachment(GzipUtils.gzip(data));
            cf.channel().writeAndFlush(req);
            //发送两次,服务端会当成一次读取(TCP粘包)
            cf.channel().writeAndFlush(req); 
    
            cf.channel().closeFuture().sync();
            group.shutdownGracefully();
            System.out.println("client filish");
        }
    }
    
    
    

    对象传输序列化Marshalling工具

    
    package thinglin.netty;
    
    import io.netty.handler.codec.marshalling.DefaultMarshallerProvider;
    import io.netty.handler.codec.marshalling.DefaultUnmarshallerProvider;
    import io.netty.handler.codec.marshalling.MarshallerProvider;
    import io.netty.handler.codec.marshalling.MarshallingDecoder;
    import io.netty.handler.codec.marshalling.MarshallingEncoder;
    import io.netty.handler.codec.marshalling.UnmarshallerProvider;
    
    import org.jboss.marshalling.MarshallerFactory;
    import org.jboss.marshalling.Marshalling;
    import org.jboss.marshalling.MarshallingConfiguration;
    
    public final class MarshallingCodeCFactory {
    
        public static MarshallingDecoder buildMarshallingDecoder() {
            //首先通过Marshalling工具类的精通方法获取Marshalling实例对象 参数serial标识创建的是java序列化工厂对象。
            final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
            //创建了MarshallingConfiguration对象,配置了版本号为5 
            final MarshallingConfiguration configuration = new MarshallingConfiguration();
            configuration.setVersion(5);
            //根据marshallerFactory和configuration创建provider
            UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
            //构建Netty的MarshallingDecoder对象,俩个参数分别为provider和单个消息序列化后的最大长度
            MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024 * 1024 * 1);
            return decoder;
        }
    
        public static MarshallingEncoder buildMarshallingEncoder() {
            final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
            final MarshallingConfiguration configuration = new MarshallingConfiguration();
            configuration.setVersion(5);
            MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
            //构建Netty的MarshallingEncoder对象,MarshallingEncoder用于实现序列化接口的POJO对象序列化为二进制数组
            MarshallingEncoder encoder = new MarshallingEncoder(provider);
            return encoder;
        }
    }
    
    

    压缩工具GzipUtils

    package thinglin.util;
    
    import java.io.ByteArrayInputStream;
    import java.io.ByteArrayOutputStream;
    import java.io.File;
    import java.io.FileInputStream;
    import java.io.FileOutputStream;
    import java.util.zip.GZIPInputStream;
    import java.util.zip.GZIPOutputStream;
    
    public class GzipUtils {
    
        public static byte[] gzip(byte[] data) throws Exception{
            ByteArrayOutputStream bos = new ByteArrayOutputStream();
            GZIPOutputStream gzip = new GZIPOutputStream(bos);
            gzip.write(data);
            gzip.finish();
            gzip.close();
            byte[] ret = bos.toByteArray();
            bos.close();
            return ret;
        }
        
        public static byte[] ungzip(byte[] data) throws Exception{
            ByteArrayInputStream bis = new ByteArrayInputStream(data);
            GZIPInputStream gzip = new GZIPInputStream(bis);
            byte[] buf = new byte[1024];
            int num = -1;
            ByteArrayOutputStream bos = new ByteArrayOutputStream();
            while((num = gzip.read(buf, 0 , buf.length)) != -1 ){
                bos.write(buf, 0, num);
            }
            gzip.close();
            bis.close();
            byte[] ret = bos.toByteArray();
            bos.flush();
            bos.close();
            return ret;
        }
        
        public static void main(String[] args) throws Exception{
            
            //读取文件
            String readPath = System.getProperty("user.dir") + File.separatorChar + "sources" +  File.separatorChar + "Netty+3.1中文用户手册.doc.jpg";
            File file = new File(readPath);  
            FileInputStream in = new FileInputStream(file);  
            byte[] data = new byte[in.available()];  
            in.read(data);  
            in.close();  
            
            System.out.println("文件原始大小:" + data.length);
            //测试压缩
            
            byte[] ret1 = GzipUtils.gzip(data);
            System.out.println("压缩之后大小:" + ret1.length);
            
            byte[] ret2 = GzipUtils.ungzip(ret1);
            System.out.println("还原之后大小:" + ret2.length);
            
            //写出文件
            String writePath = System.getProperty("user.dir") + File.separatorChar + "receive" +  File.separatorChar + "Netty+3.1中文用户手册.doc.jpg";
            FileOutputStream fos = new FileOutputStream(writePath);
            fos.write(ret2);
            fos.close();        
            
            
        }
    }
    

    请求数据Request.java

    package thinglin.netty;
    
    import java.io.Serializable;
    
    public class Request implements Serializable{
    
        private static final long  SerialVersionUID = 1L;
        
        private String id ;
        private String name ;
        private String requestMessage ;
        private byte[] attachment;
        
        public String getId() {
            return id;
        }
        public void setId(String id) {
            this.id = id;
        }
        public String getName() {
            return name;
        }
        public void setName(String name) {
            this.name = name;
        }
        public String getRequestMessage() {
            return requestMessage;
        }
        public void setRequestMessage(String requestMessage) {
            this.requestMessage = requestMessage;
        }
        public byte[] getAttachment() {
            return attachment;
        }
        public void setAttachment(byte[] attachment) {
            this.attachment = attachment;
        }
    }
    
    

    响应对象Response

    
    package thinglin.netty;
    
    import java.io.Serializable;
    
    public class Response implements Serializable{
        
        private static final long serialVersionUID = 1L;
        
        private String id;
        private String name;
        private String responseMessage;
        
        public String getId() {
            return id;
        }
        public void setId(String id) {
            this.id = id;
        }
        public String getName() {
            return name;
        }
        public void setName(String name) {
            this.name = name;
        }
        public String getResponseMessage() {
            return responseMessage;
        }
        public void setResponseMessage(String responseMessage) {
            this.responseMessage = responseMessage;
        }
    
    }
    

    相关文章

      网友评论

        本文标题:Netty 对象传输

        本文链接:https://www.haomeiwen.com/subject/fxgrrxtx.html