1. Disruptor与Netty 架构
image.pngimage.png
与Netty网络通信框架整合提升性能:
-
在使用Netty进行接收处理数据的时候,我们尽量都不要在工作线程(Handler)上编写自己的代码逻辑
-
我们需要利用异步的机制,比如使用线程池异步处理,如果使用线程池就意味着使用阻塞队列,这里可以替换为Disruptor提高性能
2.核心代码
Commonlib
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.65.Final</version>
</dependency>
<!-- 序列化框架marshalling -->
<dependency>
<groupId>org.jboss.marshalling</groupId>
<artifactId>jboss-marshalling</artifactId>
<version>2.0.12.Final</version>
</dependency>
<dependency>
<groupId>org.jboss.marshalling</groupId>
<artifactId>jboss-marshalling-serial</artifactId>
<version>2.0.12.Final</version>
</dependency>
<!-- 序列化框架protobuf -->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>2.5.0</version>
</dependency>
<!-- netty end -->
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.4.4</version>
</dependency>
public class MessageProducer {
private String producerId;
private RingBuffer<TranslatorDataWapper> ringBuffer;
public MessageProducer(String producerId, RingBuffer<TranslatorDataWapper> ringBuffer) {
this.producerId = producerId;
this.ringBuffer = ringBuffer;
}
public void onData(TranslatorData data, ChannelHandlerContext ctx) {
long sequence = ringBuffer.next();
try {
TranslatorDataWapper wapper = ringBuffer.get(sequence);
wapper.setData(data);
wapper.setCtx(ctx);
} finally {
ringBuffer.publish(sequence);
}
}
}
public class RingBufferWorkerPoolFactory {
private static final Map<String, MessageProducer> producers = new ConcurrentHashMap<String, MessageProducer>();
private static final Map<String, MessageConsumer> consumers = new ConcurrentHashMap<String, MessageConsumer>();
private RingBuffer<TranslatorDataWapper> ringBuffer;
private SequenceBarrier sequenceBarrier;
private WorkerPool<TranslatorDataWapper> workerPool;
private RingBufferWorkerPoolFactory() {
}
public static RingBufferWorkerPoolFactory getInstance() {
return SingletonHolder.instance;
}
public void initAndStart(ProducerType type, int bufferSize, WaitStrategy waitStrategy, MessageConsumer[] messageConsumers) {
//1. 构建ringBuffer对象
this.ringBuffer = RingBuffer.create(type,
new EventFactory<TranslatorDataWapper>() {
public TranslatorDataWapper newInstance() {
return new TranslatorDataWapper();
}
},
bufferSize,
waitStrategy);
//2.设置序号栅栏
this.sequenceBarrier = this.ringBuffer.newBarrier();
//3.设置工作池
this.workerPool = new WorkerPool<TranslatorDataWapper>(this.ringBuffer,
this.sequenceBarrier,
new EventExceptionHandler(), messageConsumers);
//4 把所构建的消费者置入池中
for (MessageConsumer mc : messageConsumers) {
consumers.put(mc.getConsumerId(), mc);
}
//5 添加我们的sequences
this.ringBuffer.addGatingSequences(this.workerPool.getWorkerSequences());
//6 启动我们的工作池
this.workerPool.start(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() / 2));
}
public MessageProducer getMessageProducer(String producerId) {
MessageProducer messageProducer = producers.get(producerId);
if (null == messageProducer) {
messageProducer = new MessageProducer(producerId, this.ringBuffer);
producers.put(producerId, messageProducer);
}
return messageProducer;
}
private static class SingletonHolder {
static final RingBufferWorkerPoolFactory instance = new RingBufferWorkerPoolFactory();
}
/**
* 异常静态类
*
* @author Alienware
*/
static class EventExceptionHandler implements ExceptionHandler<TranslatorDataWapper> {
public void handleEventException(Throwable ex, long sequence, TranslatorDataWapper event) {
}
public void handleOnStartException(Throwable ex) {
}
public void handleOnShutdownException(Throwable ex) {
}
}
}
Client端
public class ClientHandler extends ChannelInboundHandlerAdapter {
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
/**
try {
TranslatorData response = (TranslatorData)msg;
System.err.println("Client端: id= " + response.getId()
+ ", name= " + response.getName()
+ ", message= " + response.getMessage());
} finally {
//一定要注意 用完了缓存 要进行释放
ReferenceCountUtil.release(msg);
}
*/
TranslatorData response = (TranslatorData)msg;
String producerId = "code:seesionId:002";
MessageProducer messageProducer = RingBufferWorkerPoolFactory.getInstance().getMessageProducer(producerId);
messageProducer.onData(response, ctx);
}
}
public class MessageConsumerImpl4Client extends MessageConsumer {
public MessageConsumerImpl4Client(String consumerId) {
super(consumerId);
}
public void onEvent(TranslatorDataWapper event) throws Exception {
TranslatorData response = event.getData();
ChannelHandlerContext ctx = event.getCtx();
//业务逻辑处理:
try {
System.err.println("Client端: id= " + response.getId()
+ ", name= " + response.getName()
+ ", message= " + response.getMessage());
} finally {
ReferenceCountUtil.release(response);
}
}
}
public class NettyClient {
public static final String HOST = "127.0.0.1";
public static final int PORT = 8765;
//扩展 完善 池化: ConcurrentHashMap<KEY -> String, Value -> Channel>
private Channel channel;
//1. 创建工作线程组: 用于实际处理业务的线程组
private final EventLoopGroup workGroup = new NioEventLoopGroup();
private ChannelFuture cf;
public NettyClient() {
this.connect(HOST, PORT);
}
private void connect(String host, int port) {
//2 辅助类(注意Client 和 Server 不一样)
Bootstrap bootstrap = new Bootstrap();
try {
bootstrap.group(workGroup)
.channel(NioSocketChannel.class)
//表示缓存区动态调配(自适应)
.option(ChannelOption.RCVBUF_ALLOCATOR, AdaptiveRecvByteBufAllocator.DEFAULT)
//缓存区 池化操作
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.handler(new LoggingHandler(LogLevel.INFO))
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
sc.pipeline().addLast(new ClientHandler());
}
});
//绑定端口,同步等等请求连接
this.cf = bootstrap.connect(host, port).sync();
System.err.println("Client connected...");
//接下来就进行数据的发送, 但是首先我们要获取channel:
this.channel = cf.channel();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void sendData() {
for (int i = 0; i < 10; i++) {
TranslatorData request = new TranslatorData();
request.setId("" + i);
request.setName("请求消息名称 " + i);
request.setMessage("请求消息内容 " + i);
this.channel.writeAndFlush(request);
}
}
public void close() throws Exception {
cf.channel().closeFuture().sync();
//优雅停机
workGroup.shutdownGracefully();
System.err.println("Sever ShutDown...");
}
}
@SpringBootApplication
public class NettyClientApplication {
public static void main(String[] args) {
SpringApplication.run(NettyClientApplication.class, args);
MessageConsumer[] conusmers = new MessageConsumer[4];
for(int i =0; i < conusmers.length; i++) {
MessageConsumer messageConsumer = new MessageConsumerImpl4Client("code:clientId:" + i);
conusmers[i] = messageConsumer;
}
RingBufferWorkerPoolFactory.getInstance().initAndStart(ProducerType.MULTI,
1024*1024,
//new YieldingWaitStrategy(),
new BlockingWaitStrategy(),
conusmers);
//建立连接 并发送消息
new NettyClient().sendData();
}
}
Server端
public class MessageConsumerImpl4Server extends MessageConsumer {
public MessageConsumerImpl4Server(String consumerId) {
super(consumerId);
}
public void onEvent(TranslatorDataWapper event) throws Exception {
TranslatorData request = event.getData();
ChannelHandlerContext ctx = event.getCtx();
//1.业务处理逻辑:
System.err.println("Sever端: id= " + request.getId()
+ ", name= " + request.getName()
+ ", message= " + request.getMessage());
//2.回送响应信息:
TranslatorData response = new TranslatorData();
response.setId("resp: " + request.getId());
response.setName("resp: " + request.getName());
response.setMessage("resp: " + request.getMessage());
//写出response响应信息:
ctx.writeAndFlush(response);
}
}
public class NettyServer {
public NettyServer() {
//1. 创建两个工作线程组: 一个用于接受网络请求的线程组. 另一个用于实际处理业务的线程组
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
//2 辅助类
ServerBootstrap serverBootstrap = new ServerBootstrap();
try {
serverBootstrap.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
//表示缓存区动态调配(自适应)
.option(ChannelOption.RCVBUF_ALLOCATOR, AdaptiveRecvByteBufAllocator.DEFAULT)
//缓存区 池化操作
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
sc.pipeline().addLast(new ServerHandler());
}
});
//绑定端口,同步等等请求连接
ChannelFuture cf = serverBootstrap.bind(8765).sync();
System.err.println("Server Startup...");
cf.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//优雅停机
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
System.err.println("Sever ShutDown...");
}
}
}
public class ServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
/**
TranslatorData request = (TranslatorData)msg;
System.err.println("Sever端: id= " + request.getId()
+ ", name= " + request.getName()
+ ", message= " + request.getMessage());
//数据库持久化操作 IO读写 ---> 交给一个线程池 去异步的调用执行
TranslatorData response = new TranslatorData();
response.setId("resp: " + request.getId());
response.setName("resp: " + request.getName());
response.setMessage("resp: " + request.getMessage());
//写出response响应信息:
ctx.writeAndFlush(response);
*/
TranslatorData request = (TranslatorData)msg;
//自已的应用服务应该有一个ID生成规则
String producerId = "code:sessionId:001";
MessageProducer messageProducer = RingBufferWorkerPoolFactory.getInstance().getMessageProducer(producerId);
messageProducer.onData(request, ctx);
}
}
@SpringBootApplication
public class NettyServerApplication {
public static void main(String[] args) {
SpringApplication.run(NettyServerApplication.class, args);
MessageConsumer[] conusmers = new MessageConsumer[4];
for(int i =0; i < conusmers.length; i++) {
MessageConsumer messageConsumer = new MessageConsumerImpl4Server("code:serverId:" + i);
conusmers[i] = messageConsumer;
}
RingBufferWorkerPoolFactory.getInstance().initAndStart(ProducerType.MULTI,
1024*1024,
//new YieldingWaitStrategy(),
new BlockingWaitStrategy(),
conusmers);
new NettyServer();
}
}
运行
- 先启动server
- 再启动client
Client connected...
Client端: id= resp: 3, name= resp: 请求消息名称 3, message= resp: 请求消息内容 3
Client端: id= resp: 1, name= resp: 请求消息名称 1, message= resp: 请求消息内容 1
Client端: id= resp: 2, name= resp: 请求消息名称 2, message= resp: 请求消息内容 2
Client端: id= resp: 0, name= resp: 请求消息名称 0, message= resp: 请求消息内容 0
Client端: id= resp: 6, name= resp: 请求消息名称 6, message= resp: 请求消息内容 6
Client端: id= resp: 7, name= resp: 请求消息名称 7, message= resp: 请求消息内容 7
Client端: id= resp: 4, name= resp: 请求消息名称 4, message= resp: 请求消息内容 4
Client端: id= resp: 5, name= resp: 请求消息名称 5, message= resp: 请求消息内容 5
Client端: id= resp: 8, name= resp: 请求消息名称 8, message= resp: 请求消息内容 8
Client端: id= resp: 9, name= resp: 请求消息名称 9, message= resp: 请求消息内容 9
网友评论