Flume实时接入

作者: paopaodaxia | 来源:发表于2017-12-21 19:49 被阅读0次

    今天花了半天时间抽空看了一下flume实时接入,结合数据仓库中有部分报表有着准实时刷新的需求,需要抽数阶段近乎实时,为后面统计计算节省时间。虽然flume接入关系型数据库数据并不太合适,比如源系统删除、更新数据,flume无法处理,但是对于日志接入这种只有插入的场景还是比较合适的。

    下面介绍下flume:

    一、Flume的概念

    Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力


    image.png

    二、Flume的处理流程

    Flume的核心是把数据从数据源(source)收集过来,在将收集到的数据送到指定的目的地(sink)。为了保证输送的过程一定成功,在送到目的地(sink)之前,会先缓存数据(channel),待数据真正到达目的地(sink)后,Flume再删除自己缓存的数据。

    在整个数据的传输的过程中,流动的是event,即事务保证是在event级别进行的。那么什么是event呢?Event将传输的数据进行封装,是Flume传输数据的基本单位,如果是文本文件,通常是一行记录。Event也是事务的基本单位。Event从source,流向channel,再到sink,本身为一个字节数组,并可携带headers(头信息)信息。Event代表着一个数据的最小完整单元,从外部数据源来,向外部的目的地去。

    三、Flume的架构介绍

    Flume之所以这么神奇,是源于它自身的一个设计,这个设计就是agent。Agent本身是一个Java进程,运行在日志收集节点——所谓日志收集节点就是服务器节点。 Agent里面包含3个核心的组件:source、channel和sink,类似生产者、仓库、消费者的架构。

    • Source:source组件是专门用来收集数据的,可以处理各种类型、各种格式的日志数据,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy、自定义。
    • Channel:source组件把数据收集来以后,临时存放在channel中,即channel组件在agent中是专门用来存放临时数据的——对采集到的数据进行简单的缓存,可以存放在memory、jdbc、file等等。
    • Sink:sink组件是用于把数据发送到目的地的组件,目的地包括hdfs、logger、avro、thrift、ipc、file、null、Hbase、solr、自定义。

    四、Flume的运行机制

    Flume的核心就是一个agent,这个agent对外有两个进行交互的地方,一个是接受数据输入的source,一个是数据输出的sink,sink负责将数据发送到外部指定的目的地。source接收到数据之后,将数据发送给channel,chanel作为一个数据缓冲区会临时存放这些数据,随后sink会将channel中的数据发送到指定的地方,例如HDFS等。注意:只有在sink将channel中的数据成功发送出去之后,channel才会将临时数据进行删除,这种机制保证了数据传输的可靠性与安全性。

    五、Flume的安装与配置

    gent.sinks = HDFS
      
    agent.sources = sql-source
    agent.sources.sql-source.type = org.keedio.flume.source.SQLSource
    #mysql conn  
    agent.sources.sql-source.hibernate.connection.url = jdbc:mysql://xx.xx.xx.xx:3306/data_dev
    agent.sources.sql-source.hibernate.connection.user = root
    agent.sources.sql-source.hibernate.connection.password = xxxxxxx
    agent.sources.sql-source.hibernate.connection.autocommit = true
    agent.sources.sql-source.table = src_table_detail
    agent.sources.sql-source.start.from = 0
    agent.sources.sql-source.custom.query = select * from src_table_detail where id > $@$ order by id
    agent.sources.sql-source.batch.size = 100
    agent.sources.sql-source.max.rows = 100
    agent.sources.sql-source.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect
    agent.sources.sql-source.hibernate.connection.driver_class = com.mysql.jdbc.Driver
    agent.sources.sql-source.run.query.delay=5000 
    # Status file is used to save last readed row  
    agent.sources.sql-source.status.file.path = /home/xxxx/log 
    agent.sources.sql-source.status.file.name = sql-source.sqlSource.status
    agent.sources.sql-source.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider  
    agent.sources.sql-source.hibernate.c3p0.min_size=1  
    agent.sources.sql-source.hibernate.c3p0.max_size=10   
     
    #hdfs
    agent.sinks.HDFS.channel = ch1
    agent.sinks.HDFS.type = hdfs
    agent.sinks.HDFS.hdfs.path = hdfs://nbd-hdfs/user/hive/warehouse/xxx.db
    agent.sinks.HDFS.hdfs.fileType = DataStream
    agent.sinks.HDFS.hdfs.writeFormat = Text
    agent.sinks.HDFS.hdfs.rollSize = 268435456 
    agent.sinks.HDFS.hdfs.rollInterval = 0
    agent.sinks.HDFS.hdfs.rollCount = 0
    
    

    启动flume bin/flume-ng agent -c conf/ -f conf/flume.conf -n agent --no-reload-conf -Dflume.root.logger=INFO,console
    mysql新增的数据会实时进入hdfs

    相关文章

      网友评论

        本文标题:Flume实时接入

        本文链接:https://www.haomeiwen.com/subject/oeoawxtx.html