美文网首页
Flume 实时获取日志内容插入MySQL

Flume 实时获取日志内容插入MySQL

作者: CNSTT | 来源:发表于2018-11-28 15:10 被阅读0次

    前言:

    本文章适用于在Windows上使用Flume 自定义sink,实时获取日志文件内容并输出到Mysql表中。首先确保你的flume-ng可以启动,跳过个别步骤可自行百度。

    1、MySQL创建表:

    CREATE TABLE `fruit` (
      `id` int(11) NOT NULL AUTO_INCREMENT,
      `name` varchar(255) DEFAULT NULL,
      `salesman` varchar(20) DEFAULT NULL,
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8;
    

    2、创建自定义Sink:

    由于Flume的sink无法连接数据库,需要自己写一个自定义sink来连接

    2.1、打开Eclipse,新建一个maven project

    勾上 Create a simple project (skip archtype selection)
    Group Id:

    org.flume.mysql.sink
    

    Artifact Id:

    flumedemo
    
    2.2、配置pom.xml

    <build>...</build> 方便后续maven打包插件

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
      <groupId>org.flume.mysql.sink</groupId>
      <artifactId>flumedemo</artifactId>
      <version>0.0.1-SNAPSHOT</version>
      
        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        </properties>
        <dependencies>
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>3.8.1</version>
                <scope>test</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.flume</groupId>
                <artifactId>flume-ng-sdk</artifactId>
                <version>1.5.0.1</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flume</groupId>
                <artifactId>flume-ng-core</artifactId>
                <version>1.7.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flume</groupId>
                <artifactId>flume-ng-configuration</artifactId>
                <version>1.7.0</version>
            </dependency>
            <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>5.1.25</version>
            </dependency>
            
        </dependencies>
        <build>
            <plugins>
                <plugin>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>2.3.2</version>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                    </configuration>
                </plugin>
                <plugin>
                       <artifactId> maven-assembly-plugin </artifactId>
                       <configuration>
                            <descriptorRefs>
                            <descriptorRef>jar-with-dependencies</descriptorRef>
                            </descriptorRefs>
                            <archive>
                                 <manifest>
                                      <mainClass></mainClass>
                                 </manifest>
                            </archive>
                       </configuration>
                       <executions>
                            <execution>
                                 <id>make-assembly</id>
                                 <phase>package</phase>
                                 <goals>
                                      <goal>single</goal>
                                 </goals>
                            </execution>
                       </executions>
                  </plugin>
                  <plugin>  
                    <groupId>org.apache.maven.plugins</groupId>  
                    <artifactId>maven-compiler-plugin</artifactId>  
                    <version>3.1</version>  
                    <configuration>  
                        <source>1.7</source>  
                        <target>1.7</target>  
                    </configuration>  
                </plugin>
    
            </plugins>
        </build>
    </project>
    
    2.3、新建包和类

    右击 src/main/java 新建一个package org.flume.mysql.sink
    实体类Fruit.java

    package org.flume.mysql.sink;
    
    public class Fruit {
        private String name;
        private String salesman;
        
        public String getName() {
            return name;
        }
        public void setName(String name) {
            this.name = name;
        }
        public String getSalesman() {
            return salesman;
        }
        public void setSalesman(String salesman) {
            this.salesman = salesman;
        }
        
    }
    

    自定义sink MysqlSink.java

    package org.flume.mysql.sink;
    
    import org.apache.flume.conf.Configurable;
    import org.apache.flume.sink.AbstractSink;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import com.google.common.base.Preconditions;
    import com.google.common.base.Throwables;
    import com.google.common.collect.Lists;
    
    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.sql.PreparedStatement;
    import java.sql.SQLException;
    import java.util.List;
     
    import org.apache.flume.Channel;
    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.EventDeliveryException;
    import org.apache.flume.Transaction;
     
    public class MysqlSink extends AbstractSink implements Configurable {
    
        private Logger LOG = LoggerFactory.getLogger(MysqlSink.class);
        private String hostname;
        private String port;
        private String databaseName;
        private String tableName;
        private String user;
        private String password;
        private PreparedStatement preparedStatement;
        private Connection conn;
        private int batchSize;
    
        public MysqlSink() {
            LOG.info("Start MysqlSink");
        }
    
        public void configure(Context context) {
            hostname = context.getString("hostname");
            Preconditions.checkNotNull(hostname, "hostname must be set!!");
            port = context.getString("port");
            Preconditions.checkNotNull(port, "port must be set!!");
            databaseName = context.getString("databaseName");
            Preconditions.checkNotNull(databaseName, "databaseName must be set!!");
            tableName = context.getString("tableName");
            Preconditions.checkNotNull(tableName, "tableName must be set!!");
            user = context.getString("user");
            Preconditions.checkNotNull(user, "user must be set!!");
            password = context.getString("password");
            Preconditions.checkNotNull(password, "password must be set!!");
            batchSize = context.getInteger("batchSize", 100);
            Preconditions.checkNotNull(batchSize > 0, "batchSize must be a positive number!!");
        }
    
        @Override
        public void start() {
            super.start();
            try {            
                Class.forName("com.mysql.jdbc.Driver");
            } catch (ClassNotFoundException e) {
                e.printStackTrace();
            }
            String url = "jdbc:mysql://" + hostname + ":" + port + "/" + databaseName;        
    
            try {
                conn = DriverManager.getConnection(url, user, password);
                conn.setAutoCommit(false);
                //创建一个Statement对象
                preparedStatement = conn.prepareStatement("insert into " + tableName +
                        " (name,salesman) values (?,?)");
    
            } catch (SQLException e) {
                e.printStackTrace();
                System.exit(1);
            }
        }
    
        @Override
        public void stop() {
            super.stop();
            if (preparedStatement != null) {
                try {
                    preparedStatement.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
            if (conn != null) {
                try {
                    conn.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
        }
    
        public Status process() throws EventDeliveryException {
            Status result = Status.READY;
            Channel channel = getChannel();
            Transaction transaction = channel.getTransaction();
            Event event;
            String content;
            List<Fruit> infos = Lists.newArrayList();
            transaction.begin();        
            try {
                for (int i = 0; i < batchSize; i++) {
                    event = channel.take();     //从channel中获取一条数据
                    LOG.info("i : " + i);                
                    if (event != null) {        //对事件进行处理                    
                        content = new String(event.getBody());  //event 的 body为 "apple,wang"
                        LOG.info("content : " + content);
                        Fruit fruit=new Fruit();
                        if (content.contains(",")) {                        
                            //存储 event 的fruit. name
                            fruit.setName(content.substring(0, content.indexOf(",")));
                            //存储 event 的 fruit.salesman 逗号分开 ","
                            fruit.setSalesman(content.substring(content.indexOf(",") + 1));
                        }else{
                            fruit.setName(content);                       
                        }
                        infos.add(fruit);
                    } else {
                        result = Status.BACKOFF;
                        //LOG.info("result : " + result);
                        break;
                    }
                }
    
                if (infos.size() > 0) {
                    preparedStatement.clearBatch();
                    for (Fruit temp : infos) {
                        preparedStatement.setString(1, temp.getName());
                        preparedStatement.setString(2, temp.getSalesman());
                        preparedStatement.addBatch();
                    }
                    preparedStatement.executeBatch();
                    conn.commit();
                }
                transaction.commit();       //执行提交操作
            } catch (Exception e) {
                try {
                    transaction.rollback(); //执行回滚操作
                    //LOG.info("------------transaction.rollback()------------");
                } catch (Exception e2) {
                    LOG.error("Exception in rollback. Rollback might not have been" +
                            "successful.", e2);
                }
                LOG.error("Failed to commit transaction." +
                        "Transaction rolled back.", e);
                Throwables.propagate(e);
            } finally {
                transaction.close();
            }
            return result;
        }
    }
    
    LOG.info("***");    //输出语句,调试,方便之后再flume-ng输出看到
    
    2.4、mvn打包*

    右击项目 flumedemo
    1、Maven → update project
    2、Run As → Maven clean
    3、Run As → Maven install (第一次可能比较久)
    打包完成!如下图

    image.png
    在 target 目录下复制 flumedemo-0.0.1-SNAPSHOT-jar-with-dependencies.jar到flume的lib目录下

    3、配置conf:

    D:\com\apache-flume-1.8.0-bin\logs目录新建一个空的fruitdata.log (之后添加数据)
    D:\com\apache-flume-1.8.0-bin\conf目录新建一个fruit.conf

    fruit.conf

    agent1.sources = source1
    agent1.sinks = mysqlSink
    agent1.channels = channel1
    
    # Execsource 命令tail -f 实时获取文件新增变化
    agent1.sources.source1.type = exec
    agent1.sources.source1.command = tail -f D:/com/apache-flume-1.8.0-bin/logs/fruitdata.log
    agent1.sources.source1.channels = channel1
    
    # MysqlSink配置  package名.类名
    agent1.sinks.mysqlSink.type = org.flume.mysql.sink.MysqlSink
    agent1.sinks.mysqlSink.hostname = localhost
    agent1.sinks.mysqlSink.port = 3306
    agent1.sinks.mysqlSink.databaseName = flume     //数据库名
    agent1.sinks.mysqlSink.tableName = fruit        //表名字
    agent1.sinks.mysqlSink.user=root                //用户名
    agent1.sinks.mysqlSink.password = ****          //密码
    agent1.sinks.mysqlSink.channel = channel1
    
    agent1.channels.channel1.type = memory
    agent1.channels.channel1.capacity = 1000
    agent1.channels.channel1.transactionCapactiy = 100
    

    由于在Windows中没有 tail -f 的命令,找了很久感谢前人的分享
    tail 下载地址见文末zip
    不建议放到C:\Windows\System32
    解压到D:\tail 环境变量path追加;D:\tail即可

    4、实验:

    目录D:\com\apache-flume-1.8.0-bin 右键+shift打开cmd

    D:\com\apache-flume-1.8.0-bin>flume-ng agent -c conf -f conf/fruit.conf -n agent1 -property "flume.root.logger=INFO,console"
    

    此时向fruitdata.log添加数据 字段之间用,隔开

    apple,wang
    banana,peng
    kiwifruit,cnstt
    lemon,bob
    

    截图

    image.png
    image.png
    逐条添加数据,看到数据库表内也增加数据,因为tail -f 命令可以实时读取。

    至此完成在Windows环境下使用Flume 自定义Sink获取日志输出到MySQL中表内,中文会出现乱码,后续继续研究!

    谢谢阅读,有帮助的点个❤!

    相关文章

      网友评论

          本文标题:Flume 实时获取日志内容插入MySQL

          本文链接:https://www.haomeiwen.com/subject/gfiaqqtx.html