美文网首页
kafka客户端发送批量消息流程分析

kafka客户端发送批量消息流程分析

作者: 飞奔的白牛 | 来源:发表于2020-10-30 23:33 被阅读0次

    1、根据分区申请内存块

    buffer = free.allocate(size, maxTimeToBlock);

    2、构造batch对象加入Dequeue

    MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);

    ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, nowMs);

    3、将key、value记录到buffer中

    RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs);

    if (result.batchIsFull || result.newBatchCreated) {

    log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);

      this.sender.wakeup();  //满足条件后唤醒线程发送batch

    }

    4、sender线程发送批次

    sendProduceRequests(batches, now);

    MemoryRecords records = batch.records();  //此处将buffer中的record读取出来

    client.send(clientRequest, now);  

    相关文章

      网友评论

          本文标题:kafka客户端发送批量消息流程分析

          本文链接:https://www.haomeiwen.com/subject/zroryktx.html