美文网首页spring statemachine
SpringStateMachine之七-外部调用&事务

SpringStateMachine之七-外部调用&事务

作者: AlanKim | 来源:发表于2019-01-10 10:01 被阅读0次

说明

前面基本上涵盖了一个项目中配置简单状态机的相关实现,不过还有一个关键点,就是外部代码如何调用状态机,以及如何让状态机的持久化与业务逻辑代码在同一个事务中,避免状态机中状态与实际订单中不一致,造成脏数据。

调用方式

外部调用状态机引擎,需要以下三步:

  1. 通过创建/读取的方式获取当前订单对应的状态机引擎实例
  2. 构造message。
  3. 发送message。

需要注意以下几点:

  1. 在状态机发送完message之后,spring statemachine会通过ActionListener来监听,同时判断需要走到哪个Action中
  2. 只有在sendMessage完成之后,状态机的当前状态才会更新为target状态

所以对于调用状态机,做了以下代码封装:

接口:
/**
 * 存在状态机做串联时,统一的事务处理,将状态机实例持久化也囊括在统一的事务中
 */
public interface StateMachineSendEventManager {

    /**
     * 发送状态机event,调用bizManagerImpl中具体实现,同时处理状态机持久化
     * <p>
     * 用于订单的状态变更
     *
     * @param request
     * @param operationTypeEnum
     * @param eventEnum
     * @return
     * @throws BusinessException
     */
    OrderBaseResponse sendStatusChangeEvent(BizOrderStatusRequest request,
                                            BizOrderOperationTypeEnum operationTypeEnum,
                                            BizOrderStatusChangeEventEnum eventEnum) throws Exception;


    /**
     * 同上,不过是用于订单创建场景
     *
     * @param request
     * @param operationTypeEnum
     * @param eventEnum
     * @return
     * @throws Exception
     */
    BizOrderCreateResponse sendOrderCreateEvent(BizOrderCreateRequest request,
                                                BizOrderOperationTypeEnum operationTypeEnum,
                                                BizOrderStatusChangeEventEnum eventEnum) throws Exception;

}
对应实现
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.statemachine.StateMachine;
import org.springframework.statemachine.persist.StateMachinePersister;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.TransactionSynchronizationAdapter;
import org.springframework.transaction.support.TransactionSynchronizationManager;

@Slf4j
@Component("stateMachineSendEventManager")
public class StateMachineSendEventManagerImpl implements StateMachineSendEventManager {

    @Autowired
    private BizOrderRepository bizOrderRepository;

    @Autowired
    private BizOrderStateMachineBuildFactory bizOrderStateMachineBuildFactory;

    @Autowired
    @Qualifier("bizOrderRedisStateMachinePersister")
    private StateMachinePersister<BizOrderStatusEnum,BizOrderStatusChangeEventEnum,String> bizOrderRedisStateMachinePersister;

