美文网首页
flume多channel区分输出

flume多channel区分输出

作者: 王金松 | 来源:发表于2019-10-22 09:40 被阅读0次

场景

我们收集的主机侧数据有两种,一种是直接作为最终数据给用户展示的,比如webshell、异地登录等等;另一种是需要做一个聚合操作才能最终给用户展示的,比如暴力破解(主机侧在短时间内发出很多告警数据)。
但是主机侧的这两类数据是通过一个kafka的topic给我们传输的,所以我们通过多sink把需要后续做聚合的和直接展示的在补齐资产之后写到不同的topic中

实现

配置文件
a1.sources=hids
a1.channels=normal agg
a1.sinks=k1 k2

a1.sources.hids.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.hids.batchSize = 1000  
a1.sources.hids.batchDurationMillis = 2000  
a1.sources.hids.kafka.bootstrap.servers = 127.0.0.1:9092
a1.sources.hids.kafka.topics = hids_sec_alert
a1.sources.hids.kafka.consumer.group.id = csa
a1.sources.hids.setTopicHeader = false
a1.sources.hids.interceptors = i1
a1.sources.hids.interceptors.i1.type = com.jd.sa.HidsInterceptor$Builder
a1.sources.hids.interceptors.i1.mysqlurl = jdbc:mysql://127.0.0.1:3306/db?zeroDateTimeBehavior=convertToNull&useUnicode=true&characterEncoding=UTF-8
a1.sources.hids.interceptors.i1.mysqlusername = root
a1.sources.hids.interceptors.i1.mysqlpassword = root
a1.sources.hids.interceptors.i1.dbvalid = true
a1.sources.hids.interceptors.i1.dataexpired = 3600
a1.sources.hids.selector.type = com.jd.channelSelector.HidsChannelSelector
a1.sources.hids.selector.channel_detail = {'agg':'SUBTYPE_BRUTEFORCE'}

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = jdcloud-hids-alert-log
a1.sinks.k1.kafka.bootstrap.servers = 10.226.193.71:9092
a1.sinks.k1.kafka.flumeBatchSize = 1000
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1

a1.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k2.kafka.topic = jdcloud-hids-brute-force
a1.sinks.k2.kafka.bootstrap.servers = 10.226.193.71:9092
a1.sinks.k2.kafka.flumeBatchSize = 1000
a1.sinks.k2.kafka.producer.acks = 1
a1.sinks.k2.kafka.producer.linger.ms = 1
#a1.sinks.ki.kafka.producer.compression.type = snappy

a1.channels.normal.type=memory
a1.channels.normal.capacity=10000
a1.channels.normal.transactionCapacity=1000

a1.channels.agg.type=memory
a1.channels.agg.capacity=10000
a1.channels.agg.transactionCapacity=1000

a1.sources.hids.channels=normal agg 
a1.sinks.k1.channel=normal
a1.sinks.k2.channel=agg

格式转换和补齐资产还是通过一个Interceptor实现,这个地方不做讲解
通过配置文件可以看到我们是通过channel对不同的数据进行区分,划分到不同的channel中,然后不同的channel分别对应一个sink,这样就实现了多输出

HidsChannelSelector 实现
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.google.common.base.Charsets;
import com.jd.sa.HidsInterceptor;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.channel.AbstractChannelSelector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;

public class HidsChannelSelector extends AbstractChannelSelector {

    private static final Logger logger = LoggerFactory
            .getLogger(HidsInterceptor.class);
    private static final String CHANNEL_DETAIL = "channel_detail";
    private static final String NROMAL_CHANNEL = "normal_channel";
    Map<String, List<Channel>> channelInfo = new HashMap<>();

    @Override
    public List<Channel> getRequiredChannels(Event event) {
        String eventBody = new String(event.getBody(), Charsets.UTF_8);
        JSONObject body = JSON.parseObject(eventBody);
        String subtype = body.getString("subType");

        if (channelInfo.containsKey(subtype)) {
            return channelInfo.get(subtype);
        } else {
            return channelInfo.get(NROMAL_CHANNEL);
        }
    }

    @Override
    public List<Channel> getOptionalChannels(Event event) {
        return new ArrayList<>();
    }

    @Override
    public void configure(Context cntxt) {
        String subtypes = cntxt.getString(CHANNEL_DETAIL);

        logger.info("channel detail is {}", subtypes);
        if (subtypes == null) {
            throw new IllegalArgumentException("no set values for channel_detail");
        }
        Map subTypeObj = JSON.parseObject(subtypes);
        Map<String, Channel> channelNameMap = getChannelNameMap();

        for (Object key: subTypeObj.keySet()) {
            Object valueObj = subTypeObj.get(key);
            if (valueObj != null) {
                String value = valueObj.toString();
                String[] subtypeNames = value.split(",");
                Channel childChannel = channelNameMap.remove(key.toString());
                for (String type: subtypeNames) {
                    List<Channel> type_channels = channelInfo.get(type);
                    if (type_channels == null)
                        type_channels = new ArrayList<>();
                    type_channels.add(childChannel);
                    channelInfo.put(type, type_channels);
                }
            }
        }
        List<Channel> normalChannels = new ArrayList<Channel>(channelNameMap.values());
        channelInfo.put(NROMAL_CHANNEL, normalChannels);
    }
}

相关文章

网友评论

      本文标题:flume多channel区分输出

      本文链接:https://www.haomeiwen.com/subject/nvifvctx.html