美文网首页
Pulsar Sink 入门指南

Pulsar Sink 入门指南

作者: StreamNative | 来源:发表于2019-08-01 18:26 被阅读0次
    Pulsar Connector.png

    阅读本文需要约 20 分钟。

    1. 概览

    Apache Pulsar 是一个分布式的发布-订阅消息系统,sink 是 Pulsar 的一个组件,用于将数据导入至其他系统。

    本文介绍了 sink 的功能,并演示了如何创建与使用 JDBC sink 与 MySQL 进行连接

    2. Sink 命令

    2.1 Create

    2.1.1 命令

    创建 sink。

    $ bin/pulsar-admin sink create <options>
    

    2.1.2 常用参数

    参数 解释
    -a, --archive 指定 sink 的 NAR 包。
    --classname 指定 sink 的类名称。
    -i, --inputs 指定 sink 的 topic,多个 topic 用逗号隔开。
    --name 指定 sink 的名称。
    --namespace 指定 sink 的命名空间。
    --parallelism 指定 sink 的并发数。
    --sink-config-file 指定 sink 的 yaml 配置文件。
    --tenant 指定 sink 的租户。

    2.2 Update

    2.2.1 命令

    更新 sink。

    $ bin/pulsar-admin sink update <options>
    

    2.2.2 常用参数

    参数 解释
    -a, --archive 指定 sink 的 NAR 包。
    --classname 指定 sink 的类名称。
    -i, --inputs 指定 sink 的 topic,多个 topic 用逗号隔开。
    --name 指定 sink 的名称。
    --namespace 指定 sink 的 命名空间。
    --parallelism 指定 sink 的并发数。
    --sink-config-file 指定 sink 的 yaml 配置文件。
    --tenant 指定 sink 的租户。

    2.3 Delete

    2.3.1 命令

    删除 sink。

    $ bin/pulsar-admin sink delete <options>
    

    2.3.2 常用参数

    参数 解释
    --name 指定 sink 的名称。
    --namespace 指定 sink 的命名空间。
    --tenant 指定 sink 的租户。

    2.4 List

    2.4.1 命令

    显示所有 sink。

    $ bin/pulsar-admin sink list <options>
    

    2.4.2 常用参数

    参数 解释
    --namespace 指定 sink 的命名空间。
    --tenant 指定 sink 的租户。

    2.5 Get

    2.5.1 命令

    显示 sink 的信息。

    $ bin/pulsar-admin sink get <options>
    

    2.5.2 常用参数

    参数 解释
    --name 指定 sink 的名称。
    --namespace 指定 sink 的命名空间。
    --tenant 指定 sink 的租户。

    2.6 Status

    2.6.1 命令

    显示 sink 的状态。

    $ bin/pulsar-admin sink status <options>
    

    2.6.2 常用参数

    参数 解释
    --instance-id 指定 sink 的实例 ID。
    如果未指定,则获取所有实例的状态。
    --name 指定 sink 的名称。
    --namespace 指定 sink 的命名空间。
    --tenant 指定 sink 的租户。

    2.7 Stop

    2.7.1 命令

    停止 sink。

    $ bin/pulsar-admin sink stop <options>
    

    2.7.2 常用参数

    参数 解释
    --instance-id 指定 sink 的实例 ID。
    如果未指定,则停止所有实例。
    --name 指定 sink 的名称。
    --namespace 指定 sink 的命名空间。
    --tenant 指定 sink 的租户。

    2.8 Start

    2.8.1 命令

    启动 sink。

    $ bin/pulsar-admin sink start <options>
    

    2.8.2 常用参数

    参数 解释
    --instance-id 指定 sink 的实例 ID。
    如果未指定,则启动所有实例。
    --name 指定 sink 的名称。
    --namespace 指定 sink 的命名空间。
    --tenant 指定 sink 的租户。

    2.9 Restart

    2.9.1 命令

    重启 sink。

    $ bin/pulsar-admin sink restart <options>
    

    2.9.2 常用参数

    参数 解释
    --instance-id 指定 sink 的实例 ID。
    如果未指定,则重启所有实例。
    --name 指定 sink 的名称。
    --namespace 指定 sink 的命名空间。
    --tenant 指定 sink 的租户。

    2.10 Localrun

    2.10.1 命令

    在本地运行一个 Pulsar IO sink connector,方便调试。

    $ bin/pulsar-admin sink localrun <options>
    

    2.10.2 常用参数

    参数 解释
    -a, --archive 指定 sink 的 NAR 包。
    --classname 指定 sink 的类名称。
    -i, --inputs 指定 sink 的 topic,多个 topic 用逗号隔开。
    --name 指定 sink 的名称。
    --namespace 指定 sink 的命名空间。
    --parallelism 指定 sink 的并发数。
    --sink-config-file 指定 sink 的 yaml 配置文件。
    --tenant 指定 sink 的租户。

    3. 搭建环境

    本示例创建JDBC sink,并使用 JDBC sink 与 MySQL 进行连接。

    3.1 准备工作

    本示例在 Mac 系统上进行。在开始之前,需要安装以下依赖:

    3.2 开始搭建

    搭建步骤总计 6 步。

    3.2.1 安装与启动 MySQL

    (1) 拉取 MySQL 镜像。

    $ docker pull mysql:5.7
    

    (2) 启动 MySQL。

    $ docker run -d -it --rm \
    --name pulsar-mysql \
    -p 3306:3306 \
    -e MYSQL_ROOT_PASSWORD=jdbc \
    -e MYSQL_USER=mysqluser \
    -e MYSQL_PASSWORD=mysqlpw \
    mysql:5.7
    

    提示

    参数 解释 备注
    -d 以后台模式运行。 /
    -it 以交互模式运行,并为 docker 分配一个伪输入终端。 /
    --rm docker 停止后,自动删除 docker。 /
    --name 指定 docker 名称。 本示例指定 docker 名称为 pulsar-mysql。
    -p 指定端口。 本示例指定对外暴露 3306 端口。
    -e 指定环境变量。 本示例为 MySQL 指定以下信息:
    - root 用户的密码为 jdbc
    - 普通用户的名称为 mysqluser
    - 普通用户的密码为mysqlpw

    (3)验证是否成功启动。

    $ docker logs -f pulsar-mysql
    

    如果出现以下信息,则说明成功启动。

    2019-07-29T01:50:05.116660Z 0 [Note] InnoDB: Waiting for purge to start
    2019-07-29T01:50:05.168247Z 0 [Note] InnoDB: 5.7.26 started; log sequence number 12363846
    2019-07-29T01:50:05.168596Z 0 [Note] InnoDB: Loading buffer pool(s) from /var/lib/mysql/ib_buffer_pool
    2019-07-29T01:50:05.168855Z 0 [Note] Plugin 'FEDERATED' is disabled.
    2019-07-29T01:50:05.173901Z 0 [Note] InnoDB: Buffer pool(s) load completed at 190729  1:50:05
    2019-07-29T01:50:05.174778Z 0 [Note] Found ca.pem, server-cert.pem and server-key.pem in data directory. Trying to enable SSL support using them.
    2019-07-29T01:50:05.175045Z 0 [Warning] CA certificate ca.pem is self signed.
    2019-07-29T01:50:05.176942Z 0 [Note] Server hostname (bind-address): '*'; port: 3306
    2019-07-29T01:50:05.177017Z 0 [Note] IPv6 is available.
    2019-07-29T01:50:05.178937Z 0 [Note]   - '::' resolves to '::';
    2019-07-29T01:50:05.178998Z 0 [Note] Server socket created on IP: '::'.
    2019-07-29T01:50:05.181545Z 0 [Warning] Insecure configuration for --pid-file: Location '/var/run/mysqld' in the path is accessible to all OS users. Consider choosing a different directory.
    2019-07-29T01:50:05.192955Z 0 [Note] Event Scheduler: Loaded 0 events
    2019-07-29T01:50:05.193401Z 0 [Note] mysqld: ready for connections.
    Version: '5.7.26'  socket: '/var/run/mysqld/mysqld.sock'  port: 3306  MySQL Community Server (GPL)
    

    3.2.2 创建 MySQL 表

    为了简化操作,本示例使用 root 用户和密码进入 docker,再创建数据库和表,方便数据写入。

    (1)进入 MySQL。

    $ docker exec -it pulsar-mysql /bin/bash
    mysql -h localhost -uroot -pjdbc
    

    (2)创建数据库和表。

    $ create database test_jdbc;
    
    $ use test_jdbc;
    
    $ create table if not exists test_jdbc
    (
     id INT AUTO_INCREMENT,
     name VARCHAR(255) NOT NULL,
     primary key (id)
    )
    engine=innodb;
    

    3.2.3 安装与启动 Pulsar

    (1) 下载并安装 Pulsar。

    $ wget https://www.apache.org/dyn/mirrors/mirrors.cgi?action=download&filename=pulsar/pulsar-2.4.0/apache-pulsar-2.4.0-bin.tar.gz
    
    $ tar -zxvf apache-pulsar-2.4.0-bin.tar.gz
    
    $ cd apache-pulsar-2.4.0
    

    (2)启动 Pulsar。

    $ bin/pulsar standalone -nss
    

    (3)验证是否成功启动。

    如果出现以下信息,则说明启动成功。

    09:56:22.753 [pulsar-web-44-8] INFO  org.apache.pulsar.broker.web.PulsarWebResource - Successfully validated clusters on tenant [public]
    09:56:22.761 [pulsar-web-44-8] INFO  org.apache.pulsar.broker.admin.impl.NamespacesBase - [null] Created namespace public/default
    09:56:22.763 [pulsar-web-44-8] INFO  org.eclipse.jetty.server.RequestLog - 192.168.50.140 - - [29/七月/2019:09:56:22 +0800] "PUT /admin/v2/namespaces/public/default HTTP/1.1" 204 0 "-" "Pulsar-Java-v2.4.0" 12
    09:56:22.771 [pulsar-web-44-11] INFO  org.apache.pulsar.broker.web.PulsarWebResource - Successfully validated clusters on tenant [public]
    09:56:22.777 [pulsar-ordered-OrderedExecutor-1-0-EventThread] INFO  org.apache.pulsar.zookeeper.ZooKeeperDataCache - [State:CONNECTED Timeout:30000 sessionid:0x1003d74007c0003 local:/127.0.0.1:61606 remoteserver:localhost/127.0.0.1:2181 lastZxid:167 xid:42 sent:42 recv:44 queuedpkts:0 pendingresp:0 queuedevents:0] Received ZooKeeper watch event: WatchedEvent state:SyncConnected type:NodeDataChanged path:/admin/policies/public/default
    09:56:22.778 [pulsar-web-44-11] INFO  org.apache.pulsar.broker.admin.impl.NamespacesBase - [null] Successfully updated the replication clusters on namespace public/default
    09:56:22.779 [pulsar-web-44-11] INFO  org.eclipse.jetty.server.RequestLog - 192.168.50.140 - - [29/七月/2019:09:56:22 +0800] "POST /admin/v2/namespaces/public/default/replication HTTP/1.1" 204 0 "-" "Pulsar-Java-v2.4.0" 12
    

    3.2.4 增加配置文件

    (1)创建 mysql-jdbc-sink.yaml 配置文件。

    (2)复制以下内容至 mysql-jdbc-sink.yaml 文件。

    以下内容指定了 MySQL 的用户名、密码、链接和表名。

    configs:
       userName: "root"
       password: "jdbc"
       jdbcUrl: "jdbc:mysql://127.0.0.1:3306/test_jdbc"
       tableName: "test_jdbc"
    

    3.2.5 创建 schema

    数据库的表包含 schema 信息,JDBC sink 也支持 schema。

    因此,只要构建好 schema,即能直接从 topic 中读取消息,再通过 JDBC 将消息传送至数据库的表,该 schema 与数据库的表一一对应。

    以下示例创建 avro-schema 文件。

    {
       "type": "AVRO",
       "schema": "{\"type\":\"record\",\"name\":\"Test\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"int\"]},{\"name\":\"name\",\"type\":[\"null\",\"string\"]}]}",
       "properties": {}
    }
    

    提示

    更多关于 AVRO 字段的信息,参阅 AVRO 官网

    3.2.6 上传 schema,启动 sink

    (1)上传 schema 至 test-jdbc topic。

    $ bin/pulsar-admin schemas upload test-jdbc -f avro-schema
    

    (2)验证是否上传成功。

    $ bin/pulsar-admin schemas get test-jdbc
    

    如果出现以下信息,则说明上传成功。

    {
      "name": "test-jdbc",
      "schema": {
        "type": "record",
        "name": "Test",
        "fields": [
          {
            "name": "id",
            "type": [
              "null",
              "int"
            ]
          },
          {
            "name": "name",
            "type": [
              "null",
              "string"
            ]
          }
        ]
      },
      "type": "AVRO",
      "properties": {}
    }
    

    (3)启动 sink。

    $ bin/pulsar-admin sink localrun \
    --archive connectors/pulsar-io-jdbc-2.4.0.nar \
    --inputs test-jdbc \
    --name mysql-jdbc-sink \
    --sink-config-file connectors/mysql-jdbc-sink.yaml \
    --parallelism 1
    

    (4)验证是否启动成功。

    如果出现以下信息,则说明启动成功。

    10:01:20.357 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ClientCnx - [id: 0x21a4b716, L:/127.0.0.1:61690 - R:localhost/127.0.0.1:6650] Connected through proxy to target broker at tengdeMBP:6650
    10:01:20.359 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [test-jdbc][public/default/mysql-jdbc-sink] Subscribing to topic on cnx [id: 0x21a4b716, L:/127.0.0.1:61690 - R:localhost/127.0.0.1:6650]
    10:01:20.407 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [test-jdbc][public/default/mysql-jdbc-sink] Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 0
    

    4. 实践

    4.1 Create

    (1)创建 mysql-jdbc-sink,并指定 nar 文件、topic、名称、yaml 配置文件和并发数。

    $ bin/pulsar-admin sink create \
    --archive connectors/pulsar-io-jdbc-2.4.0.nar \
    --inputs test-jdbc \
    --name mysql-jdbc-sink \
    --sink-config-file connectors/mysql-jdbc-sink.yaml \
    --parallelism 1
    

    (2)如果出现以下信息,则说明创建成功。

    Created successfully
    

    4.2 List

    (1)显示所有 sink。

    $ bin/pulsar-admin sink list \
    --tenant public \
    --namespace default
    

    (2)返回结果显示前文创建的 mysql-jdbc-sink。

    [
     "mysql-jdbc-sink"
    ]
    

    4.3 Get

    (1)显示 sink 的信息。

    $ bin/pulsar-admin sink get \
    --tenant public \
    --namespace default \
    --name mysql-jdbc-sink
    

    (2)返回结果显示 mysql-jdbc-sink 的信息,包括租户、命名空间和名称等。

    {
     "tenant": "public",
     "namespace": "default",
     "name": "mysql-jdbc-sink",
     "className": "org.apache.pulsar.io.jdbc.JdbcAutoSchemaSink",
     "inputSpecs": {
       "test-jdbc": {
         "isRegexPattern": false
       }
     },
     "configs": {
       "password": "jdbc",
       "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/test_jdbc",
       "userName": "root",
       "tableName": "test_jdbc"
     },
     "parallelism": 1,
     "processingGuarantees": "ATLEAST_ONCE",
     "retainOrdering": false,
     "autoAck": true
    }
    

    4.4 Status

    (1)显示 mysql-jdbc-sink 的状态。

    $ bin/pulsar-admin sink status \
    --tenant public \
    --namespace default \
    --name mysql-jdbc-sink
    

    (2)返回结果显示 mysql-jdbc-sink 的状态信息,包括实例数量、是否正在运行和 worker ID等。

    {
     "numInstances" : 1,
     "numRunning" : 1,
     "instances" : [ {
       "instanceId" : 0,
       "status" : {
         "running" : true,
         "error" : "",
         "numRestarts" : 0,
         "numReadFromPulsar" : 0,
         "numSystemExceptions" : 0,
         "latestSystemExceptions" : [ ],
         "numSinkExceptions" : 0,
         "latestSinkExceptions" : [ ],
         "numWrittenToSink" : 0,
         "lastReceivedTime" : 0,
         "workerId" : "c-standalone-fw-tengdeMBP.lan-8080"
       }
     } ]
    }
    

    4.5 Stop

    (1)停止 mysql-jdbc-sink。

    $ bin/pulsar-admin sink stop \
    --tenant public \
    --namespace default \
    --name mysql-jdbc-sink \
    --instance-id 0
    

    (2)如果出现以下信息,则说明停止成功。

    Stopped successfully
    

    4.6 Start

    (1)启动 mysql-jdbc-sink。

    $ bin/pulsar-admin sink start \
    --tenant public \
    --namespace default \
    --name mysql-jdbc-sink \
    --instance-id 0
    

    (2)如果出现以下信息,则说明启动成功。

    Started successfully
    

    4.7 Restart

    (1)重启 mysql-jdbc-sink。

    $ bin/pulsar-admin sink restart \
    --tenant public \
    --namespace default \
    --name mysql-jdbc-sink \
    --instance-id 0
    

    (2)如果出现以下信息,则说明重启成功。

    Restarted successfully
    

    4.8 Update

    (1)将 parallelism 更新至2。

    $ bin/pulsar-admin sink update \
    --name mysql-jdbc-sink \
    --parallelism 2
    

    (2)如果出现以下信息,则说明更新成功。

    Updated successfully
    

    (3)查看 mysql-jdbc-sink 的信息,再次验证更新结果。

    $ bin/pulsar-admin sink get \
    --tenant public \
    --namespace default \
    --name mysql-jdbc-sink
    

    (4)Parallelism 为2,说明已更新成功。

    {
     "tenant": "public",
     "namespace": "default",
     "name": "mysql-jdbc-sink",
     "className": "org.apache.pulsar.io.jdbc.JdbcAutoSchemaSink",
     "inputSpecs": {
       "test-jdbc": {
         "isRegexPattern": false
       }
     },
     "configs": {
       "password": "jdbc",
       "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/test_jdbc",
       "userName": "root",
       "tableName": "test_jdbc"
     },
     "parallelism": 2,
     "processingGuarantees": "ATLEAST_ONCE",
     "retainOrdering": false,
     "autoAck": true
    }
    

    4.9 Delete

    (1)删除 mysql-jdbc-sink。

    $ bin/pulsar-admin sink delete \
    --tenant public \
    --namespace default \
    --name mysql-jdbc-sink
    

    (2)如果出现以下信息,则说明删除成功。

    Deleted successfully
    

    (3)查看 mysql-jdbc-sink 的信息,再次验证删除结果。

    $ bin/pulsar-admin sink get \
    --tenant public \
    --namespace default \
    --name mysql-jdbc-sink
    

    (4)mysql-jdbc-sink 不存在,说明已删除成功。

    HTTP 404 Not Found
    
    Reason: Sink mysql-jdbc-sink doesn't exist
    

    4.10 Localrun

    更多关于 localrun 的使用示例,参阅 3.2.6 上传 schema,启动 sink 的第 3 步。

    5. 总结

    本文介绍了 sink 的功能,并演示了如何创建与使用 JDBC sink 与 MySQL 进行连接

    6. 更多信息

    相关文章

      网友评论

          本文标题:Pulsar Sink 入门指南

          本文链接:https://www.haomeiwen.com/subject/mmicdctx.html