    /**
     * 发送状态机event,调用bizManagerImpl中具体实现,同时处理状态机持久化
     * <p>
     * 这里会send stateMachine event,从而跳转到对应的action --> bizManagerImpl,出现事务嵌套的情况
     * <p>
     * 不过事务传播默认是TransactionDefinition.PROPAGATION_REQUIRED,所以还是同一个事务中,
     * 只是事务范围扩大至stateMachine的持久化场景了,不要修改默认的传播机制
     *
     * @param request
     * @return
     * @throws BusinessException
     */
    @Override
    @Transactional(value = "finOrderocTransactionManager", rollbackFor = {BusinessException.class, Exception.class})
    public OrderBaseResponse sendStatusChangeEvent(BizOrderStatusRequest request,
                                                   BizOrderOperationTypeEnum operationTypeEnum,
                                                   BizOrderStatusChangeEventEnum eventEnum) throws Exception {

        // 获取状态机信息
        StateMachine<BizOrderStatusEnum, BizOrderStatusChangeEventEnum> stateMachine =
                getStateMachineFromStatusReq(request, operationTypeEnum);


        boolean result = statusChangeCommonOps(stateMachine, request, eventEnum);

        OrderBaseResponse resp = new OrderBaseResponse();
        if (!result) {
            resp.setResultCode(BizOrderErrorCode.ORDER_STATE_MACHINE_EXECUTE_ERR.name());
            resp.setMsg("订单状态操作异常");
        }

        log.info("order status change resp is {}", resp);

        // 更新redis中数据
        // 发送event写log的动作还是放在业务里面,这里无法囊括所有业务数据
        if (result) {
            TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
                @Override
                public void afterCommit() {
                    // 将数据持久化到redis中,以bizOrderId作为对应Key
                    try {
                        bizOrderRedisStateMachinePersister.persist(stateMachine, request.getBizCode());
                    } catch (Exception e) {
                        log.error("Persist bizOrderStateMachine error", e);
                    }
                }
            });
        }

        return resp;
    }

    /**
     * 同上,不过是用于订单创建场景,请求为BizOrderCreateRequest
     *
     * @param bizOrderCreateRequest
     * @param operationTypeEnum
     * @param eventEnum
     * @return
     * @throws Exception
     */
    @Override
    @Transactional(value = "finOrderocTransactionManager", rollbackFor = {BusinessException.class, Exception.class})
    public BizOrderCreateResponse sendOrderCreateEvent(BizOrderCreateRequest bizOrderCreateRequest,
                                                       BizOrderOperationTypeEnum operationTypeEnum,
                                                       BizOrderStatusChangeEventEnum eventEnum) throws Exception {


        // 获取对应的stateMachine
        StateMachine<BizOrderStatusEnum, BizOrderStatusChangeEventEnum> stateMachine =
                getStateMachineFromCreateReq(bizOrderCreateRequest, operationTypeEnum);

        Message<BizOrderStatusChangeEventEnum> eventMsg = MessageBuilder.withPayload(eventEnum)
                // key 与 status change 时不同,对应的model也不同
                .setHeader(BizOrderConstants.BIZORDER_CONTEXT_CREATE_KEY, bizOrderCreateRequest)
                // 根据传递过来的订单状态决定后续choice跳转
                .setHeader(BizOrderConstants.FINAL_STATUS_KEY, bizOrderCreateRequest.getBizOrderCreateModel().getOrderStatus())
                .build();

        BizOrderCreateResponse createResponse = new BizOrderCreateResponse();

        boolean sendResult = false;

        if (BizOrderStateMachineUtils.acceptEvent(stateMachine, eventMsg)) {
            sendResult = stateMachine.sendEvent(eventMsg);
            log.info("order statemachine send event={},result={}", eventMsg, sendResult);
        } else {
            createResponse.setResultCode(BizOrderErrorCode.NO_ORDER_STATE_MACHINE_TRANSTION_ERR.name());
            createResponse.setMsg("当前订单无法执行请求动作");
        }

        if (sendResult) {
            createResponse.setBizOrderId(bizOrderCreateRequest.getBizOrderCreateModel().getBizOrderId());
            TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
                @Override
                public void afterCommit() {
                    // 将数据持久化到redis中,以bizOrderId作为对应Key
                    try {
                        bizOrderRedisStateMachinePersister.persist(stateMachine,
                                createResponse.getBizOrderId());
                    } catch (Exception e) {
                        throw new BusinessException(BizOrderErrorCode.ORDER_STATE_MACHINE_EXECUTE_ERR, "状态机持久化失败");
                    }
                }
            });
        }


        return createResponse;
    }

    /**
     * 状态处理的通用操作抽取
     *
     * @param stateMachine  状态机
     * @param statusRequest 状态变更请求
     * @return 执行结果
     * @throws Exception 异常
     */
    private boolean statusChangeCommonOps(
            StateMachine<BizOrderStatusEnum, BizOrderStatusChangeEventEnum> stateMachine,
            BizOrderStatusRequest statusRequest,
            BizOrderStatusChangeEventEnum eventEnum) {

        log.info("order statemachine send event={}", eventEnum);


        // 执行引擎,sendEvent,result为执行结果,通过actionListener跳转到对应的Action
        Message<BizOrderStatusChangeEventEnum> eventMsg = MessageBuilder.
                withPayload(eventEnum)
                .setHeader(BizOrderConstants.BIZORDER_CONTEXT_KEY, statusRequest)
                // 只有在需要判断(choice)的场景才用得到,guard实现中使用
                .setHeader(BizOrderConstants.FINAL_STATUS_KEY, statusRequest.getBizOrderStatusModel().getTargetOrderStatus())
                .build();

        // 取到对应的状态机,判断是否可以执行
        boolean result = false;

        // 状态机的当前状态,只有在执行结束后才会变化,也就是节点对应的action执行完才会变更
        // 所以在result=true的情况下,更新状态机的持久化状态才有效
        if (BizOrderStateMachineUtils.acceptEvent(stateMachine, eventMsg)) {
            result = stateMachine.sendEvent(eventMsg);
            log.info("order statemachine send event={},result={}", eventMsg, result);
        } else {
            throw new BusinessException(BizOrderErrorCode.NO_ORDER_STATE_MACHINE_TRANSTION_ERR, "当前订单无法执行请求动作");
        }
        return result;

    }

    /**
     * 从statusRequest中获取statemachine实例
     *
     * @param statusRequest     状态请求
     * @param operationTypeEnum 操作类型
     * @return 状态机实例
     * @throws Exception 异常
     */
    private StateMachine<BizOrderStatusEnum, BizOrderStatusChangeEventEnum>
    getStateMachineFromStatusReq(BizOrderStatusRequest statusRequest,
                                 BizOrderOperationTypeEnum operationTypeEnum) throws Exception {
        log.info("Order status change request={},operationType={}", statusRequest, operationTypeEnum);

        if (!StringUtils.equals(statusRequest.getBizCode(), statusRequest.getBizOrderStatusModel().getBizOrderId())) {
            throw new BusinessException(BizOrderErrorCode.ORDER_COMMON_ILLEGAL_ARGUMENT, "请求数据异常");
        }

        // 查询订单,判断请求数据是否合法
        BizOrder bizOrder = bizOrderRepository.selectByBizPrimaryKey(statusRequest.getBizCode());
        if (null == bizOrder
                || !StringUtils.equals(bizOrder.getBizType(), statusRequest.getBizOrderStatusModel().getBizType())
                || !StringUtils.equals(bizOrder.getOrderStatus(), statusRequest.getBizOrderStatusModel().getCurrentOrderStatus())
                ) {
            throw new BusinessException(BizOrderErrorCode.ORDER_COMMON_ILLEGAL_ARGUMENT, "请求数据与订单实际数据不符");
        }

        // 构造状态机模板
        StateMachine<BizOrderStatusEnum, BizOrderStatusChangeEventEnum> srcStateMachine =
                bizOrderStateMachineBuildFactory.createStateMachine(statusRequest.getBizOrderStatusModel().getBizType(),
                        statusRequest.getBizOrderStatusModel().getSubBizType());

        // 从redis中获取对应的statemachine,并判断当前节点是否可以满足,如果无法从redis中获取对应的的statemachine,则取自DB
        StateMachine<BizOrderStatusEnum, BizOrderStatusChangeEventEnum> stateMachine
                = bizOrderRedisStateMachinePersister.restore(srcStateMachine, statusRequest.getBizCode());

        // 由于DB中已持久化,基本上不太可能出现null的情况,目前唯一能想到会出现的情况就是缓存击穿,先抛错
        if (null == stateMachine) {
            throw new BusinessException(BizOrderErrorCode.NO_CORRESPONDING_STATEMACHINE_ERR, "不存在订单对应的状态机实例");
        }
        log.info("order stateMachine info is {}", srcStateMachine);

        return stateMachine;
    }

    /**
     * 获取statemachine实例
     *
     * @param createRequest     状态请求
     * @param operationTypeEnum 操作类型
     * @return 状态机实例
     * @throws Exception 异常
     */
    private StateMachine<BizOrderStatusEnum, BizOrderStatusChangeEventEnum> getStateMachineFromCreateReq(BizOrderCreateRequest createRequest,
                                                                                                         BizOrderOperationTypeEnum operationTypeEnum) throws Exception {
        log.info("Order create request={},operationType={}", createRequest, operationTypeEnum);

        // 构造状态机模板
        StateMachine<BizOrderStatusEnum, BizOrderStatusChangeEventEnum> srcStateMachine =
                bizOrderStateMachineBuildFactory.createStateMachine(createRequest.getBizOrderCreateModel().getBizType(),
                        createRequest.getBizOrderCreateModel().getSubBizType());

        if (null == srcStateMachine) {
            throw new BusinessException(BizOrderErrorCode.NO_CORRESPONDING_STATEMACHINE_ERR, "不存在订单对应的状态机实例");
        }

        // 如果是sign,表示订单已存在,需要额外判断并restore状态机;如果是直接create,则不需要处理这些判断
        if (StringUtils.equalsIgnoreCase(BizOrderOperationTypeEnum.SIGN.getOperationType(),
                createRequest.getOperationType())) {
            if (!StringUtils.equals(createRequest.getBizCode(), createRequest.getBizOrderCreateModel().getBizOrderId())) {
                throw new BusinessException(BizOrderErrorCode.ORDER_COMMON_ILLEGAL_ARGUMENT, "请求数据异常");
            }

            // 查询订单,判断请求数据是否合法
            BizOrder bizOrder = bizOrderRepository.selectByBizPrimaryKey(createRequest.getBizCode());
            if (null == bizOrder
                    || !StringUtils.equals(bizOrder.getBizType(), createRequest.getBizOrderCreateModel().getBizType())
                    ) {
                throw new BusinessException(BizOrderErrorCode.ORDER_COMMON_ILLEGAL_ARGUMENT, "请求数据与订单实际数据不符");
            }

            // 从redis中获取对应的statemachine,并判断当前节点是否可以满足,如果无法从redis中获取对应的的statemachine,则取自DB
            StateMachine<BizOrderStatusEnum, BizOrderStatusChangeEventEnum> stateMachine
                    = bizOrderRedisStateMachinePersister.restore(srcStateMachine, createRequest.getBizOrderCreateModel().getBizOrderId());

            // 由于DB中已持久化,基本上不太可能出现null的情况,目前唯一能想到会出现的情况就是缓存击穿,先抛错
            if (null == stateMachine) {
                throw new BusinessException(BizOrderErrorCode.NO_CORRESPONDING_STATEMACHINE_ERR, "不存在订单对应的状态机实例");
            }

            return stateMachine;
        }

        log.info("order stateMachine info is {}", srcStateMachine);

        return srcStateMachine;
    }
}

