美文网首页
Pulsar-Function

Pulsar-Function

作者: LaxChan | 来源:发表于2019-12-16 18:33 被阅读0次

    参考链接

    笔记

    • 概述
      轻量级的计算方法,依赖于Pulsar消息系统,处理逻辑如下:
    1. 消费一个或多个Topic
    2. 每个消息都执行用户提供的处理逻辑
    3. 发布计算后的结果到另一个Topic
    

    目标:

    1. 开发人员开发效率
    2. 排查故障方便
    3. 操作简单(无需外部系统)
    

    灵感:

    1. 流式处理引擎(Apache Storm,Apache Heron,Apache Flink)
    2. "Serverless", "Function as a Service"(Amazon Web Services Lambda,Google Cloud Functions,Azure Cloud Functions)
    Function可如下描述:
    a. Lambda样式的函数
    b. 使用Pulsar为消息总线而设计
    

    程序设计模型:

    提供了广泛的功能,核心编程模型简单.
    Functions可以从一个或多个输入Topic中获取数据,并且对于每个收到的消息可以执行如下任务:
    1. 对输入消息做逻辑处理,并输出数据至指定的Topic或Bookkeeper
    2. 往日志Topic写入处理日志(用于调试)
    3. 递增计数器
    可以将多个Function组合起来,实现一个消息处理链,如:
    输入Message -> 关键字过滤白名单Function->白名单结果Topic->消息统计Function->统计结果Topic->结果入库Function
    

    Functions处理消息类型:

    Pulsar Functions默认是以字节数组的形式来接收消息及发送消息;也可以绑定消息的类型(有两种方式)
    1. Schema Registry
      pulsar内置的schema注册中心
    2. SerDe
    自定义Serialization,Deserialization函数(必须跟function打在同一个包里),并在发布Function时指定处理类,使用参数--output-serde-classname,如: --output-serde-classname com.example.serde.TweetSerde 
    

    FQFN(完全限定函数名)

    包含三元素:function tenant, namespace, function name
    故相同的函数名可以部署在不同的命名空间下。
    

    支持的开发语言

    Java
    Python
    Go
    

    消息处理三种模式保证

    At-most-once delivery: 最多一次投递,发送到函数的每个消息都可能被处理,或者不被处理;处理过程失败后将没有重试机制;
    At-least-once delivery:最少一次投递,发送到函数的每个消息都可以被多次处理,Function默认的模式;处理过程失败后消息将会被重新投递处理,故存在消息被多次投递、处理的情况;
    Effectively-once delivery:只有一次有效投递,发送到函数的每条消息都将有一个与之关联的输出;
    

    Exactly-once delivery:只投递一次
    基本实现的方式有两种:

    1. 分布式快照\状态检查点 (Chandy-Lamport分布式快照算法)
      定期检查,出现失败情况,回滚至最近的一次全局一致性快照/检查点,重新消费处理对应offset的消息
      优点: 较少的性能和资源开销
      缺点:故障中恢复时对性能的影响更大;计算网络拓扑越大,对性能的影响越大

    2. 基于“最少投递一次”增加消息去重插件
      消息处理前,先进行去重处理;对于失败的消息直接重新投递,若有计算节点已经处理过,则不处理该消息;需要更多的资源,尤其是存储。
      优点:故障对性能的影响是局部的;计算网络拓扑增大,并不会提高故障对性能的影响;
      缺点:需要大量存储和基础架构来支持;每个事件/消息都需要额外的性能开销;

    Pulsar Broker消息去重:

    Broker的重复数据消除逻辑基于 record-keeping系统。每个Broker都会跟踪每个消息生产者最后一个发布“successfully”的消息ID。这些信息存储于内存(以producer -> last sequence ID形式),Pulsar会定期存储快照,并持久化同时复制多份。快照与快照消息日志相结合,可确保Broker崩溃后正确状态的重建。
    同时要实现Exactly-once delivery,应用也需要做相应支持,简单样例:
    Producer producer = client.createProducer(TOPIC_NAME, conf);
    此方式创建的producer,系统将为其分配一个唯一的名称,并且发布消息的序列ID是从0开始;;若应用重启则名称将会变化,故需要做下改进:
    设置producer名称,并从最后的序列ID开始发送消息;
    并且应用系统需要将发送的消息和序列ID关联起来;
    消费端需要将处理结果同序列ID关联起来(如写入DB),故障恢复时可以通过Reader API读取指定ID的消息;
    
    • Functions工作原理
      1.) 运行模式
      【functions】
      线程:function以线程方式运行于Functions worker,使用同一个JVM;只支持java function。
      进程:由Functions worker fork出新进程运行function,在相同的宿主机上;支持Java, Python, and Go functions。
      Kubernetes:以docker容器化方式运行functions。
      【Functions worker】
      模式一: 运行于Broker中,可以理解为Broker的一部分;简单方便;
      模式二:独立的Functions worker集群模式运行;更好的资源隔离;
      2.) functions开发
      提供接口类型:
      Language-native interface:无需pulsar指定的库,故也没有对应的context;
    import java.util.function.Function;
    public class JavaNativeExclamationFunction implements Function<String, String> {
       @Override
       public String apply(String input) {
           return String.format("%s!", input);
       }
    }
    

    Pulsar Function SDK:引入pulsar指定的库,实现的函数带有对应的context;

    import org.apache.pulsar.functions.api.Context;
    import org.apache.pulsar.functions.api.Function;
    
    public class ExclamationFunction implements Function<String, String> {
       @Override
       public String process(String input, Context context) {
           return String.format("%s!", input);
       }
    }
    

    消息序列化:
    a.) Pulsar内置的schema注册中心
    b.) 自定义的序列化、反序列化函数(SerDe)
    注意:需要将SerDe打包到function的包中
    3.) 调试
    a.) unit Test

    @Test
    public void testJavaNativeExclamationFunction() {
       JavaNativeExclamationFunction exclamation = new JavaNativeExclamationFunction();
       String output = exclamation.apply("foo");
       Assert.assertEquals(output, "foo!");
    }
    @Test
    public void testExclamationFunction() {
       ExclamationFunction exclamation = new ExclamationFunction();
       String output = exclamation.process("foo", mock(Context.class));
       Assert.assertEquals(output, "foo!");
    }
    

    b.) localrun mode

    public class ExclamationFunction implements Function<String, String> {
    
       @Override
       public String process(String s, Context context) throws Exception {
           return s + "!";
       }
    
    public static void main(String[] args) throws Exception {
        FunctionConfig functionConfig = new FunctionConfig();
        functionConfig.setName("exclamation");
        functionConfig.setInputs(Collections.singleton("input"));   functionConfig.setClassName(ExclamationFunction.class.getName());
        functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
        functionConfig.setOutput("output");
        LocalRunner localRunner = LocalRunner.builder().functionConfig(functionConfig).build();
        localRunner.start(false);
    }
    

    4.) 部署
    使用pulsar-admin functions命令,主要参数如下:

    Parameter Default 备注
    classname The function's class name
    jar Path to the jar file for the function
    name The function's name
    tenant The function's tenant
    namespace The function's namespace
    output The function's output topic
    inputs The function's input topic or topics
    subs-name Pulsar source subscription name
    processing-guarantees The processing guarantees (aka delivery semantics) applied to the function [ATLEAST_ONCE, ATMOST_ONCE, EFFECTIVELY_ONCE
    parallelism The function's parallelism factor
    cpu The cpu in cores that need to be allocated per function instance
    log-topic The topic to which the function's logs are produced
    schema-type The builtin schema type or custom schema class name to be used for messages output by the function

    特别说明
    【Subscription type】
    For at-least-once and at-most-once [processing guarantees], the [SHARED]mode is applied by default; for effectively-once guarantees, the [FAILOVER] mode is applied.

    支持本地运行(localrun,运行于连接的broker)和集群运行(create,伴随broker集群运行)
    并行度设置(Parallelism):
    localrun,多次调用命令,命令亦提供参数项parallelism
    create,设置参数项parallelism

    相关文章

      网友评论

          本文标题:Pulsar-Function

          本文链接:https://www.haomeiwen.com/subject/pcjtnctx.html