现象:在页面上停止工作流时偶现depend类型节点无法正常结束,且后续无法停止;
一、页面操作stop工作流
api端代码
org.apache.dolphinscheduler.api.controller.ExecutorController#execute
image.png
org.apache.dolphinscheduler.api.service.impl.ExecutorServiceImpl#execute,构建READY_STOP事件并调用master端netty
image.png
image.png
updateProcessInstancePrepare方法在调用master端netty前会先更新工作流实例数据表
master端代码
1、main方法类在启动时注册netty处理器,其中stateEventProcessor用于处理状态变更事件。
this.nettyRemotingServer.registerProcessor(CommandType.STATE_EVENT_REQUEST, stateEventProcessor);
2、stateEventProcessor处理器封装StateEvent事件后调用StateEventResponseService.addResoponse方法,将事件放入eventQueue队列,并在奔雷的线程中消费队列中的事件;
image.png
image.png
image.png
3、persist方法将待处理的事件增加到工作流实例的stateEvents队列中,并在处理线程中持续处理事件;
image.png image.png
4、当事件为PROCESS_STATE_CHANGE时调用processStateChangeHandler方法
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread#processStateChangeHandler
image.png
页面停止工作流时传入的processInstance state为READY_STOP,对应执行killAllTasks()方法;
killAllTasks方法中遍历当前工作流正在运行的任务,Depenent类型的处理器为DependentTaskProcessor类,该类的killTask()方法只设置了实例的值却没有执行更新数据库动作,此处不完善,真正触发保存数据库的地方是该类的persitTask()方法,该方法被调用的链路为:
WorkflowExecuteThread.killAllTasks()构建TaskResponseEvent并添加到TaskResponseService的eventQueue队列----> TaskResponseService的线程消费队列,再将事件添加到TaskResopnsePersistThread的events队列--->TaskResopnsePersistThread的线程消费队列,执行persist方法中的ACTION_STOP分支,在该分支中调用TaskProcessor的persist方法,persist方法再调用persistTask方法。
二、造成dependent节点偶现无法停止的原因分析
StateWheelExecuteThread线程类的checkTask4Retry方法将未结束且类型为depend的任务实例构造StateEventType.TASK_STATE_CHANGE类型事件并发送给WorkflowExecuteThread,关键执行方法为taskStateChangeHandler,当判断任务实例完成时,调用的taskFinished方法会将当前任务从activeTaskProcessorMaps中删除, 但在TaskResopnsePersistThread更新任务实例表前会从activeTaskProcessorMaps获取处理器,如果获取不到则无法触发数据库更新动作;
总结:下图一和二分别运行在两个线程中, 当图一运行顺序早于图二时则会导致停止工作流时无法停止depend类型节点。
image.png
image.png
三、解决方式
DependentTaskProcessor.killTask()增加持久化数据库操作
或
任务实例页面增加手动杀死任务按钮
网友评论