美文网首页Apache Kafka大数据大数据,机器学习,人工智能
Librdkafka对Kafka Message的封装和相关操作

Librdkafka对Kafka Message的封装和相关操作

作者: 扫帚的影子 | 来源:发表于2018-01-09 18:13 被阅读318次
    • struct rd_kafka_message_t
    • struct rd_kafka_msg_t
    • struct rd_kafka_msgq_t
    • kafka message的协议格式可参考 官网

    struct rd_kafka_message_s
    • 所在文件: src/rdkafka.h
    • 生产的数据在application层调用接口后最终会将数据封装成这个结构, 从broker消费下来的数据回调给application层时也会封装成这个结构;
    • 定义:
    typedef struct rd_kafka_message_s {
        rd_kafka_resp_err_t err;   /**< Non-zero for error signaling. */
        rd_kafka_topic_t *rkt;     /**< Topic */
        int32_t partition;         /**< Partition */
        void   *payload;           /**< Producer: original message payload.
                        * Consumer: Depends on the value of \c err :
                        * - \c err==0: Message payload.
                        * - \c err!=0: Error string */
        size_t  len;               /**< Depends on the value of \c err :
                        * - \c err==0: Message payload length
                        * - \c err!=0: Error string length */
        void   *key;               /**< Depends on the value of \c err :
                        * - \c err==0: Optional message key */
        size_t  key_len;           /**< Depends on the value of \c err :
                        * - \c err==0: Optional message key length*/
        int64_t offset;            /**< Consume:
                                        * - Message offset (or offset for error
                        *   if \c err!=0 if applicable).
                                        * - dr_msg_cb:
                                        *   Message offset assigned by broker.
                                        *   If \c produce.offset.report is set then
                                        *   each message will have this field set,
                                        *   otherwise only the last message in
                                        *   each produced internal batch will
                                        *   have this field set, otherwise 0. */
        void  *_private;           /**< Consume:
                        *  - rdkafka private pointer: DO NOT MODIFY
                        *  - dr_msg_cb:
                                        *    msg_opaque from produce() call */
    } rd_kafka_message_t;
    
    struct rd_kafka_msg_t
    • 所在文件: src/rdkafka_msg
    • 封装了上面的 struct rd_kafka_message_s
    • 定义:
    typedef struct rd_kafka_msg_s {
        rd_kafka_message_t rkm_rkmessage;  /* MUST be first field */
    
            // 使其成为tailq的元素
        TAILQ_ENTRY(rd_kafka_msg_s)  rkm_link;
    
        int        rkm_flags;
    
            // 时间戳, 分两类: 客户端生间时的时间和broker接收后作append log时的时间
        int64_t    rkm_timestamp;  
        rd_kafka_timestamp_type_t rkm_tstype; /* rkm_timestamp type */
    
            union {
                    struct {
                            rd_ts_t ts_timeout; /* Message timeout */
                            rd_ts_t ts_enq;     /* Enqueue/Produce time */
                    } producer;
            } rkm_u;
    } rd_kafka_msg_t;
    
    • rd_kafka_message_t类型转化为rd_kafka_msg_t:
      rd_kafka_message_t rkm_rkmessage必须是struct rd_kafka_msg_s结构的第一个字段,
    rd_kafka_msg_t *rd_kafka_message2msg (rd_kafka_message_t *rkmessage) {
        return (rd_kafka_msg_t *)rkmessage;
    }
    
    • 发送kafka message前的审计
      librdkafka支持异步发送, 本地有发送缓冲区, 因为在发送前需要作check, 看发送队列是否已满, 如果设置了block发送, 在发送队列满的情况在要一直阻塞wait, 直到被signal
    static RD_INLINE RD_UNUSED rd_kafka_resp_err_t
    rd_kafka_curr_msgs_add (rd_kafka_t *rk, unsigned int cnt, size_t size,
                int block) {
    
        if (rk->rk_type != RD_KAFKA_PRODUCER)
            return RD_KAFKA_RESP_ERR_NO_ERROR;
    
        mtx_lock(&rk->rk_curr_msgs.lock);
        while (unlikely(rk->rk_curr_msgs.cnt + cnt >
                rk->rk_curr_msgs.max_cnt ||
                (unsigned long long)(rk->rk_curr_msgs.size + size) >
                (unsigned long long)rk->rk_curr_msgs.max_size)) {
            if (!block) {
                mtx_unlock(&rk->rk_curr_msgs.lock);
                return RD_KAFKA_RESP_ERR__QUEUE_FULL;
            }
    
            cnd_wait(&rk->rk_curr_msgs.cnd, &rk->rk_curr_msgs.lock);
        }
    
        rk->rk_curr_msgs.cnt  += cnt;
        rk->rk_curr_msgs.size += size;
        mtx_unlock(&rk->rk_curr_msgs.lock);
    
        return RD_KAFKA_RESP_ERR_NO_ERROR;
    }
    
    • 创建rd_kafka_msg_t, 内部接口 rd_kafka_msg_t *rd_kafka_msg_new00
    rd_kafka_msg_t *rd_kafka_msg_new00 (rd_kafka_itopic_t *rkt,
                        int32_t partition,
                        int msgflags,
                        char *payload, size_t len,
                        const void *key, size_t keylen,
                        void *msg_opaque) {
        rd_kafka_msg_t *rkm;
        size_t mlen = sizeof(*rkm);
        char *p;
    
        /* If we are to make a copy of the payload, allocate space for it too */
            // 如果设置了RD_KAFKA_MSG_F_COPY,  需要为payload分配内存,在rd_kafka_msg_t后面
        if (msgflags & RD_KAFKA_MSG_F_COPY) {
            msgflags &= ~RD_KAFKA_MSG_F_FREE;
            mlen += len;
        }
    
        mlen += keylen;
    
        /* Note: using rd_malloc here, not rd_calloc, so make sure all fields
         *       are properly set up. */
        rkm                 = rd_malloc(mlen);
        rkm->rkm_err        = 0;
        rkm->rkm_flags      = RD_KAFKA_MSG_F_FREE_RKM | msgflags;
        rkm->rkm_len        = len;
        rkm->rkm_opaque     = msg_opaque;
        rkm->rkm_rkmessage.rkt = rd_kafka_topic_keep_a(rkt);
    
        rkm->rkm_partition  = partition;
            rkm->rkm_offset     = RD_KAFKA_OFFSET_INVALID;
        rkm->rkm_timestamp  = 0;
        rkm->rkm_tstype     = RD_KAFKA_TIMESTAMP_NOT_AVAILABLE;
    
        p = (char *)(rkm+1);
    
            // 复制payload
        if (payload && msgflags & RD_KAFKA_MSG_F_COPY) {
            /* Copy payload to space following the ..msg_t */
            rkm->rkm_payload = p;
            memcpy(rkm->rkm_payload, payload, len);
            p += len;
    
        } else {
            /* Just point to the provided payload. */
            rkm->rkm_payload = payload;
        }
    
        if (key) {
            rkm->rkm_key     = p;
            rkm->rkm_key_len = keylen;
            memcpy(rkm->rkm_key, key, keylen);
        } else {
            rkm->rkm_key = NULL;
            rkm->rkm_key_len = 0;
        }
    
            return rkm;
    }
    
    • 创建rd_kafka_msg_t, 创建之前增加check, 内部接口 rd_kafka_msg_t *rd_kafka_msg_new0
    static rd_kafka_msg_t *rd_kafka_msg_new0 (rd_kafka_itopic_t *rkt,
                                              int32_t force_partition,
                                              int msgflags,
                                              char *payload, size_t len,
                                              const void *key, size_t keylen,
                                              void *msg_opaque,
                                              rd_kafka_resp_err_t *errp,
                                              int *errnop,
                                              int64_t timestamp,
                                              rd_ts_t now) {
        rd_kafka_msg_t *rkm;
    
        if (unlikely(!payload))
            len = 0;
        if (!key)
            keylen = 0;
    
            // 检查msg大小是否超出了配置的最大msg大小
        if (unlikely(len + keylen >
                 (size_t)rkt->rkt_rk->rk_conf.max_msg_size ||
                 keylen > INT32_MAX)) {
            *errp = RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE;
            if (errnop)
                *errnop = EMSGSIZE;
            return NULL;
        }
    
            // 检查发送队列是否已满, 如果设置了block发送, 在发送队列满的情况在要一直阻塞wait, 直到被signal
        *errp = rd_kafka_curr_msgs_add(rkt->rkt_rk, 1, len,
                           msgflags & RD_KAFKA_MSG_F_BLOCK);
        if (unlikely(*errp)) {
            if (errnop)
                *errnop = ENOBUFS;
            return NULL;
        }
    
            // 创建 rd_kakfa_msg_t
        rkm = rd_kafka_msg_new00(rkt, force_partition,
                     msgflags|RD_KAFKA_MSG_F_ACCOUNT /* curr_msgs_add() */,
                     payload, len, key, keylen, msg_opaque);
    
            if (timestamp)
                    rkm->rkm_timestamp  = timestamp;
            else
                    rkm->rkm_timestamp = rd_uclock()/1000;
            rkm->rkm_tstype     = RD_KAFKA_TIMESTAMP_CREATE_TIME;
    
            rkm->rkm_ts_enq = now;
    
        if (rkt->rkt_conf.message_timeout_ms == 0) {
            rkm->rkm_ts_timeout = INT64_MAX;
        } else {
            rkm->rkm_ts_timeout = now +
                rkt->rkt_conf.message_timeout_ms * 1000;
        }
    
            /* Call interceptor chain for on_send */
            // on_send拦截器, 对这个rkm->rkm_rkmessage作一些个性化处理
            rd_kafka_interceptors_on_send(rkt->rkt_rk, &rkm->rkm_rkmessage);
    
            return rkm;
    }
    
    • 创建rd_kafka_msg_t, 并放入选定的topic-partition的队列
    int rd_kafka_msg_new (rd_kafka_itopic_t *rkt, int32_t force_partition,
                  int msgflags,
                  char *payload, size_t len,
                  const void *key, size_t keylen,
                  void *msg_opaque) {
        rd_kafka_msg_t *rkm;
        rd_kafka_resp_err_t err;
        int errnox;
    
            /* Create message */
            // 创建 rd_kafka_msg_t
            rkm = rd_kafka_msg_new0(rkt, force_partition, msgflags, 
                                    payload, len, key, keylen, msg_opaque,
                                    &err, &errnox, 0, rd_clock());
            if (unlikely(!rkm)) {
                    /* errno is already set by msg_new() */
            rd_kafka_set_last_error(err, errnox);
                    return -1;
            }
    
    
            /* Partition the message */
           // 选定topic-parition, 放入队列, 这个函数很重要, 我们会单独讲
        err = rd_kafka_msg_partitioner(rkt, rkm, 1);
        if (likely(!err)) {
            rd_kafka_set_last_error(0, 0);
            return 0;
        }
    
           // 失败的话, 作清理, 设置error
            /* Interceptor: unroll failing messages by triggering on_ack.. */
            rkm->rkm_err = err;
            rd_kafka_interceptors_on_acknowledgement(rkt->rkt_rk,
                                                     &rkm->rkm_rkmessage);
    
        /* Handle partitioner failures: it only fails when the application
         * attempts to force a destination partition that does not exist
         * in the cluster.  Note we must clear the RD_KAFKA_MSG_F_FREE
         * flag since our contract says we don't free the payload on
         * failure. */
    
        rkm->rkm_flags &= ~RD_KAFKA_MSG_F_FREE;
        rd_kafka_msg_destroy(rkt->rkt_rk, rkm);
    
        /* Translate error codes to errnos. */
        if (err == RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION)
            rd_kafka_set_last_error(err, ESRCH);
        else if (err == RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC)
            rd_kafka_set_last_error(err, ENOENT);
        else
            rd_kafka_set_last_error(err, EINVAL); /* NOTREACHED */
    
        return -1;
    }
    
    struct rd_kafka_msgq_t
    • 所在文件: src/rdkafka_msg.h
    • 其实就是简单封装的rd_kafka_msg_t队列
    • 定义:
    typedef struct rd_kafka_msgq_s {
        TAILQ_HEAD(, rd_kafka_msg_s) rkmq_msgs;
    
            // kafka message 个数
        rd_atomic32_t rkmq_msg_cnt;
    
            // kafka message总大小
        rd_atomic64_t rkmq_msg_bytes;
    } rd_kafka_msgq_t;
    
    • 合并两个rd_kafka_msgq_t:
    void rd_kafka_msgq_concat (rd_kafka_msgq_t *dst,
                               rd_kafka_msgq_t *src)
    
    • 使用一个rd_kafka_msgq_t覆盖另一个rd_kafka_msgq_t
    void rd_kafka_msgq_move (rd_kafka_msgq_t *dst,
                             rd_kafka_msgq_t *src)
    
    • 从队列里删除一个rd_kafka_msgq_t
    rd_kafka_msg_t *rd_kafka_msgq_deq (rd_kafka_msgq_t *rkmq,
                       rd_kafka_msg_t *rkm,
                       int do_count)
    
    • 出队列
    rd_kafka_msg_t *rd_kafka_msgq_pop (rd_kafka_msgq_t *rkmq) {
        rd_kafka_msg_t *rkm;
    
        if (((rkm = TAILQ_FIRST(&rkmq->rkmq_msgs))))
            rd_kafka_msgq_deq(rkmq, rkm, 1);
    
        return rkm;
    }
    
    • 入队列, 插入到队尾
    static RD_INLINE RD_UNUSED void rd_kafka_msgq_enq (rd_kafka_msgq_t *rkmq,
                            rd_kafka_msg_t *rkm) {
        TAILQ_INSERT_TAIL(&rkmq->rkmq_msgs, rkm, rkm_link);
        rd_atomic32_add(&rkmq->rkmq_msg_cnt, 1);
        rd_atomic64_add(&rkmq->rkmq_msg_bytes, rkm->rkm_len+rkm->rkm_key_len);
    }
    
    • 扫描队列, 将超时的加入到超时队列
    int rd_kafka_msgq_age_scan (rd_kafka_msgq_t *rkmq,
                    rd_kafka_msgq_t *timedout,
                    rd_ts_t now) {
        rd_kafka_msg_t *rkm, *tmp;
        int cnt = rd_atomic32_get(&timedout->rkmq_msg_cnt);
    
        /* Assume messages are added in time sequencial order */
        TAILQ_FOREACH_SAFE(rkm, &rkmq->rkmq_msgs, rkm_link, tmp) {
            if (likely(rkm->rkm_ts_timeout > now))
                break;
    
            rd_kafka_msgq_deq(rkmq, rkm, 1);
            rd_kafka_msgq_enq(timedout, rkm);
        }
    
        return rd_atomic32_get(&timedout->rkmq_msg_cnt) - cnt;
    }
    
    • rd_kafka_msg_partitioner很重要的一个函数, 作两件事: 确定一个topic-partition, 然后把这个rd_kafka_msgq_t放到这个topic-parition对应的队列里
    int rd_kafka_msg_partitioner (rd_kafka_itopic_t *rkt, rd_kafka_msg_t *rkm,
                      int do_lock) {
        int32_t partition;
        rd_kafka_toppar_t *rktp_new;
            shptr_rd_kafka_toppar_t *s_rktp_new;
        rd_kafka_resp_err_t err;
    
        if (do_lock)
            rd_kafka_topic_rdlock(rkt);
    
           // 根据这个topic当前的状态, 分别作处理
            switch (rkt->rkt_state)
            {
            case RD_KAFKA_TOPIC_S_UNKNOWN:
                    /* No metadata received from cluster yet.
                     * Put message in UA partition and re-run partitioner when
                     * cluster comes up. */
            partition = RD_KAFKA_PARTITION_UA;
                    break;
    
            case RD_KAFKA_TOPIC_S_NOTEXISTS:
                    /* Topic not found in cluster.
                     * Fail message immediately. */
                    err = RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC;
            if (do_lock)
                rd_kafka_topic_rdunlock(rkt);
                    return err;
    
            case RD_KAFKA_TOPIC_S_EXISTS:
                    /* Topic exists in cluster. */
    
                    /* Topic exists but has no partitions.
                     * This is usually an transient state following the
                     * auto-creation of a topic. */
                    if (unlikely(rkt->rkt_partition_cnt == 0)) {
                            partition = RD_KAFKA_PARTITION_UA;
                            break;
                    }
    
                    /* Partition not assigned, run partitioner. */
                    // 如果rkm->rkm_partition == RD_KAFKA_PARTITION_UA, 调用设转置的partitioner函数来确定一个partition
                    if (rkm->rkm_partition == RD_KAFKA_PARTITION_UA) {
                            rd_kafka_topic_t *app_rkt;
                            /* Provide a temporary app_rkt instance to protect
                             * from the case where the application decided to
                             * destroy its topic object prior to delivery completion
                             * (issue #502). */
                            app_rkt = rd_kafka_topic_keep_a(rkt);
                            partition = rkt->rkt_conf.
                                    partitioner(app_rkt,
                                                rkm->rkm_key,
                            rkm->rkm_key_len,
                                                rkt->rkt_partition_cnt,
                                                rkt->rkt_conf.opaque,
                                                rkm->rkm_opaque);
                            rd_kafka_topic_destroy0(
                                    rd_kafka_topic_a2s(app_rkt));
                    } else
                            partition = rkm->rkm_partition;
    
                    /* Check that partition exists. */
                    if (partition >= rkt->rkt_partition_cnt) {
                            err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
                            if (do_lock)
                                    rd_kafka_topic_rdunlock(rkt);
                            return err;
                    }
                    break;
    
            default:
                    rd_kafka_assert(rkt->rkt_rk, !*"NOTREACHED");
                    break;
            }
    
        /* Get new partition */
        s_rktp_new = rd_kafka_toppar_get(rkt, partition, 0);
    
        if (unlikely(!s_rktp_new)) {
            /* Unknown topic or partition */
            if (rkt->rkt_state == RD_KAFKA_TOPIC_S_NOTEXISTS)
                err = RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC;
            else
                err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
    
            if (do_lock)
                rd_kafka_topic_rdunlock(rkt);
    
            return  err;
        }
    
            rktp_new = rd_kafka_toppar_s2i(s_rktp_new);
            rd_atomic64_add(&rktp_new->rktp_c.msgs, 1);
    
            /* Update message partition */
            if (rkm->rkm_partition == RD_KAFKA_PARTITION_UA)
                    rkm->rkm_partition = partition;
    
        /* Partition is available: enqueue msg on partition's queue */
            // 塞到partition队列的队尾
        rd_kafka_toppar_enq_msg(rktp_new, rkm);
        if (do_lock)
            rd_kafka_topic_rdunlock(rkt);
        rd_kafka_toppar_destroy(s_rktp_new); /* from _get() */
        return 0;
    }
    

    Librdkafka源码分析-Content Table

    相关文章

      网友评论

        本文标题:Librdkafka对Kafka Message的封装和相关操作

        本文链接:https://www.haomeiwen.com/subject/akzwnxtx.html