美文网首页
PulsarClient 解析(一)

PulsarClient 解析(一)

作者: tracy_668 | 来源:发表于2022-04-01 23:26 被阅读0次

[TOC]
[TOC]

Pulsar client初始化过程

初始化Pulsar Producer和Consumer都需要先初始化Pulsar client。示例:

PulsarClient client = PulsarClient.builder()
        .serviceUrl("pulsar://127.0.0.1:6650")
        .build();

  • PulsarClient.builder()
    会创建一个ClientBuilderImpl一个实例, 并用一个 ClientConfigurationData实例来初始化conf对象;

  • .serviceUrl("pulsar://127.0.0.1:6650")
    为conf对象设置serviceUrl参数,并判断是否使用了TLS

  • .build()

  1. 首先简要serviceUrl参数和serviceUrlProvider,两者只能存在一个

  2. 使用conf初始化PulsarClientImpl

  • 初始化EventLoopGroup:(EpollEventLoopGroup或者NioEventLoopGroup),这个过程会new一个客户端线程工厂

  • 初始化ConnectionPool:

  • 初始化bootstrap(client) ,设置channel的一些参数,并且指定PulsarChannelInitializer, channel初始化时,会给channel设置handler,其中包括一个ClientCnx,主要用来处理broker的结果响应;最后会初始化一个DnsNameResolver

  • 初始化externalExecutorProvider,初始化LookupService, 这个过程中会初始化PulsarServiceNameResolver, 并且在 PulsarServiceNameResolver中解析、记录url的相关信息

Producer 初始化

向pulsar生产数据,需要首先初始化一个producer,

final Producer<byte[]> producer = client.newProducer()
                    .topic(topic)
                    .maxPendingMessages(5000)
                    .enableBatching(true)
                    .create();

过程比较简单,

  • 初始化一个ProducerBuilderImpl
  • 设置 topic 参数
  • 设置最大缓存的Message数量
  • 设置是否支持批量发送
  • 创建Producer

