美文网首页
Flume采集数据Sink到S3

Flume采集数据Sink到S3

作者: Coder小咚 | 来源:发表于2021-11-20 00:10 被阅读0次

前言

前段时间做个一个采用flume转运Kafka数据到S3的案例,今天做个记录。

随着云服务越来越流行,这种场景会越来也常见,也希望能帮到有需要的人。

简介

Flume 是一个分布式、高可靠、高可用的用来收集、聚合、转移不同来源的大量日志数据到中央数据仓库的工具,架构图如下:


Flume架构图.png

Flume原理今天就不过多去说了,本篇主要结合Flume1.9源码和Hadoop3.3.0在IDEA中实现将Kafka的数据转运到S3。

环境搭建

<!-- https://mvnrepository.com/artifact/com.google.guava/guava -->
    <dependency>
      <groupId>com.google.guava</groupId>
      <artifactId>guava</artifactId>
    </dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-aws -->
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-aws</artifactId>
      <version>3.3.0</version>
    </dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs-client -->
  <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-hdfs-client</artifactId>
      <version>3.3.0</version>
      <scope>provided</scope>
    </dependency>
  • 主模块flume-ng-node中添加hadoop配置文件core-site.xml,配置文件中添加S3的认证信息
<configuration>
    <property>
        <name>fs.s3a.connection.ssl.enabled</name>
        <value>false</value>
    </property>

    <property>
        <name>fs.s3a.bucket.probe</name>
        <value>0</value>
    </property>

    <property>
        <name>fs.s3a.access.key</name>
        <value>xxxxxxxxxxxxxxxxxxxx</value>
    </property>

    <property>
        <name>fs.s3a.secret.key</name>
        <value>xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx</value>
    </property>
</configuration>
  • 主模块flume-ng-node中创建flume配置文件,例如创建kafka-flume-s3.conf文件
# 组件
a1.sources= r1
a1.channels= c1
a1.sinks= k1

# source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = 127.0.0.1:9092
a1.sources.r1.kafka.topics = basic
a1.sources.r1.interceptors=i1
a1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.PidAndTimeStampInterceptor$Builder

# channel1
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = ./checkpoint/
a1.channels.c1.dataDirs = ./data/
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 10000000
a1.channels.c1.keep-alive = 6

# 不要产生大量小文件,生产环境rollInterval配置为3600
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = s3a://unicorn-offline/pid=%{pid}/dt=%Y%m%d
a1.sinks.k1.hdfs.filePrefix = log.
a1.sinks.k1.hdfs.fileType = SequenceFile
//a1.sinks.k1.hdfs.codeC = bzip2

a1.sinks.k1.hdfs.rollInterval = 60
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.awsRegion = us-west-1

## 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1
  • 自定义拦截器,上面配置中已经出现了拦截器,需要在父工程中新建一个module,取名flume-ng-interceptor,然后将坐标写入主模块flume-ng-node的pom中,创建拦截器主类PidAndTimeStampInterceptor
package org.apache.flume.interceptor;

import com.alibaba.fastjson.JSONObject;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.flume.Context;
import org.apache.flume.Event;

public class PidAndTimeStampInterceptor implements Interceptor {
    private ArrayList<Event> events = new ArrayList<>();

    public void initialize() {}

    public Event intercept(Event event) {
        Map<String, String> headers = event.getHeaders();
        String log = new String(event.getBody(), StandardCharsets.UTF_8);
        JSONObject jsonObject = JSONObject.parseObject(log);
        String ts = jsonObject.getString("timestamp");
        String pid = jsonObject.getString("pid");
        headers.put("timestamp", ts);
        headers.put("pid", pid);
        return event;
    }

    public List<Event> intercept(List<Event> list) {
        this.events.clear();
        for (Event event : list)
            this.events.add(intercept(event));
        return this.events;
    }

    public void close() {}

    public static class Builder implements Interceptor.Builder {
        public Interceptor build() {
            return new PidAndTimeStampInterceptor();
        }

        public void configure(Context context) {}
    }
}

配置启动类

main方法配置

kafka-flume-s3需要配置绝对路径,上面的框太小,写不下。

然后当前这种情况是直接在idea中开发,部署到服务器上同样适用,只需要把相关的配置放到合适的位置即可。

大数据所要处理的数据一般都是会先收集到kafka做缓存的,然后再对接各种处理需求。存储到S3一般是做数据备份,采用Flume去做这个ETL是再好不过。

相关文章

网友评论

      本文标题:Flume采集数据Sink到S3

      本文链接:https://www.haomeiwen.com/subject/obiutrtx.html