美文网首页大数据flume玩转大数据
Flume学习系列(七)---- 自定义Sink到Mysql

Flume学习系列(七)---- 自定义Sink到Mysql

作者: 小北觅 | 来源:发表于2018-08-22 17:16 被阅读66次

    前言:接上一篇,我们总结了一下自定义Sink的流程,这次我们实现一个自己的Sink,将数据Sink到Mysql数据库中。我们还是使用 Flume学习系列(二)----实战Spooling到HDFS中的的源,但是Sink我们不用HDFS,用MysqlSink。

    一、创建数据库相关

    create database flume;
    use flume;
    
    DROP TABLE IF EXISTS `income`;
    CREATE TABLE `income` (
      `userid` varchar(36) NOT NULL , #用户唯一编号
      `county` varchar(3) NOT NULL, #县
      `town` varchar(3) NOT NULL,  #镇
      `income` int(11) DEFAULT NULL, #收入
      PRIMARY KEY (`userid`)
    ) ENGINE=InnoDB  DEFAULT CHARSET=utf8;
    

    二、编写自定义MysqlSink

        回顾一下数据的格式:(为了插入数据方便,我把最后一部分之间的####变成了逗号,同时给每个字段都加上了单引号)

    [INFO ] 2018-08-20 18:40:20 'e2a07cc1-f0e4-46e0-8bad-59303b1085fd','AMU','bml','2148168'
    

        [INFO]与[2018-08-20 18:40:20]与['e2a07cc1-f0e4-46e0-8bad-59303b1085fd','AMU','bml','2148168']之间用制表符分割。这个就是我们的body的内容,一会通过split去切。

        自定义Sink代码如下:

    package com.zhb.flume;
    
    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.sql.SQLException;
    import java.sql.Statement;
    
    import org.apache.flume.Channel;
    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.EventDeliveryException;
    import org.apache.flume.Transaction;
    import org.apache.flume.conf.Configurable;
    import org.apache.flume.sink.AbstractSink;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import com.google.common.base.Preconditions;
    
    public class MysqlSinker extends AbstractSink implements Configurable {
        private static final Logger logger = LoggerFactory.getLogger(MysqlSinker.class);
        private Connection connect;
        private Statement stmt;
        private String columnName;
        private String url;
        private String user;
        private String password;
        private String tableName;
    
        // 在整个sink结束时执行一遍
        @Override
        public synchronized void stop() {
            // TODO Auto-generated method stub
            super.stop();
        }
    
        // 在整个sink开始时执行一遍,用来初始化数据库连接
        @Override
        public synchronized void start() {
            // TODO Auto-generated method stub
            super.start();
            try {
                connect = DriverManager.getConnection(url, user, password);
                // 连接URL为 jdbc:mysql//服务器地址/数据库名 ,后面的2个参数分别是登陆用户名和密码
                stmt = connect.createStatement();
            } catch (SQLException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
    
        }
    
        // 不断循环调用,处理消息Event(本例就是插入数据库)
        public Status process() throws EventDeliveryException {
    
            // TODO Auto-generated method stub
            //事务,获取event什么的都是模板。仿照别的sink写就OK
            Channel ch = getChannel();
            Transaction txn = ch.getTransaction();
            Event event = null;
            txn.begin();
            while (true) {
                event = ch.take();
                if (event != null) {
                    break;
                }
            }
            try {
                String rawbody = new String(event.getBody());
                //logger.error("rawbody:"+rawbody);
                String body = rawbody.split("\t")[2];
                //logger.error("spiltbody:"+body);
                if (body.split(",").length == columnName.split(",").length) {
                    String sql = "insert into " + tableName + "(" + columnName + ") values(" + body + ")";  
                    //logger.error("sql:"+sql);
                    stmt.executeUpdate(sql);
                    txn.commit();
                    return Status.READY;
                } else {
                    txn.rollback();
                    return null;
                }
            } catch (Throwable th) {
                txn.rollback();
    
                if (th instanceof Error) {
                    throw (Error) th;
                } else {
                    throw new EventDeliveryException(th);
                }
            } finally {
                txn.close();
            }
    
        }
        //从配置文件中读取各种属性,并进行一些非空验证
        public void configure(Context context) {
            columnName = context.getString("column_name");
            Preconditions.checkNotNull(columnName, "column_name must be set!!");
            url = context.getString("url");
            Preconditions.checkNotNull(url, "url must be set!!");
            user = context.getString("user");
            Preconditions.checkNotNull(user, "user must be set!!");
            //我的mysql没有密码。所以这里不检查密码为空
            password = context.getString("password");
            // Preconditions.checkNotNull(password, "password must be set!!");
            tableName = context.getString("tableName");
            Preconditions.checkNotNull(tableName, "tableName must be set!!");
        }
    
    }
    

        将写好的程序打成jar包放到flume的lib下。

    三、编写配置文件

        flume.conf的内容如下:

    # my application flume configuration
    #agent2 name
    agent2.sources=source2
    agent2.sinks=sink2
    agent2.channels=channel2
    
    
    #Spooling Directory
    #set source2
    agent2.sources.source2.type=spooldir
    agent2.sources.source2.spoolDir=/Users/jsj/eclipse-workspace/logs
    
    agent2.sources.source2.channels=channel2
    agent2.sources.source2.fileHeader = false
    agent2.sources.source2.interceptors = i1
    agent2.sources.source2.interceptors.i1.type = timestamp
    
    #set sink2
    agent2.sinks.sink2.type=com.zhb.flume.MysqlSinker
    agent2.sinks.sink2.url =jdbc:mysql://127.0.0.1:3306/flume
    agent2.sinks.sink2.tableName= income
    agent2.sinks.sink2.user=root
    #为空就不写,不要写""
    agent2.sinks.sink2.password=
    agent2.sinks.sink2.column_name=userid,county,town,income
    agent2.sinks.sink2.channel=channel2
    
    #set channel2
    agent2.channels.channel2.type=memory
    agent2.channels.channel2.capacity=10000
    agent2.channels.channel2.transactionCapacity=1000
    agent2.channels.channel2.keep-alive=30
    

    四、验证

        进入到flume的bin目录下,执行./flume-ng agent -c ../conf -f ../conf/flume.conf -Dflume.root.logger=INFO,console -n agent 2

        成功启动flume后,新开一个终端,将生成的log文件拷贝到spooling 监控的文件夹下:cp /Users/jsj/eclipse-workspace/log4j/src/main/java/testlog.log* /Users/jsj/eclipse-workspace/logs
    看下数据库。

    001.jpg

        大功告成,成功插入到数据库。自定义Sink成功了。

    五、总结

        本文实现了自定义sink,将数据sink到mysql中。说一下心得吧,中间有问题的时候怎么调试,就在你编写的类里用logger去输出你想看的日志即可。我在上面的代码中注释掉了。至此,flume的绝大部分内容都结束了。

    相关文章

      网友评论

        本文标题:Flume学习系列(七)---- 自定义Sink到Mysql

        本文链接:https://www.haomeiwen.com/subject/xuqyiftx.html