前提:
源代码修改可以参考:
https://github.com/CosmosSun/suricata/tree/kafka-feature-3105-v5
C语言写入kafka可参考:
https://github.com/edenhill/librdkafka/blob/master/examples/rdkafka_example.c
编译时安装依赖并开启Kafka支持即可:
yum install librdkafka-devel
./configure --enable-rdkafka
修改:
加入其他配置及Kafka监控配置
1、在上述已经修改的基础上修改:util-log-kafka.h 文件如下:
#ifndef __UTIL_LOG_KAFKA_H__
#define __UTIL_LOG_KAFKA_H__
#ifdef HAVE_LIBRDKAFKA
#include <librdkafka/rdkafka.h>
typedef struct KafkaSetup{
const char *brokers;
const char *topic_name;
const char *broker_version;
const char *queue_buff_max_kbytes;
const char *queue_buff_max_ms;
const char *queue_buff_max_messages;
const char *stat_int_ms;
const char *compression_codec;
const char *batch_num_messages;
const char *message_max_bytes;
int partitions;
}KafkaSetup;
typedef struct SCLogKafkaContext{
rd_kafka_t *rk;
rd_kafka_topic_t *rkt;
long partition;
}SCLogKafkaContext;
int LogFileWriteKafka(void *lf_ctx, const char *string, size_t string_len);
int SCConfLogOpenKafka(ConfNode *kafka_node, void *lf_ctx);
#endif
#endif
2、util-log-kafka.c 文件如下:
#include "suricata-common.h"
#include "util-log-kafka.h"
#include "util-logopenfile.h"
#ifdef HAVE_LIBRDKAFKA
/** \brief close kafka log
* \param log_ctx Log file context
*/
static int partition = RD_KAFKA_PARTITION_UA;
static void SCLogFileCloseKafka(LogFileCtx *log_ctx)
{
SCLogKafkaContext *kafka_ctx = log_ctx->kafka;
if (NULL == kafka_ctx) {
return;
}
if (kafka_ctx->rk) {
/* Poll to handle delivery reports */
// rd_kafka_poll(kafka_ctx->rk, 0);
rd_kafka_flush(kafka_ctx->rk, 10*1000);
/* Wait for messages to be delivered */
while (rd_kafka_outq_len(kafka_ctx->rk) > 0)
rd_kafka_poll(kafka_ctx->rk, 100);
}
if (kafka_ctx->rkt) {
/* Destroy topic */
rd_kafka_topic_destroy(kafka_ctx->rkt);
}
if (kafka_ctx->rk) {
/* Destroy the handle */
rd_kafka_destroy(kafka_ctx->rk);
}
return;
}
/**
* \brief LogFileWriteKafka() writes log data to kafka output.
* \param lf_ctx Log file context allocated by caller
* \param string buffer with data to write
* \param string_len data length
* \retval 0 on sucess;
* \retval -1 on failure;
*/
int LogFileWriteKafka(void *lf_ctx, const char *string, size_t string_len)
{
LogFileCtx *log_ctx = lf_ctx;
SCLogKafkaContext *kafka_ctx = log_ctx->kafka;
retry:
if (rd_kafka_produce(kafka_ctx->rkt, partition,
RD_KAFKA_MSG_F_COPY,
/* Payload and length */
(void *)string, string_len,
/* Optional key and its length */
NULL, 0,
/* Message opaque, provided in
* delivery report callback as
* msg_opaque. */
NULL) == -1)
{
if (rd_kafka_last_error() == RD_KAFKA_RESP_ERR__QUEUE_FULL){
/*如果内部队列满,等待消息传输完成并retry,
内部队列表示要发送的消息和已发送或失败的消息,
内部队列受限于queue.buffering.max.messages配置项*/
rd_kafka_poll(kafka_ctx->rk, 100);
goto retry;
}else{
SCLogError(SC_ERR_KAFKA,
"%% Failed to produce to topic %s "
"partition %i: \n",
log_ctx->kafka_setup.topic_name, partition);
}
}
rd_kafka_poll(kafka_ctx->rk, 0);
return -1;
}
//收到消息的回调
static void dr_msg_cb(rd_kafka_t *rk,
const rd_kafka_message_t *rkmessage, void *opaque){
if(rkmessage->err)
SCLogError(SC_ERR_KAFKA,"%% Message delivery failed: %s\n",
rd_kafka_err2str(rkmessage->err));
}
//Kafka统计回调
static int stats_cb(rd_kafka_t *rk, char *json, size_t json_len, void *opaque){
SCLogInfo("%% stats delivery : %s\n",json);
fprintf(stderr, "%% stats delivery : %s\n",json);
return 0;
}
/** \brief configure and initializes kafka output logging
* \param kafka_node ConfNode structure for the output section in question
* \param lf_ctx Log file context allocated by caller
* \retval 0 on success
*/
int SCConfLogOpenKafka(ConfNode *kafka_node, void *lf_ctx)
{
LogFileCtx *log_ctx = lf_ctx;
const char *partitions = NULL;
SCLogKafkaContext *kafka_ctx = NULL;
if (NULL == kafka_node) {
return -1;
}
log_ctx->kafka_setup.brokers = ConfNodeLookupChildValue(kafka_node, "brokers");
log_ctx->kafka_setup.topic_name = ConfNodeLookupChildValue(kafka_node, "topic");
log_ctx->kafka_setup.broker_version = ConfNodeLookupChildValue(kafka_node, "broker.version.fallback");
log_ctx->kafka_setup.queue_buff_max_kbytes = ConfNodeLookupChildValue(kafka_node, "queue.buffering.max.kbytes");
log_ctx->kafka_setup.queue_buff_max_ms = ConfNodeLookupChildValue(kafka_node, "queue.buffering.max.ms");
log_ctx->kafka_setup.queue_buff_max_messages = ConfNodeLookupChildValue(kafka_node, "queue.buffering.max.messages");
log_ctx->kafka_setup.stat_int_ms = ConfNodeLookupChildValue(kafka_node, "statistics.interval.ms");
log_ctx->kafka_setup.batch_num_messages = ConfNodeLookupChildValue(kafka_node, "batch.num.messages");
log_ctx->kafka_setup.message_max_bytes = ConfNodeLookupChildValue(kafka_node, "message.max.bytes");
log_ctx->kafka_setup.compression_codec = ConfNodeLookupChildValue(kafka_node, "compression.codec");
partitions = ConfNodeLookupChildValue(kafka_node, "partitions");
log_ctx->kafka_setup.partitions = atoi(partitions);
/*create kafka ctx*/
rd_kafka_conf_t *conf;
rd_kafka_topic_conf_t *topic_conf;
char tmp[16];
char errstr[512];
kafka_ctx = (SCLogKafkaContext*) SCCalloc(1, sizeof(SCLogKafkaContext));
if (kafka_ctx == NULL) {
SCLogError(SC_ERR_MEM_ALLOC, "Unable to allocate kafka context");
exit(EXIT_FAILURE);
}
conf = rd_kafka_conf_new();
snprintf(tmp, sizeof(tmp), "%i", SIGIO);
if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(conf,
"internal.termination.signal",
tmp,
errstr,
sizeof(errstr))) {
SCLogError(SC_ERR_KAFKA, "Unable to allocate kafka context");
exit(EXIT_FAILURE);
}
if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(conf,
"broker.version.fallback",
log_ctx->kafka_setup.broker_version,
errstr,
sizeof(errstr))) {
SCLogError(SC_ERR_KAFKA, "%s", errstr);
exit(EXIT_FAILURE);
}
if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(conf,
"queue.buffering.max.messages",
log_ctx->kafka_setup.queue_buff_max_messages,
errstr,
sizeof(errstr))) {
SCLogError(SC_ERR_KAFKA, "%s", errstr);
exit(EXIT_FAILURE);
}
if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(conf,
"queue.buffering.max.ms",
log_ctx->kafka_setup.queue_buff_max_ms,
errstr,
sizeof(errstr))) {
SCLogError(SC_ERR_KAFKA, "%s", errstr);
exit(EXIT_FAILURE);
}
if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(conf,
"compression.codec",
log_ctx->kafka_setup.compression_codec,
errstr,
sizeof(errstr))) {
SCLogError(SC_ERR_KAFKA, "%s", errstr);
exit(EXIT_FAILURE);
}
if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(conf,
"queue.buffering.max.kbytes",
log_ctx->kafka_setup.queue_buff_max_kbytes,
errstr,
sizeof(errstr))) {
SCLogError(SC_ERR_KAFKA, "%s", errstr);
exit(EXIT_FAILURE);
}
if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(conf,
"batch.num.messages",
log_ctx->kafka_setup.batch_num_messages,
errstr,
sizeof(errstr))) {
SCLogError(SC_ERR_KAFKA, "%s", errstr);
exit(EXIT_FAILURE);
}
if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(conf,
"message.max.bytes",
log_ctx->kafka_setup.message_max_bytes,
errstr,
sizeof(errstr))) {
SCLogError(SC_ERR_KAFKA, "%s", errstr);
exit(EXIT_FAILURE);
}
if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(conf,
"statistics.interval.ms",
log_ctx->kafka_setup.stat_int_ms,
errstr,
sizeof(errstr))) {
SCLogError(SC_ERR_KAFKA, "%s", errstr);
exit(EXIT_FAILURE);
}
rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);
rd_kafka_conf_set_stats_cb(conf,stats_cb);
if (!(kafka_ctx->rk = rd_kafka_new(RD_KAFKA_PRODUCER,
conf,
errstr,
sizeof(errstr)))) {
rd_kafka_conf_destroy(conf);
SCLogError(SC_ERR_KAFKA, "%% Failed to create new producer: %s", errstr);
exit(EXIT_FAILURE);
}
if (0 == rd_kafka_brokers_add(kafka_ctx->rk,
log_ctx->kafka_setup.brokers)) {
rd_kafka_destroy(kafka_ctx->rk);
SCLogError(SC_ERR_KAFKA, "%% No valid brokers specified");
exit(EXIT_FAILURE);
}
topic_conf = rd_kafka_topic_conf_new();
kafka_ctx->rkt = rd_kafka_topic_new(kafka_ctx->rk,
log_ctx->kafka_setup.topic_name,
topic_conf);
if (NULL == kafka_ctx->rkt) {
rd_kafka_destroy(kafka_ctx->rk);
SCLogError(SC_ERR_KAFKA, "%% Failed to create kafka topic %s",
log_ctx->kafka_setup.topic_name);
exit(EXIT_FAILURE);
}
kafka_ctx->partition = 0;
log_ctx->kafka = kafka_ctx;
log_ctx->Close = SCLogFileCloseKafka;
return 0;
}
#endif
上述代码不知道修改了哪些地方可以和原版的(https://github.com/CosmosSun/suricata/tree/kafka-feature-3105-v5)对比下。主要是添加了部分Kafka配置以及监控统计(方便排查问题)。将statistics.interval.ms 设置为零 将关闭统计输出。
kafka配置可以参考:https://blog.csdn.net/qq_34284638/article/details/97641038
kafka监控输出的数据为json,参数意思可以参考:
https://github.com/edenhill/librdkafka/blob/bb96705083b16d773cd15ef64880b605d82c5a1a/STATISTICS.md
遇到的问题
服务端版本为0.11.0 ,运行一会就输出下面错误。在0.10.0下没有这个错误。
Receive failed: Connection reset by peer
网友评论