写在前面
Netty是高性能的网络框架,同样Proto Buf是高性能的编解码框架。两者的搭配可以说是恰到好处。
高性能的网络框架,也会碰到如何编解码组织消息,如何处理tcp拆包粘包的问题?
高性能的编解码框架,如果没有网络框架搭配,那他的高性能优势也荡然无存。
选择Netty+ProtoBuf的优秀开源项目也很多,比如Pulsar,Bookkeeper都使用了Netty+ProtoBuf。
准备Proto文件
首先,书写一个简单地.proto协议文件
syntax = "proto2";
package demo.netty;
option java_package = "com.github.shoothzj.demo.netty.protobuf.module";
option java_outer_classname = "ProtoBufModule";
message Request {
required string first_name = 1;
required string last_name = 2;
}
message Response {
required int32 status = 1;
}
然后生成它的java类
protoc -I=proto --java_out=java proto/DemoNettyProtocol.proto
书写服务端Netty的handler类,简单地接收到请求发送响应
package com.github.shoothzj.demo.netty.protobuf;
import com.github.shoothzj.demo.netty.protobuf.module.ProtoBufModule;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;
/**
* @author hezhangjian
*/
@Slf4j
public class ServerHandler extends SimpleChannelInboundHandler<ProtoBufModule.Request> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ProtoBufModule.Request request) throws Exception {
log.info("request [{}]", request);
final ProtoBufModule.Response response = ProtoBufModule.Response.newBuilder().setStatus(200).build();
channelHandlerContext.channel().writeAndFlush(response);
}
}
拉起Netty服务端
package com.github.shoothzj.demo.netty.protobuf;
import com.github.shoothzj.demo.netty.protobuf.module.ProtoBufModule;
import com.github.shoothzj.javatool.util.LogUtil;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
import lombok.extern.slf4j.Slf4j;
/**
* @author hezhangjian
*/
@Slf4j
public class ProtoServer {
private static final int MAX_FRAME_SIZE = Integer.MAX_VALUE;
public static void main(String[] args) throws Exception {
LogUtil.configureLog();
// Configure the server.
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100);
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
final ChannelPipeline pipeline = socketChannel.pipeline();
//解码器,通过Google Protocol Buffers序列化框架动态的切割接收到的ByteBuf
pipeline.addLast(new ProtobufVarint32FrameDecoder());
//服务器端接收的是客户端RequestUser对象,所以这边将接收对象进行解码生产实列
pipeline.addLast(new ProtobufDecoder(ProtoBufModule.Request.getDefaultInstance()));
//Google Protocol Buffers编码器
pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
//Google Protocol Buffers编码器
pipeline.addLast(new ProtobufEncoder());
pipeline.addLast(new ServerHandler());
}
});
// Start the server.
ChannelFuture f = bootstrap.bind(9997).sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
客户端handler,当连接建立时发送request,收到响应后打印出来
package com.github.shoothzj.demo.netty.protobuf;
import com.github.shoothzj.demo.netty.protobuf.module.ProtoBufModule;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;
/**
* @author hezhangjian
*/
@Slf4j
public class ClientHandler extends SimpleChannelInboundHandler<ProtoBufModule.Response> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ProtoBufModule.Response response) throws Exception {
log.info("response is [{}]", response);
}
/**
* 当channel激活的时候,客户端立刻发出请求
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
final ProtoBufModule.Request request = ProtoBufModule.Request.newBuilder().setFirstName("Akka").setLastName("Scala").build();
ctx.channel().writeAndFlush(request);
}
}
客户端主函数:
package com.github.shoothzj.demo.netty.protobuf;
import com.github.shoothzj.demo.netty.protobuf.module.ProtoBufModule;
import com.github.shoothzj.javatool.util.LogUtil;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
import lombok.extern.slf4j.Slf4j;
/**
* @author hezhangjian
*/
@Slf4j
public class ProtoClient {
public static void main(String[] args) throws Exception {
LogUtil.configureLog();
// configure the client
NioEventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
//解码器,通过Google Protocol Buffers序列化框架动态的切割接收到的ByteBuf
pipeline.addLast(new ProtobufVarint32FrameDecoder());
//将接收到的二进制文件解码成具体的实例,这边接收到的是服务端的ResponseBank对象实列
pipeline.addLast(new ProtobufDecoder(ProtoBufModule.Response.getDefaultInstance()));
//Google Protocol Buffers编码器
pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
//Google Protocol Buffers编码器
pipeline.addLast(new ProtobufEncoder());
pipeline.addLast(new ClientHandler());
}
});
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9997).sync();
channelFuture.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
}
网友评论