前言
前段时间做个一个采用flume转运Kafka数据到S3的案例,今天做个记录。
随着云服务越来越流行,这种场景会越来也常见,也希望能帮到有需要的人。
简介
Flume 是一个分布式、高可靠、高可用的用来收集、聚合、转移不同来源的大量日志数据到中央数据仓库的工具,架构图如下:
Flume架构图.png
Flume原理今天就不过多去说了,本篇主要结合Flume1.9源码和Hadoop3.3.0在IDEA中实现将Kafka的数据转运到S3。
环境搭建
-
官网下载源码,解压到本地idea工作目录中,然后导入项目
http://www.apache.org/dyn/closer.lua/flume/1.9.0/apache-flume-1.9.0-src.tar.gz -
Flume写S3用的是flume-hdfs-sink,所以需要在该模块下的pom中添加aws相关的依赖包
<!-- https://mvnrepository.com/artifact/com.google.guava/guava -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-aws -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>3.3.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs-client -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs-client</artifactId>
<version>3.3.0</version>
<scope>provided</scope>
</dependency>
- 主模块flume-ng-node中添加hadoop配置文件core-site.xml,配置文件中添加S3的认证信息
<configuration>
<property>
<name>fs.s3a.connection.ssl.enabled</name>
<value>false</value>
</property>
<property>
<name>fs.s3a.bucket.probe</name>
<value>0</value>
</property>
<property>
<name>fs.s3a.access.key</name>
<value>xxxxxxxxxxxxxxxxxxxx</value>
</property>
<property>
<name>fs.s3a.secret.key</name>
<value>xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx</value>
</property>
</configuration>
- 主模块flume-ng-node中创建flume配置文件,例如创建kafka-flume-s3.conf文件
# 组件
a1.sources= r1
a1.channels= c1
a1.sinks= k1
# source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = 127.0.0.1:9092
a1.sources.r1.kafka.topics = basic
a1.sources.r1.interceptors=i1
a1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.PidAndTimeStampInterceptor$Builder
# channel1
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = ./checkpoint/
a1.channels.c1.dataDirs = ./data/
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 10000000
a1.channels.c1.keep-alive = 6
# 不要产生大量小文件,生产环境rollInterval配置为3600
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = s3a://unicorn-offline/pid=%{pid}/dt=%Y%m%d
a1.sinks.k1.hdfs.filePrefix = log.
a1.sinks.k1.hdfs.fileType = SequenceFile
//a1.sinks.k1.hdfs.codeC = bzip2
a1.sinks.k1.hdfs.rollInterval = 60
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.awsRegion = us-west-1
## 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1
- 自定义拦截器,上面配置中已经出现了拦截器,需要在父工程中新建一个module,取名flume-ng-interceptor,然后将坐标写入主模块flume-ng-node的pom中,创建拦截器主类PidAndTimeStampInterceptor
package org.apache.flume.interceptor;
import com.alibaba.fastjson.JSONObject;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.flume.Context;
import org.apache.flume.Event;
public class PidAndTimeStampInterceptor implements Interceptor {
private ArrayList<Event> events = new ArrayList<>();
public void initialize() {}
public Event intercept(Event event) {
Map<String, String> headers = event.getHeaders();
String log = new String(event.getBody(), StandardCharsets.UTF_8);
JSONObject jsonObject = JSONObject.parseObject(log);
String ts = jsonObject.getString("timestamp");
String pid = jsonObject.getString("pid");
headers.put("timestamp", ts);
headers.put("pid", pid);
return event;
}
public List<Event> intercept(List<Event> list) {
this.events.clear();
for (Event event : list)
this.events.add(intercept(event));
return this.events;
}
public void close() {}
public static class Builder implements Interceptor.Builder {
public Interceptor build() {
return new PidAndTimeStampInterceptor();
}
public void configure(Context context) {}
}
}
配置启动类
main方法配置kafka-flume-s3需要配置绝对路径,上面的框太小,写不下。
然后当前这种情况是直接在idea中开发,部署到服务器上同样适用,只需要把相关的配置放到合适的位置即可。
结
大数据所要处理的数据一般都是会先收集到kafka做缓存的,然后再对接各种处理需求。存储到S3一般是做数据备份,采用Flume去做这个ETL是再好不过。
网友评论