美文网首页
尚硅谷大数据技术之Flume

尚硅谷大数据技术之Flume

作者: 尚硅谷教育 | 来源:发表于2018-12-06 15:04 被阅读3次

    第5章 Flume高级之自定义MySQLSource

    5.1 自定义Source说明

    Source是负责接收数据到Flume Agent的组件。Source组件可以处理各种类型、各种格式的日志数据,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy。官方提供的source类型已经很多,但是有时候并不能满足实际开发当中的需求,此时我们就需要根据实际需求自定义某些Source。

    如:实时监控MySQL,从MySQL中获取数据传输到HDFS或者其他存储框架,所以此时需要我们自己实现MySQLSource。

    官方也提供了自定义source的接口:

    官网说明:https://flume.apache.org/FlumeDeveloperGuide.html#source

    5.3 自定义MySQLSource组成

    image.png
    5.2 自定义MySQLSource步骤
    根据官方说明自定义MySqlSource需要继承AbstractSource类并实现Configurable和PollableSource接口。
    实现相应方法:
    getBackOffSleepIncrement()//暂不用
    getMaxBackOffSleepInterval()//暂不用
    configure(Context context)//初始化context
    process()//获取数据(从MySql获取数据,业务处理比较复杂,所以我们定义一个专门的类——SQLSourceHelper来处理跟MySql的交互),封装成Event并写入Channel,这个方法被循环调用
    stop()//关闭相关的资源
    5.4 代码实现
    5.4.1 导入Pom依赖
    <dependencies>
    <dependency>
    <groupId>org.apache.flume</groupId>
    <artifactId>flume-ng-core</artifactId>
    <version>1.7.0</version>
    </dependency>
    <dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.27</version>
    </dependency>
    </dependencies>
    5.4.2 添加配置信息
    在ClassPath下添加jdbc.properties和log4j. properties
    jdbc.properties:
    dbDriver=com.mysql.jdbc.Driver
    dbUrl=jdbc:mysql://hadoop102:3306/mysqlsource?useUnicode=true&characterEncoding=utf-8
    dbUser=root
    dbPassword=000000
    log4j. properties:

    --------console-----------

    log4j.rootLogger=info,myconsole,myfile
    log4j.appender.myconsole=org.apache.log4j.ConsoleAppender
    log4j.appender.myconsole.layout=org.apache.log4j.SimpleLayout

    log4j.appender.myconsole.layout.ConversionPattern =%d [%t] %-5p [%c] - %m%n

    log4j.rootLogger=error,myfile

    log4j.appender.myfile=org.apache.log4j.DailyRollingFileAppender
    log4j.appender.myfile.File=/tmp/flume.log
    log4j.appender.myfile.layout=org.apache.log4j.PatternLayout
    log4j.appender.myfile.layout.ConversionPattern =%d [%t] %-5p [%c] - %m%n
    5.4.3 SQLSourceHelper
    1)属性说明:
    属性 说明(括号中为默认值)
    runQueryDelay 查询时间间隔(10000)
    batchSize 缓存大小(100)
    startFrom 查询语句开始id(0)
    currentIndex 查询语句当前id,每次查询之前需要查元数据表
    recordSixe 查询返回条数
    table 监控的表名
    columnsToSelect 查询字段(*)
    customQuery 用户传入的查询语句
    query 查询语句
    defaultCharsetResultSet 编码格式(UTF-8)
    2)方法说明:
    方法 说明
    SQLSourceHelper(Context context) 构造方法,初始化属性及获取JDBC连接
    InitConnection(String url, String user, String pw) 获取JDBC连接
    checkMandatoryProperties() 校验相关属性是否设置(实际开发中可增加内容)
    buildQuery() 根据实际情况构建sql语句,返回值String
    executeQuery() 执行sql语句的查询操作,返回值List<List<Object>>
    getAllRows(List<List<Object>> queryResult) 将查询结果转换为String,方便后续操作
    updateOffset2DB(int size) 根据每次查询结果将offset写入元数据表
    execSql(String sql) 具体执行sql语句方法
    getStatusDBIndex(int startFrom) 获取元数据表中的offset
    queryOne(String sql) 获取元数据表中的offset实际sql语句执行方法
    close() 关闭资源
    3)代码分析


    image.png

    本教程由尚硅谷教育大数据研究院出品,如需转载请注明来源,欢迎大家关注尚硅谷公众号(atguigu)了解更多。

    相关文章

      网友评论

          本文标题:尚硅谷大数据技术之Flume

          本文链接:https://www.haomeiwen.com/subject/mhjocqtx.html