美文网首页
flume经验

flume经验

作者: cotecc | 来源:发表于2018-05-10 20:38 被阅读0次

    一、source

    1. kafka source

    常用参数:

    • auto.offset.reset,只有当使用新的groupid时,earliest才会生效,因为kafka没有该groupid的offset。对于已经存在的groupid,只要有offset记录,所以会从该groupid的offset开始处消费数据
    • migrateZookeeperOffsets,不使用zookeeper记录offset
    • group.id,kafka broker会记录每个groupid的offset,如果需要重新消费数据,使用心得groupid,并配合使用auto.offset.reset=earliest

    二、channel

    1. channel对比

    • file —channel,使用local目录,存储中间数据流程,如果flume死掉重启,可以通过该文件进行恢复,不会丢失数据,但是性能较差,需要读写磁盘
    • memory — channel,使用内存做中间存储,性能较好,但是flume死掉重启,在内存尚未保存的数据,会丢失
    • kafka — channel,通过kafka记录offset,及时flume死掉重启,也不会丢失数据;缺点是不能使用interceptors进行过滤

    三、sink

    1. hdfs

    常用参数:

    • serializer.appendNewline,不要在每一行数据后加\n,因为有些数据自带有\n
    • rollInterval,多长时间sink一个文件到hdfs

    其他

    1. 直接使用kafka作为channel,sink到hdfs

    # 使用kafka直接作为channel(flume中断重启时,会从kafka续传数据),没有source
    # agent1是flume名称(启动时命名标识)
    
    agent1.channels = kafka-channel
    agent1.sources = no-source
    agent1.sinks = hdfs-sink1
    
    #channel是kafka
    agent1.channels.kafka-channel.type = org.apache.flume.channel.kafka.KafkaChannel
    agent1.channels.kafka-channel.kafka.bootstrap.servers = 
    agent1.channels.kafka-channel.kafka.topic = 
    agent1.channels.kafka-channel.kafka.consumer.group.id = 
    agent1.channels.kafka-channel.migrateZookeeperOffsets = false
    #agent1.channels.kafka-source1.kafka.consumer.auto.offset.reset = earliest
    agent1.channels.kafka-channel.parseAsFlumeEvent = false
    
    
    #sink数据到hdfs,配置
    agent1.sinks.hdfs-sink1.channel = kafka-channel
    agent1.sinks.hdfs-sink1.type = hdfs
    agent1.sinks.hdfs-sink1.hdfs.path = /home/dt=%Y%m%d
    agent1.sinks.hdfs-sink1.hdfs.filePrefix = events-
    agent1.sinks.hdfs-sink1.hdfs.useLocalTimeStamp = true
    agent1.sinks.hdfs-sink1.hdfs.writeFormat = Text
    agent1.sinks.hdfs-sink1.hdfs.fileType = DataStream
    agent1.sinks.hdfs-sink1.hdfs.rollInterval = 30
    agent1.sinks.hdfs-sink1.hdfs.rollSize = 0
    agent1.sinks.hdfs-sink1.hdfs.rollCount = 0
    agent1.sinks.hdfs-sink1.hdfs.batchSize = 100
    agent1.sinks.hdfs-sink1.hdfs.txnEventMax = 1000
    agent1.sinks.hdfs-sink1.hdfs.callTimeout = 60000
    agent1.sinks.hdfs-sink1.hdfs.appendTimeout = 60000
    agent1.sinks.hdfs-sink1.serializer.appendNewline = false
    

    2. 过滤source数据

    # source中数据,匹配正则的才流到下一层
    agent1.sources.kafka-source1.interceptors = i1
    agent1.sources.kafka-source1.interceptors.i1.type = regex_filter
    agent1.sources.kafka-source1.interceptors.i1.regex = glog
    

    相关文章

      网友评论

          本文标题:flume经验

          本文链接:https://www.haomeiwen.com/subject/lavtdftx.html