Apache Pulsar 之 Java Function 实践篇
在 一篇文章了解 Pulsar Functions 中,我们介绍了什么是 Pulsar Function,它的运行机制以及原理。这篇文章具体介绍如何在 Java 中编写、部署并运行 Java Function。
部署 standalone Pulsar
前面我们说到 function 是 pulsar 的计算层,function 的运行依赖于 pulsar 的运行,为了方便演示,我们使用 docker 的方式运行一个 standalone 的 pulsar。
- 从docker hub 上获取
apachepulsar/pulsar:2.3.0
的镜像。
docker pull apachepulsar/pulsar:2.3.0
- start pulsar
docker run -it \
-p 6650:6650 \
-p 8080:8080 \
-v $PWD/pulsardata:/pulsar/data \
apachepulsar/pulsar:2.3.0 \
bin/pulsar standalone
其中 $PWD/pulsardata
是本地目录,使用 -v
参数将 docker 镜像中的 /pulsar/data
目录映射到本地的 $PWD/pulsardata
目录。
- 进入pulsar镜像
docker ps
找到 apachepulsar/pulsar:2.3.0
的 CONTAINER ID
并执行:
docker exec -it [CONTAINER ID] /bin/bash
到目前为止,你成功的以 standalone 形式启动了 Pulsar。
编写 Java Function
下面开始编写自己的第一个Java Function:
首先,新建一个 maven 工程,pom 文件具体如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>java-function</groupId>
<artifactId>java-function</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-functions-api</artifactId>
<version>2.3.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>org.example.test.ExclamationFunction</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>assembly</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
注意:
<manifest>
<mainClass>org.example.test.ExclamationFunction</mainClass>
</manifest>
这里的 mainClass
需要修改为自己的路径。
- Java Function 代码示例:
package org.example.test;
import java.util.function.Function;
public class ExclamationFunction implements Function<String, String> {
@Override
public String apply(String s) {
return "This is my function!";
}
}
在上述示例代码中,我们引用了 Java8 提供的 Function 接口,在 Pulsar Functions 中,同样提供了 Function 接口,二者之间的主要区别是:Pulsar Functions 提供了 Context 接口,当你在编写 function 时,如果需要与 Pulsar Functions 进行交互,可以使用 Context 来获取,示例如下:
package org.example.functions;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import java.util.Arrays;
public class WordCountFunction implements Function<String, Void> {
// This function is invoked every time a message is published to the input topic
@Override
public Void process(String input, Context context) throws Exception {
Arrays.asList(input.split(" ")).forEach(word -> {
String counterKey = word.toLowerCase();
context.incrCounter(counterKey, 1);
});
return null;
}
}
部署 Java Function
如下图所示,Pulsar Functions 目前支持两种状态:start 和 stop。start 到处理 function 之间,是 Pulsar Functions 的初始化阶段,如:setupProducer、setupConsumer、setupLogTopic 等;当初始化工作完成之后,Pulsar Functions 会动态加载用户的 function 代码,作为 Pulsar Functions 处理逻辑的一部分,如果用户编写的 function 有输出,在上图所示的第二步 HandlerMessage 中,会有相应的 output 输出到下游来做相应的处理;处理 function 到 stop 之间是 Pulsar Functions 处理 output 的过程。如上图第三步所示,Pulsar Functions 会调用 processResult 函数,将相应的 output 处理之后,输出到 output topic 中,同时关闭用户在初始化工作时打开的系统资源。
image.png- 打包 function
mvn package
这个时候打开 target 目录,查看是否有一个类似 java-function-1.0-SNAPSHOT.jar
的jar 包。
- 执行 function
上面你已经准备好了standalone pulsar, 但是 pulsar 环境中目前还没有你打包好的 jar 文件,所以首先需要 copy 打包好的 jar 文件到 pulsar 镜像中:
docker cp $PWD/javafunction/target/java-function-1.0-SNAPSHOT.jar CONTAINER ID:/pulsar
执行如下命令,运行 function:
./bin/pulsar-admin functions localrun \
--classname org.example.test.ExclamationFunction \
--jar java-function-1.0-SNAPSHOT.jar \
--inputs persistent://public/default/my-topic-1 \
--output persistent://public/default/test-1 \
--tenant public \
--namespace default \
--name ExclamationFunctio6
下面我们详细解释一下,每一个选项具体的作用:
Options | Describe |
---|---|
functions | 通知 pulsar broker,你即将运行 function 实例 |
classname | 指定运行 function 的 ClassName, 需要将包名加上,形如:org.example.test.ExclamationFunction
|
localrun | 当你指定localrun 时,function 将以 localrun 的形式运行 |
create | 当你指定create 的时,function 将以 cluster 的形式运行 |
jar | 指定 jar 包的运行路径。 |
inputs | 指定 function 数据的来源在哪里,支持多个 topics 作为输入 |
output | 如果该 function 有输出(有些情况下,function 没有输出),指定 function 输出的 topic,只能有一个输出 |
tenant | 指定该 function 运行的租户名 |
namespace | 指定该 function 运行的命名空间 |
name | 指定该 function 运行的名称 |
上述启动选项仅是作为 Demo 演示,更多详细选项文档可以通过./bin/pulsar-admin functions
直接查看。
启动之后,如果看到下述日志,证明启动成功:
07:55:03.724 [main] INFO org.apache.pulsar.functions.runtime.ProcessRuntime - Started process successfully
07:55:34.948 [Timer-0] INFO org.apache.pulsar.functions.runtime.LocalRunner - { │
"running": true,
"instanceId": "0"
}
当你将上述启动命令的 localrun
替换为 create
,就能以集群的模式启动,启动后会输出一行日志,具体如下:
"Created successfully"
除了启动 Pulsar Functions 之外,Pulsar 提供了如下命令,用于控制 function 的状态。
停止 Java Function
停止 Pulsar Function 的命令为 stop
,它提供了如下参数列表:
Options | Describe |
---|---|
tenant | The Function's tenant |
namespace | The Function's namespace |
name | The Function's name |
fqfn | fqfn 是 tenant、namespace、name 的组合表示,用于唯一标识一个 Function,形如:tenant/namespace/name |
instance-id | The Function's instanceID(如果 instanceID 没有被提供,则默认关闭所有的 instances) |
下面以 tenant
、namespace
、name
为例,停止 ExclamationFunctio123
Function,具体如下:
root@856932ba3474:/pulsar# ./bin/pulsar-admin functions stop \
--tenant public \
--namespace default \
--name ExclamationFunctio6
Stopped successfully
启动 Java Function
启动 Pulsar Function 的命令为 start
,它提供了如下参数列表:
Options | Describe |
---|---|
tenant | The Function's tenant |
namespace | The Function's namespace |
name | The Function's name |
fqfn | fqfn 是 tenant、namespace、name 的组合表示,用于唯一标识一个 Function,形如:tenant/namespace/name |
instance-id | The Function's instanceID(如果 instanceID 没有被提供,则默认启动所有的 instances) |
下面以 tenant
、namespace
、name
为例,启动 ExclamationFunctio123
Function,具体如下:
root@73866a1fa019:/pulsar# ./bin/pulsar-admin functions start \
--tenant public \
--namespace default \
--name ExclamationFunctio123
Started successfully
重启 Java Function
重启 Pulsar Function 的命令为 restart
,它提供了如下参数列表:
Options | Describe |
---|---|
tenant | The Function's tenant |
namespace | The Function's namespace |
name | The Function's name |
fqfn | fqfn 是 tenant、namespace、name 的组合表示,用于唯一标识一个 Function,形如:tenant/namespace/name |
instance-id | The Function's instanceID(如果 instanceID 没有被提供,则默认重启所有的 instances) |
下面以 tenant
、namespace
、name
为例,重启 ExclamationFunctio123
Function,具体如下:
root@73866a1fa019:/pulsar# ./bin/pulsar-admin functions restart \
--tenant public \
--namespace default \
--name ExclamationFunctio123
Restarted successfully
更新 Java Function
当启动或者运行一段时间 Pulsar Functions 之后,如果想要更改 Function 的内部运行参数,可以使用 update
命令,update 提供的参数列表与创建一个 Function 基本一致,可以使用 ./bin/pulsar-admin functions
查看帮助文档。
下面示例将 ExclamationFunctio123
的 cpu
核数更新为 10,具体如下:
root@73866a1fa019:/pulsar# ./bin/pulsar-admin functions update --tenant public --namespace default --name ExclamationFunctio123 --cpu 10
"Updated successfully"
删除 Java Function
删除 Pulsar Function 的命令为 restart
,它提供了如下参数列表:
Options | Describe |
---|---|
tenant | The Function's tenant |
namespace | The Function's namespace |
name | The Function's name |
fqfn | fqfn 是 tenant、namespace、name 的组合表示,用于唯一标识一个 Function,形如:tenant/namespace/name |
下面以 tenant
、namespace
、name
为例,删除 ExclamationFunctio123
Function,具体如下:
root@73866a1fa019:/pulsar# ./bin/pulsar-admin functions delete \
--tenant public \
--namespace default \
--name ExclamationFunctio123
"Deleted successfully"
网友评论