美文网首页
flume 到 kafka 精准一起性消费

flume 到 kafka 精准一起性消费

作者: 无来无去_A | 来源:发表于2020-07-26 22:36 被阅读0次

    一、Kafka精准一次性消费

    -- 1. 原理解析
          在离线的数仓中,flume1 -> kafka -> flume2时,kafka会给数据加时间戳。而且这是时间戳默认为系统时间,数据写到hdfs时,
          按照headers中的timestamp时间戳,进入到某一个文件夹中。
    -- 2. 问题描述:
          在实际开发中,当前一天23:59:58秒产生的数据,由于网络延迟的原因,数据采集到达kafka时,到了第二天,那么此时数据通过flume
          采集到hdfs后,存储在第二天的文件夹中。
    -- 3. 问题的验证:
          1. 修改数据生成器jar包的properties文件,将时间修改为2020-08-21, 不修改虚拟机的系统时间
          2. 开启采集通道:zk、hadoop集群、kafka、flume2、flume2
          3. 去到hdfs文件系统中查看数据,发生生成的文件为今天:2020-07-26,而不是数据生成的时间。
    --4 . 解决方法:
          在flume2中,自定义拦截器,将数据的时间戳添加到event的header中的timestamp中,这样数据就会根据数据的时间戳去到指定的文件夹中,实现精准一次性消费。
    

    1.1 编写拦截器

    -- 1. 编写拦截器的步骤:
    1. 自定义类 implements Interceptor
    2. 实现4个方法
       初始化、关闭资源、单个event处理逻辑、 多个event的处理逻辑
    3. 创建一个内部类,继承与Builder
       重写2个方法
    -- 2. 打包 -> 上传到flume/lib包下
    -- 3. 编写flume的配置文件
    

    拦截器

    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.JSONObject;
    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.interceptor.Interceptor;
    
    import java.util.List;
    
    
    
    public class TimeInterceptor implements Interceptor {
        /**
         * configure配置的初始化
         */
        @Override
        public void initialize() {
    
        }
    
        /**
         * 单个event的处理逻辑
         * @param event
         * @return
         */
        @Override
        public Event intercept(Event event) {
            // 获取event的body数据
            String body  = event.getBody().toString();
            // 将数据解成json对象
            JSONObject json = JSON.parseObject(body);
            // 获取key为ts的value值
            String ts = json.getString("ts");
            System.out.println(ts);
    
         // 判断数据不能为空
           if (ts != null){
               // 将ts增加到header中,数据必须是timestamp
               event.getHeaders().put("timestamp",ts);
           }
    
           // 返回event
            return event;
        }
    
        /**
         * 多个event的处理方式,内部调用单个event的处理逻辑
         * @param events
         * @return
         */
    
        @Override
        public List<Event> intercept(List<Event> events) {
    
            for (Event event : events) {
                intercept(event);
            }
            return events;
        }
    
        /**
         * 资源的关闭
         */
        @Override
        public void close() {
    
        }
    
        /**
         * 自定义内部类,实现Builder类,重写两个方法
         */
        public static class  MyBuilder implements   Builder{
            /**
             * 返回自定义类的对象
             * @return
             */
            @Override
            public Interceptor build() {
              return   new MyInterceptor();
            }
            @Override
            public void configure(Context context) {
    
            }
        }
    }
    

    1.2 在flume2配置文件中使用拦截器

    #步骤一:agent Name
    a1.sources=r1
    a1.channels=c1
    a1.sinks=k1
    
    #步骤二:kafka source
    #source类型
    a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
    #putlist中数据达到了6K以后提交到channel中
    a1.sources.r1.batchSize = 5000
    #拉取数据的时间达到2s以后,将获取的数据提交到channel中
    a1.sources.r1.batchDurationMillis = 2000
    #kafka的集群
    a1.sources.r1.kafka.bootstrap.servers = hadoop105:9092,hadoop106:9092,hadoop107:9092
    #订阅的话题
    a1.sources.r1.kafka.topics=topic_log
    
    #步骤三:定义拦截器
    a1.sources.r1.interceptors = i1
    #指定拦截器的类型 = 自定义拦截器中builder的实现类的全类名
    a1.sources.r1.interceptors.i1.type = MyInterceptor$MyBuilder
    
    #步骤四:定义file channel
    a1.channels.c1.type = file
    #checkpoint文件存储的地址
    a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
    # channel中event文件在磁盘中存储的地址
    a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1/
    #一个event文件存储的最大的大小
    a1.channels.c1.maxFileSize = 2146435071
    #checkpoint个数的最大容量
    a1.channels.c1.capacity = 1000000
    #当put事务将数据提交到channel队列中,channel队列没有足够的空间时,提交事务等待的最大时间
    a1.channels.c1.keep-alive = 6
    
    #步骤五:sink
    a1.sinks.k1.type = hdfs
    #上传到HDFS的路径
    a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d
    #上传文件的前缀
    a1.sinks.k1.hdfs.filePrefix = log-
    #是否按照时间滚动文件夹
    a1.sinks.k1.hdfs.round = false
    
    #多久生成一个新的文件
    a1.sinks.k1.hdfs.rollInterval = 10
    #设置每个文件的滚动大小大概是128M
    a1.sinks.k1.hdfs.rollSize = 134217728
    #文件的滚动与Event数量无关
    a1.sinks.k1.hdfs.rollCount = 0
    
    #开启压缩
    a1.sinks.k1.hdfs.fileType = CompressedStream
    a1.sinks.k1.hdfs.codeC = lzop
    
    #第六步:连接source和channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel= c1
    

    相关文章

      网友评论

          本文标题:flume 到 kafka 精准一起性消费

          本文链接:https://www.haomeiwen.com/subject/gzzglktx.html