美文网首页大数据玩转大数据大数据 爬虫Python AI Sql
Librdkafka的基础数据结构 3 -- Buffer相关

Librdkafka的基础数据结构 3 -- Buffer相关

作者: 扫帚的影子 | 来源:发表于2018-01-12 11:59 被阅读97次
    • 我们在前面的Librdkafka的基础数据结构 3 -- Buffer相关 1介绍了Buffer和其组成segment,只读映射slice的相关定义和操作;
    • 这一篇我们来介绍librdkafka中rd_buf_t的实际使用, 实际上是通过rd_kafka_buf_s的封装来实现的;
    • 包括:
      1. struct rd_kafka_buf_s
      2. struct rd_kafka_bufq_t

    struct rd_kafka_buf_s
    • 所在文件: src/rdkafka_buf.h
    • 这个结构涉及的操作很多, 我会在后面随着代码的深入了解来作补充和更正
    • 定义:
    struct rd_kafka_buf_s { /* rd_kafka_buf_t */
        TAILQ_ENTRY(rd_kafka_buf_s) rkbuf_link; // 这个rd_kafka_buf_s定义为tailq的元素
    
        int32_t rkbuf_corrid;  // 对应于kafka协议中request header里的CorrelationId
    
        rd_ts_t rkbuf_ts_retry;    /* Absolute send retry time */ //发送request的重试的绝对时间
    
        int     rkbuf_flags; /* RD_KAFKA_OP_F */
    
            rd_buf_t rkbuf_buf;        /**< Send/Recv byte buffer */  // 发送或接收数据的rd_buf_t
            rd_slice_t rkbuf_reader;   /**< Buffer slice reader for rkbuf_buf */ // 上面rd_buf_t的只读映射
    
        int     rkbuf_connid;      /* broker connection id (used when buffer
                        * was partially sent). */
            size_t  rkbuf_totlen;      /* recv: total expected length,
                                        * send: not used */   // 接收response时, kafka协议的前四个字节表示payload长度, 这个表示payload有多长, 即后面需要再接收的数据长度
    
        rd_crc32_t rkbuf_crc;      /* Current CRC calculation */ //CRC校验
    
        struct rd_kafkap_reqhdr rkbuf_reqhdr;   /* Request header.
                                                     * These fields are encoded
                                                     * and written to output buffer
                                                     * on buffer finalization. */
        struct rd_kafkap_reshdr rkbuf_reshdr;   /* Response header.
                                                     * Decoded fields are copied
                                                     * here from the buffer
                                                     * to provide an ease-of-use
                                                     * interface to the header */
    
        int32_t rkbuf_expected_size;  /* expected size of message */
    
            // response的入队列
            rd_kafka_replyq_t   rkbuf_replyq;       /* Enqueue response on replyq */
            rd_kafka_replyq_t   rkbuf_orig_replyq;  /* Original replyq to be used
                                                     * for retries from inside
                                                     * the rkbuf_cb() callback
                                                     * since rkbuf_replyq will
                                                     * have been reset. */
            rd_kafka_resp_cb_t *rkbuf_cb;           /* Response callback */
            struct rd_kafka_buf_s *rkbuf_response;  /* Response buffer */
    
            struct rd_kafka_broker_s *rkbuf_rkb;  // 相关联的broker
    
        rd_refcnt_t rkbuf_refcnt; // 引用计数
        void   *rkbuf_opaque;
    
           // 重试次数
        int     rkbuf_retries;            /* Retries so far. */
    #define RD_KAFKA_BUF_NO_RETRIES  1000000  /* Do not retry */
    
            int     rkbuf_features;   /* Required feature(s) that must be
                                       * supported by broker. */
    
        rd_ts_t rkbuf_ts_enq;
        rd_ts_t rkbuf_ts_sent;    /* Initially: Absolute time of transmission,
                       * after response: RTT. */
        rd_ts_t rkbuf_ts_timeout;
    
            int64_t rkbuf_offset;     /* Used by OffsetCommit */  // 需要提交的offset
    
        rd_list_t *rkbuf_rktp_vers;    /* Toppar + Op Version map.
                        * Used by FetchRequest. */
    
        rd_kafka_msgq_t rkbuf_msgq;
    
            rd_kafka_resp_err_t rkbuf_err;      /* Buffer parsing error code */
    
            union {
                    struct {
                            rd_list_t *topics;  /* Requested topics (char *) */
                            char *reason;       /* Textual reason */
                            rd_kafka_op_t *rko; /* Originating rko with replyq
                                                 * (if any) */
                            int all_topics;     /* Full/All topics requested */
    
                            int *decr;          /* Decrement this integer by one
                                                 * when request is complete:
                                                 * typically points to metadata
                                                 * cache's full_.._sent.
                                                 * Will be performed with
                                                 * decr_lock held. */
                            mtx_t *decr_lock;
    
                    } Metadata;
            } rkbuf_u;
    };
    
    • 为kafka request创建rd_kafka_buf_s
      Kafka request的header在这个函数中被自动附加上
    rd_kafka_buf_t *rd_kafka_buf_new_request (rd_kafka_broker_t *rkb, int16_t ApiKey,
                                              int segcnt, size_t size) {
            rd_kafka_buf_t *rkbuf;
    
            /* Make room for common protocol request headers */
            // 计算size 更新, 加上request header大小, 包括client id
            size += RD_KAFKAP_REQHDR_SIZE +
                    RD_KAFKAP_STR_SIZE(rkb->rkb_rk->rk_client_id);
    
           // rd_buffer_t中的segment个数加上, 包括一个header
            segcnt += 1; /* headers */
    
           按指定的segment个数和size来创建rd_kafka_buf
            rkbuf = rd_kafka_buf_new0(segcnt, size, 0);
    
            rkbuf->rkbuf_rkb = rkb;
            rd_kafka_broker_keep(rkb);
    
            rkbuf->rkbuf_reqhdr.ApiKey = ApiKey;
    
            // 写kafka request header
            /* Write request header, will be updated later. */
            /* Length: updated later */
            rd_kafka_buf_write_i32(rkbuf, 0);
            /* ApiKey */
            rd_kafka_buf_write_i16(rkbuf, rkbuf->rkbuf_reqhdr.ApiKey);
            /* ApiVersion: updated later */
            rd_kafka_buf_write_i16(rkbuf, 0);
            /* CorrId: updated later */
            rd_kafka_buf_write_i32(rkbuf, 0);
    
            /* ClientId */
            rd_kafka_buf_write_kstr(rkbuf, rkb->rkb_rk->rk_client_id);
    
            return rkbuf;
    }
    
    • 写操作 rd_kafka_buf_write:
      调用rd_buf_write来实现
    static RD_INLINE size_t rd_kafka_buf_write (rd_kafka_buf_t *rkbuf,
                                            const void *data, size_t len) {
            size_t r;
    
            r = rd_buf_write(&rkbuf->rkbuf_buf, data, len);
    
            if (rkbuf->rkbuf_flags & RD_KAFKA_OP_F_CRC)
                    rkbuf->rkbuf_crc = rd_crc32_update(rkbuf->rkbuf_crc, data, len);
    
            return r;
    }
    
    • 更新buffer中的局部数据rd_kafka_buf_update:
      调用rd_buf_write_update来实现
    static RD_INLINE void rd_kafka_buf_update (rd_kafka_buf_t *rkbuf, size_t of,
                                              const void *data, size_t len) {
            rd_kafka_assert(NULL, !(rkbuf->rkbuf_flags & RD_KAFKA_OP_F_CRC));
            rd_buf_write_update(&rkbuf->rkbuf_buf, of, data, len);
    }
    
    • push操作 rd_kafka_buf_push0
      调用rd_buf_push来实现, buf的数据不会被copy, 只作简单的指针赋值, rd_buf_push会先看当前现在写入的segment还没有没剩余空间, 如果有的话将剩余空间单拆出来, 生成一个新的segment, 然后再产生一个新的segment来放需要写入的buf, append到rd_kafka_buf_t上, 最后再把拆出来的segment再append
    void rd_kafka_buf_push0 (rd_kafka_buf_t *rkbuf, const void *buf, size_t len,
                             int allow_crc_calc, void (*free_cb) (void *)) {
            rd_buf_push(&rkbuf->rkbuf_buf, buf, len, free_cb);
    
            if (allow_crc_calc && (rkbuf->rkbuf_flags & RD_KAFKA_OP_F_CRC))
                    rkbuf->rkbuf_crc = rd_crc32_update(rkbuf->rkbuf_crc, buf, len);
    }
    
    • 针对不同类型的写入操作 rd_kafka_buf_write_*
    • buffer的retry操作, request发送失败可能会重试
    int rd_kafka_buf_retry (rd_kafka_broker_t *rkb, rd_kafka_buf_t *rkbuf) {
    
           //先判断是否需要重试
            if (unlikely(!rkb ||
                 rkb->rkb_source == RD_KAFKA_INTERNAL ||
                 rd_kafka_terminating(rkb->rkb_rk) ||
                 rkbuf->rkbuf_retries + 1 >
                 rkb->rkb_rk->rk_conf.max_retries))
                    return 0;
    
        /* Try again */
        rkbuf->rkbuf_ts_sent = 0;
        rkbuf->rkbuf_retries++;
        rd_kafka_buf_keep(rkbuf);
            // 加入broker的重试队列里
        rd_kafka_broker_buf_retry(rkb, rkbuf);
        return 1;
    }
    
    • 处理 RD_KAFKA_OP_RECV_BUF类型的buffer(实际上是rd_kafka_op_t中的)
    void rd_kafka_buf_handle_op (rd_kafka_op_t *rko, rd_kafka_resp_err_t err) {
            rd_kafka_buf_t *request, *response;
    
            request = rko->rko_u.xbuf.rkbuf;
            rko->rko_u.xbuf.rkbuf = NULL;
    
            /* NULL on op_destroy() */
        if (request->rkbuf_replyq.q) {
            int32_t version = request->rkbuf_replyq.version;
                    /* Current queue usage is done, but retain original replyq for
                     * future retries, stealing
                     * the current reference. */
                    request->rkbuf_orig_replyq = request->rkbuf_replyq;
                    rd_kafka_replyq_clear(&request->rkbuf_replyq);
            /* Callback might need to version check so we retain the
             * version across the clear() call which clears it. */
            request->rkbuf_replyq.version = version;
        }
    
        if (!request->rkbuf_cb) {
            rd_kafka_buf_destroy(request);
            return;
        }
    
            /* Let buf_callback() do destroy()s */
            response = request->rkbuf_response; /* May be NULL */
            request->rkbuf_response = NULL;
    
           // 获取到reqeust和response后触发回调
            rd_kafka_buf_callback(request->rkbuf_rkb->rkb_rk,
                      request->rkbuf_rkb, err,
                                  response, request);
    }
    
    struct rd_kafka_bufq_t
    • 所在文件: src/rdkafka_buf.h
    • 将上面的rd_kafka_buf_s封装成队列
    • 定义:
    typedef struct rd_kafka_bufq_s {
        TAILQ_HEAD(, rd_kafka_buf_s) rkbq_bufs;
        rd_atomic32_t  rkbq_cnt;
        rd_atomic32_t  rkbq_msg_cnt;
    } rd_kafka_bufq_t;
    
    • 提供出队,入队, 删除等操作

    Librdkafka源码分析-Content Table

    相关文章

      网友评论

        本文标题:Librdkafka的基础数据结构 3 -- Buffer相关

        本文链接:https://www.haomeiwen.com/subject/mkwxoxtx.html