美文网首页
【Flume采集日志】

【Flume采集日志】

作者: Y了个J | 来源:发表于2022-12-04 23:27 被阅读0次

    Flume安装部署

    安装地址

    (1)Flume官网地址:http://flume.apache.org/
    (2)文档查看地址:http://flume.apache.org/FlumeUserGuide.html
    (3)下载地址:http://archive.apache.org/dist/flume/

    安装部署

    (1)将apache-flume-1.9.0-bin.tar.gz上传到linux的/opt/software目录下
    (2)解压apache-flume-1.9.0-bin.tar.gz到/opt/module/目录下
    (3)修改apache-flume-1.9.0-bin的名称为flume
    (4)修改配置文件

    tar -zxf /opt/software/apache-flume-1.9.0-bin.tar.gz -C /opt/module/
    mv /opt/module/apache-flume-1.9.0-bin /opt/module/flume
    
    cp flume-env.sh.template flume-env.sh
    vi flume-env.sh
    export JAVA_HOME=jdk目录
    
    # 配置环境变量
    vi ~/.bashrc 
    #FLUME
    export FLUME_HOME=flume目录
    export PATH=$PATH:$FLUME_HOME/bin
    source ~/.bashrc 
    
    flume-ng version # 查看flume版本号
    

    Flume Source 测试

    常见类型: spooling directory, exec, syslog, avro, netcat,Taildir等
    无论是Spooling Directory Source和Exec Source均不能满足动态实时收集的需求,TaildirSource可以。

    SpoolingDirSource(监控一个目录)

    spoolingDirsource是安全的,不会丢失数据,但采集文件时不可以被修改,且文件不能重名。

    mkdir /opt/module/flume/job
    vi job/file_to_logger_spooling.conf
    
    a1.sources=r1
    a1.channels=c1
    a1.sinks=k1
    
    #配置source
    a1.sources.r1.type=spooldir
    a1.sources.r1.spoolDir=/opt/module/flume/testdata/log/hi
    
    #配置Channel组件
    a1.channels.c1.type=memory 
    a1.channels.c1.capacity=1000 
    a1.channels.c1.transactionCapacity=100  
    
    #配置Sink组件
    a1.sinks.k1.type=logger  
    a1.sinks.k1.maxBytesToLog=100
    
    #将三大组件绑定到一起
    a1.sources.r1.channels=c1
    a1.sinks.k1.channel=c1   
    

    启动 flume job

    bin/flume-ng agent -n a1 -c conf/ -f job/file_to_logger_spooling.conf -Dflume.root.logger=INFO,console
    

    可以看到采集完的日志文件都加上了.COMPLETED后缀

    TAILDIR Souce

    在/opt/module/flume/job目录下vi file_to_logger_taildir.conf

    a1.sources=r1
    a1.channels=c1
    a1.sinks=k1
    
    #配置source
    a1.sources.r1.type=TAILDIR
    a1.sources.r1.filegroups=f1 f2
    a1.sources.r1.filegroups.f1=/opt/module/flume/testdata/log/hi/.*json
    a1.sources.r1.filegroups.f2=/opt/module/flume/testdata/log/test/.*json
    a1.sources.r1.positionFile=./taildir_position.json
    
    #配置Channel组件
    a1.channels.c1.type=memory 
    a1.channels.c1.capacity=1000 
    a1.channels.c1.transactionCapacity=100  
    
    #配置Sink组件
    a1.sinks.k1.type=logger  
    a1.sinks.k1.maxBytesToLog=100
    
    #将三大组件绑定到一起
    a1.sources.r1.channels=c1
    a1.sinks.k1.channel=c1   
    

    启动 flume job

    bin/flume-ng agent -n a1 -c conf/ -f job/file_to_logger_taildir.conf -Dflume.root.logger=INFO,console
    

    上面两种方式都不能监控多级目录

    为解决监控多级目录的问题,我们下载flume的源码,修改TaildirSource的代码,将修改好的TaildirSource模块打包 ,将 flume-taildir-source-1.9.0.jar 上传到flume的lib目录下替换原有的 flume-taildir-source-1.9.0.jar

    修改 file_to_logger_taildir.conf 配置

    a1.sources=r1
    a1.channels=c1
    a1.sinks=k1
    
    #配置source
    a1.sources.r1.type=TAILDIR
    a1.sources.r1.filegroups=f1 f2
    a1.sources.r1.filegroups.f1=/opt/module/flume/testdata/log/hi/.*
    a1.sources.r1.filegroups.f2=/opt/module/flume/testdata/log/test/.*
    a1.sources.r1.positionFile=./taildir_position.json
    
    #配置Channel组件
    a1.channels.c1.type=memory 
    a1.channels.c1.capacity=1000 
    a1.channels.c1.transactionCapacity=100  
    
    #配置Sink组件
    a1.sinks.k1.type=logger  
    a1.sinks.k1.maxBytesToLog=100
    
    #将三大组件绑定到一起
    a1.sources.r1.channels=c1
    a1.sinks.k1.channel=c1   
    

    启动 flume job

    bin/flume-ng agent -n a1 -c conf/ -f job/file_to_logger_taildir.conf -Dflume.root.logger=INFO,console
    

    经测试,成功监控到多级目录下的文件变化

    项目经验

    修改/opt/module/flume/conf/flume-env.sh文件,配置如下参数(测试环境暂不配置)

    export JAVA_OPTS="-Xms4096m -Xmx4096m -Dcom.sun.management.jmxremote"
    export JAVA_HOME=
    

    可选择TaildirSource和KafkaChannel搭配,并配置日志校验拦截器。
    选择TailDirSource和KafkaChannel的原因如下:

    TailDirSource相比ExecSource的优势
    TailDirSource:断点续传、多目录。Flume1.6以前需要自己自定义Source记录每次读取文件位置,实现断点续传。
    ExecSource可以实时搜集数据,但是在Flume不运行或者Shell命令出错的情况下,数据将会丢失。
    采用Kafka Channel,省去了Sink,提高了效率。

    在Flume的job目录下创建file_to_kafka.conf

    #定义组件
    a1.sources = r1
    a1.channels = c1
    
    #配置source
    a1.sources.r1.type = TAILDIR
    a1.sources.r1.filegroups = f1
    a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.*
    a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json
    # a1.sources.r1.interceptors =  i1
    # a1.sources.r1.interceptors.i1.type = com.atguigu.gmall.flume.interceptor.ETLInterceptor$Builder
    
    #配置channel
    a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
    a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092
    a1.channels.c1.kafka.topic = topic_log
    a1.channels.c1.parseAsFlumeEvent = false
    
    #组装 
    a1.sources.r1.channels = c1
    
    Flume从Kafka同步数据到HDFS

    在Flume的job目录下创建kafka_to_hdfs_log.conf

    #定义组件
    a1.sources=r1
    a1.channels=c1
    a1.sinks=k1
    
    #配置source1
    a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
    a1.sources.r1.batchSize = 5000
    a1.sources.r1.batchDurationMillis = 2000
    #a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
    a1.sources.r1.kafka.bootstrap.servers = hadoop2:9092
    
    a1.sources.r1.kafka.topics=topic_log
    a1.sources.r1.interceptors = i1
    a1.sources.r1.interceptors.i1.type = com.atguigu.gmall.flume.interceptor.TimestampInterceptor$Builder
    
    #配置channel
    a1.channels.c1.type = file
    a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
    a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1
    a1.channels.c1.maxFileSize = 2146435071
    a1.channels.c1.capacity = 1000000
    a1.channels.c1.keep-alive = 6
    
    #配置sink
    a1.sinks.k1.type = hdfs
    a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d
    a1.sinks.k1.hdfs.filePrefix = log
    a1.sinks.k1.hdfs.round = false
    
    a1.sinks.k1.hdfs.rollInterval = 10
    a1.sinks.k1.hdfs.rollSize = 134217728
    a1.sinks.k1.hdfs.rollCount = 0
    
    #控制输出文件类型
    a1.sinks.k1.hdfs.fileType = CompressedStream
    a1.sinks.k1.hdfs.codeC = gzip
    
    #组装 
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    

    编写 flume TimestampInterceptor,将日志时间戳放到header当中的timestamp字段当中,这样hdfs会按日期分目录

    public class TimestampInterceptor implements Interceptor {
    
        @Override
        public void initialize() {
    
        }
    
        @Override
        public Event intercept(Event event) {
            //1 获取body和header
            byte[] body = event.getBody();
            String log = new String(body, StandardCharsets.UTF_8);
            Map<String, String> headers = event.getHeaders();
    
            //2 将log当中的ts字段解析出来
            JSONObject jsonObject = JSONObject.parseObject(log);
            String ts = jsonObject.getString("ts");
    
            //3 将ts字段 放到header当中的timestamp字段当中
            headers.put("timestamp", ts);
            return event;
        }
    
        @Override
        public List<Event> intercept(List<Event> list) {
            for (Event event : list) {
                intercept(event);
            }
            return list;
        }
    
        @Override
        public void close() {
    
        }
    
        public static class Builder implements Interceptor.Builder {
    
            @Override
            public Interceptor build() {
                return new TimestampInterceptor();
            }
    
            @Override
            public void configure(Context context) {
    
            }
        }
    }
    

    把上面代码打成flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar放到/opt/module/flume/lib下

    启动 flume job

    bin/flume-ng agent -n a1 -c conf/ -f job/kafka_to_hdfs_log.conf -Dflume.root.logger=info,console
    

    往kafka推条数据测试

       @Test
        public void testProducerSend() {
            Properties properties = new Properties();
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop2:9092");
            properties.put(ProducerConfig.ACKS_CONFIG, "all");
            properties.put(ProducerConfig.RETRIES_CONFIG, "0");
            properties.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
            properties.put(ProducerConfig.LINGER_MS_CONFIG, "1");
            properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    
            Producer<String, String> producer = new KafkaProducer<>(properties);
    
            String value = "{\"database\":\"gmall\",\"table\":\"cart_info\",\"type\":\"update\",\"ts\":1670340710200,\"xid\":13090,\"xoffset\":1573,\"data\":{\"id\":100924,\"user_id\":\"93\",\"sku_id\":16,\"cart_price\":4488.00,\"sku_num\":1,\"img_url\":\"http://47.93.148.192:8080/group1/M00/00/02/rBHu8l-sklaALrngAAHGDqdpFtU741.jpg\",\"sku_name\":\"华为 HUAWEI P40 麒麟990 5G SoC芯片 5000万超感知徕卡三摄 30倍数字变焦 8GB+128GB亮黑色全网通5G手机\",\"is_checked\":null,\"create_time\":\"2020-06-14 09:28:57\",\"operate_time\":null,\"is_ordered\":1,\"order_time\":\"2021-10-17 09:28:58\",\"source_type\":\"2401\",\"source_id\":null},\"old\":{\"is_ordered\":0,\"order_time\":null}}";
            ProducerRecord<String, String> record = new ProducerRecord<>("topic_log", value);
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    System.out.println("partition : " + recordMetadata.partition() + " , offset : " + recordMetadata.offset());
                }
            });
    
            // 所有的通道打开都需要关闭
            producer.close();
        }
    

    然后访问发现有文件生成了
    http://hadoop2:9870/explorer.html#/origin_data/gmall/log/topic_log

    截屏2022-12-06 下午11.44.07.png

    相关文章

      网友评论

          本文标题:【Flume采集日志】

          本文链接:https://www.haomeiwen.com/subject/cckdfdtx.html