美文网首页spring statemachine
SpringStateMachine之七-外部调用&事务

SpringStateMachine之七-外部调用&事务

作者: AlanKim | 来源:发表于2019-01-10 10:01 被阅读0次

    说明

    前面基本上涵盖了一个项目中配置简单状态机的相关实现,不过还有一个关键点,就是外部代码如何调用状态机,以及如何让状态机的持久化与业务逻辑代码在同一个事务中,避免状态机中状态与实际订单中不一致,造成脏数据。

    调用方式

    外部调用状态机引擎,需要以下三步:

    1. 通过创建/读取的方式获取当前订单对应的状态机引擎实例
    2. 构造message。
    3. 发送message。

    需要注意以下几点:

    1. 在状态机发送完message之后,spring statemachine会通过ActionListener来监听,同时判断需要走到哪个Action中
    2. 只有在sendMessage完成之后,状态机的当前状态才会更新为target状态

    所以对于调用状态机,做了以下代码封装:

    接口:
    /**
     * 存在状态机做串联时,统一的事务处理,将状态机实例持久化也囊括在统一的事务中
     */
    public interface StateMachineSendEventManager {
    
        /**
         * 发送状态机event,调用bizManagerImpl中具体实现,同时处理状态机持久化
         * <p>
         * 用于订单的状态变更
         *
         * @param request
         * @param operationTypeEnum
         * @param eventEnum
         * @return
         * @throws BusinessException
         */
        OrderBaseResponse sendStatusChangeEvent(BizOrderStatusRequest request,
                                                BizOrderOperationTypeEnum operationTypeEnum,
                                                BizOrderStatusChangeEventEnum eventEnum) throws Exception;
    
    
        /**
         * 同上,不过是用于订单创建场景
         *
         * @param request
         * @param operationTypeEnum
         * @param eventEnum
         * @return
         * @throws Exception
         */
        BizOrderCreateResponse sendOrderCreateEvent(BizOrderCreateRequest request,
                                                    BizOrderOperationTypeEnum operationTypeEnum,
                                                    BizOrderStatusChangeEventEnum eventEnum) throws Exception;
    
    }
    
    对应实现
    import lombok.extern.slf4j.Slf4j;
    import org.apache.commons.lang3.StringUtils;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.support.MessageBuilder;
    import org.springframework.statemachine.StateMachine;
    import org.springframework.statemachine.persist.StateMachinePersister;
    import org.springframework.stereotype.Component;
    import org.springframework.transaction.annotation.Transactional;
    import org.springframework.transaction.support.TransactionSynchronizationAdapter;
    import org.springframework.transaction.support.TransactionSynchronizationManager;
    
    @Slf4j
    @Component("stateMachineSendEventManager")
    public class StateMachineSendEventManagerImpl implements StateMachineSendEventManager {
    
        @Autowired
        private BizOrderRepository bizOrderRepository;
    
        @Autowired
        private BizOrderStateMachineBuildFactory bizOrderStateMachineBuildFactory;
    
        @Autowired
        @Qualifier("bizOrderRedisStateMachinePersister")
        private StateMachinePersister<BizOrderStatusEnum,BizOrderStatusChangeEventEnum,String> bizOrderRedisStateMachinePersister;
    
        /**
         * 发送状态机event,调用bizManagerImpl中具体实现,同时处理状态机持久化
         * <p>
         * 这里会send stateMachine event,从而跳转到对应的action --> bizManagerImpl,出现事务嵌套的情况
         * <p>
         * 不过事务传播默认是TransactionDefinition.PROPAGATION_REQUIRED,所以还是同一个事务中,
         * 只是事务范围扩大至stateMachine的持久化场景了,不要修改默认的传播机制
         *
         * @param request
         * @return
         * @throws BusinessException
         */
        @Override
        @Transactional(value = "finOrderocTransactionManager", rollbackFor = {BusinessException.class, Exception.class})
        public OrderBaseResponse sendStatusChangeEvent(BizOrderStatusRequest request,
                                                       BizOrderOperationTypeEnum operationTypeEnum,
                                                       BizOrderStatusChangeEventEnum eventEnum) throws Exception {
    
            // 获取状态机信息
            StateMachine<BizOrderStatusEnum, BizOrderStatusChangeEventEnum> stateMachine =
                    getStateMachineFromStatusReq(request, operationTypeEnum);
    
    
            boolean result = statusChangeCommonOps(stateMachine, request, eventEnum);
    
            OrderBaseResponse resp = new OrderBaseResponse();
            if (!result) {
                resp.setResultCode(BizOrderErrorCode.ORDER_STATE_MACHINE_EXECUTE_ERR.name());
                resp.setMsg("订单状态操作异常");
            }
    
            log.info("order status change resp is {}", resp);
    
            // 更新redis中数据
            // 发送event写log的动作还是放在业务里面,这里无法囊括所有业务数据
            if (result) {
                TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
                    @Override
                    public void afterCommit() {
                        // 将数据持久化到redis中,以bizOrderId作为对应Key
                        try {
                            bizOrderRedisStateMachinePersister.persist(stateMachine, request.getBizCode());
                        } catch (Exception e) {
                            log.error("Persist bizOrderStateMachine error", e);
                        }
                    }
                });
            }
    
            return resp;
        }
    
        /**
         * 同上,不过是用于订单创建场景,请求为BizOrderCreateRequest
         *
         * @param bizOrderCreateRequest
         * @param operationTypeEnum
         * @param eventEnum
         * @return
         * @throws Exception
         */
        @Override
        @Transactional(value = "finOrderocTransactionManager", rollbackFor = {BusinessException.class, Exception.class})
        public BizOrderCreateResponse sendOrderCreateEvent(BizOrderCreateRequest bizOrderCreateRequest,
                                                           BizOrderOperationTypeEnum operationTypeEnum,
                                                           BizOrderStatusChangeEventEnum eventEnum) throws Exception {
    
    
            // 获取对应的stateMachine
            StateMachine<BizOrderStatusEnum, BizOrderStatusChangeEventEnum> stateMachine =
                    getStateMachineFromCreateReq(bizOrderCreateRequest, operationTypeEnum);
    
            Message<BizOrderStatusChangeEventEnum> eventMsg = MessageBuilder.withPayload(eventEnum)
                    // key 与 status change 时不同,对应的model也不同
                    .setHeader(BizOrderConstants.BIZORDER_CONTEXT_CREATE_KEY, bizOrderCreateRequest)
                    // 根据传递过来的订单状态决定后续choice跳转
                    .setHeader(BizOrderConstants.FINAL_STATUS_KEY, bizOrderCreateRequest.getBizOrderCreateModel().getOrderStatus())
                    .build();
    
            BizOrderCreateResponse createResponse = new BizOrderCreateResponse();
    
            boolean sendResult = false;
    
            if (BizOrderStateMachineUtils.acceptEvent(stateMachine, eventMsg)) {
                sendResult = stateMachine.sendEvent(eventMsg);
                log.info("order statemachine send event={},result={}", eventMsg, sendResult);
            } else {
                createResponse.setResultCode(BizOrderErrorCode.NO_ORDER_STATE_MACHINE_TRANSTION_ERR.name());
                createResponse.setMsg("当前订单无法执行请求动作");
            }
    
            if (sendResult) {
                createResponse.setBizOrderId(bizOrderCreateRequest.getBizOrderCreateModel().getBizOrderId());
                TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
                    @Override
                    public void afterCommit() {
                        // 将数据持久化到redis中,以bizOrderId作为对应Key
                        try {
                            bizOrderRedisStateMachinePersister.persist(stateMachine,
                                    createResponse.getBizOrderId());
                        } catch (Exception e) {
                            throw new BusinessException(BizOrderErrorCode.ORDER_STATE_MACHINE_EXECUTE_ERR, "状态机持久化失败");
                        }
                    }
                });
            }
    
    
            return createResponse;
        }
    
        /**
         * 状态处理的通用操作抽取
         *
         * @param stateMachine  状态机
         * @param statusRequest 状态变更请求
         * @return 执行结果
         * @throws Exception 异常
         */
        private boolean statusChangeCommonOps(
                StateMachine<BizOrderStatusEnum, BizOrderStatusChangeEventEnum> stateMachine,
                BizOrderStatusRequest statusRequest,
                BizOrderStatusChangeEventEnum eventEnum) {
    
            log.info("order statemachine send event={}", eventEnum);
    
    
            // 执行引擎,sendEvent,result为执行结果,通过actionListener跳转到对应的Action
            Message<BizOrderStatusChangeEventEnum> eventMsg = MessageBuilder.
                    withPayload(eventEnum)
                    .setHeader(BizOrderConstants.BIZORDER_CONTEXT_KEY, statusRequest)
                    // 只有在需要判断(choice)的场景才用得到,guard实现中使用
                    .setHeader(BizOrderConstants.FINAL_STATUS_KEY, statusRequest.getBizOrderStatusModel().getTargetOrderStatus())
                    .build();
    
            // 取到对应的状态机,判断是否可以执行
            boolean result = false;
    
            // 状态机的当前状态,只有在执行结束后才会变化,也就是节点对应的action执行完才会变更
            // 所以在result=true的情况下,更新状态机的持久化状态才有效
            if (BizOrderStateMachineUtils.acceptEvent(stateMachine, eventMsg)) {
                result = stateMachine.sendEvent(eventMsg);
                log.info("order statemachine send event={},result={}", eventMsg, result);
            } else {
                throw new BusinessException(BizOrderErrorCode.NO_ORDER_STATE_MACHINE_TRANSTION_ERR, "当前订单无法执行请求动作");
            }
            return result;
    
        }
    
        /**
         * 从statusRequest中获取statemachine实例
         *
         * @param statusRequest     状态请求
         * @param operationTypeEnum 操作类型
         * @return 状态机实例
         * @throws Exception 异常
         */
        private StateMachine<BizOrderStatusEnum, BizOrderStatusChangeEventEnum>
        getStateMachineFromStatusReq(BizOrderStatusRequest statusRequest,
                                     BizOrderOperationTypeEnum operationTypeEnum) throws Exception {
            log.info("Order status change request={},operationType={}", statusRequest, operationTypeEnum);
    
            if (!StringUtils.equals(statusRequest.getBizCode(), statusRequest.getBizOrderStatusModel().getBizOrderId())) {
                throw new BusinessException(BizOrderErrorCode.ORDER_COMMON_ILLEGAL_ARGUMENT, "请求数据异常");
            }
    
            // 查询订单,判断请求数据是否合法
            BizOrder bizOrder = bizOrderRepository.selectByBizPrimaryKey(statusRequest.getBizCode());
            if (null == bizOrder
                    || !StringUtils.equals(bizOrder.getBizType(), statusRequest.getBizOrderStatusModel().getBizType())
                    || !StringUtils.equals(bizOrder.getOrderStatus(), statusRequest.getBizOrderStatusModel().getCurrentOrderStatus())
                    ) {
                throw new BusinessException(BizOrderErrorCode.ORDER_COMMON_ILLEGAL_ARGUMENT, "请求数据与订单实际数据不符");
            }
    
            // 构造状态机模板
            StateMachine<BizOrderStatusEnum, BizOrderStatusChangeEventEnum> srcStateMachine =
                    bizOrderStateMachineBuildFactory.createStateMachine(statusRequest.getBizOrderStatusModel().getBizType(),
                            statusRequest.getBizOrderStatusModel().getSubBizType());
    
            // 从redis中获取对应的statemachine,并判断当前节点是否可以满足,如果无法从redis中获取对应的的statemachine,则取自DB
            StateMachine<BizOrderStatusEnum, BizOrderStatusChangeEventEnum> stateMachine
                    = bizOrderRedisStateMachinePersister.restore(srcStateMachine, statusRequest.getBizCode());
    
            // 由于DB中已持久化,基本上不太可能出现null的情况,目前唯一能想到会出现的情况就是缓存击穿,先抛错
            if (null == stateMachine) {
                throw new BusinessException(BizOrderErrorCode.NO_CORRESPONDING_STATEMACHINE_ERR, "不存在订单对应的状态机实例");
            }
            log.info("order stateMachine info is {}", srcStateMachine);
    
            return stateMachine;
        }
    
        /**
         * 获取statemachine实例
         *
         * @param createRequest     状态请求
         * @param operationTypeEnum 操作类型
         * @return 状态机实例
         * @throws Exception 异常
         */
        private StateMachine<BizOrderStatusEnum, BizOrderStatusChangeEventEnum> getStateMachineFromCreateReq(BizOrderCreateRequest createRequest,
                                                                                                             BizOrderOperationTypeEnum operationTypeEnum) throws Exception {
            log.info("Order create request={},operationType={}", createRequest, operationTypeEnum);
    
            // 构造状态机模板
            StateMachine<BizOrderStatusEnum, BizOrderStatusChangeEventEnum> srcStateMachine =
                    bizOrderStateMachineBuildFactory.createStateMachine(createRequest.getBizOrderCreateModel().getBizType(),
                            createRequest.getBizOrderCreateModel().getSubBizType());
    
            if (null == srcStateMachine) {
                throw new BusinessException(BizOrderErrorCode.NO_CORRESPONDING_STATEMACHINE_ERR, "不存在订单对应的状态机实例");
            }
    
            // 如果是sign,表示订单已存在,需要额外判断并restore状态机;如果是直接create,则不需要处理这些判断
            if (StringUtils.equalsIgnoreCase(BizOrderOperationTypeEnum.SIGN.getOperationType(),
                    createRequest.getOperationType())) {
                if (!StringUtils.equals(createRequest.getBizCode(), createRequest.getBizOrderCreateModel().getBizOrderId())) {
                    throw new BusinessException(BizOrderErrorCode.ORDER_COMMON_ILLEGAL_ARGUMENT, "请求数据异常");
                }
    
                // 查询订单,判断请求数据是否合法
                BizOrder bizOrder = bizOrderRepository.selectByBizPrimaryKey(createRequest.getBizCode());
                if (null == bizOrder
                        || !StringUtils.equals(bizOrder.getBizType(), createRequest.getBizOrderCreateModel().getBizType())
                        ) {
                    throw new BusinessException(BizOrderErrorCode.ORDER_COMMON_ILLEGAL_ARGUMENT, "请求数据与订单实际数据不符");
                }
    
                // 从redis中获取对应的statemachine,并判断当前节点是否可以满足,如果无法从redis中获取对应的的statemachine,则取自DB
                StateMachine<BizOrderStatusEnum, BizOrderStatusChangeEventEnum> stateMachine
                        = bizOrderRedisStateMachinePersister.restore(srcStateMachine, createRequest.getBizOrderCreateModel().getBizOrderId());
    
                // 由于DB中已持久化,基本上不太可能出现null的情况,目前唯一能想到会出现的情况就是缓存击穿,先抛错
                if (null == stateMachine) {
                    throw new BusinessException(BizOrderErrorCode.NO_CORRESPONDING_STATEMACHINE_ERR, "不存在订单对应的状态机实例");
                }
    
                return stateMachine;
            }
    
            log.info("order stateMachine info is {}", srcStateMachine);
    
            return srcStateMachine;
        }
    }
    

    这里需要关注BizOrderStateMachineUtils.acceptEvent,相当于在执行之前判断是否可以执行,其实sendEvent中存在一样判断是否可执行的代码,不过这里抽取出来做了一个事前判断,实现如下:

    import org.springframework.messaging.Message;
    import org.springframework.statemachine.StateMachine;
    import org.springframework.statemachine.state.State;
    import org.springframework.statemachine.support.StateMachineUtils;
    import org.springframework.statemachine.transition.Transition;
    import org.springframework.statemachine.trigger.DefaultTriggerContext;
    import org.springframework.statemachine.trigger.Trigger;
    
    public class BizOrderStateMachineUtils {
    
        /**
         * 判断是否可以执行对应的event
         *
         * @param stateMachine
         * @param eventMsg
         * @return
         */
        public static boolean acceptEvent(StateMachine<BizOrderStatusEnum, BizOrderStatusChangeEventEnum> stateMachine,
                                          Message<BizOrderStatusChangeEventEnum> eventMsg) {
            State<BizOrderStatusEnum, BizOrderStatusChangeEventEnum> cs = stateMachine.getState();
    
            for (Transition<BizOrderStatusEnum, BizOrderStatusChangeEventEnum> transition : stateMachine.getTransitions()) {
                State<BizOrderStatusEnum, BizOrderStatusChangeEventEnum> source = transition.getSource();
                Trigger<BizOrderStatusEnum, BizOrderStatusChangeEventEnum> trigger = transition.getTrigger();
    
                if (cs != null && StateMachineUtils.containsAtleastOne(source.getIds(), cs.getIds())) {
                    if (trigger != null && trigger.evaluate(new DefaultTriggerContext<>(eventMsg.getPayload()))) {
                        return true;
                    }
                }
            }
            return false;
        }
    
    }
    
    

    其他说明:

    1. 关注getStateMachineFromXXX,这里其实就是调用了builderFactory创建了对应的实例。
    2. 还有个遗留问题,即每次从redis/db中restore时,都需要生成一个最初状态的状态机实例,然后传入进去,转化成当前状态,这种方式可能会造成比较大的资源消耗。
    3. 在sendXXXEvent方法上,都加上了@Transactional注解,这里代码中的注释也说的很清楚,会跟AbstractBizManager中的@Tranactional形成嵌套,所以事务传播方式只能是默认的TransactionDefinition.PROPAGATION_REQUIRED,不能修改,否则会问题。
    4. 至于再外部调用,直接调用StateMachineSendEventManager即可,与状态机无关了。
    5. 至此,完结。

    相关文章

      网友评论

        本文标题:SpringStateMachine之七-外部调用&事务

        本文链接:https://www.haomeiwen.com/subject/flyorqtx.html