代码 http://download.csdn.net/download/linjiqian/9930627
Netty
<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>5.0.0.Alpha2</version>
</dependency>
编解码依赖
<!-- https://mvnrepository.com/artifact/org.jboss.marshalling/jboss-marshalling -->
<dependency>
<groupId>org.jboss.marshalling</groupId>
<artifactId>jboss-marshalling</artifactId>
<version>1.3.0.CR9</version>
</dependency>
序列化依赖
<!-- https://mvnrepository.com/artifact/org.jboss.marshalling/jboss-marshalling-serial -->
<dependency>
<groupId>org.jboss.marshalling</groupId>
<artifactId>jboss-marshalling-serial</artifactId>
<version>1.3.0.CR9</version>
<scope>test</scope>
</dependency>
对象 | 函数 | 说明 |
---|---|---|
EventLoopGroup | Netty内部都是通过线程在处理数据,EventLoopGroup用来管理调度线程,注册Channel,生命周期管理。 | |
Bootstrap | group(EventLoopGroup g1,EventLoopGroup g2) | 服务器端客户端,都须指定 g1 , 服务器端需要指定g1、g2, g1 用于处理客户端的连接请求; g2 用于处理与各个客户端连接的 IO 操作。在三次握手阶段的连接在g1,完成握手的移到g2。 |
ChannelHandlerAdapter | 事件触发处理相应业务 | |
SocketChannel.pipeline().addLast | 在管道末尾追加一个处理器 |
NettyServer.java
package thinglin.netty;
import java.io.File;
import java.io.FileOutputStream;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import thinglin.util.GzipUtils;
public class NettyServer {
public static void main(String[] args) throws Exception{
EventLoopGroup pGroup = new NioEventLoopGroup();
EventLoopGroup cGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(pGroup, cGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 511) //tcp缓冲队列
.option(ChannelOption.SO_RCVBUF, 409600) //读取缓冲
.option(ChannelOption.SO_SNDBUF, 409600) //写缓冲
//设置日志
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel sc) throws Exception {
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
sc.pipeline().addLast(new ChannelHandlerAdapter(){
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("ServerHandler channelActive");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Request req = (Request)msg;
System.out.println("ServerHandler : " + req.getId() + ", " + req.getName() + ", " + req.getRequestMessage());
byte[] attachment = GzipUtils.ungzip(req.getAttachment());
String path = System.getProperty("user.dir") + File.separatorChar + "receive" + File.separatorChar + req.getName();
FileOutputStream fos = new FileOutputStream(path);
fos.write(attachment);
fos.close();
Response resp = new Response();
resp.setId(req.getId());
resp.setName(req.getName());
resp.setResponseMessage("finish");
ctx.writeAndFlush(resp);
//ctx.addListener(ChannelFutureListener.CLOSE); //发送完毕就关闭连接
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("ServerHandler channelReadComplete");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("ServerHandler exceptionCaught");
ctx.close();
}
});
}
});
ChannelFuture cf = b.bind(18765).sync();
ChannelFuture cf2 = b.bind(18766).sync();//可绑定多个端口
cf.channel().closeFuture().sync();
cf2.channel().closeFuture().sync();
pGroup.shutdownGracefully();
cGroup.shutdownGracefully();
}
}
NettyClient.java
package thinglin.netty;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.ReferenceCountUtil;
import thinglin.util.GzipUtils;
import java.io.File;
import java.io.FileInputStream;
import java.util.UUID;
public class NettyClient {
public static void main(String[] args) throws Exception{
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
sc.pipeline().addLast(new ChannelHandlerAdapter(){
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {//连接
System.out.println("ClientHandler channelActive");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //读取
try {
Response resp = (Response)msg;
System.out.println("ClientHandler : " + resp.getId() + ", " + resp.getName() + ", " + resp.getResponseMessage());
} finally {
ReferenceCountUtil.release(msg); //如果是有write可以不release
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { //读取完
System.out.println("ClientHandler channelReadComplete");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {//连接断开
System.out.println("ClientHandler exceptionCaught");
ctx.close();
}
});
}
});
ChannelFuture cf = b.connect("127.0.0.1", 18765).sync();
Request req = new Request();
req.setId(UUID.randomUUID().toString());
String path = System.getProperty("user.dir") + File.separatorChar + "sources" + File.separatorChar + "Netty+3.1中文用户手册.doc";
File file = new File(path);
req.setName(file.getName());
req.setRequestMessage("file");
FileInputStream in = new FileInputStream(file);
byte[] data = new byte[in.available()];
in.read(data);
in.close();
req.setAttachment(GzipUtils.gzip(data));
cf.channel().writeAndFlush(req);
//发送两次,服务端会当成一次读取(TCP粘包)
cf.channel().writeAndFlush(req);
cf.channel().closeFuture().sync();
group.shutdownGracefully();
System.out.println("client filish");
}
}
对象传输序列化Marshalling工具
package thinglin.netty;
import io.netty.handler.codec.marshalling.DefaultMarshallerProvider;
import io.netty.handler.codec.marshalling.DefaultUnmarshallerProvider;
import io.netty.handler.codec.marshalling.MarshallerProvider;
import io.netty.handler.codec.marshalling.MarshallingDecoder;
import io.netty.handler.codec.marshalling.MarshallingEncoder;
import io.netty.handler.codec.marshalling.UnmarshallerProvider;
import org.jboss.marshalling.MarshallerFactory;
import org.jboss.marshalling.Marshalling;
import org.jboss.marshalling.MarshallingConfiguration;
public final class MarshallingCodeCFactory {
public static MarshallingDecoder buildMarshallingDecoder() {
//首先通过Marshalling工具类的精通方法获取Marshalling实例对象 参数serial标识创建的是java序列化工厂对象。
final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
//创建了MarshallingConfiguration对象,配置了版本号为5
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
//根据marshallerFactory和configuration创建provider
UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
//构建Netty的MarshallingDecoder对象,俩个参数分别为provider和单个消息序列化后的最大长度
MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024 * 1024 * 1);
return decoder;
}
public static MarshallingEncoder buildMarshallingEncoder() {
final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
//构建Netty的MarshallingEncoder对象,MarshallingEncoder用于实现序列化接口的POJO对象序列化为二进制数组
MarshallingEncoder encoder = new MarshallingEncoder(provider);
return encoder;
}
}
压缩工具GzipUtils
package thinglin.util;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
public class GzipUtils {
public static byte[] gzip(byte[] data) throws Exception{
ByteArrayOutputStream bos = new ByteArrayOutputStream();
GZIPOutputStream gzip = new GZIPOutputStream(bos);
gzip.write(data);
gzip.finish();
gzip.close();
byte[] ret = bos.toByteArray();
bos.close();
return ret;
}
public static byte[] ungzip(byte[] data) throws Exception{
ByteArrayInputStream bis = new ByteArrayInputStream(data);
GZIPInputStream gzip = new GZIPInputStream(bis);
byte[] buf = new byte[1024];
int num = -1;
ByteArrayOutputStream bos = new ByteArrayOutputStream();
while((num = gzip.read(buf, 0 , buf.length)) != -1 ){
bos.write(buf, 0, num);
}
gzip.close();
bis.close();
byte[] ret = bos.toByteArray();
bos.flush();
bos.close();
return ret;
}
public static void main(String[] args) throws Exception{
//读取文件
String readPath = System.getProperty("user.dir") + File.separatorChar + "sources" + File.separatorChar + "Netty+3.1中文用户手册.doc.jpg";
File file = new File(readPath);
FileInputStream in = new FileInputStream(file);
byte[] data = new byte[in.available()];
in.read(data);
in.close();
System.out.println("文件原始大小:" + data.length);
//测试压缩
byte[] ret1 = GzipUtils.gzip(data);
System.out.println("压缩之后大小:" + ret1.length);
byte[] ret2 = GzipUtils.ungzip(ret1);
System.out.println("还原之后大小:" + ret2.length);
//写出文件
String writePath = System.getProperty("user.dir") + File.separatorChar + "receive" + File.separatorChar + "Netty+3.1中文用户手册.doc.jpg";
FileOutputStream fos = new FileOutputStream(writePath);
fos.write(ret2);
fos.close();
}
}
请求数据Request.java
package thinglin.netty;
import java.io.Serializable;
public class Request implements Serializable{
private static final long SerialVersionUID = 1L;
private String id ;
private String name ;
private String requestMessage ;
private byte[] attachment;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getRequestMessage() {
return requestMessage;
}
public void setRequestMessage(String requestMessage) {
this.requestMessage = requestMessage;
}
public byte[] getAttachment() {
return attachment;
}
public void setAttachment(byte[] attachment) {
this.attachment = attachment;
}
}
响应对象Response
package thinglin.netty;
import java.io.Serializable;
public class Response implements Serializable{
private static final long serialVersionUID = 1L;
private String id;
private String name;
private String responseMessage;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getResponseMessage() {
return responseMessage;
}
public void setResponseMessage(String responseMessage) {
this.responseMessage = responseMessage;
}
}
网友评论