1. 日志数据分多个topic
-- 1. 需求
在实际开发中,我们的一条日志数据可能包含很多信息(取决于前端的埋点),如启动数据、加购物车数据、评论数据、错误数据等等,对应
在公司中,有多个部门,来处理不同的数据,如故障信息处理部门,评论信息处理部门,这样一来,对于故障信息处理部门来说,除故障信息
以外的所有数据都是脏数据。
-- 2. 实现方式:
'flume1':
1. 将采集到的数据,使用自定义拦截器:
a、获取这条数据的属性,比如是启动数据、评论数据等。
b、将这个属性加入到event的header中
2. 使用多路复用的选择器,将数据指定到不同的channel中。
3. 开启两个kafka channel
'flume2':
1. 开启两个kafka source
2. 两个channel和两个sink一一对应
3. 实现一天中,一个topic一个文件夹
image.png
image.png
2.2 实现
2.2.1 自定义拦截器
-- 1. 思想:
1. 获取event的数据
2. 获取数据的类型
3. 将数据的类型加入到event的header中
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.util.List;
public class StartTopicandEventTopic implements Interceptor {
/**
* 封装confing属性
*/
public void initialize() {
}
/**
* 处理单个event
* @param event
* @return
*/
public Event intercept(Event event) {
String str = new String(event.getBody());
JSONObject json = JSONObject.parseObject(str);
String ts = json.getString("actions");
if ( ts != null ){
event.getHeaders().put("type","event");
}else {
event.getHeaders().put("type","start");
}
return event;
}
/**
* 处理多个event
* @param events
* @return
*/
public List<Event> intercept(List<Event> events) {
for (Event event : events) {
intercept(event);
}
return events;
}
/**
*
*/
public void close() {
}
public static class MyBuilder implements Interceptor.Builder {
public Interceptor build() {
return new StartTopicandEventTopic();
}
public void configure(Context context) {
}
}
}
2.2.2 编写flume1的配置文件
#步骤一:agent Name
a1.sources = r1
a1.channels = c1 c2
#步骤二:taildir source
a1.sources.r1.type = TAILDIR
#指定position_file 的位置,(记录每次上传后的偏移量,实现断点续传的关键)
a1.sources.r1.positionFile = /opt/module/flume/tail_dir.json
a1.sources.r1.batchSize=500
#监控的文件目录集合
a1.sources.r1.filegroups = f1
#定义监控的文件目录1
a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.*
#指定拦截器
a1.sources.r1.interceptors =i1 i2
#指定拦截器的类型 = 自定义拦截器中builder的实现类的全类名
a1.sources.r1.interceptors.i1.type = com.atguigu.flume.interceptor.LogInterceptor$MyBuilder
#指定拦截器的类型 = 自定义拦截器中builder的实现类的全类名
a1.sources.r1.interceptors.i2.type = StartTopicandEventTopic$MyBuilder
#步骤三、指定channel的选择器:多路复用
a1.sources.r1.selector.type = multiplexing
#自定义拦截器的header的k
a1.sources.r1.selector.header = type
#event是map中一个value值,相同的event进入channel1中
a1.sources.r1.selector.mapping.event = c1
#start是map中一个value值,相同的event进入channel2中
a1.sources.r1.selector.mapping.start = c2
#步骤四:指定channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop105:9092,hadoop106:9092,hadoop107:9092
a1.channels.c1.kafka.topic =event
a1.channels.c1.parseAsFlumeEvent=false
a1.channels.c2.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c2.kafka.bootstrap.servers = hadoop105:9092,hadoop106:9092,hadoop107:9092
a1.channels.c2.kafka.topic =start
a1.channels.c2.parseAsFlumeEvent=false
#第五步:连接source和channel
a1.sources.r1.channels = c1 c2
2.2.3 编写flume2的配置文件
#步骤一:agent Name
a1.sources=r1 r2
a1.channels=c1 c2
a1.sinks=k1 k2
#步骤二:kafka source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop105:9092,hadoop106:9092,hadoop107:9092
a1.sources.r1.kafka.topics=start
#步骤三:定义拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = MyInterceptor$MyBuilder
#步骤四:kafka source2
a1.sources.r2.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r2.batchSize = 5000
a1.sources.r2.batchDurationMillis = 2000
a1.sources.r2.kafka.bootstrap.servers = hadoop105:9092,hadoop106:9092,hadoop107:9092
a1.sources.r2.kafka.topics=event
#步骤五:定义拦截器
a1.sources.r2.interceptors = i1
a1.sources.r2.interceptors.i1.type = MyInterceptor$MyBuilder
#步骤六:定义channel1
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1/
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6
#步骤七:定义sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d/start
a1.sinks.k1.hdfs.filePrefix = log-
a1.sinks.k1.hdfs.round = false
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = lzop
#步骤八:定义channe2
a1.channels.c2.type = file
a1.channels.c2.checkpointDir = /opt/module/flume/checkpoint/behavior2
a1.channels.c2.dataDirs = /opt/module/flume/data/behavior2/
a1.channels.c2.maxFileSize = 2146435071
a1.channels.c2.capacity = 1000000
a1.channels.c2.keep-alive = 6
#步骤九:定义sink2
a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d/event
a1.sinks.k2.hdfs.filePrefix = log-
a1.sinks.k2.hdfs.round = false
a1.sinks.k2.hdfs.rollInterval = 10
a1.sinks.k2.hdfs.rollSize = 134217728
a1.sinks.k2.hdfs.rollCount = 0
a1.sinks.k2.hdfs.fileType = CompressedStream
a1.sinks.k2.hdfs.codeC = lzop
#步骤十:连接source 、 channel 、 sink
a1.sources.r1.channels = c1
a1.sources.r2.channels = c2
a1.sinks.k1.channel= c1
a1.sinks.k2.channel= c2
2.3 注意点
-- 1. 多个file时,不能共用一个checkpoint路径
-- 2. 定义sources 、 channels 、 sinks 时,多个source 、channel、sink之间不需要使用逗号
-- 3. 连接source和channel时,一个source对应多个channel,c1 和c2之间不需要逗号
-- 4. 一天的数据中,有两个文件,一个是start,一个是event
网友评论