美文网首页
Flume1.7+Kafka+Streaming集成开发

Flume1.7+Kafka+Streaming集成开发

作者: 吃橘子的冬天 | 来源:发表于2018-01-10 14:49 被阅读15次

    为什么选择Flume 1.7版本呢?
    Flume1.7有了很多新功能,而且对Kafka支持更加全面。其中一个TAILDIR source可以自动监控目录下所有文件变化,在我做的项目中用的就是这个TAILDIR Source的使用。

    1. 安装

    • 下载地址:apache-flume-1.7.0

    • 下载完成后,在/opt/ebohailife/目录下上传、解压

    
    [ebohailife@e-bohailife-dat002 ~]$ tar -zxvf apache-flume-1.7.0-bin.tar.gz
    
    
    • 检测安装是否成功:/opt/ebohailife/flume/apache-flume-1.7.0-bin/bin/flume-ng version

    打印以下信息,则表示安装成功了

    
    [ebohailife@e-bohailife-dat002 conf]$ ../bin/flume-ng version
    
    Flume 1.7.0
    
    Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
    
    Revision: 511d868555dd4d16e6ce4fedc72c2d1454546707
    
    Compiled by bessbd on Wed Oct 12 20:51:10 CEST 2016
    
    From source with checksum 0d21b3ffdc55a07e1d08875872c00523
    
    

    2. 开发

    - 更改Flume配置文件

    
    [ebohailife@e-bohailife-dat002 conf]$ echo $JAVA_HOME 
    
    /opt/ebohailife/jdk1.7.0_80 
    
    [ebohailife@e-bohailife-dat002 conf]$ cp flume-env.sh.template flume-env.sh
    
    vi flume-env.sh # 修改flume-env.sh中JAVA_HOME变量的值
    
    

    - 创建Flume任务的配置文件 taildir_behavior.conf

    
    [ebohailife@e-bohailife-uat002 conf]$ vi taildir_behavior.conf 
    
    #agent命名为a1
    
    a1.sources = r1
    
    a1.sinks = k1
    
    a1.channels = c1
    
    # Describe/configure the source
    
    a1.sources.r1.type = TAILDIR
    
    a1.sources.r1.filegroups = f1
    
    a1.sources.r1.filegroups.f1 = /opt/ebohailife/logs/rest/.*behavior.*
    
    a1.sources.r1.positionFile = /tmp/flume/taildir_behavior_position.json
    
    a1.sources.r1.fileHeader = false
    
    # Describe the sink
    
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    
    a1.sinks.k1.kafka.bootstrap.servers = 10.104.0.226:9092,10.104.0.227:9092,10.104.0.228:9092
    
    a1.sinks.k1.kafka.topic = behaviorlog_r1p3
    
    a1.sinks.k1.kafka.producer.acks= 1
    
    a1.sinks.k1.kafka.producer.linger.ms = 1
    
    a1.sinks.k1.flumeBatchSize = 100
    
    # a1.sinks.k1.topic = behaviorlog_r1p3
    
    # Kafka集群Broker列表,以下属性在1.7以上版本已弃用
    
    # a1.sinks.k1.brokerList = 10.104.0.226:9092,10.104.0.227:9092,10.104.0.228:9092
    
    # a1.sinks.k1.requiredAcks = 1
    
    # a1.sinks.k1.batchSize = 100
    
    # Use a channel which buffers events in file
    
    a1.channels.c1.type = file
    
    #检查点文件存储路径
    
    a1.channels.c1.checkpointDir = /opt/ebohailife/apache-flume-1.7.0-bin/checkpoint
    
    #消息数据存储路径
    
    a1.channels.c1.dataDirs = /opt/ebohailife/apache-flume-1.7.0-bin/data
    
    # Bind the source and sink to the channel
    
    a1.sources.r1.channels = c1
    
    a1.sinks.k1.channel = c1
    
    

    - 创建Flume任务的配置文件 taildir_phoneinfo.conf

    
    [ebohailife@e-bohailife-uat002 conf]$ vi taildir_phoneinfo.conf
    
    #agent命名为a2
    
    a2.sources = r2
    
    a2.sinks = k2
    
    a2.channels = c2
    
    # Describe/configure the source
    
    a2.sources.r2.type = TAILDIR
    
    a2.sources.r2.filegroups = f1
    
    a2.sources.r2.filegroups.f1 = /opt/ebohailife/logs/rest/.*phoneinfo.*
    
    a2.sources.r2.positionFile = /tmp/flume/taildir_phoneinfo_position.json
    
    a2.sources.r2.fileHeader = false
    
    # Describe the sink
    
    a2.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
    
    a2.sinks.k2.kafka.bootstrap.servers = 10.104.0.226:9092,10.104.0.227:9092,10.104.0.228:9092
    
    a2.sinks.k2.kafka.topic = phoneinfolog_r1p3
    
    a2.sinks.k2.kafka.producer.acks= 1
    
    a2.sinks.k2.kafka.producer.linger.ms = 1
    
    a2.sinks.k2.flumeBatchSize = 100
    
    # a2.sinks.k2.topic = behaviorlog_r1p3
    
    # Kafka集群Broker列表,以下属性在1.7以上版本已弃用
    
    # a2.sinks.k2.brokerList = 10.104.0.226:9092,10.104.0.227:9092,10.104.0.228:9092
    
    # a2.sinks.k2.requiredAcks = 1
    
    # a2.sinks.k2.batchSize = 100
    
    # Use a channel which buffers events in file
    
    a2.channels.c2.type = file
    
    #检查点文件存储路径
    
    a2.channels.c2.checkpointDir = /opt/ebohailife/apache-flume-1.7.0-bin/checkpoint
    
    #消息数据存储路径
    
    a2.channels.c2.dataDirs = /opt/ebohailife/apache-flume-1.7.0-bin/data
    
    # Bind the source and sink to the channel
    
    a2.sources.r2.channels = c2
    
    a2.sinks.k2.channel = c2
    
    

    - 创建Kafka Topic

    
    # 创建topic behaviorlog_r1p3 
    
    ./kafka-topics.sh --zookeeper 10.104.0.227:2181 --create --topic behaviorlog_r1p3 --partition 3 --replication-factor 1
    
    # 创建topic phoneinfolog_r1p3
    
    ./kafka-topics.sh --zookeeper 10.104.0.227:2181 --create --topic phoneinfolog_r1p3 --partition 3 --replication-factor 1
    
    

    - 查看topic

    
    ./kafka-topics.sh  --list --zookeeper 10.104.0.227:2181
    
    

    - 启动Flume NG,后台运行

    
    ./flume-ng agent -c /opt/ebohailife/apache-flume-1.7.0-bin/conf -f /opt/ebohailife/apache-flume-1.7.0-bin/conf/taildir_behavior.conf  -n a1  >/dev/null 2>&1 &
    
    ./flume-ng agent -c /opt/ebohailife/apache-flume-1.7.0-bin/conf -f /opt/ebohailife/apache-flume-1.7.0-bin/conf/taildir_phoneinfo.conf  -n a2  >/dev/null 2>&1 & 
    
    #  -Dflume.root.logger=INFO,console 
    
    

    - 启动Kafka Consumer,后台运行

    
    # 启动behaviorlog_r1p3
    
    ./kafka-console-consumer.sh --topic behaviorlog_r1p3 --bootstrap-server 10.104.0.226:9092 >/dev/null 2>&1 &
    
     # 启动phoneinfolog_r1p3
    
    ./kafka-console-consumer.sh --topic phoneinfolog_r1p3 --bootstrap-server 10.104.0.226:9092 >/dev/null 2>&1 & 
    
    

    - 创建日志收集流

    
    # 创建phoneinfo_log流
    
    CREATE STREAM phoneinfo_log_stream(phoneinfo STRING, tmp1 STRING, ip STRING, tmp2 STRING, phone_model STRING, tmp3 STRING,  phone_version STRING, tmp4 STRING,  area STRING, tmp5 STRING, start_time TIMESTAMP, tmp6 STRING, KDDI STRING, tmp7 STRING, app_source STRING, tmp8 STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' TBLPROPERTIES("topic"="phoneinfolog_r1p3" ,"kafka.zookeeper"="10.104.0.227:2181","kafka.broker.list"="10.104.0.226:9092,10.104.0.227:9092,10.104.0.228:9092"); 
    
    # 创建behavior_log流
    
    CREATE STREAM behavior_log_stream(eventid STRING, tmp1 STRING, ip STRING, tmp2 STRING, user_id STRING, tmp3 STRING,  user_name STRING, tmp4 STRING,  in_time TIMESTAMP, tmp5 STRING, operate_time TIMESTAMP, tmp6 STRING, phone_unicode STRING, tmp7 STRING, trigger_count STRING, tmp8 STRING, diff_in_oper INT, tmp9 STRING, tel_no STRING, tmp10 STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' TBLPROPERTIES("topic"="behaviorlog_r1p3" ,"kafka.zookeeper"="10.104.0.227:2181","kafka.broker.list"="10.104.0.226:9092,10.104.0.227:9092,10.104.0.228:9092"); 
    
    

    - 创建日志表

    
    # 创建phoneinfo_log表
    
    CREATE TABLE phoneinfo_log_tab(phone STRING, ip STRING, phone_model STRING, phone_version STRING, area STRING, start_time TIMESTAMP, KDDI STRING, app_source STRING);
    
    # 创建behavior_log表
    
    CREATE TABLE behavior_log_tab(eventid STRING, ip STRING, user_id STRING, user_name STRING,  in_time TIMESTAMP,  operate_time TIMESTAMP,  
    
    phone_unicode STRING, trigger_count STRING, diff_in_oper INT, tel_no STRING);
    
    

    为防止小文件过多,进行以下设置:

    set streamsql.enable.hdfs.batchflush = true # 打开批量flush开关
    set streamsql.hdfs.batchflush.size = <num> #设置一次flush的消息个数,消息量达到该参数时flush一次
    set [streamsql.hdfs.batchflush.interval.ms](http://streamsql.hdfs.batchflush.interval.ms) = <num> #设置每过多长时间(单位为毫秒)flush一次 
    
    # 需满足 batchflush.size 和 [batchflush.interval.ms](http://batchflush.interval.ms) 其中的一个条件即会触发一次flush
    
    

    - 启动日志流

    
    # 触发phoneinfo_log_stream流计算
    
    INSERT INTO phoneinfo_log_tab SELECT phoneinfo, ip, phone_model, phone_version, area, start_time, KDDI, app_source FROM phoneinfo_log_stream; 
    
    # 触发behavior_log_stream流计算 
    
    INSERT INTO behavior_log_tab SELECT eventid, ip, user_id, user_name,  in_time,  operate_time, phone_unicode, trigger_count, diff_in_oper, tel_no FROM behavior_log_stream;
    
    

    相关文章

      网友评论

          本文标题:Flume1.7+Kafka+Streaming集成开发

          本文链接:https://www.haomeiwen.com/subject/vgsonxtx.html