美文网首页Apache Pulsar 指南
Apache Pulsar 源码走读(一)启动

Apache Pulsar 源码走读(一)启动

作者: WJL3333 | 来源:发表于2020-06-06 03:30 被阅读0次

1.启动入口

PulsarStandaloneStarter
在standalone模式下,主要启动了以下几个服务

  1. PulsarService
  2. PulsarAdmin
  3. LocalBookeeperEnsemble
  4. WorkerService

PulsarBrokerStarter.BrokerStarter
在普通模式下,启动了以下几个服务

  1. PulsarService
  2. BookieServer
  3. AutoRecoveryMain
  4. StatsProvider
  5. WorkerService

简单说一些这几个服务

  • WorkerService: Pulsar function 相关,可以不启动
  • PulsarService: 主要的PulsarBroker相关
  • BookieServer: Bookeeper相关
  • AutoRecoveryMain: Bookeeper autorecovery相关
  • StatsProvider: Metric Exporter类似的功能

2. PulsarService

PulsarService.start

  1. ProtocolHandlers
    支持不同protocol处理(kafka协议等)

  2. localZookeeperConnectionProvider
    维护zk session 和zk连接

  3. startZkCacheService

    • LocalZooKeeperCache => LocalZooKeeperCacheService
    • GlobalZooKeeperCache => ConfigurationCacheService
  4. BookkeeperClientFactory
    创建配置Bookkeeper 客户端

  5. managedLedgerClientFactory
    维护一个ManagedLedger的客户端,借用BookkeeperClient

  6. BrokerService
    这个是服务器的主要逻辑了,这个放在后面说

  7. loadManager
    收集集群机器负载,并根据负载情况均衡负载

  8. startNamespaceService
    NameSpaceService,管理放置的ResourceBundle,和LoadManager相关

  9. schemaStorage

  10. schemaRegistryService
    上面2个都是和Schema相关的

  11. defaultOffloader
    LedgerOffloader,用来将Ledger(Bookkeeper)中的冷数据放到其他存储当中

  1. WebService

  2. webSocketService
    http,websocket相关

  3. LeaderElectionService
    和LoadManager有关,如果是集中方式的话需要选出一个Leader定期根据集群情况进行均衡负载

  4. transactionMetadataStoreService
    事务相关

  5. metricGenerator
    metric相关

  6. WorkerService
    pulsar function 相关

3. BrokerService

public void start() throws Exception {
        // producer id 分布式生成器
        this.producerNameGenerator = new DistributedIdGenerator(pulsar.getZkClient(), producerNameGeneratorPath,
                pulsar.getConfiguration().getClusterName());

        // 网络层配置
        ServerBootstrap bootstrap = defaultServerBootstrap.clone();

        ServiceConfiguration serviceConfig = pulsar.getConfiguration();

        bootstrap.childHandler(new PulsarChannelInitializer(pulsar, false));
        ...
        // 绑定端口
        listenChannel = bootstrap.bind(addr).sync().channel();
        ...

       // metric
        this.startStatsUpdater(
                serviceConfig.getStatsUpdateInitialDelayInSecs(),
                serviceConfig.getStatsUpdateFrequencyInSecs());

       // 启动了一堆需要定期执行的任务
        this.startInactivityMonitor();
       // 启动3个schedule任务分别检测
       // 1. 长时间无效的topic
       // 2. 长时间无效的producer(和message去重相关)
       // 3. 长时间无效的subscription
        this.startMessageExpiryMonitor();
        this.startCompactionMonitor();
        this.startMessagePublishBufferMonitor();
        this.startConsumedLedgersMonitor();
        this.startBacklogQuotaChecker();
        this.updateBrokerPublisherThrottlingMaxRate();
        this.startCheckReplicationPolicies();

        // register listener to capture zk-latency
        ClientCnxnAspect.addListener(zkStatsListener);
        ClientCnxnAspect.registerExecutor(pulsar.getExecutor());

4. PulsarChannelInitializer

顺着netty的初始化方式我们直接看ChannelInitializer,这里应该和Kafka类似进行处理请求的操作。


protected void initChannel(SocketChannel ch) throws Exception {
        
        ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER);
     
        ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(
            brokerConf.getMaxMessageSize() + Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4));

        ch.pipeline().addLast("flowController", new FlowControlHandler());
        ServerCnx cnx = new ServerCnx(pulsar);
        ch.pipeline().addLast("handler", cnx);

        connections.put(ch.remoteAddress(), cnx);
    }

5. ServerCnx

这个类的作用可以对标KafkaApis,处理各种Api请求
这个类实际上是一个ChannelHandler
继承了PulsarHandler(主要负责一些连接的keepalive逻辑)
PulsarHandler继承了 PulsarDecoder ( 主要负责序列化,反序列化Api请求)
PulsarDecoder实际上是一个 ChannelInboundHandlerAdapter

而PulsarAPi实际上是通过Pulsar.proto 生成的,这里编写了各种Api的定义

相关文章

网友评论

    本文标题:Apache Pulsar 源码走读(一)启动

    本文链接:https://www.haomeiwen.com/subject/docttktx.html