- 我们在前面的Librdkafka的基础数据结构 3 -- Buffer相关 1介绍了Buffer和其组成segment,只读映射slice的相关定义和操作;
- 这一篇我们来介绍librdkafka中
rd_buf_t
的实际使用, 实际上是通过rd_kafka_buf_s
的封装来实现的;
- 包括:
- struct rd_kafka_buf_s
- struct rd_kafka_bufq_t
struct rd_kafka_buf_s
- 所在文件: src/rdkafka_buf.h
- 这个结构涉及的操作很多, 我会在后面随着代码的深入了解来作补充和更正
- 定义:
struct rd_kafka_buf_s { /* rd_kafka_buf_t */
TAILQ_ENTRY(rd_kafka_buf_s) rkbuf_link; // 这个rd_kafka_buf_s定义为tailq的元素
int32_t rkbuf_corrid; // 对应于kafka协议中request header里的CorrelationId
rd_ts_t rkbuf_ts_retry; /* Absolute send retry time */ //发送request的重试的绝对时间
int rkbuf_flags; /* RD_KAFKA_OP_F */
rd_buf_t rkbuf_buf; /**< Send/Recv byte buffer */ // 发送或接收数据的rd_buf_t
rd_slice_t rkbuf_reader; /**< Buffer slice reader for rkbuf_buf */ // 上面rd_buf_t的只读映射
int rkbuf_connid; /* broker connection id (used when buffer
* was partially sent). */
size_t rkbuf_totlen; /* recv: total expected length,
* send: not used */ // 接收response时, kafka协议的前四个字节表示payload长度, 这个表示payload有多长, 即后面需要再接收的数据长度
rd_crc32_t rkbuf_crc; /* Current CRC calculation */ //CRC校验
struct rd_kafkap_reqhdr rkbuf_reqhdr; /* Request header.
* These fields are encoded
* and written to output buffer
* on buffer finalization. */
struct rd_kafkap_reshdr rkbuf_reshdr; /* Response header.
* Decoded fields are copied
* here from the buffer
* to provide an ease-of-use
* interface to the header */
int32_t rkbuf_expected_size; /* expected size of message */
// response的入队列
rd_kafka_replyq_t rkbuf_replyq; /* Enqueue response on replyq */
rd_kafka_replyq_t rkbuf_orig_replyq; /* Original replyq to be used
* for retries from inside
* the rkbuf_cb() callback
* since rkbuf_replyq will
* have been reset. */
rd_kafka_resp_cb_t *rkbuf_cb; /* Response callback */
struct rd_kafka_buf_s *rkbuf_response; /* Response buffer */
struct rd_kafka_broker_s *rkbuf_rkb; // 相关联的broker
rd_refcnt_t rkbuf_refcnt; // 引用计数
void *rkbuf_opaque;
// 重试次数
int rkbuf_retries; /* Retries so far. */
#define RD_KAFKA_BUF_NO_RETRIES 1000000 /* Do not retry */
int rkbuf_features; /* Required feature(s) that must be
* supported by broker. */
rd_ts_t rkbuf_ts_enq;
rd_ts_t rkbuf_ts_sent; /* Initially: Absolute time of transmission,
* after response: RTT. */
rd_ts_t rkbuf_ts_timeout;
int64_t rkbuf_offset; /* Used by OffsetCommit */ // 需要提交的offset
rd_list_t *rkbuf_rktp_vers; /* Toppar + Op Version map.
* Used by FetchRequest. */
rd_kafka_msgq_t rkbuf_msgq;
rd_kafka_resp_err_t rkbuf_err; /* Buffer parsing error code */
union {
struct {
rd_list_t *topics; /* Requested topics (char *) */
char *reason; /* Textual reason */
rd_kafka_op_t *rko; /* Originating rko with replyq
* (if any) */
int all_topics; /* Full/All topics requested */
int *decr; /* Decrement this integer by one
* when request is complete:
* typically points to metadata
* cache's full_.._sent.
* Will be performed with
* decr_lock held. */
mtx_t *decr_lock;
} Metadata;
} rkbuf_u;
};
- 为kafka request创建
rd_kafka_buf_s
Kafka request的header在这个函数中被自动附加上
rd_kafka_buf_t *rd_kafka_buf_new_request (rd_kafka_broker_t *rkb, int16_t ApiKey,
int segcnt, size_t size) {
rd_kafka_buf_t *rkbuf;
/* Make room for common protocol request headers */
// 计算size 更新, 加上request header大小, 包括client id
size += RD_KAFKAP_REQHDR_SIZE +
RD_KAFKAP_STR_SIZE(rkb->rkb_rk->rk_client_id);
// rd_buffer_t中的segment个数加上, 包括一个header
segcnt += 1; /* headers */
按指定的segment个数和size来创建rd_kafka_buf
rkbuf = rd_kafka_buf_new0(segcnt, size, 0);
rkbuf->rkbuf_rkb = rkb;
rd_kafka_broker_keep(rkb);
rkbuf->rkbuf_reqhdr.ApiKey = ApiKey;
// 写kafka request header
/* Write request header, will be updated later. */
/* Length: updated later */
rd_kafka_buf_write_i32(rkbuf, 0);
/* ApiKey */
rd_kafka_buf_write_i16(rkbuf, rkbuf->rkbuf_reqhdr.ApiKey);
/* ApiVersion: updated later */
rd_kafka_buf_write_i16(rkbuf, 0);
/* CorrId: updated later */
rd_kafka_buf_write_i32(rkbuf, 0);
/* ClientId */
rd_kafka_buf_write_kstr(rkbuf, rkb->rkb_rk->rk_client_id);
return rkbuf;
}
- 写操作
rd_kafka_buf_write
:
调用rd_buf_write
来实现
static RD_INLINE size_t rd_kafka_buf_write (rd_kafka_buf_t *rkbuf,
const void *data, size_t len) {
size_t r;
r = rd_buf_write(&rkbuf->rkbuf_buf, data, len);
if (rkbuf->rkbuf_flags & RD_KAFKA_OP_F_CRC)
rkbuf->rkbuf_crc = rd_crc32_update(rkbuf->rkbuf_crc, data, len);
return r;
}
- 更新buffer中的局部数据
rd_kafka_buf_update
:
调用rd_buf_write_update
来实现
static RD_INLINE void rd_kafka_buf_update (rd_kafka_buf_t *rkbuf, size_t of,
const void *data, size_t len) {
rd_kafka_assert(NULL, !(rkbuf->rkbuf_flags & RD_KAFKA_OP_F_CRC));
rd_buf_write_update(&rkbuf->rkbuf_buf, of, data, len);
}
- push操作
rd_kafka_buf_push0
调用rd_buf_push
来实现, buf的数据不会被copy, 只作简单的指针赋值, rd_buf_push
会先看当前现在写入的segment还没有没剩余空间, 如果有的话将剩余空间单拆出来, 生成一个新的segment, 然后再产生一个新的segment来放需要写入的buf, append到rd_kafka_buf_t
上, 最后再把拆出来的segment再append
void rd_kafka_buf_push0 (rd_kafka_buf_t *rkbuf, const void *buf, size_t len,
int allow_crc_calc, void (*free_cb) (void *)) {
rd_buf_push(&rkbuf->rkbuf_buf, buf, len, free_cb);
if (allow_crc_calc && (rkbuf->rkbuf_flags & RD_KAFKA_OP_F_CRC))
rkbuf->rkbuf_crc = rd_crc32_update(rkbuf->rkbuf_crc, buf, len);
}
- 针对不同类型的写入操作
rd_kafka_buf_write_*
- buffer的retry操作, request发送失败可能会重试
int rd_kafka_buf_retry (rd_kafka_broker_t *rkb, rd_kafka_buf_t *rkbuf) {
//先判断是否需要重试
if (unlikely(!rkb ||
rkb->rkb_source == RD_KAFKA_INTERNAL ||
rd_kafka_terminating(rkb->rkb_rk) ||
rkbuf->rkbuf_retries + 1 >
rkb->rkb_rk->rk_conf.max_retries))
return 0;
/* Try again */
rkbuf->rkbuf_ts_sent = 0;
rkbuf->rkbuf_retries++;
rd_kafka_buf_keep(rkbuf);
// 加入broker的重试队列里
rd_kafka_broker_buf_retry(rkb, rkbuf);
return 1;
}
- 处理
RD_KAFKA_OP_RECV_BUF
类型的buffer(实际上是rd_kafka_op_t中的)
void rd_kafka_buf_handle_op (rd_kafka_op_t *rko, rd_kafka_resp_err_t err) {
rd_kafka_buf_t *request, *response;
request = rko->rko_u.xbuf.rkbuf;
rko->rko_u.xbuf.rkbuf = NULL;
/* NULL on op_destroy() */
if (request->rkbuf_replyq.q) {
int32_t version = request->rkbuf_replyq.version;
/* Current queue usage is done, but retain original replyq for
* future retries, stealing
* the current reference. */
request->rkbuf_orig_replyq = request->rkbuf_replyq;
rd_kafka_replyq_clear(&request->rkbuf_replyq);
/* Callback might need to version check so we retain the
* version across the clear() call which clears it. */
request->rkbuf_replyq.version = version;
}
if (!request->rkbuf_cb) {
rd_kafka_buf_destroy(request);
return;
}
/* Let buf_callback() do destroy()s */
response = request->rkbuf_response; /* May be NULL */
request->rkbuf_response = NULL;
// 获取到reqeust和response后触发回调
rd_kafka_buf_callback(request->rkbuf_rkb->rkb_rk,
request->rkbuf_rkb, err,
response, request);
}
struct rd_kafka_bufq_t
- 所在文件: src/rdkafka_buf.h
- 将上面的rd_kafka_buf_s封装成队列
- 定义:
typedef struct rd_kafka_bufq_s {
TAILQ_HEAD(, rd_kafka_buf_s) rkbq_bufs;
rd_atomic32_t rkbq_cnt;
rd_atomic32_t rkbq_msg_cnt;
} rd_kafka_bufq_t;
网友评论