场景:
1: 每个订单有 下单 支付 完成 3种状态
模拟10条数据,有的是有上述3种状态,有的没有
2:最终目的是 一个消费者拉取了全部数据,然后数据后台使用多线程,
保证同一个单号进入同一个队列,每个线程处理对应的队列数据。
kafka队列消费.png
3:通过构建消息队列和线程的一一绑定关系,以及单号个队列绑定关系。来实现同一个单号进入同一个队列
4: 存在的BUG
- 如果宕机,内存队列里的数据就没了。需要增加的时候写到消息表
- 如果下单消费失败了,后面的支付和完成是不能消费的,一定要等到有订单了才可以消费
参考增加日志以及redis
https://segmentfault.com/a/1190000018640106
消费者放入消息到队列
@Override
public void consumer() {
// 模拟上游传递过来的数据
List<Thread01> thread01s = list();
int size = thread01s.size();
for (int i = 0; i < size; i++) {
Thread01 thread01 = thread01s.get(i);
System.out.println("当前获取的状态:"+thread01.toString());
chooseQueue(thread01);
}
}
/**
* 通过单号选择进入那个队列中
* @param thread01
*/
private void chooseQueue(Thread01 thread01) {
String shipId = thread01.getShipId();
int index = Math.abs(shipId.hashCode()) % 3;
putQueue(index,thread01);
}
private void putQueue(int index, Thread01 thread01) {
LinkedBlockingQueue<Thread01> queue = queueMap.get(index);
try {
queue.put(thread01);
System.out.println(index+"-->"+queue.size());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
通过对单号进行HASH 决定进去那个队列。
事先设置好队列
队列下标和单号的hash算法获取index对应。
static Map<Integer,LinkedBlockingQueue<Thread01>> queueMap=new HashMap();
static{
queueMap.put(0,new LinkedBlockingQueue<Thread01>(1000));
queueMap.put(1,new LinkedBlockingQueue<Thread01>(1000));
queueMap.put(2,new LinkedBlockingQueue<Thread01>(1000));
}
循环消费数据。线程绑定队列
public void doConsumer(int index){
LinkedBlockingQueue<Thread01> queue = queueMap.get(index);
int size = queue.size();
while(queue.size()!=0){
// System.out.println("队列下标:"+index+"-->队列大小:"+queue.size());
Thread01 take = null;
try {
take = queue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
// 业务逻辑处理
// System.out.println("当前获取的状态:"+take.toString()+" 线程:"+Thread.currentThread().getName());
Thread02 ship_id = thread02Service.getOne(new QueryWrapper<Thread02>().eq("ship_id", take.getShipId()));
Thread02 t02=new Thread02();
if(null==ship_id){
t02.setShipId(take.getShipId());
t02.setStatus(take.getStatus());
thread02Service.save(t02);
}else{
thread02Service.update(new UpdateWrapper<Thread02>().eq("ship_id",take.getShipId()).set("status",take.getStatus()));
}
}
}
@PostConstruct
public void doThread(){
ExecutorService executorService = Executors.newFixedThreadPool(3);
for (int i = 0; i < 3; i++) {
int index=i;
executorService.submit(new Runnable() {
@Override
public void run() {
System.out.println("-----------初始化线程任务-----------------");
while(true){
doConsumer(index);
}
}
},"thread_"+index);
}
}
网友评论