美文网首页Netflix Conductor源码分析
Netflix Conductor 入门Example

Netflix Conductor 入门Example

作者: 吉祥噜噜 | 来源:发表于2021-12-14 11:48 被阅读0次

    通过命令行将Netflix Conductor Sever端启动之后( https://netflix.github.io/conductor/intro/#installing-and-running 介绍了如何安装Conductor),访问localhost:8080/swagger-ui.html地址显示如下页面:

    swagger.png
    访问localhost:5000地址显示的是ui页面
    ui.png

    我们假设有一个流程,该流程根据输入的城市名去查询该城市的天气,如果气温大于37度则发送一条短信通知指定的人。

    定义任务

    首先我们要定义一个查询天气的任务和一个发送短信的任务。通过postman或者swagger向conductor server提交这两个任务。代码如下

    curl --location --request POST 'http://localhost:8080/api/metadata/taskdefs' \
    --header 'Content-Type: application/json' \
    --data-raw '[
    {
      "name": "queryWeather",
      "inputKeys": [
        "city"
      ],
      "outputKeys": [
        "temperature"
      ],
      "retryCount": 3,
      "retryLogic": "FIXED",
      "retryDelaySeconds": 10,
      "timeoutSeconds": 300,
      "timeoutPolicy": "TIME_OUT_WF",
      "responseTimeoutSeconds": 180,
      "ownerEmail": "zengxc@do1.com.cn"  
    }
    ,
    {
      "name": "sendMessage",
      "inputKeys": [
        "receiver",
        "content"
      ],  
      "retryCount": 3,
      "retryLogic": "FIXED",
      "retryDelaySeconds": 10,
      "timeoutSeconds": 300,
      "timeoutPolicy": "TIME_OUT_WF",
      "responseTimeoutSeconds": 180,
      "ownerEmail": "zengxc@do1.com.cn"  
    }
    ]'
    

    提交成功后,可以在localhost:5000/taskDef页面看到


    taskDef.png

    编排任务

    定义完任务后,就可以进行任务编排了,代码如下

    curl --location --request POST 'http://localhost:8080/api/metadata/workflow' \
    --header 'Content-Type: application/json' \
    --data-raw '{
        "name": "weather_warning",
        "description": "send weather warning message",
        "version": 4,
        "schemaVersion": 2,
        "ownerEmail": "zengxc@do1.com.cn",
        "tasks": [
            {
                "name": "queryWeather",
                "taskReferenceName": "weather",
                "inputParameters": {
                    "city": "${workflow.input.city}"
                },
                "type": "SIMPLE"
            },
            {
                "name": "switch_task",
                "taskReferenceName": "is_warning",
                "inputParameters": {
                    "temperature": "${weather.output.temperature}"
                },
                "type": "SWITCH",
                "evaluatorType": "javascript",
                "expression": "$.temperature > 37 ? 'Warning' : ''",
                "decisionCases": {
                    "Warning": [
                        {
                            "name": "sendMessage",
                            "taskReferenceName": "message",
                            "inputParameters": {
                                "receiver": "${workflow.input.receiver}",
                                "content": "${workflow.input.city}气温为${weather.output.temperature}度,请注意防暑!"
                            },
                            "type": "SIMPLE"
                        }
                    ]
                }
            }
        ]
    }'
    

    提交成功后,可以在localhost:5000/workflowDef页面看到


    workflowDef.png workflowUi.png

    启动工作流

    curl --location --request POST 'http://localhost:8080/api/workflow' \
    --header 'Content-Type: application/json' \
    --data-raw '{
        "name": "weather_warning",
        "version": 4,
        "correlationId": "my_weather_warning_workflows",
        "input": {
            "receiver": "张三",
            "city": "广州"
        }
    }'
    

    启动成功后可以在localhost:5000看到在执行的任务,点击进入可以看到执行情况


    execute.png

    任务实现

    前面为了叙事流畅,没有介绍任务的实现。如果没有对应任务实现,上面启动流程后,流程是不会往下执行,它会等待第一个任务的响应。下面用Spring Boot实现 conductor client(worker)

    在pom.xml引入

    <!-- https://mvnrepository.com/artifact/com.netflix.conductor/conductor-client-spring -->
    <dependency>
        <groupId>com.netflix.conductor</groupId>
        <artifactId>conductor-client-spring</artifactId>
        <version>3.3.6</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/com.netflix.conductor/conductor-common -->
    <dependency>
        <groupId>com.netflix.conductor</groupId>
        <artifactId>conductor-common</artifactId>
        <version>3.3.6</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/com.netflix.conductor/conductor-client -->
    <dependency>
        <groupId>com.netflix.conductor</groupId>
        <artifactId>conductor-client</artifactId>
        <version>3.3.6</version>
    </dependency>
    

    在配置文件application.properties加上配置

    conductor.worker.pollingInterval=2
    conductor.client.rootURI=http://localhost:8080/api/
    conductor.client.threadCount=2
    

    QueryWeatherWorker

    import com.netflix.conductor.client.worker.Worker;
    import com.netflix.conductor.common.metadata.tasks.Task;
    import com.netflix.conductor.common.metadata.tasks.TaskResult;
    import org.springframework.stereotype.Component;
    
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * @author zengxc
     */
    @Component
    public class QueryWeatherWorker implements Worker {
        Map<String, Integer> cityTemp = new HashMap<>();
        private final String taskDefName = "queryWeather";
    
        public QueryWeatherWorker() {
            cityTemp.put("广州", 38);
            cityTemp.put("湖南", 18);
        }
    
        @Override
        public String getTaskDefName() {
            return taskDefName;
        }
    
        @Override
        public TaskResult execute(Task task) {
            System.out.printf("Executing %s%n", taskDefName);
            String city = (String) task.getInputData().get("city");
            System.out.println(city + " 气温:" + cityTemp.get(city));
    
            TaskResult result = new TaskResult(task);
            result.setStatus(TaskResult.Status.COMPLETED);
            //Register the output of the task
            result.getOutputData().put("temperature", cityTemp.get(city));
            result.log(city + " 气温:" + cityTemp.get(city));
            return result;
        }
    
    }
    
    

    SendMessageWorker

    import com.netflix.conductor.client.worker.Worker;
    import com.netflix.conductor.common.metadata.tasks.Task;
    import com.netflix.conductor.common.metadata.tasks.TaskResult;
    import org.springframework.stereotype.Component;
    
    /**
     * @author zengxc
     */
    @Component
    public class SendMessageWorker implements Worker {
    
        private final String taskDefName="sendMessage";
    
        @Override
        public String getTaskDefName() {
            return taskDefName;
        }
    
        @Override
        public TaskResult execute(Task task) {
            System.out.printf("Executing %s\n", taskDefName);
            System.out.println("接收人:" + task.getInputData().get("receiver")+" "+task.getInputData().get("content"));
    
            TaskResult result = new TaskResult(task);
            result.setStatus(TaskResult.Status.COMPLETED);
            result.log("接收人:" + task.getInputData().get("receiver")+" "+task.getInputData().get("content"));
            return result;
        }
    }
    

    相关文章

      网友评论

        本文标题:Netflix Conductor 入门Example

        本文链接:https://www.haomeiwen.com/subject/qouefrtx.html