阅读本文需要约 20 分钟。
1. 概览
Apache Pulsar 是一个分布式的发布-订阅消息系统,sink 是 Pulsar 的一个组件,用于将数据导入至其他系统。
本文介绍了 sink 的功能,并演示了如何创建与使用 JDBC sink 与 MySQL 进行连接。
2. Sink 命令
2.1 Create
2.1.1 命令
创建 sink。
$ bin/pulsar-admin sink create <options>
2.1.2 常用参数
参数 | 解释 |
---|---|
-a , --archive
|
指定 sink 的 NAR 包。 |
--classname |
指定 sink 的类名称。 |
-i , --inputs
|
指定 sink 的 topic,多个 topic 用逗号隔开。 |
--name |
指定 sink 的名称。 |
--namespace |
指定 sink 的命名空间。 |
--parallelism |
指定 sink 的并发数。 |
--sink-config-file |
指定 sink 的 yaml 配置文件。 |
--tenant |
指定 sink 的租户。 |
2.2 Update
2.2.1 命令
更新 sink。
$ bin/pulsar-admin sink update <options>
2.2.2 常用参数
参数 | 解释 |
---|---|
-a , --archive
|
指定 sink 的 NAR 包。 |
--classname |
指定 sink 的类名称。 |
-i , --inputs
|
指定 sink 的 topic,多个 topic 用逗号隔开。 |
--name |
指定 sink 的名称。 |
--namespace |
指定 sink 的 命名空间。 |
--parallelism |
指定 sink 的并发数。 |
--sink-config-file |
指定 sink 的 yaml 配置文件。 |
--tenant |
指定 sink 的租户。 |
2.3 Delete
2.3.1 命令
删除 sink。
$ bin/pulsar-admin sink delete <options>
2.3.2 常用参数
参数 | 解释 |
---|---|
--name |
指定 sink 的名称。 |
--namespace |
指定 sink 的命名空间。 |
--tenant |
指定 sink 的租户。 |
2.4 List
2.4.1 命令
显示所有 sink。
$ bin/pulsar-admin sink list <options>
2.4.2 常用参数
参数 | 解释 |
---|---|
--namespace |
指定 sink 的命名空间。 |
--tenant |
指定 sink 的租户。 |
2.5 Get
2.5.1 命令
显示 sink 的信息。
$ bin/pulsar-admin sink get <options>
2.5.2 常用参数
参数 | 解释 |
---|---|
--name |
指定 sink 的名称。 |
--namespace |
指定 sink 的命名空间。 |
--tenant |
指定 sink 的租户。 |
2.6 Status
2.6.1 命令
显示 sink 的状态。
$ bin/pulsar-admin sink status <options>
2.6.2 常用参数
参数 | 解释 |
---|---|
--instance-id |
指定 sink 的实例 ID。 如果未指定,则获取所有实例的状态。 |
--name |
指定 sink 的名称。 |
--namespace |
指定 sink 的命名空间。 |
--tenant |
指定 sink 的租户。 |
2.7 Stop
2.7.1 命令
停止 sink。
$ bin/pulsar-admin sink stop <options>
2.7.2 常用参数
参数 | 解释 |
---|---|
--instance-id |
指定 sink 的实例 ID。 如果未指定,则停止所有实例。 |
--name |
指定 sink 的名称。 |
--namespace |
指定 sink 的命名空间。 |
--tenant |
指定 sink 的租户。 |
2.8 Start
2.8.1 命令
启动 sink。
$ bin/pulsar-admin sink start <options>
2.8.2 常用参数
参数 | 解释 |
---|---|
--instance-id |
指定 sink 的实例 ID。 如果未指定,则启动所有实例。 |
--name |
指定 sink 的名称。 |
--namespace |
指定 sink 的命名空间。 |
--tenant |
指定 sink 的租户。 |
2.9 Restart
2.9.1 命令
重启 sink。
$ bin/pulsar-admin sink restart <options>
2.9.2 常用参数
参数 | 解释 |
---|---|
--instance-id |
指定 sink 的实例 ID。 如果未指定,则重启所有实例。 |
--name |
指定 sink 的名称。 |
--namespace |
指定 sink 的命名空间。 |
--tenant |
指定 sink 的租户。 |
2.10 Localrun
2.10.1 命令
在本地运行一个 Pulsar IO sink connector,方便调试。
$ bin/pulsar-admin sink localrun <options>
2.10.2 常用参数
参数 | 解释 |
---|---|
-a , --archive
|
指定 sink 的 NAR 包。 |
--classname |
指定 sink 的类名称。 |
-i , --inputs
|
指定 sink 的 topic,多个 topic 用逗号隔开。 |
--name |
指定 sink 的名称。 |
--namespace |
指定 sink 的命名空间。 |
--parallelism |
指定 sink 的并发数。 |
--sink-config-file |
指定 sink 的 yaml 配置文件。 |
--tenant |
指定 sink 的租户。 |
3. 搭建环境
本示例创建JDBC sink,并使用 JDBC sink 与 MySQL 进行连接。
3.1 准备工作
本示例在 Mac 系统上进行。在开始之前,需要安装以下依赖:
3.2 开始搭建
搭建步骤总计 6 步。
3.2.1 安装与启动 MySQL
(1) 拉取 MySQL 镜像。
$ docker pull mysql:5.7
(2) 启动 MySQL。
$ docker run -d -it --rm \
--name pulsar-mysql \
-p 3306:3306 \
-e MYSQL_ROOT_PASSWORD=jdbc \
-e MYSQL_USER=mysqluser \
-e MYSQL_PASSWORD=mysqlpw \
mysql:5.7
提示
参数 解释 备注 -d
以后台模式运行。 / -it
以交互模式运行,并为 docker 分配一个伪输入终端。 / --rm
docker 停止后,自动删除 docker。 / --name
指定 docker 名称。 本示例指定 docker 名称为 pulsar-mysql。 -p
指定端口。 本示例指定对外暴露 3306 端口。 -e
指定环境变量。 本示例为 MySQL 指定以下信息:
- root 用户的密码为 jdbc
- 普通用户的名称为 mysqluser
- 普通用户的密码为mysqlpw
(3)验证是否成功启动。
$ docker logs -f pulsar-mysql
如果出现以下信息,则说明成功启动。
2019-07-29T01:50:05.116660Z 0 [Note] InnoDB: Waiting for purge to start
2019-07-29T01:50:05.168247Z 0 [Note] InnoDB: 5.7.26 started; log sequence number 12363846
2019-07-29T01:50:05.168596Z 0 [Note] InnoDB: Loading buffer pool(s) from /var/lib/mysql/ib_buffer_pool
2019-07-29T01:50:05.168855Z 0 [Note] Plugin 'FEDERATED' is disabled.
2019-07-29T01:50:05.173901Z 0 [Note] InnoDB: Buffer pool(s) load completed at 190729 1:50:05
2019-07-29T01:50:05.174778Z 0 [Note] Found ca.pem, server-cert.pem and server-key.pem in data directory. Trying to enable SSL support using them.
2019-07-29T01:50:05.175045Z 0 [Warning] CA certificate ca.pem is self signed.
2019-07-29T01:50:05.176942Z 0 [Note] Server hostname (bind-address): '*'; port: 3306
2019-07-29T01:50:05.177017Z 0 [Note] IPv6 is available.
2019-07-29T01:50:05.178937Z 0 [Note] - '::' resolves to '::';
2019-07-29T01:50:05.178998Z 0 [Note] Server socket created on IP: '::'.
2019-07-29T01:50:05.181545Z 0 [Warning] Insecure configuration for --pid-file: Location '/var/run/mysqld' in the path is accessible to all OS users. Consider choosing a different directory.
2019-07-29T01:50:05.192955Z 0 [Note] Event Scheduler: Loaded 0 events
2019-07-29T01:50:05.193401Z 0 [Note] mysqld: ready for connections.
Version: '5.7.26' socket: '/var/run/mysqld/mysqld.sock' port: 3306 MySQL Community Server (GPL)
3.2.2 创建 MySQL 表
为了简化操作,本示例使用 root 用户和密码进入 docker,再创建数据库和表,方便数据写入。
(1)进入 MySQL。
$ docker exec -it pulsar-mysql /bin/bash
mysql -h localhost -uroot -pjdbc
(2)创建数据库和表。
$ create database test_jdbc;
$ use test_jdbc;
$ create table if not exists test_jdbc
(
id INT AUTO_INCREMENT,
name VARCHAR(255) NOT NULL,
primary key (id)
)
engine=innodb;
3.2.3 安装与启动 Pulsar
(1) 下载并安装 Pulsar。
$ wget https://www.apache.org/dyn/mirrors/mirrors.cgi?action=download&filename=pulsar/pulsar-2.4.0/apache-pulsar-2.4.0-bin.tar.gz
$ tar -zxvf apache-pulsar-2.4.0-bin.tar.gz
$ cd apache-pulsar-2.4.0
(2)启动 Pulsar。
$ bin/pulsar standalone -nss
(3)验证是否成功启动。
如果出现以下信息,则说明启动成功。
09:56:22.753 [pulsar-web-44-8] INFO org.apache.pulsar.broker.web.PulsarWebResource - Successfully validated clusters on tenant [public]
09:56:22.761 [pulsar-web-44-8] INFO org.apache.pulsar.broker.admin.impl.NamespacesBase - [null] Created namespace public/default
09:56:22.763 [pulsar-web-44-8] INFO org.eclipse.jetty.server.RequestLog - 192.168.50.140 - - [29/七月/2019:09:56:22 +0800] "PUT /admin/v2/namespaces/public/default HTTP/1.1" 204 0 "-" "Pulsar-Java-v2.4.0" 12
09:56:22.771 [pulsar-web-44-11] INFO org.apache.pulsar.broker.web.PulsarWebResource - Successfully validated clusters on tenant [public]
09:56:22.777 [pulsar-ordered-OrderedExecutor-1-0-EventThread] INFO org.apache.pulsar.zookeeper.ZooKeeperDataCache - [State:CONNECTED Timeout:30000 sessionid:0x1003d74007c0003 local:/127.0.0.1:61606 remoteserver:localhost/127.0.0.1:2181 lastZxid:167 xid:42 sent:42 recv:44 queuedpkts:0 pendingresp:0 queuedevents:0] Received ZooKeeper watch event: WatchedEvent state:SyncConnected type:NodeDataChanged path:/admin/policies/public/default
09:56:22.778 [pulsar-web-44-11] INFO org.apache.pulsar.broker.admin.impl.NamespacesBase - [null] Successfully updated the replication clusters on namespace public/default
09:56:22.779 [pulsar-web-44-11] INFO org.eclipse.jetty.server.RequestLog - 192.168.50.140 - - [29/七月/2019:09:56:22 +0800] "POST /admin/v2/namespaces/public/default/replication HTTP/1.1" 204 0 "-" "Pulsar-Java-v2.4.0" 12
3.2.4 增加配置文件
(1)创建 mysql-jdbc-sink.yaml
配置文件。
(2)复制以下内容至 mysql-jdbc-sink.yaml
文件。
以下内容指定了 MySQL 的用户名、密码、链接和表名。
configs:
userName: "root"
password: "jdbc"
jdbcUrl: "jdbc:mysql://127.0.0.1:3306/test_jdbc"
tableName: "test_jdbc"
3.2.5 创建 schema
数据库的表包含 schema 信息,JDBC sink 也支持 schema。
因此,只要构建好 schema,即能直接从 topic 中读取消息,再通过 JDBC 将消息传送至数据库的表,该 schema 与数据库的表一一对应。
以下示例创建 avro-schema
文件。
{
"type": "AVRO",
"schema": "{\"type\":\"record\",\"name\":\"Test\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"int\"]},{\"name\":\"name\",\"type\":[\"null\",\"string\"]}]}",
"properties": {}
}
提示
更多关于 AVRO 字段的信息,参阅 AVRO 官网 。
3.2.6 上传 schema,启动 sink
(1)上传 schema 至 test-jdbc topic。
$ bin/pulsar-admin schemas upload test-jdbc -f avro-schema
(2)验证是否上传成功。
$ bin/pulsar-admin schemas get test-jdbc
如果出现以下信息,则说明上传成功。
{
"name": "test-jdbc",
"schema": {
"type": "record",
"name": "Test",
"fields": [
{
"name": "id",
"type": [
"null",
"int"
]
},
{
"name": "name",
"type": [
"null",
"string"
]
}
]
},
"type": "AVRO",
"properties": {}
}
(3)启动 sink。
$ bin/pulsar-admin sink localrun \
--archive connectors/pulsar-io-jdbc-2.4.0.nar \
--inputs test-jdbc \
--name mysql-jdbc-sink \
--sink-config-file connectors/mysql-jdbc-sink.yaml \
--parallelism 1
(4)验证是否启动成功。
如果出现以下信息,则说明启动成功。
10:01:20.357 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ClientCnx - [id: 0x21a4b716, L:/127.0.0.1:61690 - R:localhost/127.0.0.1:6650] Connected through proxy to target broker at tengdeMBP:6650
10:01:20.359 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [test-jdbc][public/default/mysql-jdbc-sink] Subscribing to topic on cnx [id: 0x21a4b716, L:/127.0.0.1:61690 - R:localhost/127.0.0.1:6650]
10:01:20.407 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [test-jdbc][public/default/mysql-jdbc-sink] Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 0
4. 实践
4.1 Create
(1)创建 mysql-jdbc-sink,并指定 nar 文件、topic、名称、yaml 配置文件和并发数。
$ bin/pulsar-admin sink create \
--archive connectors/pulsar-io-jdbc-2.4.0.nar \
--inputs test-jdbc \
--name mysql-jdbc-sink \
--sink-config-file connectors/mysql-jdbc-sink.yaml \
--parallelism 1
(2)如果出现以下信息,则说明创建成功。
Created successfully
4.2 List
(1)显示所有 sink。
$ bin/pulsar-admin sink list \
--tenant public \
--namespace default
(2)返回结果显示前文创建的 mysql-jdbc-sink。
[
"mysql-jdbc-sink"
]
4.3 Get
(1)显示 sink 的信息。
$ bin/pulsar-admin sink get \
--tenant public \
--namespace default \
--name mysql-jdbc-sink
(2)返回结果显示 mysql-jdbc-sink 的信息,包括租户、命名空间和名称等。
{
"tenant": "public",
"namespace": "default",
"name": "mysql-jdbc-sink",
"className": "org.apache.pulsar.io.jdbc.JdbcAutoSchemaSink",
"inputSpecs": {
"test-jdbc": {
"isRegexPattern": false
}
},
"configs": {
"password": "jdbc",
"jdbcUrl": "jdbc:mysql://127.0.0.1:3306/test_jdbc",
"userName": "root",
"tableName": "test_jdbc"
},
"parallelism": 1,
"processingGuarantees": "ATLEAST_ONCE",
"retainOrdering": false,
"autoAck": true
}
4.4 Status
(1)显示 mysql-jdbc-sink 的状态。
$ bin/pulsar-admin sink status \
--tenant public \
--namespace default \
--name mysql-jdbc-sink
(2)返回结果显示 mysql-jdbc-sink 的状态信息,包括实例数量、是否正在运行和 worker ID等。
{
"numInstances" : 1,
"numRunning" : 1,
"instances" : [ {
"instanceId" : 0,
"status" : {
"running" : true,
"error" : "",
"numRestarts" : 0,
"numReadFromPulsar" : 0,
"numSystemExceptions" : 0,
"latestSystemExceptions" : [ ],
"numSinkExceptions" : 0,
"latestSinkExceptions" : [ ],
"numWrittenToSink" : 0,
"lastReceivedTime" : 0,
"workerId" : "c-standalone-fw-tengdeMBP.lan-8080"
}
} ]
}
4.5 Stop
(1)停止 mysql-jdbc-sink。
$ bin/pulsar-admin sink stop \
--tenant public \
--namespace default \
--name mysql-jdbc-sink \
--instance-id 0
(2)如果出现以下信息,则说明停止成功。
Stopped successfully
4.6 Start
(1)启动 mysql-jdbc-sink。
$ bin/pulsar-admin sink start \
--tenant public \
--namespace default \
--name mysql-jdbc-sink \
--instance-id 0
(2)如果出现以下信息,则说明启动成功。
Started successfully
4.7 Restart
(1)重启 mysql-jdbc-sink。
$ bin/pulsar-admin sink restart \
--tenant public \
--namespace default \
--name mysql-jdbc-sink \
--instance-id 0
(2)如果出现以下信息,则说明重启成功。
Restarted successfully
4.8 Update
(1)将 parallelism 更新至2。
$ bin/pulsar-admin sink update \
--name mysql-jdbc-sink \
--parallelism 2
(2)如果出现以下信息,则说明更新成功。
Updated successfully
(3)查看 mysql-jdbc-sink 的信息,再次验证更新结果。
$ bin/pulsar-admin sink get \
--tenant public \
--namespace default \
--name mysql-jdbc-sink
(4)Parallelism 为2,说明已更新成功。
{
"tenant": "public",
"namespace": "default",
"name": "mysql-jdbc-sink",
"className": "org.apache.pulsar.io.jdbc.JdbcAutoSchemaSink",
"inputSpecs": {
"test-jdbc": {
"isRegexPattern": false
}
},
"configs": {
"password": "jdbc",
"jdbcUrl": "jdbc:mysql://127.0.0.1:3306/test_jdbc",
"userName": "root",
"tableName": "test_jdbc"
},
"parallelism": 2,
"processingGuarantees": "ATLEAST_ONCE",
"retainOrdering": false,
"autoAck": true
}
4.9 Delete
(1)删除 mysql-jdbc-sink。
$ bin/pulsar-admin sink delete \
--tenant public \
--namespace default \
--name mysql-jdbc-sink
(2)如果出现以下信息,则说明删除成功。
Deleted successfully
(3)查看 mysql-jdbc-sink 的信息,再次验证删除结果。
$ bin/pulsar-admin sink get \
--tenant public \
--namespace default \
--name mysql-jdbc-sink
(4)mysql-jdbc-sink 不存在,说明已删除成功。
HTTP 404 Not Found
Reason: Sink mysql-jdbc-sink doesn't exist
4.10 Localrun
更多关于 localrun 的使用示例,参阅 3.2.6 上传 schema,启动 sink 的第 3 步。
5. 总结
本文介绍了 sink 的功能,并演示了如何创建与使用 JDBC sink 与 MySQL 进行连接。
6. 更多信息
-
更多关于 Pulsar connector 的信息,参阅 Pulsar Connector 预览。
-
更多关于 Pulsar source 的信息,参阅 Pulsar Source 入门篇。
网友评论