场景
我们收集的主机侧数据有两种,一种是直接作为最终数据给用户展示的,比如webshell、异地登录等等;另一种是需要做一个聚合操作才能最终给用户展示的,比如暴力破解(主机侧在短时间内发出很多告警数据)。
但是主机侧的这两类数据是通过一个kafka的topic给我们传输的,所以我们通过多sink把需要后续做聚合的和直接展示的在补齐资产之后写到不同的topic中
实现
配置文件
a1.sources=hids
a1.channels=normal agg
a1.sinks=k1 k2
a1.sources.hids.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.hids.batchSize = 1000
a1.sources.hids.batchDurationMillis = 2000
a1.sources.hids.kafka.bootstrap.servers = 127.0.0.1:9092
a1.sources.hids.kafka.topics = hids_sec_alert
a1.sources.hids.kafka.consumer.group.id = csa
a1.sources.hids.setTopicHeader = false
a1.sources.hids.interceptors = i1
a1.sources.hids.interceptors.i1.type = com.jd.sa.HidsInterceptor$Builder
a1.sources.hids.interceptors.i1.mysqlurl = jdbc:mysql://127.0.0.1:3306/db?zeroDateTimeBehavior=convertToNull&useUnicode=true&characterEncoding=UTF-8
a1.sources.hids.interceptors.i1.mysqlusername = root
a1.sources.hids.interceptors.i1.mysqlpassword = root
a1.sources.hids.interceptors.i1.dbvalid = true
a1.sources.hids.interceptors.i1.dataexpired = 3600
a1.sources.hids.selector.type = com.jd.channelSelector.HidsChannelSelector
a1.sources.hids.selector.channel_detail = {'agg':'SUBTYPE_BRUTEFORCE'}
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = jdcloud-hids-alert-log
a1.sinks.k1.kafka.bootstrap.servers = 10.226.193.71:9092
a1.sinks.k1.kafka.flumeBatchSize = 1000
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k2.kafka.topic = jdcloud-hids-brute-force
a1.sinks.k2.kafka.bootstrap.servers = 10.226.193.71:9092
a1.sinks.k2.kafka.flumeBatchSize = 1000
a1.sinks.k2.kafka.producer.acks = 1
a1.sinks.k2.kafka.producer.linger.ms = 1
#a1.sinks.ki.kafka.producer.compression.type = snappy
a1.channels.normal.type=memory
a1.channels.normal.capacity=10000
a1.channels.normal.transactionCapacity=1000
a1.channels.agg.type=memory
a1.channels.agg.capacity=10000
a1.channels.agg.transactionCapacity=1000
a1.sources.hids.channels=normal agg
a1.sinks.k1.channel=normal
a1.sinks.k2.channel=agg
格式转换和补齐资产还是通过一个Interceptor实现,这个地方不做讲解
通过配置文件可以看到我们是通过channel对不同的数据进行区分,划分到不同的channel中,然后不同的channel分别对应一个sink,这样就实现了多输出
HidsChannelSelector 实现
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.google.common.base.Charsets;
import com.jd.sa.HidsInterceptor;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.channel.AbstractChannelSelector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
public class HidsChannelSelector extends AbstractChannelSelector {
private static final Logger logger = LoggerFactory
.getLogger(HidsInterceptor.class);
private static final String CHANNEL_DETAIL = "channel_detail";
private static final String NROMAL_CHANNEL = "normal_channel";
Map<String, List<Channel>> channelInfo = new HashMap<>();
@Override
public List<Channel> getRequiredChannels(Event event) {
String eventBody = new String(event.getBody(), Charsets.UTF_8);
JSONObject body = JSON.parseObject(eventBody);
String subtype = body.getString("subType");
if (channelInfo.containsKey(subtype)) {
return channelInfo.get(subtype);
} else {
return channelInfo.get(NROMAL_CHANNEL);
}
}
@Override
public List<Channel> getOptionalChannels(Event event) {
return new ArrayList<>();
}
@Override
public void configure(Context cntxt) {
String subtypes = cntxt.getString(CHANNEL_DETAIL);
logger.info("channel detail is {}", subtypes);
if (subtypes == null) {
throw new IllegalArgumentException("no set values for channel_detail");
}
Map subTypeObj = JSON.parseObject(subtypes);
Map<String, Channel> channelNameMap = getChannelNameMap();
for (Object key: subTypeObj.keySet()) {
Object valueObj = subTypeObj.get(key);
if (valueObj != null) {
String value = valueObj.toString();
String[] subtypeNames = value.split(",");
Channel childChannel = channelNameMap.remove(key.toString());
for (String type: subtypeNames) {
List<Channel> type_channels = channelInfo.get(type);
if (type_channels == null)
type_channels = new ArrayList<>();
type_channels.add(childChannel);
channelInfo.put(type, type_channels);
}
}
}
List<Channel> normalChannels = new ArrayList<Channel>(channelNameMap.values());
channelInfo.put(NROMAL_CHANNEL, normalChannels);
}
}
网友评论