美文网首页Java架构技术进阶
嘘,异步事件这样用真的好么?

嘘,异步事件这样用真的好么?

作者: 代码小当家 | 来源:发表于2020-06-30 21:28 被阅读0次

故事背景

今年年初的时候写了一篇文章 《围观:基于事件机制的内部解耦之心路历程》。这篇文章主要讲的是用 ES 数据异构的场景。程序订阅 Mysql Binlog 的变更,然后程序内部使用 Spring Event 来分发具体的事件,因为一个表的数据变更可能会需要更新多个 ES 索引。

为了方便大家理解我把之前方案的图片复制过来了,如下:

嘘,异步事件这样用真的好么?

上图的方案存在一个问题,就是我们今天文章要聊的内容。

这个问题就是当 MQ Consumer 收到消息后,就直接发布 Event 了,如果是同步的,没有问题。如果某个 EventListener 中处理失败了,那么这条消息将不会 ACK。

如果是异步发布 Event 的场景,发布完消息马上就 ACK 了。就算某个 EventListener 中处理失败了,MQ 也感知不到,不会进行消息的重新投递,这就是存在的问题。

嘘,异步事件这样用真的好么?

解决方案

方案一

既然消息已经 ACK 了,那就不利用 MQ 的重试功能了,使用方自己重试是不是也可以呢?

可肯定是可以的,内部处理是否成功肯定是可以知道的,如果处理失败了可以默认重试,或者有一定策略的重试。实在不行还可以落库,保存记录。

这样的问题在于太烦了呀,每个使用的地方都要去做这件事情,而且对于未来接手你代码的程序小哥哥来说,这很有可能让小哥哥头发慢慢脱落啊。。。。

脱落不要紧,关键他还不知道要做这个处理,说不定哪天就背锅了,惨兮兮。。。。

方案二

要保证消息和业务处理的一致性,就不能立马进行 ACK 操作。而是要等业务处理完成后再决定是否要 ACK。

如果有处理失败的就不应该 ACK,这样就能复用 MQ 的重试机制了。

分析下来,这就是一个典型的异步转同步的场景。像 Dubbo 中也有这个场景,所以我们可以借鉴 Dubbo 中的实现思路。

创建一个 DefaultFuture 用于同步等待获取任务执行结果。然后在 MQ 消费的地方使用 DefaultFuture。

@Service@RocketMQMessageListener(topic = "${rocketmq.topic.data_change}", consumerGroup = "${rocketmq.group.data_change_consumer}")public class DataChangeConsume implements RocketMQListener<DataChangeRequest> {    @Autowired    private ApplicationContext applicationContext;    @Autowired    private CustomApplicationContextAware customApplicationContextAware;    @Override    public void onMessage(DataChangeRequest dataChangeRequest) {        log.info("received message {} , Thread {}", dataChangeRequest, Thread.currentThread().getName());        DataChangeEvent event = new DataChangeEvent(this);        event.setChangeType(dataChangeRequest.getChangeType());        event.setTable(dataChangeRequest.getTable());        event.setMessageId(dataChangeRequest.getMessageId());        DefaultFuture defaultFuture = DefaultFuture.newFuture(dataChangeRequest, customApplicationContextAware.getTaskCount(), 6000 * 10);        applicationContext.publishEvent(event);        Boolean result = defaultFuture.get();        log.info("MessageId {} 处理结果 {}", dataChangeRequest.getMessageId(), result);        if (!result) {            throw new RuntimeException("处理失败,不进行消息ACK,等待下次重试");        }    }}

newFuture() 会传入事件参数,超时时间,任务数量几个参数。任务数量是用于判断所有 EventListener 是否全部执行完成。

defaultFuture.get(); 这不就会阻塞,等待所有任务执行完成才会返回结果,如果所有业务都处理成功了,那么会返回 true,流程结束,消息自动 ACK。

如果返回了 false 证明有处理失败的或者超时的,就不需要 ACK 了,抛出异常等待重试。

