美文网首页Java web
事件中心 - 解耦核心业务与辅助功能

事件中心 - 解耦核心业务与辅助功能

作者: 十毛tenmao | 来源:发表于2019-12-13 23:00 被阅读0次

    随着业务的不断发展,原来融入在业务系统主流程中的辅助功能越来越多,每次增加新的逻辑,都要修改主干流程,比如发送微信服务号消息,发送邮件提醒等。 这种做法,让主干流程和辅助功能耦合太紧密,很容易在修改辅助功能的时候,导致主干流程的bug。本文介绍利用事件中心,让主干流程专注于业务核心,其他辅助功能会通过监听事件中心来实现,大大解耦了核心业务和辅助逻辑。

    实现

    • 事件中心EventHub

    事件中心,一般用来监听收集各种事件并分发给监听者列表.

    /**
     * 事件中心,一般用来监听收集各种事件并分发给监听者列表.
     *
     * @author tenmao
     * @since 2019/12/12
     */
    @Slf4j
    public class EventHub<Event, Entity> {
        private final MdcThreadPoolExecutor mdcThreadPoolExecutor = MdcThreadPoolExecutor.newWithInheritedMdc(4, 16, 1, TimeUnit.MINUTES, new ArrayBlockingQueue<>(100));
    
        /**
         * 同步监听(一般可以用在事务一致性场景).
         */
        private Map<Event, Set<Consumer<Entity>>> syncListenerMap = new HashMap<>();
    
        /**
         * 异步监听.
         */
        private Map<Event, Set<Consumer<Entity>>> asyncListenerMap = new HashMap<>();
    
    
        /**
         * 注册同步监听器.
         *
         * @param event        事件
         * @param syncListener 同步监听器
         */
        public void subscribeSync(Event event, Consumer<Entity> syncListener) {
            Preconditions.checkNotNull(event);
            Preconditions.checkNotNull(syncListener);
            log.info("subscribeSync: event[{}], listener[{}]", event, syncListener);
            addListener(syncListenerMap, event, syncListener);
        }
    
        /**
         * 注册异步监听器.
         *
         * @param event         事件
         * @param asyncListener 异步监听器
         */
        public void subscribeAsyncListener(Event event, Consumer<Entity> asyncListener) {
            Preconditions.checkNotNull(event);
            Preconditions.checkNotNull(asyncListener);
            log.info("subscribeAsyncListener: event[{}], listener[{}]", event, asyncListener);
            addListener(asyncListenerMap, event, asyncListener);
        }
    
        private synchronized void addListener(Map<Event, Set<Consumer<Entity>>> map, Event event, Consumer<Entity> listener) {
            Set<Consumer<Entity>> consumers = map.get(event);
            if (consumers == null) {
                consumers = new HashSet<>();
                map.putIfAbsent(event, consumers);
            }
            consumers.add(listener);
        }
    
    
        /**
         * 发布事件.
         *
         * @param event  事件
         * @param entity 事件实体
         */
        public void publishEvent(Event event, Entity entity) {
            //订单状态变更
            log.info("publishEvent: event[{}], entity[{}]", event, entity);
            for (Consumer<Entity> consumer : syncListenerMap.get(event)) {
                consumer.accept(entity);
            }
    
            for (Consumer<Entity> asyncObserver : asyncListenerMap.get(event)) {
                mdcThreadPoolExecutor.execute(() -> {
                    try {
                        asyncObserver.accept(entity);
                    } catch (Exception e) {
                        log.warn("fail to finish listener for event: event[{}], entity[{}]", event, entity);
                    }
                });
            }
        }
    }
    
    • 为不同类型事件配置事件中心
    /**
     * @author tenmao
     * @since 2019/12/12
     */
    @Configuration
    public class EventHubConfiguration {
        /**
         * 订单状态事件中心.
         */
        @Bean
        public EventHub<OrderStatus, Order> orderEventHub() {
            return new EventHub<>();
        }
    
        /**
         * 退款状态事件中心.
         */
        @Bean
        public EventHub<RefundStatus, Refund> refundEventHub() {
            return new EventHub<>();
        }
    }
    

    事件监听者

    /**
     * 发送提醒消息.
     *
     * @author tenmao
     * @since 2019/12/12
     */
    @Slf4j
    @Component
    public class EmailListener {
        @Resource
        private EmailRemoteManager EmailRemoteManager;
    
        @Resource
        private EventHub<OrderStatus, Order> orderEventHub;
    
        @PostConstruct
        private void init() {
            //监听多个不同的事件,监听到后会发送提醒消息
            orderEventHub.subscribeAsyncListener(OrderStatus.PAYED, this::handleOrderPayed);
            orderEventHub.subscribeAsyncListener(OrderStatus.ARRIVAL, this::handleOrderArrival);
        }
    
        private void handleOrderPayed(Order order) {
            //推送 Email
            EmailRequest EmailRequest = EmailRequest.build(order);
            EmailRemoteManager.sendEmail(EmailRequest);
        }
    
        private void handleOrderArrival(Order order) {
            //推送 Email
            EmailRequest EmailRequest = EmailRequest.build(order);
            EmailRemoteManager.sendEmail(EmailRequest);
        }
    }
    

    事件源

    业务系统,比如订单系统中,会在不同的订单状态时,发送相应的订单事件.

    @Slf4j
    @Component
    public class OrderManager {
        @Resource
        private EventHub<OrderStatus, Order> orderEventHub;
    
        public void payOrder(String orderNo) {
            //TODO 订单付款
            Order order = getOne(orderNo);
            orderEventHub.publishEvent(OrderStatus.PAYED, order);
        }
    
        public void orderArrival(String orderNo) {
            //TODO 订单到货
            Order order = getOne(orderNo);
            orderEventHub.publishEvent(OrderStatus.ARRIVAL, order);
        }
        //TODO 还有其他事件
    }

    相关文章

      网友评论

        本文标题:事件中心 - 解耦核心业务与辅助功能

        本文链接:https://www.haomeiwen.com/subject/lflxnctx.html