Knative Eventing 之 Sequence 介绍

作者: 阿里云云栖号 | 来源:发表于2019-08-05 13:37 被阅读22次

    在处理数据时,往往会涉及到一个数据需要进行多次加工,这时候我们一般是通过Pipeline的方式进行处理。那么在Knative Eventing中是否也能支持对一个事件进行分步骤多次处理? 这个还真有。从 0.7 版本开始,Knative Eventing中提供了一个 Sequence 资源模型,可用于事件Pipeline处理。

    Sequence 定义

    首先我们看一下Sequence Spec定义:

    apiVersion: messaging.knative.dev/v1alpha1
    kind: Sequence
    metadata:
      name: test
    spec:
      channelTemplate:
        apiVersion: messaging.knative.dev/v1alpha1
        kind: InMemoryChannel
      steps:
        - ref:
            apiVersion: serving.knative.dev/v1alpha1
            kind: Service
            name: test
      reply:
        kind: Broker
        apiVersion: eventing.knative.dev/v1alpha1
        name: test
    

    Sequence Spec包括3个部分:

    1. steps: 在step中定义了按照顺序执行的服务,每个服务会对应创建Subscription。
    2. channelTemplate:指定了使用具体的那个Channel
    3. reply:(可选)定义了最后一个step返回结果的响应目标

    在 Broker/Trigger 模型中使用 Sequence

    我们将创建以下逻辑配置。创建一个 cronjobsource,向 Broker 提供事件,然后创建一个 filter,将这些事件连接到由3个 step 组成的 Sequence 中。然后,我们获取最后的step返回结果事件发送给给Broker,并创建另一个 Trigger,该 Trigger 随后将显示事件结果。
    对于这个例子,这里设置一个 Broker 程序、一个 InMemoryChannel 以及一个 Knative Service(用于显示事件结果)。示例使用 default namespace。
    如果要使用不同类型的Channel,则需要修改sequence.spec.channeltemplate以创建对应的 Channel 资源。

    创建 Knative Service

    首先创建3个Knative Service,用于 Sequence 中服务处理

    apiVersion: serving.knative.dev/v1alpha1
    kind: Service
    metadata:
      name: first
    spec:
      template:
        spec:
          containers:
          - image: us.gcr.io/probable-summer-223122/cmd-03315b715ae8f3e08e3a9378df706fbb@sha256:2656f39a7fcb6afd9fc79e7a4e215d14d651dc674f38020d1d18c6f04b220700
                env:
                - name: STEP
                  value: "0"
    
    ---
    apiVersion: serving.knative.dev/v1alpha1
    kind: Service
    metadata:
      name: second
    spec:
      template:
        spec:
          containers:
          - image: us.gcr.io/probable-summer-223122/cmd-03315b715ae8f3e08e3a9378df706fbb@sha256:2656f39a7fcb6afd9fc79e7a4e215d14d651dc674f38020d1d18c6f04b220700
                env:
                - name: STEP
                  value: "1"
    ---
    apiVersion: serving.knative.dev/v1alpha1
    kind: Service
    metadata:
      name: third
    spec:
      template:
        spec:
          containers:
          - image: us.gcr.io/probable-summer-223122/cmd-03315b715ae8f3e08e3a9378df706fbb@sha256:2656f39a7fcb6afd9fc79e7a4e215d14d651dc674f38020d1d18c6f04b220700
                env:
                - name: STEP
                  value: "2"
    
    ---
    

    执行创建命令:

    kubectl -n default create -f ./steps.yaml
    

    创建 Sequence

    创建Sequence,这里依次顺序执行[first->second->third]这3个服务。将最终处理的结果发送到broker-test中。

    apiVersion: messaging.knative.dev/v1alpha1
    kind: Sequence
    metadata:
      name: sequence
    spec:
      channelTemplate:
        apiVersion: messaging.knative.dev/v1alpha1
        kind: InMemoryChannel
      steps:
        - ref:
            apiVersion: serving.knative.dev/v1alpha1
            kind: Service
            name: first
        - ref:
            apiVersion: serving.knative.dev/v1alpha1
            kind: Service
            name: second
        - ref:
            apiVersion: serving.knative.dev/v1alpha1
            kind: Service
            name: third
      reply:
        kind: Broker
        apiVersion: eventing.knative.dev/v1alpha1
        name: broker-test
    

    执行如下命令:

    kubectl -n default create -f ./sequence.yaml
    

    创建CronJobSource指向Broker

    这里将创建一个 cronjobsource,它将每2分钟发送一个{"message": "Hello world!"} 信息到 broker-test 中。

    apiVersion: sources.eventing.knative.dev/v1alpha1
    kind: CronJobSource
    metadata:
      name: cronjob-source
    spec:
      schedule: "*/2 * * * *"
      data: '{"message": "Hello world!"}'
      sink:
        apiVersion: eventing.knative.dev/v1alpha1
        kind: Broker
        name: broker-test
    

    执行命令如下:

    kubectl -n default create -f ./cron-source.yaml
    

    为Sequence创建Trigger

    创建订阅事件类型为:dev.knative.cronjob.event 的 Trigger, 用于Sequence 进行消费处理。

    apiVersion: eventing.knative.dev/v1alpha1
    kind: Trigger
    metadata:
      name: sequence-trigger
    spec:
      filter:
        sourceAndType:
          type: dev.knative.cronjob.event
      subscriber:
        ref:
          apiVersion: messaging.knative.dev/v1alpha1
          kind: Sequence
          name: sequence
    

    执行如下命令:

    kubectl -n default create -f ./trigger.yaml
    

    创建结果订阅 Trigger

    创建结果订阅 Trigger,订阅samples.http.mod3 的事件类型,对 sequence 执行的结果进行显示

    apiVersion: serving.knative.dev/v1alpha1
    kind: Service
    metadata:
      name: sequence-display
    spec:
      template:
        spec:
          containers:
            - image: gcr.io/knative-releases/github.com/knative/eventing-sources/cmd/event_display
    ---
    apiVersion: eventing.knative.dev/v1alpha1
    kind: Trigger
    metadata:
      name: sequence-trigger
    spec:
      filter:
        sourceAndType:
          type: samples.http.mod3
      subscriber:
        ref:
          apiVersion: serving.knative.dev/v1alpha1
          kind: Service
          name: sequence-display
    ---
    

    结论

    通过 Sequence 资源模型,我们很容易在 Knative Eventing 中实现事件处理的 Pipeline。对于需要多步骤处理的服务尤为适合。



    本文作者:元毅

    阅读原文

    本文为云栖社区原创内容,未经允许不得转载。

    相关文章

      网友评论

        本文标题:Knative Eventing 之 Sequence 介绍

        本文链接:https://www.haomeiwen.com/subject/lqppdctx.html