美文网首页
Pulsar-安装部署

Pulsar-安装部署

作者: 大哥你先走 | 来源:发表于2018-11-05 21:37 被阅读0次

    Pulsar的起源

    Pulsar是由雅虎创建的开源的、分布式pub-sub系统,现在是Apache基金会的一个孵化项目。

    Pulsar的关键特性

    Pulsar的关键特性如下表所示:

    关键特性 描述
    Pulsar函数 使用对开发人员友好的API,可以轻松部署轻量级计算逻辑,无需运行自己的流处理引擎。
    生产环境已证明 Pulsar已经在雅虎规模的生产环境中运行了3年多,每秒有数百万条消息涉及数百万个主题。
    水平扩展 Pulsar集群支持无缝水平扩展到数百个节点。
    低延迟、支持持久存储 Pulsar设计用于大规模的低延迟发布(<5ms),具有强大的可用性保证。
    跨域复制 专为跨多个地理区域的数据中心之间的配置数据复制而设计。
    多租户 原生支持多租户,支持租户间的隔离,身份验证,授权和配额管理。
    持久存储 基于Apache BookKeeper的持久消息存储。支持读写之间的IO隔离。
    丰富的客户端 Pulsar使用灵活的消息传递模型,支持Java,C ++,Python和Go。
    可操作性 提供用于配置,管理,工具和监视的管理API,支持部署在裸机或Kubernetes上。

    Pulsar集群搭建

    温馨提示

    1. 单集群的Pulsar实例可以满足绝大多数学习、开发、验证需求,如果没有特殊的需要,建议使用单集群Pulsar实例,多集群的安装部署参考多集群部署
    2. 在部署Pulsar过程中,如需要使用所有内置的Pulsar IO 连接器,需要下载apache-pulsar-io-connectors,并安装到Pulsar的connectors目录下。

    Pulsar集群的部署步骤如下:

    • 部署ZooKeeper集群(可选)
    • 初始化集群元信息
    • 部署BookKeeper集群
    • 部署一个或多个Pulsar broker

    安装前准备

    如果你已经有一个ZooKeeper集群,并且愿意重用该集群,则无需准备安装ZooKeeper集群的资源,也无需部署ZooKeeper集群

    1. 至少6台Linux机器或虚拟机
    • 1-1 三台机器用于部署ZooKeeper集群,Pulsar仅会定期使用ZooKeeper进行协调和配置任务,业务操作不依赖ZooKeeper集群,部署时可以使用性能规格较低的机器。
    • 1-2 三台部署BookKeeper集群和Pulsar broker。Puslar集群实际承载业务,建议使用性能规格更高的机器,比如计算能力更强的CPU、10Gbps NIC、SSD硬盘或高性能存储。
    1. 覆盖所有节点的DNS名称,如果没有DNS服务器可以通过hosts文件实现。
    2. 所有的机器需要安装Java 8或更高版本


      pulsar基本配置

    本文安装部署Pulsar集群的节点信息

    节点 规格 部署组件 主机名/地址
    Pulsar-zk-01 4vCPU,8G内存 ZooKeeper集群 kwe1000854790.novalocal/205.20.107.25
    Pulsar-zk-02 4vCPU,8G内存 ZooKeeper集群 kwe1000853508.novalocal/205.20.107.21
    Pulsar-zk-03 4vCPU,8G内存 ZooKeeper集群 kwe1000853507.novalocal/205.20.107.22
    Pulsar-bk-01 8vCPU,16G内存 BookKeeper集群 kwe1000853505.novalocal/205.20.107.20
    Pulsar-bk-02 8vCPU,16G内存 BookKeeper集群 kwe1000853504.novalocal/205.20.107.19
    Pulsar-bk-03 8vCPU,16G内存 BookKeeper集群 kwe1000853503.novalocal/205.20.107.24

    安装Pulsar

    集群中的每个节点都需要安装Pulsar二进制包,包括ZooKeeper和BookKeeper节点。

    • 获取安装包
      Pulsar

    • 将软件包拷贝到/opt目录下解压,并将解压的目录重命名为pulsarCluster

      tar -zxvf apache-pulsar-2.1.1-incubating-bin.tar.gz
      mv apache-pulsar-2.1.1-incubating pulsarCluster· 
      

    确保/opt目录有足够的磁盘空间,或使用其他目录安装

    Pulsar的目录结构如下表:

    目录 内容
    bin Pulsar的命令行工具
    conf Pulsar的配置文件
    data 存储ZooKeeper和BookKeeper数据
    lib Pulsar使用的第三方库
    logs 日志存储路径
    examples Pulsar提供的样例

    安装Pulsar 连接器(可选)

    从2.1.0-inclubating版本开始,Pulsar单独发布了包含所有内置连接器的二进制包,如果想使用这些内置的连接器,可以参考下面的步骤安装,如果不需要可以直接跳过。

    • 获取软件包
      Pulsar IO Connectors

    • 解压软件包,并将connector目录拷贝到Pulsar安装目录(/opt/puslarCluster)

      tar -zxvf apache-pulsar-io-connectors-2.1.1-incubating-bin.tar.gz
      cd apache-pulsar-io-connectors-2.1.1-incubating
      cp -r connectors/ /opt/pulsarCluster
      

    部署ZooKeeper集群

    • 获取ZooKeeper安装包
      ZooKeeper

    • 将安装包拷贝到三个节点的/opt目录解压

      tar -zxvf zookeeper-3.4.12.tar.gz
      
    • 修改 conf/zookeeper.conf配置,增加ZooKeeper集群节点信息。

      server.1=205.20.107.25:2888:3888
      server.2=205.20.107.21:2888:3888
      server.3=205.20.107.22:2888:3888
      
      cd /opt/pulsarCluster
      echo “server.1=205.20.107.25:2888:3888” >> conf/zookeeper.conf
      echo “server.1=205.20.107.21:2888:3888” >> conf/zookeeper.conf
      echo “server.1=205.20.107.22:2888:3888” >> conf/zookeeper.conf
      
    • 配置ZooKeeper myid信息

      cd /opt/pulsarCluster
      mkdir -p data/zookeeper
      echo 1 > data/zookeeper/myid
      
    • 以守护进程启动ZooKeeper

      cd /opt/pulsarCluster/bin
      ./pulsar-daemon start zookeeper
      

    配置集群信息

    部署完ZooKeeper集群后,需要将一些Pulsar集群的元信息写入ZooKeeper集群的每个节点,由于数据在ZooKeeper集群内部会互相同步,因此只需要将元信息写入ZooKeeper的一个节点。可以在ZooKeeper集群的任意节点通过pulsar工具的 initialize-cluster-metadata方法配置数据,配置命令只需执行一次,否则ZooKeeper会报节点已经存在的错误。命令的一个简单样例如下:

    $ ./pulsar initialize-cluster-metadata \
      --cluster pulsar-cluster-zk-1 \
      --zookeeper 205.20.107.25:2181 \
      --configuration-store 205.20.107.25:2181 \
      --web-service-url http://pulsar.cluster.com:8080 \
      --web-service-url-tls https://pulsar.cluster.com:8443 \
      --broker-service-url pulsar://pulsar.cluster.com:6650 \
      --broker-service-url-tls pulsar+ssl://pulsar.cluster.com:6651
    

    在本文的安装部署过程中,Pulsar集群的名称为pulsar-cluster,统一域名pulsar.cluster.com。命令参数的具体含义如下:

    Flag Description
    --cluster 集群名称
    --zookeeper ZooKeeper集群连接参数,仅需要包含集群中的一个节点即可
    --configuration-store Pulsar实例的配置存储集群(ZooKeeper),和-zookeeper参数一样只需要包含集群中的一个节点即可
    --web-service-url 集群Web服务的URL+端口,URL必须是一个i标准的DNS名称,默认端口8080,不建议修改。
    --web-service-url-tls 集群Web提供TLS服务的URL+端口,端口默认8443,不建议修改。
    --broker-service-url 集群brokers服务URL,URL中DNS的名称和Web服务保持一致,URL使用pulsar替代http/http,端口默认6650,不建议修改。
    --broker-service-url-tls 集群brokers提供TLS服务的URL,默认端口6551,不建议修改。

    部署BookKeeper集群

    • 配置BookKeeper集群
      Pulsar集群中所有持久数据的存储都由BookKeeper负责,因此如果想使用Pulsar需要部署一个BookKeeper集群,建议部署一个包含3个bookie节点的BookKeeper集群。BookKeeper集群的配置使用conf/bookkeeper.conf文件,BookKeeper最终的配置是配置ZooKeeper集群的地址,一个具体的配置例子如下:
      zkServers=205.20.107.25:2181,205.20.107.21:2181,205.20.107.22:2181
      
      Pulsar从2.1.0版本开始引入了有状态函数,如果想使用该功能,还需要在conf/bookkeeper.conf文件中增加如下配置:
      extraServerComponents=org.apache.bookkeeper.stream.server.StreamStorageLifecycleComponent
      
    • 启动BooKeeper集群
      • 后台进程启动
        cd /opt/pulsarCluster/bin
        ./pulsar-daemon start bookie
        
      • 前台进程启动
        cd /opt/pulsarCluster/bin
        ./bookkeeper bookie
        
    • 检查BookKeeper集群状态
       cd /opt/pulsarCluster/bin
       ./bookkeeper shell bookiesanity
      
      如果BookKeeper成功运行,输出的最后一行结果为:
      org.apache.bookkeeper.bookie.BookieShell - Bookie sanity test succeeded
      

    部署Pulsar brokers

    • 配置pulsar brokers:在conf/broker.conf中配置ZooKeeper集群、配置存储集群、集群名称、开始函数功能
       zookeeperServers=205.20.107.25:2181,205.20.107.21:2181,205.20.107.22:2181
       configurationStoreServers=205.20.107.25:2181,205.20.107.21:2181,205.20.107.22:2181
       clusterName=pulsar-cluster
       functionsWorkerEnabled=true
      
    • 配置function worker 集群名(conf/functions_worker.yml)
      ulsarFunctionsCluster=pulsar-cluster
      
    • 启动 brokers
      • 后台进程启动
        $ cd /opt/pulsarCluster/bin
        $ ./pulsar-daemon start broker
        
      • 前台进程启动
        $ cd /opt/pulsarCluster/bin
        $ ./bookkeeper broker
        

    使用Pulsar发布、订阅消息(Java)

    安装Pulsar Java客户端

    如果使用Maven,在工程的pom文件增加如下配置:

    <!-- 在<properties> 块中增加版本号信息 -->
    <pulsar.version>2.1.1-incubating</pulsar.version>
    
    <!--增加依赖 -->
    <dependency>
      <groupId>org.apache.pulsar</groupId>
      <artifactId>pulsar-client</artifactId>
      <version>${pulsar.version}</version>
    </dependency>
    

    构造Client

    • Pular连接 URLs
      • 本地Pulsar
        pulsar://localhost:6650
        
      • 生成环境Pulsar集群(使用域名)
        pulsar://pulsar.cluster.com:6650
        
      • 开启TLS的条件下,Pulsar集群连接URLs
        pulsar+ssl://pulsar.cluster.com:6651
        
    • 配置Pulsar客户端
      客户端的配置主要包括:Pulsar集群信息配置、鉴权信息配置、TLS配置、线程数连接数配置等。具体的配置可以参考Pulsar Client配置
      基于本文搭建的集群,只保留最简单的配置信息,Pulsar Client的构造如下:
            private static final String SERVER_URL = "pulsar://pulsar.cluster.com:6650";
            PulsarClient client = PulsarClient.builder()
                .serviceUrl(SERVER_URL)
                .enableTcpNoDelay(true)
                .build();
      

    构造生产者

    一个完整的生产者样例如下:

    
    import org.apache.pulsar.client.api.MessageId;
    import org.apache.pulsar.client.api.Producer;
    import org.apache.pulsar.client.api.PulsarClient;
    import org.apache.pulsar.client.api.Schema;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.TimeUnit;
    
    public class ProducerDemm {
        private static final Logger log = LoggerFactory.getLogger(ProducerDemm.class);
        private static final String SERVER_URL = "pulsar://pulsar.cluster.com:6650";
        public static void main(String[] args) throws Exception {
            // 构造Pulsar Client
            PulsarClient client = PulsarClient.builder()
                .serviceUrl(SERVER_URL)
                .enableTcpNoDelay(true)
                .build();
            // 构造生产者
            Producer<String> producer = client.newProducer(Schema.STRING)
                .producerName("my-producer")
                .topic("persistent://public/default/my-topic")
                .batchingMaxMessages(1024)
                .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
                .enableBatching(true)
                .blockIfQueueFull(true)
                .maxPendingMessages(512)
                .sendTimeout(10, TimeUnit.SECONDS)
                .blockIfQueueFull(true)
                .create();
            // 同步发送消息
            MessageId messageId = producer.send("Hello World");
            log.info("message id is {}",messageId);
            CompletableFuture<MessageId> asyncMessageId = producer.sendAsync("This is a async message");
            // 阻塞线程,直到返回结果
            log.info("async message id is {}",asyncMessageId.get());
    
            // 配置发送的消息元信息,同步发送
            producer.newMessage()
                .key("my-message-key")
                .value("my-message")
                .property("my-key", "my-value")
                .property("my-other-key", "my-other-value")
                .send();
            producer.newMessage()
                .key("my-async-message-key")
                .value("my-async-message")
                .property("my-async-key", "my-async-value")
                .property("my-async-other-key", "my-async-other-value")
                .sendAsync();
            
            // 关闭producer的方式有两种:同步和异步
            // producer.closeAsync();
            producer.close();
            
            // 关闭licent的方式有两种,同步和异步
            // client.close();
            client.closeAsync();
            
        }
    }
    

    构造消费者

    • 单订阅
    
    import org.apache.pulsar.client.api.Consumer;
    import org.apache.pulsar.client.api.Message;
    import org.apache.pulsar.client.api.PulsarClient;
    import org.apache.pulsar.client.api.SubscriptionType;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.concurrent.TimeUnit;
    
    public class MyConsumer {
       private static final Logger log = LoggerFactory.getLogger(MyConsumer.class);
       private static final String SERVER_URL = "pulsar://pulsar.cluster.com:6650";
    
       public static void main(String[] args) throws Exception {
           // 构造Pulsar Client
           PulsarClient client = PulsarClient.builder()
               .serviceUrl(SERVER_URL)
               .enableTcpNoDelay(true)
               .build();
           Consumer consumer = client.newConsumer()
               .consumerName("my-consumer")
               .topic("persistent://public/default/my-topic")
               .subscriptionName("my-subscription")
               .ackTimeout(10, TimeUnit.SECONDS)
               .maxTotalReceiverQueueSizeAcrossPartitions(10)
               .subscriptionType(SubscriptionType.Exclusive)
               .subscribe();
           do {
               // 接收消息有两种方式:异步和同步
               // CompletableFuture<Message<String>> message = consumer.receiveAsync();
               Message message = consumer.receive();
               log.info("get message from pulsar cluster,{}", message);
           } while (true);
       }
    }
    
    • 多订阅
    
    import org.apache.pulsar.client.api.Consumer;
    import org.apache.pulsar.client.api.ConsumerBuilder;
    import org.apache.pulsar.client.api.PulsarClient;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.Arrays;
    import java.util.List;
    import java.util.regex.Pattern;
    
    public class MultiConsumer {
        private static final Logger log = LoggerFactory.getLogger(MultiConsumer.class);
        private static final String SERVER_URL = "pulsar://pulsar.cluster.com:6650";
        private static final String DEFAULT_NS_TOPICS = "persistent://public/default/.*";
        private static final String DEFATULT_NS_REG_TOPICS= "persistent://public/default/my.*";
        private static void main(String[] args) throws Exception {
            PulsarClient client = PulsarClient.builder()
                .serviceUrl(SERVER_URL)
                .enableTcpNoDelay(true)
                .build();
            ConsumerBuilder consumerBuilder = client.newConsumer()
                .subscriptionName("multi-sub");
    
            // 订阅namespace下所有的topic
            Pattern allTopicsInNamespace = Pattern.compile(DEFAULT_NS_TOPICS);
            consumerBuilder.topicsPattern("").subscribe();
    
            // 订阅namespace下满足正则匹配的topic
            Pattern someTopicsInNamespace = Pattern.compile(DEFATULT_NS_REG_TOPICS);
            Consumer allTopicsConsumer = consumerBuilder
                .topicsPattern(someTopicsInNamespace)
                .subscribe();
    
            List<String> topics = Arrays.asList(
                "topic-1",
                "topic-2",
                "topic-3"
            );
    
            Consumer multiTopicConsumer = consumerBuilder
                .topics(topics)
                .subscribe();
            
        }
    }
    

    Pulsar美中不足

    • 不支持Windows平台安装部署,这对多数使用Windows的开发者来说调试成本较高。
    • SDK支持不足,Go,C++版本的SDK对Windows平台的支持还不够完美。

    相关文章

      网友评论

          本文标题:Pulsar-安装部署

          本文链接:https://www.haomeiwen.com/subject/rdkpzftx.html