学习kafka有一段时间了。关于它里面的知识还是需要总结一下,一来是能让自己对kafka能有一个比较成型的理解,二来是对知识的一种回顾。多总结加深理解。
前言
kafka作为应用最为广泛的消息中间件,其内部各个的组件是怎么来协调工作的?其内部的设计思想是怎么样的?这些都很值得我们去细细的分析和研究。此处一系列的分析以kafka-0.10.1.0版本为基础进行解读。
由于新版本的kafka的C端是采用java实现的(S端是由Scala ),所以对于java开发人员很好上手。在使用kafka的时候,最开始接触的就是它的生产者和消费者客户端。并且kafka客户端对外提供的接口非常简洁,使用起来简单方便。当然这些得益于kafka在幕后为我们做了很多工作。
生产者(KafkaProducer)
使用KafkaProducer的伪代码:
KafkaProducer producer = new KafkaProducer(Map);
ProducerRecordrecord = new ProducerRecord(Topic, Key, Value);
Future fu = producer.send(record);
producer.flush();
RecordMetadata rm = fu.get(1, TimeUnit.MINUTES);
可以看出,生产者的API确实非常简单。以上的伪代码是同步发送消息,kafka的生产者还提供异步提交消息的方法(伪代码):
KafkaProducer producer = new KafkaProducer(Map);
ProducerRecord record = new ProducerRecord(Topic, Key, Value);
producer.send(record, new Callback(){
@Override public void onCompletion(RecordMetadata metadata, Exception e) { if (e != null) { System.out.println(e.getMessage()); } if(metadata != null) { System.out.println("message send to partition " + metadata.partition() + ",topic:" + metadata.topic() + ", offset: " + metadata.offset()); } }
});
KafkaProducer对象代表一个客户端进程,KafkaProducer.send()方法发送到服务端的消息,并不是直接发送到服务端,而是KafkaProducer把消息存放到内存队列中。再由一个消息发送线程从队列中拉取出消息,以批量的方式发送给服务端。kafka中的记录收集器(RecordAccumulator)负责缓存生产者产生的消息。发送线程(Sender)负责读取记录收集器(RecordAccumulator)的批量消息,通过网络发送给服务端。
ProducerRecord
需要发送给服务器的消息,都会封装成ProducerRecord对象。ProducerRecord对象中定义了消息相关信息:
private final String topic; // 要发送的topic名称
private final Integer partition; // 要发送的分区ID
private final K key; // 消息的KEY值
private final V value; // 消息的value
private final Long timestamp;
生产者中的拦截器(interceptor)
在发送消息之前,kafka允许用户对消息进行操作,拦截器(interceptor)孕育而生。并且允许用户指定多个interceptor从而形成一个拦截器链路来对用户所要发送的消息进行操作。
在构造KafkaProducer对象的时候,可以指定自定义的拦截链:
// 构建拦截链
List interceptors = new ArrayList();
interceptors.add("xx.xx....");
interceptors.add("xx.xx....");
HashMap config = new HashMap();
config.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
KafkaProducer producer = new KafkaProducer(config);
在KafkaProducer构造函数中,会给KafkaProducer中的拦截器变量进行赋值(伪代码)。
private final ProducerInterceptors interceptors;
private KafkaProducer() {
List interceptorList = (List) (new ProducerConfig(userProvidedConfigs)).getConfiguredInstances(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptor.class);
this.interceptors = interceptorList.isEmpty() ? null : new ProducerInterceptors<>(interceptorList);
}
在调用发送方法进行消息发送的时候,会首先使用在初始化中设置的拦截器,来对消息进行操作。
public Future send(ProducerRecord record, Callback callback) {
ProducerRecord interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record);
return doSend(interceptedRecord, callback);
}
调用拦截器里面的方法,对消息进行处理:
public ProducerRecord onSend(ProducerRecord record) {
ProducerRecord interceptRecord = record;
for (ProducerInterceptor interceptor : this.interceptors) {
try {
interceptRecord = interceptor.onSend(interceptRecord);
} catch (Exception e) {
if (record != null)
log.warn("Error executing interceptor onSend callback for topic: {}, partition: {}", record.topic(), record.partition(), e);
else
log.warn("Error executing interceptor onSend callback", e);
}
}
return interceptRecord;
}
onSend()方法里面调用用户自定义的拦截器。用户自定义拦截器实现org.apache.kafka.clients.producer.ProducerInterceptor接口。
调用了用户自定义的拦截器后,就进入了KafkaProducer.doSend()。
KafkaProducer.doSend()
private Future<RecordMetadata> doSend(ProducerRecord record, Callback callback) {
TopicPartition tp = null;
try {
// 发送之前确认topic对应的metadata(元数据)可用(topic的partition的主副本可用),如果没有metadata就要获取相应的metadata,获取metadata是阻塞的。总之必须metadata可用才会发送生产信息
ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
Cluster cluster = clusterAndWaitTime.cluster; // 获取集群信息
byte[] serializedKey;
// 序列化ProducerRecord中的KEY和VALUE
try {
serializedKey = keySerializer.serialize(record.topic(), record.key());
} catch (ClassCastException cce) {
}
byte[] serializedValue;
try {
serializedValue = valueSerializer.serialize(record.topic(), record.value());
} catch (ClassCastException cce) {
}
// 根据ProducerRecord中的partition和KEY计算出要发送到的partition
int partition = partition(record, serializedKey, serializedValue, cluster);
int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);
// 根据ProducerRecord中计算出字节如果超出限制,则会抛出异常
ensureValidRecordSize(serializedSize);
tp = new TopicPartition(record.topic(), partition);
long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp);
// 追加数据到RecordAccumulator中(缓存)
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs);
// 如果追加数据后,RecordAccumulator中的数据已经达到限制,或者空间不足,则会唤醒发送线程(sender) ,把消息批量的提交到服务端。
if (result.batchIsFull || result.newBatchCreated) {
this.sender.wakeup();
}
return result.future;
} catch (Exception e) {
if (this.interceptors != null)
this.interceptors.onSendError(record, tp, e);
throw e;
}
}
发送过程分解分析
获取topic对应的metadata(元数据)
发送过程中,通过waitOnMetadata()来获取topic对应的metadata。因为metadata涉及的内容比较多。所以后面单独来写。总之必须metadata可用才会发送生产信息。
序列化ProducerRecord的Key和Value
KafkaProducer在发送消息之前需要对ProducerRecord中的Key和Value进行序列化操作,在KafkaComsumer端将对消息中的Key和Value进行反序列化操作。在kafka内部提供了序列化和反序列化默认实现类:
我们也可以自定义序列化(实现Serializer接口)和反序列化(实现Deserializer接口)类。然后同加载拦截器实现一样,使用自定义序列化和反序列化类。
HashMap config = new HashMap();
config.put("key.serializer", "xx.xx....");
config.put("value.serializer", "xx.xx....");
KafkaProducer producer = new KafkaProducer(new Map(config));
接下来就应该选择把消息发送到topic的哪个分区上了
计算partition值
int partition = partition(record, serializedKey, serializedValue, cluster);
private int partition(ProducerRecord record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
Integer partition = record.partition();
return partition != null ? partition : partitioner.partition( record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}
KafkaProducer中默认使用org.apache.kafka.clients.producer.internals.DefaultPartitioner来实现计算partition的值。
上面介绍过ProducerRecord,它里面的变量会用来计算partition。具体如下:
1,如果指定了partition,那么消息会被发送到指定的分区(partition)中。
2,如果没有指定partition,但指定了key,那么会使用key进行hash计算,根据计算出来的值发送到对应的分区(partition)中。
3,如果没有指定partition,没有指定key,那么会使用round-robin模式(轮询模式)发送消息到分区(partition)中。
4,如果同时指定了partition和key。那么partition起作用(key无效),发送到partition指定的分区中。
代码如下:
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
// 没有指定 key 的情况下
int nextValue = counter.getAndIncrement(); // counter(final类型的AtomicInteger)在计算的时候在此随机数的基础上自增;
// 获取topic中有效的分区信息(有效的分区代表着这个分区的leader可以正常的提供读写服务)
List availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) { // 根据可用分区数量和随机数来计算partition
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else { // 没有可用的分区,就使用topic下所有的分区数量来计算partition
return Utils.toPositive(nextValue) % numPartitions;
}
} else { // 有key的情况下,使用key的hash值进行计算partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
有了partition后就要向RecordAccumulator中追加消息了
向RecordAccumulator(记录收集器)追加消息
首先了解RecordAccumulator相关的生态类:
生产者向RecordAccumulator添加记录,就像向一个车间中各个不同的传送带上的箱子添加物品一样(传送带的名称代表一个TopicPartition,传送带代表一个队列,传送带上的箱子代表RecordBatch)。生产者把属于同一个传送带的物品,放到传送带的箱子里面,当箱子存满,或者当前箱子无法装下物品时,就会放在新的一个箱子里面,那个装满的箱子里面的物品就会等待被一次性的处理。如果传送带上面的箱子已经超出了传送带所能承受的容量,生产者后面生产的物品就必须等待,等待传送带释放出新的空间能放上新的箱子来装物品,如果等待一段时间后还没有空间来装物品,就放弃此物品。
看看记录收集器周围的生态:
RecordBatch
对生产的批量记录的一个封装,表示正在或者将要被发送的的一批记录(传送带上的箱子)。会拥有一个MemoryRecords的引用
MemoryRecords
生产者发送的记录在内存中的一个记录集。记录最终存放的地方
Compressor
负责执行追加写操作
ProduceRequestResult
ProduceRequestResult是在初始化RecordBatch时建立。属于批次级的实例。ProduceRequestResult内置了CountdownLatch并且count为1。在外围调用FutureRecordMetadata的get方法中获取到记录的元数据时,会阻塞当前线程,必须等到ProduceRequestResult完成,也就是CountdownLatch变为0(在FutureRecordMetadata.get()中会调用ProduceRequestResult.CountdownLatch.await())。在Sender线程完成RecordBatch中的全部消息的发送并且收到服务端的响应后,会把CountdownLatch中的count变为0,这样外围阻塞的线程就会继续往下走获取到记录的元数据信息。
baseOffset变量:记录服务端响应客户端中的RecordBatch中第一个消息分配的的offset。
ReadyCheckResult
在发送线程(Sender)读取RecordAccumulator的消息进行发送的时候,首先会进行就绪检查:遍历所有的RecordAccumulator.ConcurrentMap>中各个tp的Deque中的批记录集(RecordBatch),把可以发送的RecordBatch对应的节点(leader)取出来,封装成了ReadyCheckResult对象。ReadyCheckResult里面就是保持的可以发送的RecordBatch对应的节点(leader)信息,下一次就绪检查点的时间,分区的leader未知的topic信息。
FutureRecordMetadata
保存生产记录的元数据,包括:ProduceRequestResult实例,RecordBatch中保存的记录个数,产生出记录时候的时间,key的大小,value的大小,checksum(暂时没有搞明白)等。每次追加完消息后返回FutureRecordMetadata实例,属于消息级的实例。提供get方法来让外围程序获取记录元数据信息,只是必须等到Sender线程完成RecordBatch的发送并且收到服务端的响应后,才返回元数据信息-RecordMetadata。
RecordMetadata
对批次元数据信息的封装。
RecordAppendResult
持有FutureRecordMetadata对象实例,RecordBatch是否满的标识(batchIsFull变量),需不需要重新创建新的RecordBatch(newBatchCreated变量)。batchIsFull和newBatchCreated在调用RecordAccumulator.append()方法后来判断是否需要唤醒Sender线程进行发送消息。如果batchIsFull为true:代表双向队列里面有RecordBatch满了,可以唤醒发送线程发送消息了。如果newBatchCreated为true:代表旧的RecordBatch满了或者是装不下新的消息了,可以唤醒发送线程发送消息了。
BufferPool
每实例化一个KafkaProducer,对应一个RecordAccumulator实例,每一个RecordAccumulator实例对应一个BufferPool实例,BufferPool提供分配内存存放消息空间的方法:allocate和释放消息空间的方法:deallocate。每个BufferPool实例中包括:
totalMemory(池总的内存的总量),
poolableSize(控制Deque队列中每个ByteBuffer的大小),
lock:ReentrantLock(保证每次分配和释放空间线程安全),
free<ByteBuffer>:Deque(已经申请未使用的空间),
waiters:Deque(记录申请不到足够空间而阻塞的线程,队列中记录的是阻塞线程对应的Condition对象),
availableMemory(未申请未使用的空间)
分配空间的流程:
public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException {
if (size > this.totalMemory) // 申请空间的大小已经超过了BufferPool总的字节大小抛出异常 throw new IllegalArgumentException("Attempt to allocate " + size + " bytes, but there is a hard limit of " + this.totalMemory + " on memory allocations.");
this.lock.lock();
try {
// 申请的大小和Deque队列中ByteBuffer的大小一样,并且队列又存在已经申请未使用的内存,就直接使用队列中的ByteBuffer实例
if (size == poolableSize && !this.free.isEmpty())
return this.free.pollFirst();
// 计算出已经申请未使用的内存大小
int freeListSize = this.free.size() * this.poolableSize;
if (this.availableMemory + freeListSize >= size) { // 可用空间大于或等于要申请的空间 freeUp(size);
this.availableMemory -= size; // 减少size大小的未申请未使用的空间
lock.unlock();
return ByteBuffer.allocate(size); // 返回size大小的ByteBuffer
else { // 没有空间来分配申请的空间
int accumulated = 0;
ByteBuffer buffer = null;
Condition moreMemory = this.lock.newCondition();
long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs); this.waiters.addLast(moreMemory);
while (accumulated < size) {
long startWaitNs = time.nanoseconds();
long timeNs;
boolean waitingTimeElapsed;
try { // 如果累计的空间大小小于申请的空间大小,释放当前线程占有的锁,阻塞当前线程 // 阻塞remainingTimeToBlockNs时间后,没有被唤醒(unlock或者是await),返回false, waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
this.waiters.remove(moreMemory); throw e;
} finally {
long endWaitNs = time.nanoseconds();
timeNs = Math.max(0L, endWaitNs - startWaitNs); t
his.waitTime.record(timeNs, time.milliseconds());
}
if (waitingTimeElapsed) {
this.waiters.remove(moreMemory);
throw new TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms.");
}
remainingTimeToBlockNs -= timeNs;
// 累计空间大小是0并且申请大小等于Deque队列中ByteBuffer的大小并且已经申请未使用的空间队列不为空
if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {
buffer = this.free.pollFirst(); // 使用Deque队列中的第一个ByteBuffer
accumulated = size; // accumulated=size 会跳出循环
} else {
freeUp(size - accumulated); // 释放空间
int got = (int) Math.min(size - accumulated, this.availableMemory);
this.availableMemory -= got; // 减少未申请未使用的空间的值
accumulated += got; // 增加累计大小的值,直到累计的大小大于申请的大小
}
}
Condition removed = this.waiters.removeFirst();
if (removed != moreMemory)
throw new IllegalStateException("Wrong condition: this shouldn't happen.");
if (this.availableMemory > 0 || !this.free.isEmpty()) {
if (!this.waiters.isEmpty())
this.waiters.peekFirst().signal(); // 修改Condition队列中节点的状态,让其中的节点可以被唤醒
}
lock.unlock();
if (buffer == null)
return ByteBuffer.allocate(size);
else
return buffer;
}
} finally {
if (lock.isHeldByCurrentThread())
lock.unlock();
}
}
IncompleteRecordBatches
在记录收集器添加发送数据的方法中(RecordAccumulator.append()),把那些发送尚未完成的RecordBatch保存到此集合中,作用是对所有的RecordBatch的监控和管理。在Sender线程把生产请求提交到服务端后(实际就是提交RecordBatch中的数据),服务端正常响应给客户端,客户端在处理响应的回调函数中,如果是成功提交到服务端,就从IncompleteRecordBatches集合中删除。删除还有以下情况:Sender强制退出时,超时。此类操作的所有RecordBatch都加锁,保证线程安全。
以上分析了记录收集器的生态圈,在KafkaProducer.doSend方法中把信息追加到记录收集器后,根据RecordAccumulator.append方法返回的RecordAccumulator.RecordAppendResult实例的batchIsFull和newBatchCreated变量来判断是否唤醒发送线程,进行信息发送。客户端可以从doSend方法返回的FutureRecordMetadata实例中获取到请求完成(收到服务器响应)后的元数据信息。
小结:以上就是客户端使用KafkaProducer发送消息,最后保存在记录收集器的过程。接下来将重点看看消息怎么进入记录收集器的。
网友评论