美文网首页
Java 实现阻塞队列 Netty发送限流

Java 实现阻塞队列 Netty发送限流

作者: GCZeng | 来源:发表于2021-09-08 17:52 被阅读0次

    场景:项目使用Netty作为TCP客户端发送消息给TCP服务器出现了消息丢失问题(发送的是文件,按照规则分成几十个分包),奇怪的是,我每个分包都收到了服务器响应接收成功,但是服务器的开发人员说我发的消息包接收不全,我TMD的服了,我每个分包都收到响应的啊,由于服务器端的不配合,我只能怀疑是服务器并发不够强,我一下子发过去他处理不了,然后发送要做限流,用阻塞队列实现;

    使用BlockingQueue实现

         /**
         * 消息发送列表
         */
        private final BlockingQueue<String> sendQueue = new LinkedBlockingQueue<>();
    
        private final Runnable sendHandler = () -> {
            log.info("启动消息消费线程");
            while (true) {
                try {
                    log.info("队列阻塞ing");
                    //这里take()会阻塞,直到有数据
                    String message = sendQueue.take();
                    log.info("发送消息:{}",message);
                    //每次发送间隔1秒,不然服务器并发不行,处理不了。。。
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    log.error(e.toString());
                }
            }
        };
    
       @Test
        public void test() throws InterruptedException {
            //Demo就这样启动一个线程,生产环境就不要这样做了,不然。。。
            //启动消息发送线程
            new Thread(sendHandler).start();
    
            //模拟发送20条消息
            for (int i = 1; i <= 10; i++) {
                sendQueue.offer("message1:"+i);
            }
    
            //休眠20秒
            Thread.sleep(20000);
    
            //再模拟发送20条消息
            for (int i = 1; i <= 10; i++) {
                sendQueue.offer("message2:"+i);
            }
    
            //休眠20秒
            Thread.sleep(20000);
    
            log.info("退出程序");
        }
    

    查看日志

    17:46:08.247 [Thread-0] INFO com.carrun.阻塞队列 - 启动消息消费线程
    17:46:08.249 [Thread-0] INFO com.carrun.阻塞队列 - 队列阻塞ing
    17:46:08.249 [Thread-0] INFO com.carrun.阻塞队列 - 发送消息:message1:1
    17:46:09.250 [Thread-0] INFO com.carrun.阻塞队列 - 队列阻塞ing
    17:46:09.250 [Thread-0] INFO com.carrun.阻塞队列 - 发送消息:message1:2
    17:46:10.251 [Thread-0] INFO com.carrun.阻塞队列 - 队列阻塞ing
    17:46:10.251 [Thread-0] INFO com.carrun.阻塞队列 - 发送消息:message1:3
    17:46:11.251 [Thread-0] INFO com.carrun.阻塞队列 - 队列阻塞ing
    17:46:11.251 [Thread-0] INFO com.carrun.阻塞队列 - 发送消息:message1:4
    17:46:12.252 [Thread-0] INFO com.carrun.阻塞队列 - 队列阻塞ing
    17:46:12.252 [Thread-0] INFO com.carrun.阻塞队列 - 发送消息:message1:5
    17:46:13.253 [Thread-0] INFO com.carrun.阻塞队列 - 队列阻塞ing
    17:46:13.253 [Thread-0] INFO com.carrun.阻塞队列 - 发送消息:message1:6
    17:46:14.253 [Thread-0] INFO com.carrun.阻塞队列 - 队列阻塞ing
    17:46:14.253 [Thread-0] INFO com.carrun.阻塞队列 - 发送消息:message1:7
    17:46:15.254 [Thread-0] INFO com.carrun.阻塞队列 - 队列阻塞ing
    17:46:15.254 [Thread-0] INFO com.carrun.阻塞队列 - 发送消息:message1:8
    17:46:16.254 [Thread-0] INFO com.carrun.阻塞队列 - 队列阻塞ing
    17:46:16.254 [Thread-0] INFO com.carrun.阻塞队列 - 发送消息:message1:9
    17:46:17.254 [Thread-0] INFO com.carrun.阻塞队列 - 队列阻塞ing
    17:46:17.254 [Thread-0] INFO com.carrun.阻塞队列 - 发送消息:message1:10
    17:46:18.255 [Thread-0] INFO com.carrun.阻塞队列 - 队列阻塞ing
    17:46:28.246 [Thread-0] INFO com.carrun.阻塞队列 - 发送消息:message2:1
    17:46:29.247 [Thread-0] INFO com.carrun.阻塞队列 - 队列阻塞ing
    17:46:29.247 [Thread-0] INFO com.carrun.阻塞队列 - 发送消息:message2:2
    17:46:30.247 [Thread-0] INFO com.carrun.阻塞队列 - 队列阻塞ing
    17:46:30.247 [Thread-0] INFO com.carrun.阻塞队列 - 发送消息:message2:3
    17:46:31.248 [Thread-0] INFO com.carrun.阻塞队列 - 队列阻塞ing
    17:46:31.248 [Thread-0] INFO com.carrun.阻塞队列 - 发送消息:message2:4
    17:46:32.249 [Thread-0] INFO com.carrun.阻塞队列 - 队列阻塞ing
    17:46:32.249 [Thread-0] INFO com.carrun.阻塞队列 - 发送消息:message2:5
    17:46:33.249 [Thread-0] INFO com.carrun.阻塞队列 - 队列阻塞ing
    17:46:33.249 [Thread-0] INFO com.carrun.阻塞队列 - 发送消息:message2:6
    17:46:34.249 [Thread-0] INFO com.carrun.阻塞队列 - 队列阻塞ing
    17:46:34.249 [Thread-0] INFO com.carrun.阻塞队列 - 发送消息:message2:7
    17:46:35.249 [Thread-0] INFO com.carrun.阻塞队列 - 队列阻塞ing
    17:46:35.249 [Thread-0] INFO com.carrun.阻塞队列 - 发送消息:message2:8
    17:46:36.250 [Thread-0] INFO com.carrun.阻塞队列 - 队列阻塞ing
    17:46:36.250 [Thread-0] INFO com.carrun.阻塞队列 - 发送消息:message2:9
    17:46:37.251 [Thread-0] INFO com.carrun.阻塞队列 - 队列阻塞ing
    17:46:37.251 [Thread-0] INFO com.carrun.阻塞队列 - 发送消息:message2:10
    17:46:38.251 [Thread-0] INFO com.carrun.阻塞队列 - 队列阻塞ing
    17:46:48.246 [main] INFO com.carrun.阻塞队列 - 退出程序
    

    由日志可以看出,先发送10条数据(message1,每条间隔1条),然后等待了10秒,接着发送后面添加的那10条数据(message2,每条间隔1条),达到了限流发送要求。

    相关文章

      网友评论

          本文标题:Java 实现阻塞队列 Netty发送限流

          本文链接:https://www.haomeiwen.com/subject/zkbvwltx.html