设置消息路由模式,包括SinglePartition, RoundRobinPartition和CustomPartition, 使用 ``CustomPartition`时,需要实现router

异步创建producer,首先需要获取topic的元数据信息,这是需要创建连接,调用关系如下图所示:


image.png

最终会调用bootstrap.connect()方法创建netty连接。netty连接创建完成之后,为 netty channel 添加closeFuture的清理逻辑,然后创建ClientCnx对象,并设置remoteaddress等属性信息。

创建连接完成之后,会创建newPartitionMetadataRequest并且携带topic作为参数,然后发送newPartitionMetadataRequest到服务端。服务端会返回对应的分区数量信息。

获取到topic的分区信息之后,根据分区数量创建PartitionedProducerImpl或者ProducerImpl。

至此,producer创建完毕。

PulsarClient

PulsarClient client = PulsarClient.builder()
        .serviceUrl("pulsar://localhost:6650")
        .build();

让我们看一下这个类的主要方法

image.png
  • 创建producer/consumer/reader
  • 元数据信息相关
  • transaction相关
  • close方法

ClientBuilder

这里有一个builder方法用来传递一些PulsarClient的配置

支持的配置项

  1. 连接配置相关:
  • 连接地址:serviceUrl / serviceUrlProvider / listener / proxyServiceUrl

[图片上传失败...(image-fe4d5f-1648735036203)]

[图片上传失败...(image-79f60b-1648735036203)]

这里面Builder.build就直接配置参数传入了PulsarClientImpl的构造函数了

我们看下这里面做了什么操作

PulsarClientImpl

package org.apache.pulsar.client.impl;

public class PulsarClientImpl implements PulsarClient {

        // 查找服务
    private LookupService lookup;
    
    // 连接池
    private final ConnectionPool cnxPool;
    
    // netty 里面的HashedWheelTimer,用来调度一些延迟操作
    private final Timer timer;
    private final ExecutorProvider externalExecutorProvider;
    private final ExecutorProvider internalExecutorService;

        // 当前PulsarClient的状态
    private AtomicReference<State> state = new AtomicReference<>();
    
    // 所有的业务处理单元(客户端逻辑)
    private final Set<ProducerBase<?>> producers;
    private final Set<ConsumerBase<?>> consumers;

        // id发号器
    private final AtomicLong producerIdGenerator = new AtomicLong();
    private final AtomicLong consumerIdGenerator = new AtomicLong();
    private final AtomicLong requestIdGenerator = new AtomicLong();

      // 这里面的EventLoopGroup好像只被当成线程池来用了
    // 0. ConnectionPool 里面初始化作为连接的io线程池(netty客户端常规用法)
    // 1. 在Consumer里面用来定时flush PersistentAcknowledgmentsGroupingTracker
    // 2. Producer 里面用来定时生成加密的key
    // 3. 作为AsyncHttpClient的构造参数
    private final EventLoopGroup eventLoopGroup;

        // Schema 的cache
    private final LoadingCache<String, SchemaInfoProvider> schemaProviderLoadingCache;

    // producer 用来生成PublishTime
    private final Clock clientClock;

    @Getter
    private TransactionCoordinatorClientImpl tcClient;

LookUpService根据配置参数会选择HttpLookupService 或者是BinaryProtoLookupService

ConnectionPool

package org.apache.pulsar.client.impl;

public class ConnectionPool implements Closeable {
  
    // 连接池,保存连接
    // 地址 -> 第x个连接 -> 连接
    // 如果配置maxConnectionsPerHosts=0 则把pooling关闭了
    protected final ConcurrentHashMap<InetSocketAddress, ConcurrentMap<Integer, CompletableFuture<ClientCnx>>> pool;
  
    // netty 相关
    // PulsarClient 传递过来的
    private final EventLoopGroup eventLoopGroup;
    private final Bootstrap bootstrap;
    private final PulsarChannelInitializer channelInitializerHandler;
    protected final DnsNameResolver dnsResolver;
  
    // 配置
    private final ClientConfigurationData clientConfig;
    private final int maxConnectionsPerHosts;
  
    // 是否是Server Name Indication 代理,TLS 相关,先忽略
    private final boolean isSniProxy;
 

构造函数主要是按照netty 网络客户端方式初始化相关成员变量

        bootstrap = new Bootstrap();
        // 绑定io线程池
        bootstrap.group(eventLoopGroup);
        // 配置了channel类型,如果支持Epoll的话会变成Epoll的channel
        bootstrap.channel(EventLoopUtil.getClientSocketChannelClass(eventLoopGroup));
        // 设置tcp的连接超时时间
        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.getConnectionTimeoutMs());
        // 设置tcp no delay
        bootstrap.option(ChannelOption.TCP_NODELAY, conf.isUseTcpNoDelay());
        // 配置allocator
        bootstrap.option(ChannelOption.ALLOCATOR, PulsarByteBufAllocator.DEFAULT);
        // 绑定channelInitializer
            channelInitializerHandler = new PulsarChannelInitializer(conf, clientCnxSupplier);
            bootstrap.handler(channelInitializerHandler);
        // 这个类是netty提供的,用来解析DNS,后面专门会说
        this.dnsResolver = new DnsNameResolverBuilder(eventLoopGroup.next()).traceEnabled(true)
                .channelType(EventLoopUtil.getDatagramChannelClass(eventLoopGroup)).build();
    }

这里面传入的BufferPool是一个自定义的

这个连接池的主要功能

  • 创建并cache连接
  • 归还连接
  • 按照配置的maxConnectionsPerHosts限制连接数目

具体使用方式可以参照org.apache.pulsar.client.impl.ConnectionPoolTest 这个类

ConnectionPool pool;
InetSocketAddress brokerAddress = ....;

// 获取连接,如果之前没有的话,会创建一个
CompletableFuture<ClientCnx> conn = pool.getConnection(brokerAddress);
ClientCnx cnx = conn.get();

// 使用连接做事情
...
  
// 归还给连接池
pool.releaseConnection(cnx);
          
pool.closeAllConnections();
pool.close();

我们先看一下这个类PulsarChannelInitializer用来初始化和pulsar broker 端的连接。

public void initChannel(SocketChannel ch) throws Exception {

    // tls相关
    ch.pipeline().addLast("ByteBufPairEncoder", tlsEnabled ? ByteBufPair.COPYING_ENCODER : ByteBufPair.ENCODER);

    // 定长解码器
    ch.pipeline().addLast("frameDecoder", 
                          new LengthFieldBasedFrameDecoder(
            Commands.DEFAULT_MAX_MESSAGE_SIZE + Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4));
  
    // 到这里可以拿到了RPC协议反序列化后的对象,进行客户端逻辑处理
    // 实际在这个类ClientCnx里面处理所有逻辑
    ch.pipeline().addLast("handler", clientCnxSupplier.get());
}
创建连接逻辑 (connectToAddress)

netty 的bootstrap.connect(忽略tls)

ClientCnx

我们看一下这个类的层次结构

public class ClientCnx extends PulsarHandler;
public abstract class PulsarHandler extends PulsarDecoder;
public abstract class PulsarDecoder extends ChannelInboundHandlerAdapter;

PulsarDecoder
PulsarDecoder 这个类前面在初始化连接的时候还加入了一个LengthFieldBasedFrameDecoder.

所以到这里的channelRead就可以直接反序列化RPC就可以,之后会调用相应的RPC处理方法(handleXXXXXX)

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ...
          
        // Get a buffer that contains the full frame
        ByteBuf buffer = (ByteBuf) msg;
        BaseCommand cmd = null;
        BaseCommand.Builder cmdBuilder = null;
        try {
            // De-serialize the command
            int cmdSize = (int) buffer.readUnsignedInt();
            int writerIndex = buffer.writerIndex();
            buffer.writerIndex(buffer.readerIndex() + cmdSize);
          
            // 从对象池里拿到一个ByteBufCodedInputStream
            ByteBufCodedInputStream cmdInputStream = ByteBufCodedInputStream.get(buffer);
            cmdBuilder = BaseCommand.newBuilder();
            // 反序列化
            cmd = cmdBuilder.mergeFrom(cmdInputStream, null).build();
            buffer.writerIndex(writerIndex);

            cmdInputStream.recycle();

            ...
            // 下面按照不同的RPC类型调用不用的方法进行处理
            switch (cmd.getType()) {
            case PARTITIONED_METADATA:
                checkArgument(cmd.hasPartitionMetadata());
                try {
                    interceptCommand(cmd);
                    handlePartitionMetadataRequest(cmd.getPartitionMetadata());
                } catch (InterceptException e) {
                    ctx.writeAndFlush(Commands.newPartitionMetadataResponse(getServerError(e.getErrorCode()),
                            e.getMessage(), cmd.getPartitionMetadata().getRequestId()));
                } finally {
                    cmd.getPartitionMetadata().recycle();
                }
                break;
            ...
              // 省略其他RPC方法,都是正常handleXXXXX
        } finally {
               // 清理方法
            if (cmdBuilder != null) {
                cmdBuilder.recycle();
            }

            if (cmd != null) {
                cmd.recycle();
            }

            buffer.release();
        }
    }

PulsarHandler

这个类实际里面主要增加了KeepAlive逻辑的实现。

ClientCnx

这里主要负责和服务端交互的逻辑。

package org.apache.pulsar.client.impl;


public class ClientCnx extends PulsarHandler {


    // 连接状态
    enum State {
        None, SentConnectFrame, Ready, Failed, Connecting
    }
    private State state;

   //----------------------------------------------------------------------
  
    // 临时的请求队列
    // requestId -> 请求
    private final ConcurrentLongHashMap<CompletableFuture<? extends Object>> pendingRequests =
        new ConcurrentLongHashMap<>(16, 1);
  
    // Lookup 请求队列
    private final Queue<Pair<Long, Pair<ByteBuf, CompletableFuture<LookupDataResult>>>> waitingLookupRequests;

   //----------------------------------------------------------------------
  
    // 一些业务逻辑单元
    private final ConcurrentLongHashMap<ProducerImpl<?>> producers = new ConcurrentLongHashMap<>(16, 1);
    private final ConcurrentLongHashMap<ConsumerImpl<?>> consumers = new ConcurrentLongHashMap<>(16, 1);
    private final ConcurrentLongHashMap<TransactionMetaStoreHandler> transactionMetaStoreHandlers = new ConcurrentLongHashMap<>(16, 1);
  
   //----------------------------------------------------------------------
  
    // 异步新建连接的handle
    private final CompletableFuture<Void> connectionFuture = new CompletableFuture<Void>();
  
   //----------------------------------------------------------------------
   
    // PulsarClient 构造时传递进来的线程池
    private final EventLoopGroup eventLoopGroup;

   //----------------------------------------------------------------------
  
    // 限流(和lookup有关)
    private final Semaphore pendingLookupRequestSemaphore;
    private final Semaphore maxLookupRequestSemaphore;
  
    // 连接拒绝相关的成员(和lookup有关)
    private final int maxNumberOfRejectedRequestPerConnection;
    private final int rejectedRequestResetTimeSec = 60;
    // 被拒绝的请求数目(和lookup有关)
    private static final AtomicIntegerFieldUpdater<ClientCnx> NUMBER_OF_REJECTED_REQUESTS_UPDATER = AtomicIntegerFieldUpdater
            .newUpdater(ClientCnx.class, "numberOfRejectRequests");
    @SuppressWarnings("unused")
    private volatile int numberOfRejectRequests = 0;

    //----------------------------------------------------------------------
    // 用来检查请求是否超时的数据结构
    private static class RequestTime {
        final long creationTimeMs;
        final long requestId;
        final RequestType requestType;

        RequestTime(long creationTime, long requestId, RequestType requestType) {
            this.creationTimeMs = creationTime;
            this.requestId = requestId;
            this.requestType = requestType;
        }
    }
  
    // 超时的请求队列
    private final ConcurrentLinkedQueue<RequestTime> requestTimeoutQueue = new ConcurrentLinkedQueue<>();
    
   //----------------------------------------------------------------------
  
    // 消息的最大大小
    @Getter
    private static int maxMessageSize = Commands.DEFAULT_MAX_MESSAGE_SIZE;

    // RPC协议版本
    private final int protocolVersion;
  
    // operation超时时间
    private final long operationTimeoutMs;
    // 用来检查operation超时时间的handle
    private ScheduledFuture<?> timeoutTask;
  
   //----------------------------------------------------------------------
  
    // 一些记录是否从proxy连接的信息
    protected String proxyToTargetBrokerAddress = null;
    protected String remoteHostName = null;
  
    // TLS 相关
    private boolean isTlsHostnameVerificationEnable;
    private static final TlsHostnameVerifier HOSTNAME_VERIFIER = new TlsHostnameVerifier();
    protected final Authentication authentication;
    protected AuthenticationDataProvider authenticationDataProvider;
  
   //----------------------------------------------------------------------
  
    // 事务相关
    private TransactionBufferHandler transactionBufferHandler;
    

    private enum RequestType {
        Command,
        GetLastMessageId,
        GetTopics,
        GetSchema,
        GetOrCreateSchema;

        String getDescription() {
            if (this == Command) {
                return "request";
            } else {
                return name() + " request";
            }
        }
    }

这里临时回到ConnectionPool的逻辑中,之前创建连接的时候实际调用Bootstrap.connect这里返回的实际是一个Netty的Channel对象,但是ConnectionPool里面返回的ClientCnx对象。

ConnectionPool

private CompletableFuture<ClientCnx> createConnection(InetSocketAddress logicalAddress,
            InetSocketAddress physicalAddress, int connectionKey) {
     
        final CompletableFuture<ClientCnx> cnxFuture = new CompletableFuture<ClientCnx>();

        // Trigger async connect to broker
        createConnection(physicalAddress).thenAccept(channel -> {
            ....
            // 这里面ClientCnx对象实际是从这个已经成功连接的Channel的pipeline里拿到的
            final ClientCnx cnx = (ClientCnx) channel.pipeline().get("handler");
            ....

            if (!logicalAddress.equals(physicalAddress)) {
                // We are connecting through a proxy. We need to set the target broker in the ClientCnx object so that
                // it can be specified when sending the CommandConnect.
                // That phase will happen in the ClientCnx.connectionActive() which will be invoked immediately after
                // this method.
                cnx.setTargetBroker(logicalAddress);
            }
            
            // 保存了远端连接的地址
            cnx.setRemoteHostName(physicalAddress.getHostName());

            cnx.connectionFuture().thenRun(() -> {
                ... 
                // 连接成功则返回
                cnxFuture.complete(cnx);
            }).exceptionally(exception -> {
               
                cnxFuture.completeExceptionally(exception);
                cleanupConnection(logicalAddress, connectionKey, cnxFuture);
                cnx.ctx().close();
                return null;
            });
              
           ...

ClientCnx的主要方法(功能)

  • 连接生命周期管理(netty Handler里面的方法)

channelActive

channelInActive

exceptionCaught

  • 发送request:主动发送RPC的方法,并按照业务逻辑处理

Lookup请求

getLastMessageId

getSchema

  • 处理response:继承自PulsarDecoder 的handleXXXXX RPC 处理逻辑

  • 主动发送RPC方法获得原始的response

CompletableFuture<T> sendRequestAndHandleTimeout(ByteBuf requestMessage,long requestId,RequestType requestType)

  • 检查请求是否超时checkRequestTimeout

  • 注册/ 删除业务逻辑对象(业务逻辑对象后面单出文章说)

consumer

producer

transactionMetaStoreHandler

transactionBufferHandler

sendRequestAndHandleTimeout方法

private <T> CompletableFuture<T> sendRequestAndHandleTimeout(ByteBuf requestMessage, long requestId, RequestType requestType) {
  
        // 放入到pending请求队列里面,用来等待response
        CompletableFuture<T> future = new CompletableFuture<>();
        pendingRequests.put(requestId, future);
  
        // 直接发送RPC body
        ctx.writeAndFlush(requestMessage).addListener(writeFuture -> {
            if (!writeFuture.isSuccess()) {
                log.warn("{} Failed to send {} to broker: {}", ctx.channel(), requestType.getDescription(), writeFuture.cause().getMessage());
                pendingRequests.remove(requestId);
                future.completeExceptionally(writeFuture.cause());
            }
        });
        // 在超时队列里面增加一个数据结构用来记录超时
        requestTimeoutQueue.add(new RequestTime(System.currentTimeMillis(), requestId, requestType));
  
        return future;
    }
chanelActive 方法

这个方法逻辑比较简单

PulsarHandler.channelActive方法里面开启了KeepAlive逻辑的调度任务

ClientCnx.channelActive 方法里面开启了requestTimeout逻辑的调度任务

发送一个ConnectCommand请求给服务端(服务端处理逻辑到后面会说)

请求超时的处理

这个逻辑也比较容易。

使用了EventLoopGroup调度了一个定时任务,每次去查看requestTimeoutQueue里面的请求是否有超时的

有的话就把这个请求的response设置成TimeoutException

这里的请求超时检查时间间隔是operationTimeoutMs决定的

PulsarClient 功能回顾

这样让我们回顾一下PulsarClient的总体功能

  • 包含了一个连接池用来创建ClientCnx和服务端进行沟通

  • 保存了一些自定义业务处理单元(consumer,producer, tcClient)

  • LookupService

  • 一些周期check的动作

  • Schema 的LoadingCache

业务单元通过注册到ClientCnx上面,可以使用这个连接发送RPC,获得response,这样传递回业务逻辑单元里面

PulsarClient这个类对使用者来说提供了一个RPC层面的抽象,其他类使用RPC完成自己的逻辑

相关文章

  • PulsarClient 解析(一)

    PulsarClient 让我们看一下这个类的主要方法 创建producer/consumer/reader 元数...

  • PulsarClient 解析(一)

    [TOC] PulsarClient 让我们看一下这个类的主要方法 创建producer/consumer/rea...

  • XML解析

    Sax解析:流的方式进行解析流解析:以流的方式进行解析(一行一行解析)Dom解析:加载整个文档,以树的方式解析

  • Linux智能DNS服务搭建之Bind服务(二)

    一、DNS正向解析与反向解析 1.DNS正向解析与反向解析简介 2.DNS正向解析与反向解析配置 1)配置正向解析...

  • xml解析

    一、解析方式:DOM解析,SAX解析 1)解析工具 基于DOM解析原理的: 1)JAXP (o...

  • 五、Groovy语法(五)json、xml解析

    Groovy数据解析 一、json解析 请求网络数据并解析 二、xml解析 groovy解析xml数据 groov...

  • XML3 - XML解析编程

    JAXP解析 JAXP的DOM解析 由解析器工厂类获取解析器工厂 解析器工厂产生一个解析器 解析XML,获得一个D...

  • 2018年12月大学六级考试答案解析

    写作:点评 解析 听力:点评 解析 阅读:点评 解析(第一套) 解析(第二套) 解析(第三套) 翻译:点评 解析

  • 2018年12月大学四级考试答案解析

    写作:点评 解析 听力:点评 解析 阅读:点评 解析(第一套)解析(第二套) 解析(第三套) 翻译:点评 解析

  • JSON解析数据

    JsonObject解析 和JSonArray解析: JSON创建和JSON解析: 一。效果图: 创建: 解析: ...

网友评论

      本文标题:PulsarClient 解析(一)

      本文链接:https://www.haomeiwen.com/subject/vhqojrtx.html