美文网首页
suricata 支持Kafka输出

suricata 支持Kafka输出

作者: 安全的小飞飞 | 来源:发表于2020-10-23 10:25 被阅读0次

前提:

源代码修改可以参考:
https://github.com/CosmosSun/suricata/tree/kafka-feature-3105-v5
C语言写入kafka可参考:
https://github.com/edenhill/librdkafka/blob/master/examples/rdkafka_example.c
编译时安装依赖并开启Kafka支持即可:

yum install librdkafka-devel

./configure --enable-rdkafka

修改:

加入其他配置及Kafka监控配置

1、在上述已经修改的基础上修改:util-log-kafka.h 文件如下:

#ifndef __UTIL_LOG_KAFKA_H__
#define __UTIL_LOG_KAFKA_H__

#ifdef HAVE_LIBRDKAFKA
#include <librdkafka/rdkafka.h>


typedef struct KafkaSetup{
    const char *brokers;
    const char *topic_name;
    const char *broker_version;
    const char *queue_buff_max_kbytes;
    const char *queue_buff_max_ms;
    const char *queue_buff_max_messages;
    const char *stat_int_ms;
    const char *compression_codec;
    const char *batch_num_messages;
    const char *message_max_bytes;
    int partitions;
}KafkaSetup;

typedef struct SCLogKafkaContext{
    rd_kafka_t *rk;
    rd_kafka_topic_t *rkt;
    long partition;
}SCLogKafkaContext;

int LogFileWriteKafka(void *lf_ctx, const char *string, size_t string_len);
int SCConfLogOpenKafka(ConfNode *kafka_node, void *lf_ctx);

#endif

#endif

2、util-log-kafka.c 文件如下:


#include "suricata-common.h"
#include "util-log-kafka.h"
#include "util-logopenfile.h"

#ifdef HAVE_LIBRDKAFKA

/** \brief close kafka log
 *  \param log_ctx Log file context
 */

static int  partition = RD_KAFKA_PARTITION_UA;

static void SCLogFileCloseKafka(LogFileCtx *log_ctx)
{
    SCLogKafkaContext *kafka_ctx = log_ctx->kafka;

    if (NULL == kafka_ctx) {
        return;
    }

    if (kafka_ctx->rk) {
        /* Poll to handle delivery reports */
//      rd_kafka_poll(kafka_ctx->rk, 0);
        rd_kafka_flush(kafka_ctx->rk, 10*1000);
        /* Wait for messages to be delivered */
        while (rd_kafka_outq_len(kafka_ctx->rk) > 0)
            rd_kafka_poll(kafka_ctx->rk, 100);
    }

    if (kafka_ctx->rkt) {
        /* Destroy topic */
        rd_kafka_topic_destroy(kafka_ctx->rkt);
    }

    if (kafka_ctx->rk) {
        /* Destroy the handle */
        rd_kafka_destroy(kafka_ctx->rk);
    }
    return;
}

/**
 * \brief LogFileWriteKafka() writes log data to kafka output.
 * \param lf_ctx Log file context allocated by caller
 * \param string buffer with data to write
 * \param string_len data length
 * \retval 0 on sucess;
 * \retval -1 on failure;
 */
int LogFileWriteKafka(void *lf_ctx, const char *string, size_t string_len)
{
    LogFileCtx *log_ctx = lf_ctx;
    SCLogKafkaContext *kafka_ctx = log_ctx->kafka;
    retry:
        if (rd_kafka_produce(kafka_ctx->rkt, partition,
                RD_KAFKA_MSG_F_COPY,
            /* Payload and length */
            (void *)string, string_len,
            /* Optional key and its length */
            NULL, 0,
            /* Message opaque, provided in
             * delivery report callback as
             * msg_opaque. */
            NULL) == -1)
        {
            if (rd_kafka_last_error() == RD_KAFKA_RESP_ERR__QUEUE_FULL){
                /*如果内部队列满,等待消息传输完成并retry,
                内部队列表示要发送的消息和已发送或失败的消息,
                内部队列受限于queue.buffering.max.messages配置项*/
                 rd_kafka_poll(kafka_ctx->rk, 100);
                 goto retry;
            }else{
                SCLogError(SC_ERR_KAFKA,
                    "%% Failed to produce to topic %s "
                    "partition %i: \n",
                    log_ctx->kafka_setup.topic_name, partition);
            }
        }
        rd_kafka_poll(kafka_ctx->rk, 0);
    return -1;
}

