美文网首页Java web
事件中心 - 解耦核心业务与辅助功能

事件中心 - 解耦核心业务与辅助功能

作者: 十毛tenmao | 来源:发表于2019-12-13 23:00 被阅读0次

随着业务的不断发展,原来融入在业务系统主流程中的辅助功能越来越多,每次增加新的逻辑,都要修改主干流程,比如发送微信服务号消息,发送邮件提醒等。 这种做法,让主干流程和辅助功能耦合太紧密,很容易在修改辅助功能的时候,导致主干流程的bug。本文介绍利用事件中心,让主干流程专注于业务核心,其他辅助功能会通过监听事件中心来实现,大大解耦了核心业务和辅助逻辑。

实现

  • 事件中心EventHub

事件中心,一般用来监听收集各种事件并分发给监听者列表.

/**
 * 事件中心,一般用来监听收集各种事件并分发给监听者列表.
 *
 * @author tenmao
 * @since 2019/12/12
 */
@Slf4j
public class EventHub<Event, Entity> {
    private final MdcThreadPoolExecutor mdcThreadPoolExecutor = MdcThreadPoolExecutor.newWithInheritedMdc(4, 16, 1, TimeUnit.MINUTES, new ArrayBlockingQueue<>(100));

    /**
     * 同步监听(一般可以用在事务一致性场景).
     */
    private Map<Event, Set<Consumer<Entity>>> syncListenerMap = new HashMap<>();

    /**
     * 异步监听.
     */
    private Map<Event, Set<Consumer<Entity>>> asyncListenerMap = new HashMap<>();


    /**
     * 注册同步监听器.
     *
     * @param event        事件
     * @param syncListener 同步监听器
     */
    public void subscribeSync(Event event, Consumer<Entity> syncListener) {
        Preconditions.checkNotNull(event);
        Preconditions.checkNotNull(syncListener);
        log.info("subscribeSync: event[{}], listener[{}]", event, syncListener);
        addListener(syncListenerMap, event, syncListener);
    }

    /**
     * 注册异步监听器.
     *
     * @param event         事件
     * @param asyncListener 异步监听器
     */
    public void subscribeAsyncListener(Event event, Consumer<Entity> asyncListener) {
        Preconditions.checkNotNull(event);
        Preconditions.checkNotNull(asyncListener);
        log.info("subscribeAsyncListener: event[{}], listener[{}]", event, asyncListener);
        addListener(asyncListenerMap, event, asyncListener);
    }

    private synchronized void addListener(Map<Event, Set<Consumer<Entity>>> map, Event event, Consumer<Entity> listener) {
        Set<Consumer<Entity>> consumers = map.get(event);
        if (consumers == null) {
            consumers = new HashSet<>();
            map.putIfAbsent(event, consumers);
        }
        consumers.add(listener);
    }


    /**
     * 发布事件.
     *
     * @param event  事件
     * @param entity 事件实体
     */
    public void publishEvent(Event event, Entity entity) {
        //订单状态变更
        log.info("publishEvent: event[{}], entity[{}]", event, entity);
        for (Consumer<Entity> consumer : syncListenerMap.get(event)) {
            consumer.accept(entity);
        }

        for (Consumer<Entity> asyncObserver : asyncListenerMap.get(event)) {
            mdcThreadPoolExecutor.execute(() -> {
                try {
                    asyncObserver.accept(entity);
                } catch (Exception e) {
                    log.warn("fail to finish listener for event: event[{}], entity[{}]", event, entity);
                }
            });
        }
    }
}
  • 为不同类型事件配置事件中心
/**
 * @author tenmao
 * @since 2019/12/12
 */
@Configuration
public class EventHubConfiguration {
    /**
     * 订单状态事件中心.
     */
    @Bean
    public EventHub<OrderStatus, Order> orderEventHub() {
        return new EventHub<>();
    }

    /**
     * 退款状态事件中心.
     */
    @Bean
    public EventHub<RefundStatus, Refund> refundEventHub() {
        return new EventHub<>();
    }
}

事件监听者

/**
 * 发送提醒消息.
 *
 * @author tenmao
 * @since 2019/12/12
 */
@Slf4j
@Component
public class EmailListener {
    @Resource
    private EmailRemoteManager EmailRemoteManager;

    @Resource
    private EventHub<OrderStatus, Order> orderEventHub;

    @PostConstruct
    private void init() {
        //监听多个不同的事件,监听到后会发送提醒消息
        orderEventHub.subscribeAsyncListener(OrderStatus.PAYED, this::handleOrderPayed);
        orderEventHub.subscribeAsyncListener(OrderStatus.ARRIVAL, this::handleOrderArrival);
    }

    private void handleOrderPayed(Order order) {
        //推送 Email
        EmailRequest EmailRequest = EmailRequest.build(order);
        EmailRemoteManager.sendEmail(EmailRequest);
    }

    private void handleOrderArrival(Order order) {
        //推送 Email
        EmailRequest EmailRequest = EmailRequest.build(order);
        EmailRemoteManager.sendEmail(EmailRequest);
    }
}

事件源

业务系统,比如订单系统中,会在不同的订单状态时,发送相应的订单事件.

@Slf4j
@Component
public class OrderManager {
    @Resource
    private EventHub<OrderStatus, Order> orderEventHub;

    public void payOrder(String orderNo) {
        //TODO 订单付款
        Order order = getOne(orderNo);
        orderEventHub.publishEvent(OrderStatus.PAYED, order);
    }

    public void orderArrival(String orderNo) {
        //TODO 订单到货
        Order order = getOne(orderNo);
        orderEventHub.publishEvent(OrderStatus.ARRIVAL, order);
    }
    //TODO 还有其他事件
}

相关文章

  • 事件中心 - 解耦核心业务与辅助功能

    随着业务的不断发展,原来融入在业务系统主流程中的辅助功能越来越多,每次增加新的逻辑,都要修改主干流程,比如发送微信...

  • 在业务代码中植入异步通知功能

    对异步通知的定位,是作为核心业务的一种补充,应该尽量与核心业务解耦。 采用的解耦方式为“事件+监听器”。一些主流的...

  • spring 配置、注解理解

    声明业务对象 声明日志切面 指定右边的核心业务功能 指定左边的辅助功能然后通过aop:config把业务对象与...

  • 基于领域事件实现微服务解耦

    基于领域事件实现微服务解耦 领域事件是解耦微服务的关键。 什么是领域事件 除了命令和操作等业务行为,还有一种非常重...

  • RocketMQ-入门

    一、MQ应用场景 业务解耦解耦是消息队列要解决的最本质问题。所谓解耦,简单点讲就是一个事务,只关心核心的流程。而需...

  • Ioc

    1.AOP的实现:1.1配置文件的形式实现的。 实现核心业务和辅助功能的交织: 目的是为了将核心业务与...

  • AOP-AspectJ 实现防重复点击

    一、概念介绍 我们要实现项目中点击事件的防重复点击功能,AOP思想可以很好的实践。业务代码和其它代码可以完全解耦。...

  • 架构设计原则

    架构坚持组件化,持续重构,小而美。架构设计十大原则: 1.全面解耦原则:对业务进行抽象建模,业务数据与业务逻辑解耦...

  • 项目笔记之 重构

    业务逻辑梳理功能模块梳理模块抽象解耦封装->模块组件化模块内部深度优化

  • 消息中心

    应用场景: 1.存在并发访问的业务 2.耗时比较久的业务 3.需要解耦出来的业务 消息中心简介: ...

网友评论

    本文标题:事件中心 - 解耦核心业务与辅助功能

    本文链接:https://www.haomeiwen.com/subject/lflxnctx.html