美文网首页Java
Netty 对象传输

Netty 对象传输

作者: ThingLin | 来源:发表于2017-08-12 13:43 被阅读39次

代码 http://download.csdn.net/download/linjiqian/9930627

Netty

<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>5.0.0.Alpha2</version>
</dependency>

编解码依赖

<!-- https://mvnrepository.com/artifact/org.jboss.marshalling/jboss-marshalling -->
<dependency>
    <groupId>org.jboss.marshalling</groupId>
    <artifactId>jboss-marshalling</artifactId>
    <version>1.3.0.CR9</version>
</dependency>

序列化依赖

<!-- https://mvnrepository.com/artifact/org.jboss.marshalling/jboss-marshalling-serial -->
<dependency>
    <groupId>org.jboss.marshalling</groupId>
    <artifactId>jboss-marshalling-serial</artifactId>
    <version>1.3.0.CR9</version>
    <scope>test</scope>
</dependency>
对象 函数 说明
EventLoopGroup Netty内部都是通过线程在处理数据,EventLoopGroup用来管理调度线程,注册Channel,生命周期管理。
Bootstrap group(EventLoopGroup g1,EventLoopGroup g2) 服务器端客户端,都须指定 g1 , 服务器端需要指定g1、g2, g1 用于处理客户端的连接请求; g2 用于处理与各个客户端连接的 IO 操作。在三次握手阶段的连接在g1,完成握手的移到g2。
ChannelHandlerAdapter 事件触发处理相应业务
SocketChannel.pipeline().addLast 在管道末尾追加一个处理器

NettyServer.java


package thinglin.netty;

import java.io.File;
import java.io.FileOutputStream;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import thinglin.util.GzipUtils;

public class NettyServer {

    public static void main(String[] args) throws Exception{
        
        EventLoopGroup pGroup = new NioEventLoopGroup();
        EventLoopGroup cGroup = new NioEventLoopGroup();
        
        ServerBootstrap b = new ServerBootstrap();
        b.group(pGroup, cGroup)
         .channel(NioServerSocketChannel.class)
         .option(ChannelOption.SO_BACKLOG, 511) //tcp缓冲队列
         .option(ChannelOption.SO_RCVBUF, 409600) //读取缓冲
         .option(ChannelOption.SO_SNDBUF, 409600) //写缓冲
         //设置日志
         .handler(new LoggingHandler(LogLevel.INFO))
         .childHandler(new ChannelInitializer<SocketChannel>() {
            protected void initChannel(SocketChannel sc) throws Exception {
                sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                sc.pipeline().addLast(new ChannelHandlerAdapter(){
                    @Override
                    public void channelActive(ChannelHandlerContext ctx) throws Exception {
                        System.out.println("ServerHandler channelActive");
                    }

                    @Override
                    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                        Request req = (Request)msg;
                        System.out.println("ServerHandler : " + req.getId() + ", " + req.getName() + ", " + req.getRequestMessage());
                        byte[] attachment = GzipUtils.ungzip(req.getAttachment());
                        
                        String path = System.getProperty("user.dir") + File.separatorChar + "receive" +  File.separatorChar + req.getName();
                        FileOutputStream fos = new FileOutputStream(path);
                        fos.write(attachment);
                        fos.close();
                        
                        Response resp = new Response();
                        resp.setId(req.getId());
                        resp.setName(req.getName());
                        resp.setResponseMessage("finish");
                        ctx.writeAndFlush(resp);
                        //ctx.addListener(ChannelFutureListener.CLOSE); //发送完毕就关闭连接
                    }

                    @Override
                    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
                        System.out.println("ServerHandler channelReadComplete");
                    }

                    @Override
                    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                        System.out.println("ServerHandler exceptionCaught");
                        ctx.close();
                    }
                });
            }
        });
        
        ChannelFuture cf = b.bind(18765).sync();
        ChannelFuture cf2 = b.bind(18766).sync();//可绑定多个端口
        
        cf.channel().closeFuture().sync();
        cf2.channel().closeFuture().sync();
        pGroup.shutdownGracefully();
        cGroup.shutdownGracefully();
        
    }
}


NettyClient.java

package thinglin.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.ReferenceCountUtil;
import thinglin.util.GzipUtils;

import java.io.File;
import java.io.FileInputStream;
import java.util.UUID;


