美文网首页
【Flume采集日志】

【Flume采集日志】

作者: Y了个J | 来源:发表于2022-12-04 23:27 被阅读0次

Flume安装部署

安装地址

(1)Flume官网地址:http://flume.apache.org/
(2)文档查看地址:http://flume.apache.org/FlumeUserGuide.html
(3)下载地址:http://archive.apache.org/dist/flume/

安装部署

(1)将apache-flume-1.9.0-bin.tar.gz上传到linux的/opt/software目录下
(2)解压apache-flume-1.9.0-bin.tar.gz到/opt/module/目录下
(3)修改apache-flume-1.9.0-bin的名称为flume
(4)修改配置文件

tar -zxf /opt/software/apache-flume-1.9.0-bin.tar.gz -C /opt/module/
mv /opt/module/apache-flume-1.9.0-bin /opt/module/flume

cp flume-env.sh.template flume-env.sh
vi flume-env.sh
export JAVA_HOME=jdk目录

# 配置环境变量
vi ~/.bashrc 
#FLUME
export FLUME_HOME=flume目录
export PATH=$PATH:$FLUME_HOME/bin
source ~/.bashrc 

flume-ng version # 查看flume版本号

Flume Source 测试

常见类型: spooling directory, exec, syslog, avro, netcat,Taildir等
无论是Spooling Directory Source和Exec Source均不能满足动态实时收集的需求,TaildirSource可以。

SpoolingDirSource(监控一个目录)

spoolingDirsource是安全的,不会丢失数据,但采集文件时不可以被修改,且文件不能重名。

mkdir /opt/module/flume/job
vi job/file_to_logger_spooling.conf
a1.sources=r1
a1.channels=c1
a1.sinks=k1

#配置source
a1.sources.r1.type=spooldir
a1.sources.r1.spoolDir=/opt/module/flume/testdata/log/hi

#配置Channel组件
a1.channels.c1.type=memory 
a1.channels.c1.capacity=1000 
a1.channels.c1.transactionCapacity=100  

#配置Sink组件
a1.sinks.k1.type=logger  
a1.sinks.k1.maxBytesToLog=100

#将三大组件绑定到一起
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1   

启动 flume job

bin/flume-ng agent -n a1 -c conf/ -f job/file_to_logger_spooling.conf -Dflume.root.logger=INFO,console

可以看到采集完的日志文件都加上了.COMPLETED后缀

TAILDIR Souce

在/opt/module/flume/job目录下vi file_to_logger_taildir.conf

a1.sources=r1
a1.channels=c1
a1.sinks=k1

#配置source
a1.sources.r1.type=TAILDIR
a1.sources.r1.filegroups=f1 f2
a1.sources.r1.filegroups.f1=/opt/module/flume/testdata/log/hi/.*json
a1.sources.r1.filegroups.f2=/opt/module/flume/testdata/log/test/.*json
a1.sources.r1.positionFile=./taildir_position.json

#配置Channel组件
a1.channels.c1.type=memory 
a1.channels.c1.capacity=1000 
a1.channels.c1.transactionCapacity=100  

#配置Sink组件
a1.sinks.k1.type=logger  
a1.sinks.k1.maxBytesToLog=100

#将三大组件绑定到一起
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1   

启动 flume job

bin/flume-ng agent -n a1 -c conf/ -f job/file_to_logger_taildir.conf -Dflume.root.logger=INFO,console

上面两种方式都不能监控多级目录

为解决监控多级目录的问题,我们下载flume的源码,修改TaildirSource的代码,将修改好的TaildirSource模块打包 ,将 flume-taildir-source-1.9.0.jar 上传到flume的lib目录下替换原有的 flume-taildir-source-1.9.0.jar

修改 file_to_logger_taildir.conf 配置

a1.sources=r1
a1.channels=c1
a1.sinks=k1

#配置source
a1.sources.r1.type=TAILDIR
a1.sources.r1.filegroups=f1 f2
a1.sources.r1.filegroups.f1=/opt/module/flume/testdata/log/hi/.*
a1.sources.r1.filegroups.f2=/opt/module/flume/testdata/log/test/.*
a1.sources.r1.positionFile=./taildir_position.json

#配置Channel组件
a1.channels.c1.type=memory 
a1.channels.c1.capacity=1000 
a1.channels.c1.transactionCapacity=100  

#配置Sink组件
a1.sinks.k1.type=logger  
a1.sinks.k1.maxBytesToLog=100

#将三大组件绑定到一起
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1   

启动 flume job

bin/flume-ng agent -n a1 -c conf/ -f job/file_to_logger_taildir.conf -Dflume.root.logger=INFO,console

经测试,成功监控到多级目录下的文件变化

项目经验

修改/opt/module/flume/conf/flume-env.sh文件,配置如下参数(测试环境暂不配置)

export JAVA_OPTS="-Xms4096m -Xmx4096m -Dcom.sun.management.jmxremote"
export JAVA_HOME=

可选择TaildirSource和KafkaChannel搭配,并配置日志校验拦截器。
选择TailDirSource和KafkaChannel的原因如下:

TailDirSource相比ExecSource的优势
TailDirSource:断点续传、多目录。Flume1.6以前需要自己自定义Source记录每次读取文件位置,实现断点续传。
ExecSource可以实时搜集数据,但是在Flume不运行或者Shell命令出错的情况下,数据将会丢失。
采用Kafka Channel,省去了Sink,提高了效率。

