美文网首页方案
多线程消费顺序-01

多线程消费顺序-01

作者: Spring_java | 来源:发表于2021-06-26 21:00 被阅读0次

场景:
1: 每个订单有 下单 支付 完成 3种状态
模拟10条数据,有的是有上述3种状态,有的没有

2:最终目的是 一个消费者拉取了全部数据,然后数据后台使用多线程,
保证同一个单号进入同一个队列,每个线程处理对应的队列数据。


kafka队列消费.png

3:通过构建消息队列和线程的一一绑定关系,以及单号个队列绑定关系。来实现同一个单号进入同一个队列

4: 存在的BUG

  • 如果宕机,内存队列里的数据就没了。需要增加的时候写到消息表
  • 如果下单消费失败了,后面的支付和完成是不能消费的,一定要等到有订单了才可以消费
    参考增加日志以及redis
    https://segmentfault.com/a/1190000018640106

消费者放入消息到队列

@Override
    public void consumer() {
        // 模拟上游传递过来的数据
        List<Thread01> thread01s = list();
        int size = thread01s.size();
        for (int i = 0; i < size; i++) {
            Thread01 thread01 = thread01s.get(i);
            System.out.println("当前获取的状态:"+thread01.toString());
            chooseQueue(thread01);
        }
    }

    /**
     * 通过单号选择进入那个队列中
     * @param thread01
     */
    private void chooseQueue(Thread01 thread01) {
        String shipId = thread01.getShipId();
         int index = Math.abs(shipId.hashCode()) % 3;
        putQueue(index,thread01);
    }

    private void putQueue(int index, Thread01 thread01)  {
        LinkedBlockingQueue<Thread01> queue = queueMap.get(index);
        try {
            queue.put(thread01);
            System.out.println(index+"-->"+queue.size());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

通过对单号进行HASH 决定进去那个队列。

事先设置好队列

队列下标和单号的hash算法获取index对应。

    static Map<Integer,LinkedBlockingQueue<Thread01>> queueMap=new HashMap();
    static{
        queueMap.put(0,new LinkedBlockingQueue<Thread01>(1000));
        queueMap.put(1,new LinkedBlockingQueue<Thread01>(1000));
        queueMap.put(2,new LinkedBlockingQueue<Thread01>(1000));
    }

循环消费数据。线程绑定队列

    public void doConsumer(int index){
        LinkedBlockingQueue<Thread01> queue = queueMap.get(index);
        int size = queue.size();
        while(queue.size()!=0){
//            System.out.println("队列下标:"+index+"-->队列大小:"+queue.size());
            Thread01 take = null;
            try {
                take = queue.take();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 业务逻辑处理
//            System.out.println("当前获取的状态:"+take.toString()+" 线程:"+Thread.currentThread().getName());
            Thread02 ship_id = thread02Service.getOne(new QueryWrapper<Thread02>().eq("ship_id", take.getShipId()));
            Thread02 t02=new Thread02();
            if(null==ship_id){
                t02.setShipId(take.getShipId());
                t02.setStatus(take.getStatus());
                thread02Service.save(t02);
            }else{
                thread02Service.update(new UpdateWrapper<Thread02>().eq("ship_id",take.getShipId()).set("status",take.getStatus()));
            }
        }
    }
    @PostConstruct
    public void doThread(){
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        for (int i = 0; i < 3; i++) {
             int  index=i;
            executorService.submit(new Runnable() {
                @Override
                public void run() {
                    System.out.println("-----------初始化线程任务-----------------");
                    while(true){
                        doConsumer(index);
                    }
                }
            },"thread_"+index);
        }
    }

结果显示

原始数据.png 队列详细.png 结果.png

相关文章

  • 多线程消费顺序-01

    场景:1: 每个订单有 下单 支付 完成 3种状态模拟10条数据,有的是有上述3种状态,有的没有 2:最终目的...

  • 消息队列-2 多线程消费如何保证消息不丢

    01 提高消息队列消费性能 1.扩容,增加消费机器和partition2.多线程消费,但如果需要顺序消费那么就不能...

  • (8)Kafka怎么体现消息顺序性

    概要:2:方案1、2、3,消费多线程只串行,有点鸡肋了 一、Kafka只能保证分区内消息顺序有序,无法保证全局有序...

  • kafka

    主题和分区 消息发送方式 消息顺序保证 分区 消费者 消费者005消费者01.PNG005消费者02.PNG005...

  • 顺序消费

    生产者根据自定义的规则,将某一类的消息发送到同一个queue中,可以hash与队列的个数取余 消费者消费者要注意使...

  • 面试集锦

    (金财互联) 1、多线程状态的几种异常2、mq怎么保证顺序消费3、大事务小事务4、什么场景用到lambda5、My...

  • python——多线程

    多线程-threading 子类完成创建多线程 线程的执行顺序也是主线程和各个子线程随机执行,顺序不确定 线程对全...

  • 多线程循环顺序处理的方式

    目前有个任务需要对数据进行一个循环处理,那么就需要多线程顺序触发的问题了.这里以顺序打印为例子对常见的多线程顺序处...

  • 多线程顺序处理的方式

    目前有个任务需要对数据进行一个循环处理,那么就需要多线程顺序触发的问题了.这里以顺序打印为例子对常见的多线程顺序处...

  • 并发

    保证多线程的顺序执行 方式1:通过join方法保证多线程的顺序join作用:让主线程等待子线程结束后才能继续运行 ...

网友评论

    本文标题:多线程消费顺序-01

    本文链接:https://www.haomeiwen.com/subject/efozyltx.html