public class NettyClient {

    
    public static void main(String[] args) throws Exception{
        
        EventLoopGroup group = new NioEventLoopGroup();
        Bootstrap b = new Bootstrap();
        b.group(group)
         .channel(NioSocketChannel.class)
         .handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel sc) throws Exception {
                sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                sc.pipeline().addLast(new ChannelHandlerAdapter(){
                    @Override
                    public void channelActive(ChannelHandlerContext ctx) throws Exception {//连接
                        System.out.println("ClientHandler channelActive");
                    }

                    @Override
                    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //读取
                        try {
                            Response resp = (Response)msg;
                            System.out.println("ClientHandler : " + resp.getId() + ", " + resp.getName() + ", " + resp.getResponseMessage());           
                        } finally {
                            ReferenceCountUtil.release(msg); //如果是有write可以不release
                        }
                    }

                    @Override
                    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { //读取完
                        System.out.println("ClientHandler channelReadComplete");
                    }

                    @Override
                    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {//连接断开
                        System.out.println("ClientHandler exceptionCaught");
                        ctx.close();
                    }
                });
            }
        });
        
        ChannelFuture cf = b.connect("127.0.0.1", 18765).sync();
        
        Request req = new Request();
        req.setId(UUID.randomUUID().toString());
        String path = System.getProperty("user.dir") + File.separatorChar + "sources" +  File.separatorChar + "Netty+3.1中文用户手册.doc";
        File file = new File(path);
        req.setName(file.getName());
        req.setRequestMessage("file");  
        FileInputStream in = new FileInputStream(file);  
        byte[] data = new byte[in.available()];  
        in.read(data);  
        in.close(); 
        req.setAttachment(GzipUtils.gzip(data));
        cf.channel().writeAndFlush(req);
        //发送两次,服务端会当成一次读取(TCP粘包)
        cf.channel().writeAndFlush(req); 

        cf.channel().closeFuture().sync();
        group.shutdownGracefully();
        System.out.println("client filish");
    }
}


对象传输序列化Marshalling工具


package thinglin.netty;

import io.netty.handler.codec.marshalling.DefaultMarshallerProvider;
import io.netty.handler.codec.marshalling.DefaultUnmarshallerProvider;
import io.netty.handler.codec.marshalling.MarshallerProvider;
import io.netty.handler.codec.marshalling.MarshallingDecoder;
import io.netty.handler.codec.marshalling.MarshallingEncoder;
import io.netty.handler.codec.marshalling.UnmarshallerProvider;

import org.jboss.marshalling.MarshallerFactory;
import org.jboss.marshalling.Marshalling;
import org.jboss.marshalling.MarshallingConfiguration;

public final class MarshallingCodeCFactory {

    public static MarshallingDecoder buildMarshallingDecoder() {
        //首先通过Marshalling工具类的精通方法获取Marshalling实例对象 参数serial标识创建的是java序列化工厂对象。
        final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
        //创建了MarshallingConfiguration对象,配置了版本号为5 
        final MarshallingConfiguration configuration = new MarshallingConfiguration();
        configuration.setVersion(5);
        //根据marshallerFactory和configuration创建provider
        UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
        //构建Netty的MarshallingDecoder对象,俩个参数分别为provider和单个消息序列化后的最大长度
        MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024 * 1024 * 1);
        return decoder;
    }

    public static MarshallingEncoder buildMarshallingEncoder() {
        final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
        final MarshallingConfiguration configuration = new MarshallingConfiguration();
        configuration.setVersion(5);
        MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
        //构建Netty的MarshallingEncoder对象,MarshallingEncoder用于实现序列化接口的POJO对象序列化为二进制数组
        MarshallingEncoder encoder = new MarshallingEncoder(provider);
        return encoder;
    }
}

压缩工具GzipUtils

package thinglin.util;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;

public class GzipUtils {

    public static byte[] gzip(byte[] data) throws Exception{
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        GZIPOutputStream gzip = new GZIPOutputStream(bos);
        gzip.write(data);
        gzip.finish();
        gzip.close();
        byte[] ret = bos.toByteArray();
        bos.close();
        return ret;
    }
    