//收到消息的回调
static void dr_msg_cb(rd_kafka_t *rk,
                      const rd_kafka_message_t *rkmessage, void *opaque){
        if(rkmessage->err)
            SCLogError(SC_ERR_KAFKA,"%% Message delivery failed: %s\n",
                    rd_kafka_err2str(rkmessage->err));
}
//Kafka统计回调
static int stats_cb(rd_kafka_t *rk, char *json, size_t json_len, void *opaque){
    SCLogInfo("%% stats delivery : %s\n",json);
    fprintf(stderr, "%% stats delivery : %s\n",json);
    return 0;
}

/** \brief configure and initializes kafka output logging
 *  \param kafka_node ConfNode structure for the output section in question
 *  \param lf_ctx Log file context allocated by caller
 *  \retval 0 on success
 */
int SCConfLogOpenKafka(ConfNode *kafka_node, void *lf_ctx)
{
    LogFileCtx *log_ctx = lf_ctx;
    const char *partitions = NULL;
    SCLogKafkaContext *kafka_ctx = NULL;

    if (NULL == kafka_node) {
        return -1;
    }

    log_ctx->kafka_setup.brokers = ConfNodeLookupChildValue(kafka_node, "brokers");
    log_ctx->kafka_setup.topic_name = ConfNodeLookupChildValue(kafka_node, "topic");
    log_ctx->kafka_setup.broker_version = ConfNodeLookupChildValue(kafka_node, "broker.version.fallback");
    log_ctx->kafka_setup.queue_buff_max_kbytes = ConfNodeLookupChildValue(kafka_node, "queue.buffering.max.kbytes");
    log_ctx->kafka_setup.queue_buff_max_ms = ConfNodeLookupChildValue(kafka_node, "queue.buffering.max.ms");
    log_ctx->kafka_setup.queue_buff_max_messages = ConfNodeLookupChildValue(kafka_node, "queue.buffering.max.messages");
    log_ctx->kafka_setup.stat_int_ms = ConfNodeLookupChildValue(kafka_node, "statistics.interval.ms");
    log_ctx->kafka_setup.batch_num_messages = ConfNodeLookupChildValue(kafka_node, "batch.num.messages");
    log_ctx->kafka_setup.message_max_bytes = ConfNodeLookupChildValue(kafka_node, "message.max.bytes");
    log_ctx->kafka_setup.compression_codec = ConfNodeLookupChildValue(kafka_node, "compression.codec");
    partitions =  ConfNodeLookupChildValue(kafka_node, "partitions");
    log_ctx->kafka_setup.partitions = atoi(partitions);

    /*create kafka ctx*/
    rd_kafka_conf_t *conf;
    rd_kafka_topic_conf_t *topic_conf;
    char tmp[16];
    char errstr[512];
    kafka_ctx = (SCLogKafkaContext*) SCCalloc(1, sizeof(SCLogKafkaContext));
    if (kafka_ctx == NULL) {
        SCLogError(SC_ERR_MEM_ALLOC, "Unable to allocate kafka context");
        exit(EXIT_FAILURE);
    }

    conf = rd_kafka_conf_new();
    snprintf(tmp, sizeof(tmp), "%i", SIGIO);
    if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(conf,
        "internal.termination.signal",
        tmp,
        errstr,
        sizeof(errstr))) {
        SCLogError(SC_ERR_KAFKA, "Unable to allocate kafka context");
        exit(EXIT_FAILURE);
    }
    if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(conf,
        "broker.version.fallback",
        log_ctx->kafka_setup.broker_version,
        errstr,
        sizeof(errstr))) {
        SCLogError(SC_ERR_KAFKA, "%s", errstr);
        exit(EXIT_FAILURE);
    }
    if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(conf,
        "queue.buffering.max.messages",
        log_ctx->kafka_setup.queue_buff_max_messages,
        errstr,
        sizeof(errstr))) {
        SCLogError(SC_ERR_KAFKA, "%s", errstr);
        exit(EXIT_FAILURE);
    }
     if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(conf,
        "queue.buffering.max.ms",
        log_ctx->kafka_setup.queue_buff_max_ms,
        errstr,
        sizeof(errstr))) {
        SCLogError(SC_ERR_KAFKA, "%s", errstr);
        exit(EXIT_FAILURE);
    }
    if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(conf,
        "compression.codec",
        log_ctx->kafka_setup.compression_codec,
        errstr,
        sizeof(errstr))) {
        SCLogError(SC_ERR_KAFKA, "%s", errstr);
        exit(EXIT_FAILURE);
    }
    if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(conf,
        "queue.buffering.max.kbytes",
        log_ctx->kafka_setup.queue_buff_max_kbytes,
        errstr,
        sizeof(errstr))) {
        SCLogError(SC_ERR_KAFKA, "%s", errstr);
        exit(EXIT_FAILURE);
    }
    if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(conf,
        "batch.num.messages",
        log_ctx->kafka_setup.batch_num_messages,
        errstr,
        sizeof(errstr))) {
        SCLogError(SC_ERR_KAFKA, "%s", errstr);
        exit(EXIT_FAILURE);
    }
     if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(conf,
        "message.max.bytes",
        log_ctx->kafka_setup.message_max_bytes,
        errstr,
        sizeof(errstr))) {
        SCLogError(SC_ERR_KAFKA, "%s", errstr);
        exit(EXIT_FAILURE);
    }
     if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(conf,
        "statistics.interval.ms",
        log_ctx->kafka_setup.stat_int_ms,
        errstr,
        sizeof(errstr))) {
        SCLogError(SC_ERR_KAFKA, "%s", errstr);
        exit(EXIT_FAILURE);
    }
    rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);
    rd_kafka_conf_set_stats_cb(conf,stats_cb);
    if (!(kafka_ctx->rk = rd_kafka_new(RD_KAFKA_PRODUCER,
        conf,
        errstr,
        sizeof(errstr)))) {
        rd_kafka_conf_destroy(conf);
        SCLogError(SC_ERR_KAFKA, "%% Failed to create new producer: %s", errstr);
        exit(EXIT_FAILURE);
    }
    if (0 == rd_kafka_brokers_add(kafka_ctx->rk,
        log_ctx->kafka_setup.brokers)) {
        rd_kafka_destroy(kafka_ctx->rk);
        SCLogError(SC_ERR_KAFKA, "%% No valid brokers specified");
        exit(EXIT_FAILURE);
    }
    topic_conf = rd_kafka_topic_conf_new();
    kafka_ctx->rkt = rd_kafka_topic_new(kafka_ctx->rk,
        log_ctx->kafka_setup.topic_name,
        topic_conf);
    if (NULL == kafka_ctx->rkt) {
        rd_kafka_destroy(kafka_ctx->rk);
        SCLogError(SC_ERR_KAFKA, "%% Failed to create kafka topic %s",
            log_ctx->kafka_setup.topic_name);
        exit(EXIT_FAILURE);
    }

    kafka_ctx->partition = 0;
    log_ctx->kafka = kafka_ctx;
    log_ctx->Close = SCLogFileCloseKafka;

    return 0;
}
#endif

