美文网首页
Apache Pulsar 之 Functions Debug

Apache Pulsar 之 Functions Debug

作者: wolf4j | 来源:发表于2019-06-20 21:38 被阅读0次

    Apache Pulsar 之 Functions Debug

    在之前的文章 Java FunctionsPython FunctionsGo Functions 中,向大家详细介绍了如何基于自己的场景快速编写并部署 Pulsar Functions。这篇文章,会介绍如何利用 Pulsar 提供的 CLI 对 Functions 进行 Debug。

    使用 LogTopic

    Pulsar Functions 在设计之初,就考虑到如何对 Pulsar Functions 进行 Debug。如下图所示,Pulsar Functions 允许用户将自己在 Functions 中定义好的日志信息输出到指定的 LogTopic 中,如果有遇到相关的问题需要从日志信息中获取,用户可以编写 consumer 从指定的 LogTopic 中消费消息,即可查看相关的日志信息。

    具体使用如下:

    import org.apache.pulsar.functions.api.Context;
    import org.apache.pulsar.functions.api.Function;
    import org.slf4j.Logger;
    
    public class LoggingFunction implements Function<String, Void> {
        @Override
        public void apply(String input, Context context) {
            Logger LOG = context.getLogger();
            String messageId = new String(context.getMessageId());
    
            if (input.contains("danger")) {
                LOG.warn("A warning was received in message {}", messageId);
            } else {
                LOG.info("Message {} received\nContent: {}", messageId, input);
            }
    
            return null;
        }
    }
    
    

    如上述代码所示,用户可以通过 context.getLogger() 获取 logger 对象并将其赋值给 slf4jLOG 字段,这样用户就可以使用 LOG 字段在 Pulsar Functions 中定义自己想要的日志信息。当你有日志信息需要输出时,在启动过程中,需要告诉 Pulsar Functions 你期望将日志信息输出到哪一个 topic 中,具体使用如下:

    $ bin/pulsar-admin functions create \
      --log-topic persistent://public/default/logging-function-logs \
      # Other function configs
    

    Functions CLI

    除了使用 LogTopic 通过自定义日志信息的方式来追踪 Functions 的问题之外,用户还以通过使用 Functions CLI 对 Pulsar Functions 进行相关的 Debug 工作。

    关于 Pulsar Functions 相关的 CLI,Pulsar 将其集成到 pulsar-admin 下,你可以使用 ./bin/pulsar-adminFunctions 告诉 Pulsar,你要执行和 Pulsar Functions 相关的操作。为了方便用户定位问题,Pulsar Functions 提供了如下几种 CLI:

    • get
    • stats
    • trigger
    • list
    • status

    get

    使用 get 命令可以获取当前 Pulsar Functions 的详细信息,它提供了如下参数列表供用户使用:

    • FQFN
    • name
    • tenant
    • namespace

    需要说明的是,FQFN(Fully Qualified Functions Name)是由 tenantnamespacename 这三个字段组成,它可以唯一标识一个 Pulsar Functions,用户在使用时,可以使用 FQFN 或者同时指定 tenantnamespacename 这三个字段。示例如下:

    $ ./bin/pulsar-admin functions get \
        --tenant public \
        --namespace default \
        --name ExclamationFunctio6
    

    执行如上命令之后,输出如下:

    {
      "tenant": "public",
      "namespace": "default",
      "name": "ExclamationFunctio6",
      "className": "org.example.test.ExclamationFunction",
      "inputSpecs": {
        "persistent://public/default/my-topic-1": {
          "isRegexPattern": false
        }
      },
      "output": "persistent://public/default/test-1",
      "processingGuarantees": "ATLEAST_ONCE",
      "retainOrdering": false,
      "userConfig": {},
      "runtime": "JAVA",
      "autoAck": true,
      "parallelism": 1
    }
    

    通过 get 命令,你可以获取到当前 Pulsar Functions 具体的输入、输出,运行的 rutime 是什么,当前 Functions 中有多少个 instance 在运行等信息。

    list

    list 命令可以获取当前 namespace 下有哪些 Functions 在运行,会返回运行的 Functions 的 name 列表,它提供了如下参数列表供用户使用:

    • tenant
    • namespace

    参考示例如下:

    $ ./bin/pulsar-admin functions list \
        --tenant public \
        --namespace default
    

    执行如上命令之后,输出如下:

    ExclamationFunctio1
    ExclamationFunctio2
    ExclamationFunctio3
    

    上述输出说明,在 default 这个 namespace 下,运行了三个 Pulsar Functions。

    trigger

    trigger 命令会模拟 Functions 执行的过程,可以用来验证当前 Functions 运行的整个链路是否正常,它提供了如下参数列表供用户使用:

    • FQFN
    • tenant
    • namespace
    • name
    • topic
    • trigger-file
    • trigger-value

    参考示例如下:

    $ ./bin/pulsar-admin functions trigger \
        --tenant public \
        --namespace default \
        --name ExclamationFunctio6 \
        --topic persistent://public/default/my-topic-1 \
        --trigger-value "hello pulsar functions"
    

    执行成功之后会输出 This is my function!

    注意:在使用 --topic 指定具体的 topic 时,需要使用 topic 的全称,否则会出现如下错误:

    Function in trigger function has unidentified topic
    
    Reason: Function in trigger function has unidentified topic
    

    status

    使用 status 命令可以查看当前 Functions 运行的状态信息,它提供了如下参数列表供用户使用:

    • FQFN
    • tenant
    • namespace
    • name
    • instance-id

    参考示例如下:

    $ ./bin/pulsar-admin functions status \
        --tenant public \
        --namespace default \
        --name ExclamationFunctio6 \
    

    执行上述命令后,输出如下:

    {
      "numInstances" : 1,
      "numRunning" : 1,
      "instances" : [ {
        "instanceId" : 0,
        "status" : {
          "running" : true,
          "error" : "",
          "numRestarts" : 0,
          "numReceived" : 1,
          "numSuccessfullyProcessed" : 1,
          "numUserExceptions" : 0,
          "latestUserExceptions" : [ ],
          "numSystemExceptions" : 0,
          "latestSystemExceptions" : [ ],
          "averageLatency" : 0.8385,
          "lastInvocationTime" : 1557734137987,
          "workerId" : "c-standalone-fw-23ccc88ef29b-8080"
        }
      } ]
    }
    

    可以看到在 status 命令中,你可以查看到当前 Functions 是否处于运行状态,总共接收了多少条消息,成功处理了多少条消息,系统发生过多少次错误,平均延迟为多少等。由于一个 Functions 可以运行多个 instance,所以可以通过 --instance-id 来具体查看某一个 instance 的详细信息。在上述示例中,在 ExclamationFunctio6 的 Functions 下只运行了一个 instance。

    stats

    stats 命令主要用来获取当前 Functions 的统计信息,它提供了如下参数列表供用户使用:

    • FQFN
    • tenant
    • namespace
    • name
    • instance-id

    参数列表的含义与 status 命令一致,参考示例如下:

    $ ./bin/pulsar-admin functions stats \
        --tenant public \
        --namespace default \
        --name ExclamationFunctio6 \
    

    执行上述命令后,输出如下:

    {
      "receivedTotal" : 1,
      "processedSuccessfullyTotal" : 1,
      "systemExceptionsTotal" : 0,
      "userExceptionsTotal" : 0,
      "avgProcessLatency" : 0.8385,
      "1min" : {
        "receivedTotal" : 0,
        "processedSuccessfullyTotal" : 0,
        "systemExceptionsTotal" : 0,
        "userExceptionsTotal" : 0,
        "avgProcessLatency" : null
      },
      "lastInvocation" : 1557734137987,
      "instances" : [ {
        "instanceId" : 0,
        "metrics" : {
          "receivedTotal" : 1,
          "processedSuccessfullyTotal" : 1,
          "systemExceptionsTotal" : 0,
          "userExceptionsTotal" : 0,
          "avgProcessLatency" : 0.8385,
          "1min" : {
            "receivedTotal" : 0,
            "processedSuccessfullyTotal" : 0,
            "systemExceptionsTotal" : 0,
            "userExceptionsTotal" : 0,
            "avgProcessLatency" : null
          },
          "lastInvocation" : 1557734137987,
          "userMetrics" : { }
        }
      } ]
    }
    

    运行 Pulsar Functions 出现问题时,你可以结合上述 Debug 命令和自定义的日志信息,对 Pulsar Functions 的问题进行很好的定位和分析。

    相关文章

      网友评论

          本文标题:Apache Pulsar 之 Functions Debug

          本文链接:https://www.haomeiwen.com/subject/trkmtctx.html