美文网首页
Pulsar Source 入门篇

Pulsar Source 入门篇

作者: StreamNative | 来源:发表于2019-07-25 18:17 被阅读0次

    阅读本文需要约 5 分钟。

    • Apache Pulsar 是一个分布式发布订阅消息系统。
    • Source 是 Pulsar 的一个组件,用来将其他系统的数据输入至 Pulsar。

    摘要

    本文介绍 Apache Pulsar Source 的基础知识,例如,Source 的常用命令、环境搭建以及使用示例。

    Source 常用命令

    create

    创建 source。

    参数 解释
    -a, --archive 指定 source 的 NAR 包
    --classname 指定 source 的类名称
    --destination-topic-name 指定目标 Topic 名称
    --name 指定 source 的名称
    --namespace 指定 source 的命名空间
    --parallelism 指定 source 的并发数
    --source-config-file 指定 source 使用的配置文件
    --tenant 指定 source 所属的租户

    update

    更新 source。

    参数 解释
    -a, --archive 指定 source 的 NAR 包
    --classname 指定 source 的类名称
    --destination-topic-name 指定目标 Topic 名称
    --name 指定 source 的名称
    --namespace 指定 source 的命名空间
    --parallelism 指定 source 的并发数
    --source-config-file 指定 source 使用的配置文件
    --tenant 指定 source 所属的租户

    delete

    删除 source。

    参数 解释
    --name 指定 source 的名称
    --namespace 指定 source 的命名空间
    --tenant 指定 source 所属的租户

    start

    启动 source。

    参数 解释
    --name 指定 source 的名称
    --namespace 指定 source 的命名空间
    --tenant 指定 source 所属的租户
    --instance-id 指定 source 的 instance-id,如果未指定,将启动所有实例

    stop

    停止 source。

    参数 解释
    --name 指定 source 的名称
    --namespace 指定 source 的命名空间
    --tenant 指定 source 所属的租户
    --instance-id 指定 source 的 instance-id,如果未指定,将停止所有实例

    get

    获取 source 信息。

    参数 解释
    --name 指定 source 的名称
    --namespace 指定 source 的命名空间
    --tenant 指定 source 所属的租户

    status

    检查 source 状态。

    参数 解释
    --name 指定 source 的名称
    --namespace 指定 source 的命名空间
    --tenant 指定 source 所属的租户
    --instance-id 指定 source 的 instance-id,如果未指定,将获取所有实例状态

    list

    列出所有 source 信息。

    参数 解释
    --namespace 指定 source 的命名空间
    --tenant 指定 source 所属的租户

    restart

    重启 source。

    参数 解释
    --name 指定 source 的名称
    --namespace 指定 source 的命名空间
    --tenant 指定 source 所属的租户
    --instance-id 指定 source 的 instance-id,如果未指定,将重启所有实例

    localrun

    在本地运行 source,方便调试。

    参数 解释
    -a, --archive 指定 source 的 NAR 包
    --classname 指定 source 的类名称
    --destination-topic-name 指定目标 Topic 名称
    --name 指定 source 的名称
    --namespace 指定 source 的命名空间
    --parallelism 指定 source 的并发数
    --source-config-file 指定 source 使用的配置文件
    --tenant 指定 source 所属的租户

    环境搭建

    本示例以 Kafka source 为例,实践这些命令。

    1. 下载所需文件。
    wget http://central.maven.org/maven2/org/apache/kafka/kafka-clients/0.10.2.1/kafka-clients-0.10.2.1.jar
    wget https://archive.apache.org/dist/pulsar/pulsar-2.4.0/connectors/pulsar-io-kafka-2.4.0.nar
    
    1. 创建网络。
      实践发现,使用下述方式才能成功;使用 --link 的方式指定网络,会出现问题。
    docker network create kafka-pulsar
    
    1. 拉取 ZooKeeper 镜像并启动 ZooKeeper 服务。
    docker pull wurstmeister/zookeeper
    docker run -d -it -p 2181:2181 --name pulsar-kafka-zookeeper --network kafka-pulsar wurstmeister/zookeeper
    
    1. 拉取 Kafka 镜像并启动 Kafka 服务。
    docker pull wurstmeister/kafka:2.11-1.0.2
    docker run -d -it --network kafka-pulsar -p 6667:6667 -p 9092:9092 -e KAFKA_ADVERTISED_HOST_NAME=pulsar-kafka -e KAFKA_ZOOKEEPER_CONNECT=pulsar-kafka-zookeeper:2181 --name pulsar-kafka wurstmeister/kafka:2.11-1.0.2
    
    1. 拉取 Pulsar 镜像并启动 Pulsar standalone 服务。
    docker pull apachepulsar/pulsar:2.4.0
    docker run -d -it --network kafka-pulsar -p 6650:6650 -p 8080:8080 -v $PWD/data:/pulsar/data --name pulsar-kafka-standalone apachepulsar/pulsar:2.4.0 bin/pulsar standalone
    
    1. 创建 source 配置文件 kafkaSourceConfig.yaml。
    configs:
       bootstrapServers: "pulsar-kafka:9092"
       groupId: "test-pulsar-io"
       topic: "my-topic"
       sessionTimeoutMs: "10000"
       autoCommitEnabled: "false"
    
    1. 创建生产者文件 kafka-producer.py。
    kafka-producer.py
    from kafka import KafkaProducer
    producer = KafkaProducer(bootstrap_servers='pulsar-kafka:9092')
    future = producer.send('my-topic', b'hello world')
    future.get()
    
    1. 创建消费者文件 pulsar-client.py。
    import pulsar
    
    client = pulsar.Client('pulsar://localhost:6650')
    consumer = client.subscribe('my-topic', subscription_name='my-aa')
    
    while True:
       msg = consumer.receive()
       print msg
       print dir(msg)
       print("Received message: '%s'" % msg.data())
       consumer.acknowledge(msg)
    
    client.close()
    
    1. 复制以下文件至 pulsar-kafka-standalone。
    docker cp pulsar-io-kafka-2.4.0.nar pulsar-kafka-standalone:/pulsar
    docker cp kafkaSourceConfig.yaml pulsar-kafka-standalone:/pulsar/conf
    docker cp kafka-clients-0.10.2.1.jar pulsar-kafka-standalone:/pulsar/lib
    docker cp pulsar-client.py pulsar-kafka-standalone:/pulsar/
    docker cp kafka-producer.py pulsar-kafka-standalone:/pulsar/
    
    1. 开启新窗口,使用 localrun 运行 source。
    docker exec -it pulsar-kafka-standalone /bin/bash
    ./bin/pulsar-admin source localrun --archive ./pulsar-io-kafka-2.4.0.nar --classname org.apache.pulsar.io.kafka.KafkaBytesSource --tenant public --namespace default --name kafka --destination-topic-name my-topic --source-config-file ./conf/kafkaSourceConfig.yaml --parallelism 1
    
    1. 开启新窗口,运行消费者。
    docker exec -it pulsar-kafka-standalone /bin/bash
    python pulsar-client.py
    
    1. 开启新窗口,运行生产者。
    docker exec -it pulsar-kafka-standalone /bin/bash
    pip install kafka-python
    python3 kafka-producer.py
    
    1. 验证。
      此时消费者窗口显示以下消息,说明环境搭建成功。
    Received message: 'hello world'
    

    使用示例

    Localrun 该命令已在前文 #10 实现。前文 #9 已向 /pulsar/lib 文件夹中复制了一个 kafka 的 clients 库,因此需要首先重启 pulsar-kafka-standalone。

    docker restart pulsar-kafka-standalone
    

    create

    在租户 public 和命名空间 default 下,创建名为 kafka 的 source。

    ./bin/pulsar-admin source create --archive ./pulsar-io-kafka-2.4.0.nar --classname org.apache.pulsar.io.kafka.KafkaBytesSource --tenant public --namespace default --name kafka --destination-topic-name my-topic --source-config-file ./conf/kafkaSourceConfig.yaml --parallelism 1
    "Created successfully"
    

    如果命令行窗口显示以上信息,说明创建成功。

    list

    显示租户为 public、命名空间为 default 的 source。

    ./bin/pulsar-admin source list --tenant public --namespace default
    [
      "kafka"
    ]
    

    get

    获取名称为 kafka 的 source 的信息。

    ./bin/pulsar-admin source get --tenant public --namespace default --name kafka
    {
      "tenant": "public",
      "namespace": "default",
      "name": "kafka",
      "className": "org.apache.pulsar.io.kafka.KafkaBytesSource",
      "topicName": "my-topic",
      "configs": {
        "bootstrapServers": "pulsar-kafka:9092",
        "groupId": "test-pulsar-io1",
        "topic": "my-topic",
        "sessionTimeoutMs": "10000",
        "autoCommitEnabled": "false"
      },
      "parallelism": 1,
      "processingGuarantees": "ATLEAST_ONCE"
    }
    

    以上显示了刚才创建的 source 信息,包括租户、 namespace 、 名称、类名称、所在机器等。

    status

    获取名称为 kafka 的 source 的运行状态。

    ./bin/pulsar-admin source status --tenant public --namespace default --name kafka
    {
      "numInstances" : 1,
      "numRunning" : 1,
      "instances" : [ {
        "instanceId" : 0,
        "status" : {
          "running" : true,
          "error" : "",
          "numRestarts" : 0,
          "numReceivedFromSource" : 0,
          "numSystemExceptions" : 0,
          "latestSystemExceptions" : [ ],
          "numSourceExceptions" : 0,
          "latestSourceExceptions" : [ ],
          "numWritten" : 0,
          "lastReceivedTime" : 0,
          "workerId" : "c-standalone-fw-7e0cf1b3bf9d-8080"
        }
      } ]
    }
    

    以上显示了 source 的实例信息,包括是否正在运行、实例 id、workId 等。

    stop

    停止租户 public 命名空间 default 下面名称为 kafka 的 source。

    ./bin/pulsar-admin source stop --tenant public --namespace default --name kafka --instance-id 0
    Stopped successfully
    

    start

    启动租户 public 命名空间 default 下面名称为 kafka 的 source。

    ./bin/pulsar-admin source start --tenant public --namespace default --name kafka --instance-id 0
    Started successfully
    

    restart

    重启租户 public 命名空间 default 下面名称为 kafka 的 source。

    ./bin/pulsar-admin source restart --tenant public --namespace default --name kafka --instance-id 0
    Restarted successfully
    

    update

    更新租户 public 命名空间 default 下面名称为 kafka 的 source。

    ./bin/pulsar-admin source update --archive ./pulsar-io-kafka-2.4.0.nar --classname org.apache.pulsar.io.kafka.KafkaBytesSource --tenant public --namespace default --name kafka --destination-topic-name my-topic --source-config-file ./conf/kafkaSourceConfig.yaml --parallelism 1 --cpu 2
    "Updated successfully"
    ./bin/pulsar-admin source get --tenant public --namespace default --name kafka
    {
      "tenant": "public",
      "namespace": "default",
      "name": "kafka",
      "className": "org.apache.pulsar.io.kafka.KafkaBytesSource",
      "topicName": "my-topic",
      "configs": {
        "bootstrapServers": "pulsar-kafka:9092",
        "groupId": "test-pulsar-io1",
        "topic": "my-topic",
        "sessionTimeoutMs": "10000",
        "autoCommitEnabled": "false"
      },
      "parallelism": 1,
      "processingGuarantees": "ATLEAST_ONCE",
      "resources": {
        "cpu": 2.0,
        "ram": 1073741824,
        "disk": 10737418240
      }
    }
    

    以上示例成功更新了CPU。

    delete

    删除租户 public 命名空间 default 下面名称为 kafka 的 source。

    ./bin/pulsar-admin source delete --tenant public --namespace default --name kafka
    "Delete source successfully"
    ./bin/pulsar-admin source get --tenant public --namespace default --name kafka
    HTTP 404 Not Found
    
    Reason: Source kafka doesn't exist
    

    以上示例成功删除了该 source。

    总结

    本文以 Kafka source 为例,介绍了 source 的常用命令、环境搭建和使用示例,之后会有更多文章深入介绍 source,敬请期待。

    相关文章

      网友评论

          本文标题:Pulsar Source 入门篇

          本文链接:https://www.haomeiwen.com/subject/nmlkrctx.html