美文网首页Kafka文字欲
无镜--kafka之生产者(五)

无镜--kafka之生产者(五)

作者: 绍圣 | 来源:发表于2018-08-09 10:28 被阅读10次

在看很多讲kafka的文章里面都会说:kafka只保证单个partition的有序性,那么kafka是怎么保证有序的喃?

使用RecordAccumulator的mutePartition和unmutePartition方法来配合实现有序性

//记录tp是否还有未完成的RecordBatch,保证一个tp的顺序性,当一个tp对应的RecordBatch要开始发送时,就将此tp加入到muted中,tp对应的RecordBatch发送完成后,删除muted中的tp

private final Set muted;

public void mutePartition(TopicPartition tp) { muted.add(tp); }

public void unmutePartition(TopicPartition tp) { muted.remove(tp); }

RecordAccumulator.ready方法中进行判断(伪代码)

public ReadyCheckResult ready(Cluster cluster, long nowMs) {

if (!readyNodes.contains(leader) && !muted.contains(part)) {}

}

if (!readyNodes.contains(leader) && !muted.contains(part)),如果muted中包含了这个tp,那么即使这个tp对应的leader存在,RecordBatch可以发送也不会去发送它,因为它上一个RecordBatch还没有处理完成。

RecordAccumulator.drain方法中进行判断(伪代码)

public Map> drain(Cluster cluster, Set nodes, int maxSize, long now) {

if (!muted.contains(tp)){}

}

if (!muted.contains(tp))在对RecordAccumulator中的记录进行重新组装的时候,依旧会判断对应的tp是否在muted中。在muted中的依旧不会选择出来发送。

在Sender中的变量:guaranteeMessageOrder:是否保持单个partition的有序性

在KafkaProducer的构造中

this.sender = new Sender(client, this.metadata, this.accumulator, config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) == 1, config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG), (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)), config.getInt(ProducerConfig.RETRIES_CONFIG), this.metrics, new SystemTime(), clientId, this.requestTimeoutMs);

public Sender(KafkaClient client, Metadata metadata, RecordAccumulator accumulator, boolean guaranteeMessageOrder, int maxRequestSize, short acks, int retries, Metrics metrics, Time time, String clientId, int requestTimeout) { this.client = client; this.accumulator = accumulator; this.metadata = metadata; this.guaranteeMessageOrder = guaranteeMessageOrder; this.maxRequestSize = maxRequestSize; this.running = true; this.acks = acks; this.retries = retries; this.time = time; this.clientId = clientId; this.sensors = new SenderMetrics(metrics); this.requestTimeout = requestTimeout; }

guaranteeMessageOrder=config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) == 1

我们可以在使用的时候设置max.in.flight.requests.per.connection来设置guaranteeMessageOrder的值。

mutePartition和unmutePartition方法都是在Sender中进行调用

mutePartition在Sender.run中调用

if (guaranteeMessageOrder) {

// 记录将要发送的topicPartition到mute中

for (List batchList : batches.values()) {

for (RecordBatch batch : batchList)

this.accumulator.mutePartition(batch.topicPartition);

}

}

发送的时候,把将要提交的RecordBatch的tp加到muted中。下次再需要发送tp里的RecordBatch的时候,如果muted里面包含了此tp,就不会选择出来发送。

在处理服务端响应的时候,清除muted中的tp

if (guaranteeMessageOrder) this.accumulator.unmutePartition(batch.topicPartition);

总结:要保证单partition的有序性,需要配置max.in.flight.requests.per.connection=1。

相关文章

  • 无镜--kafka之生产者(五)

    在看很多讲kafka的文章里面都会说:kafka只保证单个partition的有序性,那么kafka是怎么保证有序...

  • 无镜--kafka之生产者-总结

    写到这里想对kafka生产者做一个总结,算是kafka生产的完结篇,总结一下它里面到底使用了技巧,使之它这么快: ...

  • 无镜--kafka之生产者(四)

    前面三回在分析生产者时,重点在发送的主流程上:怎么生产,怎么发送,怎么调度。略过了一个重要的环节:Metadata...

  • 无镜--kafka之生产者(二)

    书接上回。在上回中,我们了解了记录收集器周围的生态类,接下来看看生产的消息是如何追加到记录收集器(RecordAc...

  • 无镜--kafka之生产者(三)

    话说上回中,KafkaProducer已经将生产的记录追加到了RecordAccumulator中。那么接下来的事...

  • 无镜--kafka之生产者(一)

    学习kafka有一段时间了。关于它里面的知识还是需要总结一下,一来是能让自己对kafka能有一个比较成型的理解,二...

  • Apache Kafka -5 生产者示例

    Apache Kafka教程 之 Apache Kafka - 生产者示例 原文地址: http://blogx...

  • 无镜--kafka之TOPIC

    前面在说kafka生产者的时候,经常会提到topic。这是kafka中核心的概念。说起topic就会涉及到part...

  • 无镜--kafka之消费者(五)

    消费者能发送拉取请求的前提条件是:1,消费者已经连接上了服务端协调者所在的节点;2,消费者必须获取到服务端协调者分...

  • Kafka之Producer篇

    Kafka之Producer篇 Kafka生产者案例 本篇我们叙述Kafka是如何将数据发送到服务端的;首先我们...

网友评论

    本文标题:无镜--kafka之生产者(五)

    本文链接:https://www.haomeiwen.com/subject/ndatbftx.html