上述代码不知道修改了哪些地方可以和原版的(https://github.com/CosmosSun/suricata/tree/kafka-feature-3105-v5)对比下。主要是添加了部分Kafka配置以及监控统计(方便排查问题)。将statistics.interval.ms 设置为零 将关闭统计输出。
kafka配置可以参考:https://blog.csdn.net/qq_34284638/article/details/97641038
kafka监控输出的数据为json,参数意思可以参考:
https://github.com/edenhill/librdkafka/blob/bb96705083b16d773cd15ef64880b605d82c5a1a/STATISTICS.md

遇到的问题

服务端版本为0.11.0 ,运行一会就输出下面错误。在0.10.0下没有这个错误。

 Receive failed: Connection reset by peer

相关文章

  • suricata 支持Kafka输出

    前提: 源代码修改可以参考:https://github.com/CosmosSun/suricata/tree/...

  • suricata使用捕获硬件

    1.Endace DAG Suricata附带原生Endace DAG卡支持。这意味着Suricata可以直接使用...

  • suricata 输出 - Lua

    1. Lua 输出 Suricata提供了通过可插入的lua脚本获取特定类型网络流量的更详细输出的可能性。您可以自...

  • suricata 输出 - eve

    1.EVE 1.1Eve JSON输出 EVE输出工具通过JSON输出警报,元数据,文件信息和协议特定记录。 最常...

  • [Suricata]结果写入kafka

    目前是公司正在搞得项目, 所以只说思路, 具体代码就不贴了 修改配置中的写出类型 suricata.yaml配置中...

  • suricata-输出http-json body

    默认suricata的http body是单独输出的,现在我们想在eve.json输出http请求的时候输出请求和...

  • lua 支持

    1. Lua在Suricata中的使用 Lua脚本可以在Suricata的两个组件中使用。第一个是在输出。第二个是...

  • flinksql从kafka中消费mysql的binlog日志

    *使用canal采集mysql的binlog,输出到kafka,然后使用flinksql消费kafka,并输出到屏...

  • suricata 规则管理

    1.Suricata-Update 1.1.使用Suricata-Update进行规则管理 suricata-up...

  • 删除kafka中的topic

    1 配置kafka broker支持删除topic 删除topic需要kafka broker的支持,在broke...

网友评论

      本文标题:suricata 支持Kafka输出

      本文链接:https://www.haomeiwen.com/subject/rrcwpktx.html