本章要点:
- java序列化缺点
- 业界流行的集中编解码框架介绍
3.1 java序列化缺点
- 无法跨越语言,是java序列化最致命的缺点;
- 序列化后的码流太大
- 序列化性能太低
3.2 主流编辑码框架
3.2.1 Google Protobuf编解码
Protobuf在业界非常流行,Protobuf具有以下有点:
- 产品成熟
- 跨语言、支持多种语言
- 编码后消息更小,便与存储和传输
- 编解码性能高、
- 支持定义可选和必选字段
Protobuf是一个灵活、高效、结构化的数据序列化框架,相比于XML等传统的序列化工具,它更小、更快、更简单。
https://github.com/google/protobuf/releases
下载protof工具,protoc-3.5.1-win32.zip,并解压,会看到protoc.exe工具。
User.proto的文件内容如下:
package com.bj58.wuxian.protobuf;
option java_outer_classname = "UserProto";
message User{
required int32 id=1;
required string username=2;
required string password=3;
enum Sex{
nan=1;
nv=2;
}
required Sex sex=4;
}
执行如下命令:
D:/develop/protoc-3.5.1-win32/bin/protoc.exe -I=./proto --java_out=D:/develop/protoc-3.5.1-win32/bin/proto/ ./proto/User.proto
生成了一个UserProto.java文件。
protoc.exe -I=proto的输入目录 --java_out=java类输出目录 proto的输入目录包括包括proto文件
此时将UserProto.java文件拷贝到IDE中会报错,那是因为缺少包依赖:
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.5.1</version>
</dependency>
protobuf序列化传输代码实例:
UserServer代码:
package com.bj58.wuxian.netty.codec.protobuf;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import com.bj58.wuxian.protobuf.UserProto;
public class UserServer {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup=new NioEventLoopGroup();
EventLoopGroup workGroup=new NioEventLoopGroup();
try {
ServerBootstrap bootstrap=new ServerBootstrap();
bootstrap.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());//用于半包处理
ch.pipeline().addLast(new ProtobufDecoder(UserProto.User.getDefaultInstance()));//ProtobufDecoder参数是所要解码的目标类
ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
ch.pipeline().addLast(new ProtobufEncoder());
ch.pipeline().addLast(new UserServerHandler());
}
});
ChannelFuture f=bootstrap.bind(8888).sync();
f.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
}finally{
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
UserServerHandler 代码:
package com.bj58.wuxian.netty.codec.protobuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import com.bj58.wuxian.protobuf.UserProto;
import com.bj58.wuxian.protobuf.UserProto.User.Sex;
public class UserServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("**********server channelActive*************");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
UserProto.User user =(UserProto.User)msg;
System.out.println("request:"+msg);
UserProto.User.Builder builder = UserProto.User.newBuilder();
builder.setId(123);
builder.setSex(Sex.nv);
builder.setPassword("45678");
if("zhaoshichao".equalsIgnoreCase(user.getUsername())){
builder.setUsername("shangjing");
}else{
builder.setUsername("guanggunhan");
}
UserProto.User wife = builder.build();
ctx.writeAndFlush(wife);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
cause.printStackTrace();
ctx.close();
}
}
UserClient 代码:
package com.bj58.wuxian.netty.codec.protobuf;
import com.bj58.wuxian.protobuf.UserProto;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
public class UserClient {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup group=new NioEventLoopGroup();
try {
Bootstrap bootstrap=new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());
ch.pipeline().addLast(new ProtobufDecoder(UserProto.User.getDefaultInstance()));
ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
ch.pipeline().addLast(new ProtobufEncoder());
ch.pipeline().addLast(new UserClientHandler());
}
});
ChannelFuture f=bootstrap.connect("127.0.0.1",8888).sync();
f.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
}finally{
group.shutdownGracefully();
}
}
}
UserClientHandler 代码:
package com.bj58.wuxian.netty.codec.protobuf;
import com.bj58.wuxian.protobuf.UserProto;
import com.bj58.wuxian.protobuf.UserProto.User.Sex;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class UserClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
for(int i=0;i<=5;i++){
UserProto.User.Builder builder = UserProto.User.newBuilder();
builder.setId(123);
builder.setSex(Sex.nan);
builder.setPassword("45678");
if(i%2==0){
builder.setUsername("zhaoshichao");
}else{
builder.setUsername("zhaoshichao"+i);
}
UserProto.User wife = builder.build();
ctx.writeAndFlush(wife);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
System.out.println("response:"+(UserProto.User)msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
cause.printStackTrace();
ctx.close();
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
}
3.2.1 Message Pack编解码
MessagePack特点如下:
- 编解码高效,性能高
- 序列化之后的码流小
- 支持跨语言
添加pom依赖:
<dependency>
<groupId>org.msgpack</groupId>
<artifactId>msgpack</artifactId>
<version>0.6.12</version>
</dependency>
自定义编码器:
package com.bj58.wuxian.netty.codec.msgpack;
import org.msgpack.MessagePack;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
public class MsgPackEncoder extends MessageToByteEncoder<Object> {
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out)
throws Exception {
MessagePack msfPack=new MessagePack();
byte[] bytes=msfPack.write(msg);
out.writeBytes(bytes);
}
}
自定义解码器:
package com.bj58.wuxian.netty.codec.msgpack;
import java.util.List;
import org.msgpack.MessagePack;
import com.bj58.wuxian.msgpack.model.User;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
public class MsgPackDecoder extends MessageToMessageDecoder<ByteBuf> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg,
List<Object> out) throws Exception {
byte[] bytes=new byte[msg.readableBytes()];
msg.readBytes(bytes);
MessagePack messagePack=new MessagePack();
out.add(messagePack.read(bytes,User.class));
}
}
UserServer代码:
package com.bj58.wuxian.netty.codec.msgpack;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
public class UserServer {
public static void main(String[] argsStrings) throws Exception {
// 配置服务端NIO线程组(boss线程、worker线程)
EventLoopGroup bGroup = new NioEventLoopGroup();
EventLoopGroup wGroup = new NioEventLoopGroup();
// 创建启动辅助类
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bGroup, wGroup).channel(NioServerSocketChannel.class).option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
// 添加对象系列化编解码器,同时提供粘包拆包支持
channel.pipeline().addLast(new LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 2));
channel.pipeline().addLast("解码器", new MsgPackDecoder());
channel.pipeline().addLast(new LengthFieldPrepender(2));
channel.pipeline().addLast("编码器", new MsgPackEncoder());
channel.pipeline().addLast(new UserServerHandler());
}
});
try {
// 监听本地端口,同步等待监听结果
ChannelFuture future = bootstrap.bind(8888).sync();
// 等待服务端监听端口关闭,优雅退出
future.channel().closeFuture().sync();
} finally {
bGroup.shutdownGracefully();
wGroup.shutdownGracefully();
}
}
}
ServerHanler代码:
package com.bj58.wuxian.netty.codec.msgpack;
import com.bj58.wuxian.msgpack.model.User;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class UserServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("**********server channelActive**********");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
User user=(User)msg;
User wife=new User();
if("zhaoshichao".equals(user.getUsername())){
wife.setPassword("1234");
wife.setUsername("shangjing");;
}else{
wife.setPassword("1234");
wife.setUsername("others");
}
ctx.writeAndFlush(wife);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
User客户端:
package com.bj58.wuxian.netty.codec.msgpack;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
public class UserClient {
public static void main(String [] argsStrings) throws Exception {
//配置客户端端NIO线程组
EventLoopGroup bGroup = new NioEventLoopGroup();
//创建客户端启动辅助类
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(bGroup).
channel(NioSocketChannel.class).
option(ChannelOption.TCP_NODELAY, true).
option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000).
handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
//添加对象系列化编解码器,同时提供粘包拆包支持
channel.pipeline().addLast(new LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 2));
channel.pipeline().addLast("解码器", new MsgPackDecoder());
channel.pipeline().addLast(new LengthFieldPrepender(2));
channel.pipeline().addLast("编码器", new MsgPackEncoder());
channel.pipeline().addLast(new UserClientHandler());
}
});
//发起异步连接
ChannelFuture future = bootstrap.connect("127.0.0.1", 8888).sync();
try {
//等待客户端链路关闭
future.channel().closeFuture().sync();
} finally {
//优雅退出,释放资源
bGroup.shutdownGracefully();
}
}
}
ClientHandler代码:
package com.bj58.wuxian.netty.codec.msgpack;
import com.bj58.wuxian.msgpack.model.User;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class UserClientHandler extends ChannelInboundHandlerAdapter{
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
for(int i=0;i<=5;i++){
User user=new User();
user.setId(123);
user.setPassword("45678");
if(i%2==0){
user.setUsername("zhaoshichao");
}else{
user.setUsername("zhaoshichao"+i);
}
ctx.writeAndFlush(user);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
System.out.println("response:"+(User)msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
cause.printStackTrace();
ctx.close();
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
}
网友评论