Flume安装部署
安装地址
(1)Flume官网地址:http://flume.apache.org/
(2)文档查看地址:http://flume.apache.org/FlumeUserGuide.html
(3)下载地址:http://archive.apache.org/dist/flume/
安装部署
(1)将apache-flume-1.9.0-bin.tar.gz上传到linux的/opt/software目录下
(2)解压apache-flume-1.9.0-bin.tar.gz到/opt/module/目录下
(3)修改apache-flume-1.9.0-bin的名称为flume
(4)修改配置文件
tar -zxf /opt/software/apache-flume-1.9.0-bin.tar.gz -C /opt/module/
mv /opt/module/apache-flume-1.9.0-bin /opt/module/flume
cp flume-env.sh.template flume-env.sh
vi flume-env.sh
export JAVA_HOME=jdk目录
# 配置环境变量
vi ~/.bashrc
#FLUME
export FLUME_HOME=flume目录
export PATH=$PATH:$FLUME_HOME/bin
source ~/.bashrc
flume-ng version # 查看flume版本号
Flume Source 测试
常见类型: spooling directory, exec, syslog, avro, netcat,Taildir等
无论是Spooling Directory Source和Exec Source均不能满足动态实时收集的需求,TaildirSource可以。
SpoolingDirSource(监控一个目录)
spoolingDirsource是安全的,不会丢失数据,但采集文件时不可以被修改,且文件不能重名。
mkdir /opt/module/flume/job
vi job/file_to_logger_spooling.conf
a1.sources=r1
a1.channels=c1
a1.sinks=k1
#配置source
a1.sources.r1.type=spooldir
a1.sources.r1.spoolDir=/opt/module/flume/testdata/log/hi
#配置Channel组件
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
#配置Sink组件
a1.sinks.k1.type=logger
a1.sinks.k1.maxBytesToLog=100
#将三大组件绑定到一起
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
启动 flume job
bin/flume-ng agent -n a1 -c conf/ -f job/file_to_logger_spooling.conf -Dflume.root.logger=INFO,console
可以看到采集完的日志文件都加上了.COMPLETED
后缀
TAILDIR Souce
在/opt/module/flume/job目录下vi file_to_logger_taildir.conf
a1.sources=r1
a1.channels=c1
a1.sinks=k1
#配置source
a1.sources.r1.type=TAILDIR
a1.sources.r1.filegroups=f1 f2
a1.sources.r1.filegroups.f1=/opt/module/flume/testdata/log/hi/.*json
a1.sources.r1.filegroups.f2=/opt/module/flume/testdata/log/test/.*json
a1.sources.r1.positionFile=./taildir_position.json
#配置Channel组件
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
#配置Sink组件
a1.sinks.k1.type=logger
a1.sinks.k1.maxBytesToLog=100
#将三大组件绑定到一起
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
启动 flume job
bin/flume-ng agent -n a1 -c conf/ -f job/file_to_logger_taildir.conf -Dflume.root.logger=INFO,console
上面两种方式都不能监控多级目录
为解决监控多级目录的问题,我们下载flume的源码,修改TaildirSource的代码,将修改好的TaildirSource模块打包 ,将 flume-taildir-source-1.9.0.jar 上传到flume的lib目录下替换原有的 flume-taildir-source-1.9.0.jar
修改 file_to_logger_taildir.conf 配置
a1.sources=r1
a1.channels=c1
a1.sinks=k1
#配置source
a1.sources.r1.type=TAILDIR
a1.sources.r1.filegroups=f1 f2
a1.sources.r1.filegroups.f1=/opt/module/flume/testdata/log/hi/.*
a1.sources.r1.filegroups.f2=/opt/module/flume/testdata/log/test/.*
a1.sources.r1.positionFile=./taildir_position.json
#配置Channel组件
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
#配置Sink组件
a1.sinks.k1.type=logger
a1.sinks.k1.maxBytesToLog=100
#将三大组件绑定到一起
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
启动 flume job
bin/flume-ng agent -n a1 -c conf/ -f job/file_to_logger_taildir.conf -Dflume.root.logger=INFO,console
经测试,成功监控到多级目录下的文件变化
项目经验
修改/opt/module/flume/conf/flume-env.sh文件,配置如下参数(测试环境暂不配置)
export JAVA_OPTS="-Xms4096m -Xmx4096m -Dcom.sun.management.jmxremote"
export JAVA_HOME=
可选择TaildirSource和KafkaChannel搭配,并配置日志校验拦截器。
选择TailDirSource和KafkaChannel的原因如下:
TailDirSource相比ExecSource的优势
TailDirSource:断点续传、多目录。Flume1.6以前需要自己自定义Source记录每次读取文件位置,实现断点续传。
ExecSource可以实时搜集数据,但是在Flume不运行或者Shell命令出错的情况下,数据将会丢失。
采用Kafka Channel,省去了Sink,提高了效率。
在Flume的job目录下创建file_to_kafka.conf
#定义组件
a1.sources = r1
a1.channels = c1
#配置source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.*
a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json
# a1.sources.r1.interceptors = i1
# a1.sources.r1.interceptors.i1.type = com.atguigu.gmall.flume.interceptor.ETLInterceptor$Builder
#配置channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092
a1.channels.c1.kafka.topic = topic_log
a1.channels.c1.parseAsFlumeEvent = false
#组装
a1.sources.r1.channels = c1
Flume从Kafka同步数据到HDFS
在Flume的job目录下创建kafka_to_hdfs_log.conf
#定义组件
a1.sources=r1
a1.channels=c1
a1.sinks=k1
#配置source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
#a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r1.kafka.bootstrap.servers = hadoop2:9092
a1.sources.r1.kafka.topics=topic_log
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.gmall.flume.interceptor.TimestampInterceptor$Builder
#配置channel
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6
#配置sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = log
a1.sinks.k1.hdfs.round = false
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
#控制输出文件类型
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip
#组装
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
编写 flume TimestampInterceptor,将日志时间戳放到header当中的timestamp字段当中,这样hdfs会按日期分目录
public class TimestampInterceptor implements Interceptor {
@Override
public void initialize() {
}
@Override
public Event intercept(Event event) {
//1 获取body和header
byte[] body = event.getBody();
String log = new String(body, StandardCharsets.UTF_8);
Map<String, String> headers = event.getHeaders();
//2 将log当中的ts字段解析出来
JSONObject jsonObject = JSONObject.parseObject(log);
String ts = jsonObject.getString("ts");
//3 将ts字段 放到header当中的timestamp字段当中
headers.put("timestamp", ts);
return event;
}
@Override
public List<Event> intercept(List<Event> list) {
for (Event event : list) {
intercept(event);
}
return list;
}
@Override
public void close() {
}
public static class Builder implements Interceptor.Builder {
@Override
public Interceptor build() {
return new TimestampInterceptor();
}
@Override
public void configure(Context context) {
}
}
}
把上面代码打成flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar放到/opt/module/flume/lib下
启动 flume job
bin/flume-ng agent -n a1 -c conf/ -f job/kafka_to_hdfs_log.conf -Dflume.root.logger=info,console
往kafka推条数据测试
@Test
public void testProducerSend() {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop2:9092");
properties.put(ProducerConfig.ACKS_CONFIG, "all");
properties.put(ProducerConfig.RETRIES_CONFIG, "0");
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
properties.put(ProducerConfig.LINGER_MS_CONFIG, "1");
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(properties);
String value = "{\"database\":\"gmall\",\"table\":\"cart_info\",\"type\":\"update\",\"ts\":1670340710200,\"xid\":13090,\"xoffset\":1573,\"data\":{\"id\":100924,\"user_id\":\"93\",\"sku_id\":16,\"cart_price\":4488.00,\"sku_num\":1,\"img_url\":\"http://47.93.148.192:8080/group1/M00/00/02/rBHu8l-sklaALrngAAHGDqdpFtU741.jpg\",\"sku_name\":\"华为 HUAWEI P40 麒麟990 5G SoC芯片 5000万超感知徕卡三摄 30倍数字变焦 8GB+128GB亮黑色全网通5G手机\",\"is_checked\":null,\"create_time\":\"2020-06-14 09:28:57\",\"operate_time\":null,\"is_ordered\":1,\"order_time\":\"2021-10-17 09:28:58\",\"source_type\":\"2401\",\"source_id\":null},\"old\":{\"is_ordered\":0,\"order_time\":null}}";
ProducerRecord<String, String> record = new ProducerRecord<>("topic_log", value);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
System.out.println("partition : " + recordMetadata.partition() + " , offset : " + recordMetadata.offset());
}
});
// 所有的通道打开都需要关闭
producer.close();
}
然后访问发现有文件生成了
http://hadoop2:9870/explorer.html#/origin_data/gmall/log/topic_log
网友评论