美文网首页Kafka程序员文字欲
无镜--kafka之生产者(一)

无镜--kafka之生产者(一)

作者: 绍圣 | 来源:发表于2018-07-27 10:42 被阅读19次

    学习kafka有一段时间了。关于它里面的知识还是需要总结一下,一来是能让自己对kafka能有一个比较成型的理解,二来是对知识的一种回顾。多总结加深理解。

    前言

    kafka作为应用最为广泛的消息中间件,其内部各个的组件是怎么来协调工作的?其内部的设计思想是怎么样的?这些都很值得我们去细细的分析和研究。此处一系列的分析以kafka-0.10.1.0版本为基础进行解读。

    由于新版本的kafka的C端是采用java实现的(S端是由Scala ),所以对于java开发人员很好上手。在使用kafka的时候,最开始接触的就是它的生产者和消费者客户端。并且kafka客户端对外提供的接口非常简洁,使用起来简单方便。当然这些得益于kafka在幕后为我们做了很多工作。

    生产者(KafkaProducer)

    使用KafkaProducer的伪代码:

    KafkaProducer producer = new KafkaProducer(Map);

    ProducerRecordrecord = new ProducerRecord(Topic, Key, Value); 

    Future fu = producer.send(record);

    producer.flush();

    RecordMetadata rm = fu.get(1, TimeUnit.MINUTES);

    可以看出,生产者的API确实非常简单。以上的伪代码是同步发送消息,kafka的生产者还提供异步提交消息的方法(伪代码):

    KafkaProducer producer = new KafkaProducer(Map);

    ProducerRecord record = new ProducerRecord(Topic, Key, Value);

    producer.send(record, new Callback(){

    @Override public void onCompletion(RecordMetadata metadata, Exception e) { if (e != null) { System.out.println(e.getMessage()); } if(metadata != null) { System.out.println("message send to partition " + metadata.partition() + ",topic:" + metadata.topic() + ", offset: " + metadata.offset()); } }

    });

    KafkaProducer对象代表一个客户端进程,KafkaProducer.send()方法发送到服务端的消息,并不是直接发送到服务端,而是KafkaProducer把消息存放到内存队列中。再由一个消息发送线程从队列中拉取出消息,以批量的方式发送给服务端。kafka中的记录收集器(RecordAccumulator)负责缓存生产者产生的消息。发送线程(Sender)负责读取记录收集器(RecordAccumulator)的批量消息,通过网络发送给服务端。

    ProducerRecord

    需要发送给服务器的消息,都会封装成ProducerRecord对象。ProducerRecord对象中定义了消息相关信息:

    private final String topic; // 要发送的topic名称

    private final Integer partition; // 要发送的分区ID

    private final K key; // 消息的KEY值

    private final V value; // 消息的value

    private final Long timestamp;

    生产者中的拦截器(interceptor)

    在发送消息之前,kafka允许用户对消息进行操作,拦截器(interceptor)孕育而生。并且允许用户指定多个interceptor从而形成一个拦截器链路来对用户所要发送的消息进行操作。

    在构造KafkaProducer对象的时候,可以指定自定义的拦截链:

    // 构建拦截链

     List interceptors = new ArrayList();

    interceptors.add("xx.xx....");

    interceptors.add("xx.xx....");

    HashMap config = new HashMap();

    config.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);

    KafkaProducer producer = new KafkaProducer(config);

    在KafkaProducer构造函数中,会给KafkaProducer中的拦截器变量进行赋值(伪代码)。

    private final ProducerInterceptors interceptors;

    private KafkaProducer() {

    List interceptorList = (List) (new ProducerConfig(userProvidedConfigs)).getConfiguredInstances(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptor.class);

    this.interceptors = interceptorList.isEmpty() ? null : new ProducerInterceptors<>(interceptorList);

    }

    在调用发送方法进行消息发送的时候,会首先使用在初始化中设置的拦截器,来对消息进行操作。

    public Future send(ProducerRecord record, Callback callback) {

    ProducerRecord interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record);

    return doSend(interceptedRecord, callback);

    }

    调用拦截器里面的方法,对消息进行处理:

    public ProducerRecord onSend(ProducerRecord record) {

    ProducerRecord interceptRecord = record;

    for (ProducerInterceptor interceptor : this.interceptors) {

    try {

    interceptRecord = interceptor.onSend(interceptRecord);

    } catch (Exception e) {

    if (record != null)

    log.warn("Error executing interceptor onSend callback for topic: {}, partition: {}", record.topic(), record.partition(), e);

    else

    log.warn("Error executing interceptor onSend callback", e);

    }

    }

    return interceptRecord;

    }

    onSend()方法里面调用用户自定义的拦截器。用户自定义拦截器实现org.apache.kafka.clients.producer.ProducerInterceptor接口。

    调用了用户自定义的拦截器后,就进入了KafkaProducer.doSend()。

    KafkaProducer.doSend()

    private Future<RecordMetadata> doSend(ProducerRecord record, Callback callback) {

            TopicPartition tp = null;

            try {

    // 发送之前确认topic对应的metadata(元数据)可用(topic的partition的主副本可用),如果没有metadata就要获取相应的metadata,获取metadata是阻塞的。总之必须metadata可用才会发送生产信息

    ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);

                long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);

                Cluster cluster = clusterAndWaitTime.cluster; // 获取集群信息

                byte[] serializedKey;

    // 序列化ProducerRecord中的KEY和VALUE

                try {

                    serializedKey = keySerializer.serialize(record.topic(), record.key());

                } catch (ClassCastException cce) {

                }

                byte[] serializedValue;

                try {

                    serializedValue = valueSerializer.serialize(record.topic(), record.value());

                } catch (ClassCastException cce) {

                }

    // 根据ProducerRecord中的partition和KEY计算出要发送到的partition

                int partition = partition(record, serializedKey, serializedValue, cluster);

                int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);

    // 根据ProducerRecord中计算出字节如果超出限制,则会抛出异常

                ensureValidRecordSize(serializedSize);

                tp = new TopicPartition(record.topic(), partition);

                long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();

                log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);

                Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp);

    // 追加数据到RecordAccumulator中(缓存)

                RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs);

    // 如果追加数据后,RecordAccumulator中的数据已经达到限制,或者空间不足,则会唤醒发送线程(sender) ,把消息批量的提交到服务端。

                if (result.batchIsFull || result.newBatchCreated) {

                    this.sender.wakeup();

                }

                return result.future;

            } catch (Exception e) {

                if (this.interceptors != null)

                    this.interceptors.onSendError(record, tp, e);

                throw e;

            }

        }

    发送过程分解分析

    获取topic对应的metadata(元数据)

    发送过程中,通过waitOnMetadata()来获取topic对应的metadata。因为metadata涉及的内容比较多。所以后面单独来写。总之必须metadata可用才会发送生产信息。

    序列化ProducerRecord的Key和Value

    KafkaProducer在发送消息之前需要对ProducerRecord中的Key和Value进行序列化操作,在KafkaComsumer端将对消息中的Key和Value进行反序列化操作。在kafka内部提供了序列化和反序列化默认实现类:

    我们也可以自定义序列化(实现Serializer接口)和反序列化(实现Deserializer接口)类。然后同加载拦截器实现一样,使用自定义序列化和反序列化类。

    HashMap config = new HashMap();

    config.put("key.serializer", "xx.xx....");

    config.put("value.serializer", "xx.xx....");

    KafkaProducer  producer = new KafkaProducer(new Map(config));

    接下来就应该选择把消息发送到topic的哪个分区上了

    计算partition值

    int partition = partition(record, serializedKey, serializedValue, cluster);

    private int partition(ProducerRecord record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {

    Integer partition = record.partition();

    return partition != null ? partition : partitioner.partition( record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);

    }

    KafkaProducer中默认使用org.apache.kafka.clients.producer.internals.DefaultPartitioner来实现计算partition的值。

    上面介绍过ProducerRecord,它里面的变量会用来计算partition。具体如下:

    1,如果指定了partition,那么消息会被发送到指定的分区(partition)中。

    2,如果没有指定partition,但指定了key,那么会使用key进行hash计算,根据计算出来的值发送到对应的分区(partition)中。

    3,如果没有指定partition,没有指定key,那么会使用round-robin模式(轮询模式)发送消息到分区(partition)中。

    4,如果同时指定了partition和key。那么partition起作用(key无效),发送到partition指定的分区中。

    代码如下:

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

    List partitions = cluster.partitionsForTopic(topic);

    int numPartitions = partitions.size();

    if (keyBytes == null) {

    // 没有指定 key 的情况下

    int nextValue = counter.getAndIncrement(); // counter(final类型的AtomicInteger)在计算的时候在此随机数的基础上自增;

    // 获取topic中有效的分区信息(有效的分区代表着这个分区的leader可以正常的提供读写服务)

    List availablePartitions = cluster.availablePartitionsForTopic(topic);

    if (availablePartitions.size() > 0) { // 根据可用分区数量和随机数来计算partition

    int part = Utils.toPositive(nextValue) % availablePartitions.size();

    return availablePartitions.get(part).partition();

    } else { // 没有可用的分区,就使用topic下所有的分区数量来计算partition

    return Utils.toPositive(nextValue) % numPartitions;

    }

    } else { // 有key的情况下,使用key的hash值进行计算partition

    return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;

    }

    }

    有了partition后就要向RecordAccumulator中追加消息了

    向RecordAccumulator(记录收集器)追加消息

    首先了解RecordAccumulator相关的生态类:

    生产者向RecordAccumulator添加记录,就像向一个车间中各个不同的传送带上的箱子添加物品一样(传送带的名称代表一个TopicPartition,传送带代表一个队列,传送带上的箱子代表RecordBatch)。生产者把属于同一个传送带的物品,放到传送带的箱子里面,当箱子存满,或者当前箱子无法装下物品时,就会放在新的一个箱子里面,那个装满的箱子里面的物品就会等待被一次性的处理。如果传送带上面的箱子已经超出了传送带所能承受的容量,生产者后面生产的物品就必须等待,等待传送带释放出新的空间能放上新的箱子来装物品,如果等待一段时间后还没有空间来装物品,就放弃此物品。

    看看记录收集器周围的生态:

    RecordBatch

    对生产的批量记录的一个封装,表示正在或者将要被发送的的一批记录(传送带上的箱子)。会拥有一个MemoryRecords的引用

    MemoryRecords

    生产者发送的记录在内存中的一个记录集。记录最终存放的地方

    Compressor

    负责执行追加写操作

    ProduceRequestResult

    ProduceRequestResult是在初始化RecordBatch时建立。属于批次级的实例。ProduceRequestResult内置了CountdownLatch并且count为1。在外围调用FutureRecordMetadata的get方法中获取到记录的元数据时,会阻塞当前线程,必须等到ProduceRequestResult完成,也就是CountdownLatch变为0(在FutureRecordMetadata.get()中会调用ProduceRequestResult.CountdownLatch.await())。在Sender线程完成RecordBatch中的全部消息的发送并且收到服务端的响应后,会把CountdownLatch中的count变为0,这样外围阻塞的线程就会继续往下走获取到记录的元数据信息。

    baseOffset变量:记录服务端响应客户端中的RecordBatch中第一个消息分配的的offset。

    ReadyCheckResult

    在发送线程(Sender)读取RecordAccumulator的消息进行发送的时候,首先会进行就绪检查:遍历所有的RecordAccumulator.ConcurrentMap>中各个tp的Deque中的批记录集(RecordBatch),把可以发送的RecordBatch对应的节点(leader)取出来,封装成了ReadyCheckResult对象。ReadyCheckResult里面就是保持的可以发送的RecordBatch对应的节点(leader)信息,下一次就绪检查点的时间,分区的leader未知的topic信息。

    FutureRecordMetadata

    保存生产记录的元数据,包括:ProduceRequestResult实例,RecordBatch中保存的记录个数,产生出记录时候的时间,key的大小,value的大小,checksum(暂时没有搞明白)等。每次追加完消息后返回FutureRecordMetadata实例,属于消息级的实例。提供get方法来让外围程序获取记录元数据信息,只是必须等到Sender线程完成RecordBatch的发送并且收到服务端的响应后,才返回元数据信息-RecordMetadata。

    RecordMetadata

    对批次元数据信息的封装。

    RecordAppendResult

    持有FutureRecordMetadata对象实例,RecordBatch是否满的标识(batchIsFull变量),需不需要重新创建新的RecordBatch(newBatchCreated变量)。batchIsFull和newBatchCreated在调用RecordAccumulator.append()方法后来判断是否需要唤醒Sender线程进行发送消息。如果batchIsFull为true:代表双向队列里面有RecordBatch满了,可以唤醒发送线程发送消息了。如果newBatchCreated为true:代表旧的RecordBatch满了或者是装不下新的消息了,可以唤醒发送线程发送消息了。

    BufferPool

    每实例化一个KafkaProducer,对应一个RecordAccumulator实例,每一个RecordAccumulator实例对应一个BufferPool实例,BufferPool提供分配内存存放消息空间的方法:allocate和释放消息空间的方法:deallocate。每个BufferPool实例中包括:

    totalMemory(池总的内存的总量),

    poolableSize(控制Deque队列中每个ByteBuffer的大小),

    lock:ReentrantLock(保证每次分配和释放空间线程安全),

    free<ByteBuffer>:Deque(已经申请未使用的空间),

    waiters:Deque(记录申请不到足够空间而阻塞的线程,队列中记录的是阻塞线程对应的Condition对象),

    availableMemory(未申请未使用的空间)

    分配空间的流程:

    public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException {

    if (size > this.totalMemory) // 申请空间的大小已经超过了BufferPool总的字节大小抛出异常 throw new IllegalArgumentException("Attempt to allocate " + size + " bytes, but there is a hard limit of " + this.totalMemory + " on memory allocations.");

    this.lock.lock();

    try {

    // 申请的大小和Deque队列中ByteBuffer的大小一样,并且队列又存在已经申请未使用的内存,就直接使用队列中的ByteBuffer实例

    if (size == poolableSize && !this.free.isEmpty())

    return this.free.pollFirst();

    // 计算出已经申请未使用的内存大小

    int freeListSize = this.free.size() * this.poolableSize;

    if (this.availableMemory + freeListSize >= size) { // 可用空间大于或等于要申请的空间 freeUp(size);

    this.availableMemory -= size; // 减少size大小的未申请未使用的空间

    lock.unlock();

    return ByteBuffer.allocate(size); // 返回size大小的ByteBuffer

    else { // 没有空间来分配申请的空间

    int accumulated = 0;

    ByteBuffer buffer = null;

    Condition moreMemory = this.lock.newCondition();

    long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs); this.waiters.addLast(moreMemory);

    while (accumulated < size) {

    long startWaitNs = time.nanoseconds();

    long timeNs;

    boolean waitingTimeElapsed;

    try { // 如果累计的空间大小小于申请的空间大小,释放当前线程占有的锁,阻塞当前线程 // 阻塞remainingTimeToBlockNs时间后,没有被唤醒(unlock或者是await),返回false, waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);

    } catch (InterruptedException e) {

    this.waiters.remove(moreMemory); throw e;

    } finally {

    long endWaitNs = time.nanoseconds();

    timeNs = Math.max(0L, endWaitNs - startWaitNs); t

    his.waitTime.record(timeNs, time.milliseconds());

    }

    if (waitingTimeElapsed) {

    this.waiters.remove(moreMemory);

    throw new TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms.");

    }

    remainingTimeToBlockNs -= timeNs;

    // 累计空间大小是0并且申请大小等于Deque队列中ByteBuffer的大小并且已经申请未使用的空间队列不为空

    if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {

    buffer = this.free.pollFirst(); // 使用Deque队列中的第一个ByteBuffer

    accumulated = size; // accumulated=size 会跳出循环

    } else {

    freeUp(size - accumulated); // 释放空间

    int got = (int) Math.min(size - accumulated, this.availableMemory);

    this.availableMemory -= got; // 减少未申请未使用的空间的值

    accumulated += got; // 增加累计大小的值,直到累计的大小大于申请的大小

    }

    }

    Condition removed = this.waiters.removeFirst();

    if (removed != moreMemory)

    throw new IllegalStateException("Wrong condition: this shouldn't happen.");

    if (this.availableMemory > 0 || !this.free.isEmpty()) {

    if (!this.waiters.isEmpty())

    this.waiters.peekFirst().signal(); // 修改Condition队列中节点的状态,让其中的节点可以被唤醒

    }

    lock.unlock();

    if (buffer == null)

    return ByteBuffer.allocate(size);

    else

    return buffer;

    }

    } finally {

    if (lock.isHeldByCurrentThread())

    lock.unlock();

    }

    }

    IncompleteRecordBatches

    在记录收集器添加发送数据的方法中(RecordAccumulator.append()),把那些发送尚未完成的RecordBatch保存到此集合中,作用是对所有的RecordBatch的监控和管理。在Sender线程把生产请求提交到服务端后(实际就是提交RecordBatch中的数据),服务端正常响应给客户端,客户端在处理响应的回调函数中,如果是成功提交到服务端,就从IncompleteRecordBatches集合中删除。删除还有以下情况:Sender强制退出时,超时。此类操作的所有RecordBatch都加锁,保证线程安全。

    以上分析了记录收集器的生态圈,在KafkaProducer.doSend方法中把信息追加到记录收集器后,根据RecordAccumulator.append方法返回的RecordAccumulator.RecordAppendResult实例的batchIsFull和newBatchCreated变量来判断是否唤醒发送线程,进行信息发送。客户端可以从doSend方法返回的FutureRecordMetadata实例中获取到请求完成(收到服务器响应)后的元数据信息。

    小结:以上就是客户端使用KafkaProducer发送消息,最后保存在记录收集器的过程。接下来将重点看看消息怎么进入记录收集器的。

    相关文章

      网友评论

        本文标题:无镜--kafka之生产者(一)

        本文链接:https://www.haomeiwen.com/subject/pgewmftx.html