在Flume的job目录下创建file_to_kafka.conf

#定义组件
a1.sources = r1
a1.channels = c1

#配置source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.*
a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json
# a1.sources.r1.interceptors =  i1
# a1.sources.r1.interceptors.i1.type = com.atguigu.gmall.flume.interceptor.ETLInterceptor$Builder

#配置channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092
a1.channels.c1.kafka.topic = topic_log
a1.channels.c1.parseAsFlumeEvent = false

#组装 
a1.sources.r1.channels = c1
Flume从Kafka同步数据到HDFS

在Flume的job目录下创建kafka_to_hdfs_log.conf

#定义组件
a1.sources=r1
a1.channels=c1
a1.sinks=k1

#配置source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
#a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r1.kafka.bootstrap.servers = hadoop2:9092

a1.sources.r1.kafka.topics=topic_log
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.gmall.flume.interceptor.TimestampInterceptor$Builder

#配置channel
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6

#配置sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = log
a1.sinks.k1.hdfs.round = false

a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0

#控制输出文件类型
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip

#组装 
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

编写 flume TimestampInterceptor,将日志时间戳放到header当中的timestamp字段当中,这样hdfs会按日期分目录

public class TimestampInterceptor implements Interceptor {

    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {
        //1 获取body和header
        byte[] body = event.getBody();
        String log = new String(body, StandardCharsets.UTF_8);
        Map<String, String> headers = event.getHeaders();

        //2 将log当中的ts字段解析出来
        JSONObject jsonObject = JSONObject.parseObject(log);
        String ts = jsonObject.getString("ts");

        //3 将ts字段 放到header当中的timestamp字段当中
        headers.put("timestamp", ts);
        return event;
    }

    @Override
    public List<Event> intercept(List<Event> list) {
        for (Event event : list) {
            intercept(event);
        }
        return list;
    }

    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder {

        @Override
        public Interceptor build() {
            return new TimestampInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }
}

把上面代码打成flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar放到/opt/module/flume/lib下

启动 flume job

bin/flume-ng agent -n a1 -c conf/ -f job/kafka_to_hdfs_log.conf -Dflume.root.logger=info,console

往kafka推条数据测试

   @Test
    public void testProducerSend() {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop2:9092");
        properties.put(ProducerConfig.ACKS_CONFIG, "all");
        properties.put(ProducerConfig.RETRIES_CONFIG, "0");
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
        properties.put(ProducerConfig.LINGER_MS_CONFIG, "1");
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(properties);

        String value = "{\"database\":\"gmall\",\"table\":\"cart_info\",\"type\":\"update\",\"ts\":1670340710200,\"xid\":13090,\"xoffset\":1573,\"data\":{\"id\":100924,\"user_id\":\"93\",\"sku_id\":16,\"cart_price\":4488.00,\"sku_num\":1,\"img_url\":\"http://47.93.148.192:8080/group1/M00/00/02/rBHu8l-sklaALrngAAHGDqdpFtU741.jpg\",\"sku_name\":\"华为 HUAWEI P40 麒麟990 5G SoC芯片 5000万超感知徕卡三摄 30倍数字变焦 8GB+128GB亮黑色全网通5G手机\",\"is_checked\":null,\"create_time\":\"2020-06-14 09:28:57\",\"operate_time\":null,\"is_ordered\":1,\"order_time\":\"2021-10-17 09:28:58\",\"source_type\":\"2401\",\"source_id\":null},\"old\":{\"is_ordered\":0,\"order_time\":null}}";
        ProducerRecord<String, String> record = new ProducerRecord<>("topic_log", value);
        producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                System.out.println("partition : " + recordMetadata.partition() + " , offset : " + recordMetadata.offset());
            }
        });

        // 所有的通道打开都需要关闭
        producer.close();
    }

然后访问发现有文件生成了
http://hadoop2:9870/explorer.html#/origin_data/gmall/log/topic_log

截屏2022-12-06 下午11.44.07.png

相关文章

  • Flume

    日志采集框架Flume 1 Flume介绍 1.概述 Flume是一个分布式、可靠、和高可用的海量日志采集、聚合和...

  • Flume基础学习

    Flume是一款非常优秀的日志采集工具。支持多种形式的日志采集,作为apache的顶级开源项目,Flume再大数据...

  • Flume的安装与使用详解

    Flume的简单介绍Flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。Flume可以采集文件...

  • (一)Flume的安装

    Flume的简单介绍Flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。Flume可以采集文件...

  • Flume的安装与使用详解

    Flume的简单介绍Flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。Flume可以采集文件...

  • Flume的安装与使用详解

    Flume的简单介绍Flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。Flume可以采集文件...

  • Flume的安装与使用详解

    Flume的简单介绍Flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。Flume可以采集文件...

  • flume的安装和使用

    Flume的简单介绍Flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。Flume可以采集文件...

  • Flume简介

    1、概述  Flume是一个分布式、可靠、高可用的海量日志采集、聚合和传输的系统。 Flume可以采集...

  • Flume构建日志采集系统

    title: Flume构建日志采集系统date: 2018-02-03 19:45tags: [flume,k...

网友评论

      本文标题:【Flume采集日志】

      本文链接:https://www.haomeiwen.com/subject/cckdfdtx.html