场景:项目使用Netty作为TCP客户端发送消息给TCP服务器出现了消息丢失问题(发送的是文件,按照规则分成几十个分包),奇怪的是,我每个分包都收到了服务器响应接收成功,但是服务器的开发人员说我发的消息包接收不全,我TMD的服了,我每个分包都收到响应的啊,由于服务器端的不配合,我只能怀疑是服务器并发不够强,我一下子发过去他处理不了,然后发送要做限流,用阻塞队列实现;
使用BlockingQueue实现
/**
* 消息发送列表
*/
private final BlockingQueue<String> sendQueue = new LinkedBlockingQueue<>();
private final Runnable sendHandler = () -> {
log.info("启动消息消费线程");
while (true) {
try {
log.info("队列阻塞ing");
//这里take()会阻塞,直到有数据
String message = sendQueue.take();
log.info("发送消息:{}",message);
//每次发送间隔1秒,不然服务器并发不行,处理不了。。。
Thread.sleep(1000);
} catch (InterruptedException e) {
log.error(e.toString());
}
}
};
@Test
public void test() throws InterruptedException {
//Demo就这样启动一个线程,生产环境就不要这样做了,不然。。。
//启动消息发送线程
new Thread(sendHandler).start();
//模拟发送20条消息
for (int i = 1; i <= 10; i++) {
sendQueue.offer("message1:"+i);
}
//休眠20秒
Thread.sleep(20000);
//再模拟发送20条消息
for (int i = 1; i <= 10; i++) {
sendQueue.offer("message2:"+i);
}
//休眠20秒
Thread.sleep(20000);
log.info("退出程序");
}
查看日志
17:46:08.247 [Thread-0] INFO com.carrun.阻塞队列 - 启动消息消费线程
17:46:08.249 [Thread-0] INFO com.carrun.阻塞队列 - 队列阻塞ing
17:46:08.249 [Thread-0] INFO com.carrun.阻塞队列 - 发送消息:message1:1
17:46:09.250 [Thread-0] INFO com.carrun.阻塞队列 - 队列阻塞ing
17:46:09.250 [Thread-0] INFO com.carrun.阻塞队列 - 发送消息:message1:2
17:46:10.251 [Thread-0] INFO com.carrun.阻塞队列 - 队列阻塞ing
17:46:10.251 [Thread-0] INFO com.carrun.阻塞队列 - 发送消息:message1:3
17:46:11.251 [Thread-0] INFO com.carrun.阻塞队列 - 队列阻塞ing
17:46:11.251 [Thread-0] INFO com.carrun.阻塞队列 - 发送消息:message1:4
17:46:12.252 [Thread-0] INFO com.carrun.阻塞队列 - 队列阻塞ing
17:46:12.252 [Thread-0] INFO com.carrun.阻塞队列 - 发送消息:message1:5
17:46:13.253 [Thread-0] INFO com.carrun.阻塞队列 - 队列阻塞ing
17:46:13.253 [Thread-0] INFO com.carrun.阻塞队列 - 发送消息:message1:6
17:46:14.253 [Thread-0] INFO com.carrun.阻塞队列 - 队列阻塞ing
17:46:14.253 [Thread-0] INFO com.carrun.阻塞队列 - 发送消息:message1:7
17:46:15.254 [Thread-0] INFO com.carrun.阻塞队列 - 队列阻塞ing
17:46:15.254 [Thread-0] INFO com.carrun.阻塞队列 - 发送消息:message1:8
17:46:16.254 [Thread-0] INFO com.carrun.阻塞队列 - 队列阻塞ing
17:46:16.254 [Thread-0] INFO com.carrun.阻塞队列 - 发送消息:message1:9
17:46:17.254 [Thread-0] INFO com.carrun.阻塞队列 - 队列阻塞ing
17:46:17.254 [Thread-0] INFO com.carrun.阻塞队列 - 发送消息:message1:10
17:46:18.255 [Thread-0] INFO com.carrun.阻塞队列 - 队列阻塞ing
17:46:28.246 [Thread-0] INFO com.carrun.阻塞队列 - 发送消息:message2:1
17:46:29.247 [Thread-0] INFO com.carrun.阻塞队列 - 队列阻塞ing
17:46:29.247 [Thread-0] INFO com.carrun.阻塞队列 - 发送消息:message2:2
17:46:30.247 [Thread-0] INFO com.carrun.阻塞队列 - 队列阻塞ing
17:46:30.247 [Thread-0] INFO com.carrun.阻塞队列 - 发送消息:message2:3
17:46:31.248 [Thread-0] INFO com.carrun.阻塞队列 - 队列阻塞ing
17:46:31.248 [Thread-0] INFO com.carrun.阻塞队列 - 发送消息:message2:4
17:46:32.249 [Thread-0] INFO com.carrun.阻塞队列 - 队列阻塞ing
17:46:32.249 [Thread-0] INFO com.carrun.阻塞队列 - 发送消息:message2:5
17:46:33.249 [Thread-0] INFO com.carrun.阻塞队列 - 队列阻塞ing
17:46:33.249 [Thread-0] INFO com.carrun.阻塞队列 - 发送消息:message2:6
17:46:34.249 [Thread-0] INFO com.carrun.阻塞队列 - 队列阻塞ing
17:46:34.249 [Thread-0] INFO com.carrun.阻塞队列 - 发送消息:message2:7
17:46:35.249 [Thread-0] INFO com.carrun.阻塞队列 - 队列阻塞ing
17:46:35.249 [Thread-0] INFO com.carrun.阻塞队列 - 发送消息:message2:8
17:46:36.250 [Thread-0] INFO com.carrun.阻塞队列 - 队列阻塞ing
17:46:36.250 [Thread-0] INFO com.carrun.阻塞队列 - 发送消息:message2:9
17:46:37.251 [Thread-0] INFO com.carrun.阻塞队列 - 队列阻塞ing
17:46:37.251 [Thread-0] INFO com.carrun.阻塞队列 - 发送消息:message2:10
17:46:38.251 [Thread-0] INFO com.carrun.阻塞队列 - 队列阻塞ing
17:46:48.246 [main] INFO com.carrun.阻塞队列 - 退出程序
由日志可以看出,先发送10条数据(message1,每条间隔1条),然后等待了10秒,接着发送后面添加的那10条数据(message2,每条间隔1条),达到了限流发送要求。
网友评论