public Boolean get() {    if (isDone()) {        return true;    }    long start = System.currentTimeMillis();    lock.lock();    try {        while (!isDone()) {            done.await(timeout, TimeUnit.MILLISECONDS);            // 有失败的任务反馈            if (!isSuccessDone()) {                return false;            }            // 全部执行成功            if (isDone()) {                return true;            }            // 超时            if (System.currentTimeMillis() - start > timeout) {                return false;            }        }    } catch (InterruptedException e) {        throw new RuntimeException(e);    } finally {        lock.unlock();    }    return true;}

isDone() 会判断反馈结果了的任务数量跟总数量是否一致,如果一直就说明全部执行完成了。

public boolean isDone() {    return feedbackResultCount.get() == taskCount;}

那么任务执行完了怎么反馈呢? 不可能让每个使用的方法去关心,所以我们定义了一个切面来做这件事情。

@Aspect@Componentpublic class EventListenerAspect {    @Around(value = "@annotation(eventListener)")    public Object aroundAdvice(ProceedingJoinPoint joinpoint, EventListener eventListener) throws Throwable {      DataChangeEvent event = null;      boolean executeResult = true;       try {         event = (DataChangeEvent)joinpoint.getArgs()[0];         Object result = joinpoint.proceed();         return result;      } catch (Exception e) {         executeResult = false;          throw e;      } finally {         DefaultFuture.received(event.getMessageId(), executeResult);      }    }}

通过 DefaultFuture.received() 反馈执行结果。

public static void received(String id, boolean result) {    DefaultFuture future = FUTURES.get(id);    if (future != null) {        // 累加失败任务数量        if (!result) {            future.feedbackFailResultCount.incrementAndGet();        }        // 累加执行完成任务数量        future.feedbackResultCount.incrementAndGet();        if (future.isDone()) {            FUTURES.remove(id);            future.doReceived();        }    }}private void doReceived() {    lock.lock();    try {        if (done != null) {            // 唤醒阻塞的线程            done.signal();        }    } finally {        lock.unlock();    }复制代码

下面我们来总结整个流程:

  • 收到 MQ 消息,组装成 DefaultFuture,通过 get 方法获取执行结果,未执行完的时候此方法阻塞。
  • 通过切面切入加了 EventListener 的方法,判断是否有异常来判断任务的执行结果。
  • 通过 DefaultFuture.received() 反馈结果。
  • 反馈时计算是否全部完成,全部完成则唤醒阻塞的线程。DefaultFuture.get()就能获取到结果。
  • 是否要进行 ACK 操作。

需要注意的是每个 EventListener 内部消费的逻辑都要做幂等控制。

源码地址: https://github.com/yinjihuan/kitty-cloud/tree/master/kitty-cloud-mqconsume[1]

相关文章

  • 嘘,异步事件这样用真的好么?

    故事背景 今年年初的时候写了一篇文章 《围观:基于事件机制的内部解耦之心路历程》。这篇文章主要讲的是用 ES 数据...

  • 嘘!异步事件这样用真的好么?

    故事背景 今年年初的时候写了一篇文章 《围观:基于事件机制的内部解耦之心路历程》。这篇文章主要讲的是用 ES 数据...

  • 这样真的好么?

    看完靖雯的100万的诱惑,发现100万还能做这么多事情,之前想的是100万买一套房子可能还不够... 受其感染,我...

  • 这样真的好么

    有一种人,整天笑嘻嘻的,好似和谁都合得来,但是从不主动联系朋友,脾气也出奇的好,好像世上没什么事情能让他们愤怒和悲...

  • 这样懒散真的好么?

    好些日子没写简书了,看着小伙伴们每日奋笔耕耘,自觉惭愧,都已经躺着画画了,还想怎样? 其实多数时候都是把准备工作做...

  • 1653: 这样真的好么(*^*)

    Description 在某人参加的某一场比赛中,一共有k位选手参赛,他们的编号是1到k。主办方准备了n个气球,主...

  • RxSwift学习

    编程思想 用同步的方式,编写处理异步事件的代码。是基于异步 Event(事件)序列的响应式编程。它可以简化异步编程...

  • 放弃理财,这样真的好么?

    文/墨子 以前,看富爸爸,穷爸爸,我的理财观被颠覆了一次,最近,读的一些书和看的一些文章,我的理财观念又被颠覆了一...

  • 商场这样做真的好么

    前几天商场说要给我们商家免费做一批工装,可现在做好了又要交钱,一件白色半袖衬衫50元,要从活动押金里扣除,现在感觉...

  • 八千万起步的中国足球运动员转会市场

    为什么会这样? 这样真的好么? 什么时候是个头? 该怎么办?

网友评论

    本文标题:嘘,异步事件这样用真的好么?

    本文链接:https://www.haomeiwen.com/subject/xwefqktx.html