    public static byte[] ungzip(byte[] data) throws Exception{
        ByteArrayInputStream bis = new ByteArrayInputStream(data);
        GZIPInputStream gzip = new GZIPInputStream(bis);
        byte[] buf = new byte[1024];
        int num = -1;
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        while((num = gzip.read(buf, 0 , buf.length)) != -1 ){
            bos.write(buf, 0, num);
        }
        gzip.close();
        bis.close();
        byte[] ret = bos.toByteArray();
        bos.flush();
        bos.close();
        return ret;
    }
    
    public static void main(String[] args) throws Exception{
        
        //读取文件
        String readPath = System.getProperty("user.dir") + File.separatorChar + "sources" +  File.separatorChar + "Netty+3.1中文用户手册.doc.jpg";
        File file = new File(readPath);  
        FileInputStream in = new FileInputStream(file);  
        byte[] data = new byte[in.available()];  
        in.read(data);  
        in.close();  
        
        System.out.println("文件原始大小:" + data.length);
        //测试压缩
        
        byte[] ret1 = GzipUtils.gzip(data);
        System.out.println("压缩之后大小:" + ret1.length);
        
        byte[] ret2 = GzipUtils.ungzip(ret1);
        System.out.println("还原之后大小:" + ret2.length);
        
        //写出文件
        String writePath = System.getProperty("user.dir") + File.separatorChar + "receive" +  File.separatorChar + "Netty+3.1中文用户手册.doc.jpg";
        FileOutputStream fos = new FileOutputStream(writePath);
        fos.write(ret2);
        fos.close();        
        
        
    }
}

请求数据Request.java

package thinglin.netty;

import java.io.Serializable;

public class Request implements Serializable{

    private static final long  SerialVersionUID = 1L;
    
    private String id ;
    private String name ;
    private String requestMessage ;
    private byte[] attachment;
    
    public String getId() {
        return id;
    }
    public void setId(String id) {
        this.id = id;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public String getRequestMessage() {
        return requestMessage;
    }
    public void setRequestMessage(String requestMessage) {
        this.requestMessage = requestMessage;
    }
    public byte[] getAttachment() {
        return attachment;
    }
    public void setAttachment(byte[] attachment) {
        this.attachment = attachment;
    }
}

响应对象Response


package thinglin.netty;

import java.io.Serializable;

public class Response implements Serializable{
    
    private static final long serialVersionUID = 1L;
    
    private String id;
    private String name;
    private String responseMessage;
    
    public String getId() {
        return id;
    }
    public void setId(String id) {
        this.id = id;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public String getResponseMessage() {
        return responseMessage;
    }
    public void setResponseMessage(String responseMessage) {
        this.responseMessage = responseMessage;
    }

}

相关文章

  • Netty 对象传输

    代码 http://download.csdn.net/download/linjiqian/9930627 Ne...

  • Netty笔记之八:自定义通信协议

    Netty中双方建立通信之后,对象数据会按照ByteBuf字节码的方式进行传输。 自定义一种通信协议,协议将传输数...

  • netty案例,netty4.1中级拓展篇二《Netty使用Pr

    前言介绍 在netty数据传输过程中可以有很多选择,比如;字符串、json、xml、java对象,但为了保证传输的...

  • 2.netty传输

    Netty传输: NIO:非阻塞传输OIO:阻塞传输Local:JVM内部传输Embedded:测试Channel...

  • netty通信框架

    Netty入门教程——认识Netty彻底理解Netty,这一篇文章就够了 Netty特点 并发高 传输快 封装好 ...

  • netty系列之:netty中的核心编码器base64

    简介 我们知道数据在netty中传输是以ByteBuf的形式进行的,可以说ByteBuf是netty的数据传输基础...

  • (3)深入理解 RPC 之传输篇

    一、RpcRequest 和 RpcResponse 二、Socket传输 三、Netty 传输 四、同步与异步 ...

  • 5.dubbo源码-编码&解码

    Netty编码申明 由于dubbo底层使用Netty作为网络传输框架,所以如果需要编码的话,可以通过继承netty...

  • Netty学习--传输

    传输迁移 未使用Netty 的阻塞网络编程 未使用Netty 的异步网络编程 使用Netty 的阻塞网络处理 使用...

  • netty的学习

    title: netty的学习tags: [netty] 最近写android的tcp通信,用来传输文件,用原生的...

网友评论

    本文标题:Netty 对象传输

    本文链接:https://www.haomeiwen.com/subject/fxgrrxtx.html