这里需要关注BizOrderStateMachineUtils.acceptEvent,相当于在执行之前判断是否可以执行,其实sendEvent中存在一样判断是否可执行的代码,不过这里抽取出来做了一个事前判断,实现如下:

import org.springframework.messaging.Message;
import org.springframework.statemachine.StateMachine;
import org.springframework.statemachine.state.State;
import org.springframework.statemachine.support.StateMachineUtils;
import org.springframework.statemachine.transition.Transition;
import org.springframework.statemachine.trigger.DefaultTriggerContext;
import org.springframework.statemachine.trigger.Trigger;

public class BizOrderStateMachineUtils {

    /**
     * 判断是否可以执行对应的event
     *
     * @param stateMachine
     * @param eventMsg
     * @return
     */
    public static boolean acceptEvent(StateMachine<BizOrderStatusEnum, BizOrderStatusChangeEventEnum> stateMachine,
                                      Message<BizOrderStatusChangeEventEnum> eventMsg) {
        State<BizOrderStatusEnum, BizOrderStatusChangeEventEnum> cs = stateMachine.getState();

        for (Transition<BizOrderStatusEnum, BizOrderStatusChangeEventEnum> transition : stateMachine.getTransitions()) {
            State<BizOrderStatusEnum, BizOrderStatusChangeEventEnum> source = transition.getSource();
            Trigger<BizOrderStatusEnum, BizOrderStatusChangeEventEnum> trigger = transition.getTrigger();

            if (cs != null && StateMachineUtils.containsAtleastOne(source.getIds(), cs.getIds())) {
                if (trigger != null && trigger.evaluate(new DefaultTriggerContext<>(eventMsg.getPayload()))) {
                    return true;
                }
            }
        }
        return false;
    }

}

其他说明:

  1. 关注getStateMachineFromXXX,这里其实就是调用了builderFactory创建了对应的实例。
  2. 还有个遗留问题,即每次从redis/db中restore时,都需要生成一个最初状态的状态机实例,然后传入进去,转化成当前状态,这种方式可能会造成比较大的资源消耗。
  3. 在sendXXXEvent方法上,都加上了@Transactional注解,这里代码中的注释也说的很清楚,会跟AbstractBizManager中的@Tranactional形成嵌套,所以事务传播方式只能是默认的TransactionDefinition.PROPAGATION_REQUIRED,不能修改,否则会问题。
  4. 至于再外部调用,直接调用StateMachineSendEventManager即可,与状态机无关了。
  5. 至此,完结。

相关文章

网友评论

    本文标题:SpringStateMachine之七-外部调用&事务

    本文链接:https://www.haomeiwen.com/subject/flyorqtx.html