美文网首页
flume多channel区分输出

flume多channel区分输出

作者: 王金松 | 来源:发表于2019-10-22 09:40 被阅读0次

    场景

    我们收集的主机侧数据有两种,一种是直接作为最终数据给用户展示的,比如webshell、异地登录等等;另一种是需要做一个聚合操作才能最终给用户展示的,比如暴力破解(主机侧在短时间内发出很多告警数据)。
    但是主机侧的这两类数据是通过一个kafka的topic给我们传输的,所以我们通过多sink把需要后续做聚合的和直接展示的在补齐资产之后写到不同的topic中
    

    实现

    配置文件
    a1.sources=hids
    a1.channels=normal agg
    a1.sinks=k1 k2
    
    a1.sources.hids.type = org.apache.flume.source.kafka.KafkaSource
    a1.sources.hids.batchSize = 1000  
    a1.sources.hids.batchDurationMillis = 2000  
    a1.sources.hids.kafka.bootstrap.servers = 127.0.0.1:9092
    a1.sources.hids.kafka.topics = hids_sec_alert
    a1.sources.hids.kafka.consumer.group.id = csa
    a1.sources.hids.setTopicHeader = false
    a1.sources.hids.interceptors = i1
    a1.sources.hids.interceptors.i1.type = com.jd.sa.HidsInterceptor$Builder
    a1.sources.hids.interceptors.i1.mysqlurl = jdbc:mysql://127.0.0.1:3306/db?zeroDateTimeBehavior=convertToNull&useUnicode=true&characterEncoding=UTF-8
    a1.sources.hids.interceptors.i1.mysqlusername = root
    a1.sources.hids.interceptors.i1.mysqlpassword = root
    a1.sources.hids.interceptors.i1.dbvalid = true
    a1.sources.hids.interceptors.i1.dataexpired = 3600
    a1.sources.hids.selector.type = com.jd.channelSelector.HidsChannelSelector
    a1.sources.hids.selector.channel_detail = {'agg':'SUBTYPE_BRUTEFORCE'}
    
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.kafka.topic = jdcloud-hids-alert-log
    a1.sinks.k1.kafka.bootstrap.servers = 10.226.193.71:9092
    a1.sinks.k1.kafka.flumeBatchSize = 1000
    a1.sinks.k1.kafka.producer.acks = 1
    a1.sinks.k1.kafka.producer.linger.ms = 1
    
    a1.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k2.kafka.topic = jdcloud-hids-brute-force
    a1.sinks.k2.kafka.bootstrap.servers = 10.226.193.71:9092
    a1.sinks.k2.kafka.flumeBatchSize = 1000
    a1.sinks.k2.kafka.producer.acks = 1
    a1.sinks.k2.kafka.producer.linger.ms = 1
    #a1.sinks.ki.kafka.producer.compression.type = snappy
    
    a1.channels.normal.type=memory
    a1.channels.normal.capacity=10000
    a1.channels.normal.transactionCapacity=1000
    
    a1.channels.agg.type=memory
    a1.channels.agg.capacity=10000
    a1.channels.agg.transactionCapacity=1000
    
    a1.sources.hids.channels=normal agg 
    a1.sinks.k1.channel=normal
    a1.sinks.k2.channel=agg
    

    格式转换和补齐资产还是通过一个Interceptor实现,这个地方不做讲解
    通过配置文件可以看到我们是通过channel对不同的数据进行区分,划分到不同的channel中,然后不同的channel分别对应一个sink,这样就实现了多输出

    HidsChannelSelector 实现
    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.JSONObject;
    import com.google.common.base.Charsets;
    import com.jd.sa.HidsInterceptor;
    import org.apache.flume.Channel;
    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.channel.AbstractChannelSelector;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.*;
    
    public class HidsChannelSelector extends AbstractChannelSelector {
    
        private static final Logger logger = LoggerFactory
                .getLogger(HidsInterceptor.class);
        private static final String CHANNEL_DETAIL = "channel_detail";
        private static final String NROMAL_CHANNEL = "normal_channel";
        Map<String, List<Channel>> channelInfo = new HashMap<>();
    
        @Override
        public List<Channel> getRequiredChannels(Event event) {
            String eventBody = new String(event.getBody(), Charsets.UTF_8);
            JSONObject body = JSON.parseObject(eventBody);
            String subtype = body.getString("subType");
    
            if (channelInfo.containsKey(subtype)) {
                return channelInfo.get(subtype);
            } else {
                return channelInfo.get(NROMAL_CHANNEL);
            }
        }
    
        @Override
        public List<Channel> getOptionalChannels(Event event) {
            return new ArrayList<>();
        }
    
        @Override
        public void configure(Context cntxt) {
            String subtypes = cntxt.getString(CHANNEL_DETAIL);
    
            logger.info("channel detail is {}", subtypes);
            if (subtypes == null) {
                throw new IllegalArgumentException("no set values for channel_detail");
            }
            Map subTypeObj = JSON.parseObject(subtypes);
            Map<String, Channel> channelNameMap = getChannelNameMap();
    
            for (Object key: subTypeObj.keySet()) {
                Object valueObj = subTypeObj.get(key);
                if (valueObj != null) {
                    String value = valueObj.toString();
                    String[] subtypeNames = value.split(",");
                    Channel childChannel = channelNameMap.remove(key.toString());
                    for (String type: subtypeNames) {
                        List<Channel> type_channels = channelInfo.get(type);
                        if (type_channels == null)
                            type_channels = new ArrayList<>();
                        type_channels.add(childChannel);
                        channelInfo.put(type, type_channels);
                    }
                }
            }
            List<Channel> normalChannels = new ArrayList<Channel>(channelNameMap.values());
            channelInfo.put(NROMAL_CHANNEL, normalChannels);
        }
    }
    

    相关文章

      网友评论

          本文标题:flume多channel区分输出

          本文链接:https://www.haomeiwen.com/subject/nvifvctx.html