美文网首页
Flink写MySQL Demo

Flink写MySQL Demo

作者: Jorvi | 来源:发表于2020-01-19 09:15 被阅读0次

1. pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.learn.LearnFlink</groupId>
    <artifactId>LearnFlink</artifactId>
    <version>1.0</version>

    <properties>
        <flink.version>1.8.1</flink.version>
        <scala.version>2.11.8</scala.version>
        <scala.major.version>2.11</scala.major.version>
    </properties>

    <profiles>
        <profile>
            <id>local</id>
            <properties>
                <scope>compile</scope>
            </properties>
            <activation>
                <activeByDefault>true</activeByDefault>
            </activation>
        </profile>
        <profile>
            <id>pro</id>
            <properties>
                <scope>provided</scope>
            </properties>
        </profile>
    </profiles>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>${flink.version}</version>
            <scope>${scope}</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.major.version}</artifactId>
            <version>${flink.version}</version>
            <scope>${scope}</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime_${scala.major.version}</artifactId>
            <version>${flink.version}</version>
            <scope>${scope}</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.major.version}</artifactId>
            <version>${flink.version}</version>
            <scope>${scope}</scope>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-compiler</artifactId>
            <version>${scala.version}</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.7</version>
            <scope>${scope}</scope>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
            <scope>${scope}</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.major.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.18</version>
        </dependency>

        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.8.5</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.0.0</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>log4j:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!--不要拷贝 META-INF 目录下的签名,否则会引起 SecurityExceptions -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>org.learn.Driver</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <!-- 编译scala文件插件 -->
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.2</version>
                <executions>
                    <execution>
                        <id>scala-compile-first</id>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                        <configuration>
                            <includes>
                                <include>**/*.scala</include>
                            </includes>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

2. 主函数

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import sink.SinkToMysql
import source.SourceFromFile
import vo.ClassInfo
import org.apache.flink.api.scala._

object FileToMysql {

  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env
      .addSource(new SourceFromFile)
      .map(line => parse(line))
      .addSink(new SinkToMysql)
    env.execute()
  }


  private def parse(line: String): ClassInfo = {
    var classInfo: ClassInfo = new ClassInfo()
    val arr = line.split(",")
    classInfo.id = arr(0)
    classInfo.classname = arr(1)
    classInfo.teacherId = arr(2)
    classInfo
  }

}

3. Source

import java.io.{BufferedReader, FileReader}
import java.util.concurrent.TimeUnit

import org.apache.commons.lang3.StringUtils
import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import org.apache.flink.util.Collector

class SourceFromFile extends RichSourceFunction[String] {
  private var isRunning: Boolean = true

  override def run(sourceContext: SourceFunction.SourceContext[String]): Unit = {
    val bufferedReader: BufferedReader = new BufferedReader(new FileReader("E:\\documents\\test.txt"))
    while (isRunning) {
      val line: String = bufferedReader.readLine();
      if (StringUtils.isNotBlank(line)) {
        sourceContext.collect(line);
      }
      TimeUnit.SECONDS.sleep(60);
    }
  }

  override def cancel(): Unit = {
    isRunning = false
  }
}

4. Sink

import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import vo.ClassInfo

class SinkToMysql extends RichSinkFunction[ClassInfo] {

  private val jdbcUrl = "jdbc:mysql://localhost:3306/test?useSSL=false&serverTimezone=GMT%2B8"
  private val username = "root"
  private val password = ""
  private val driverName = "com.mysql.cj.jdbc.Driver"

  override def invoke(classInfo: ClassInfo, context: SinkFunction.Context[_]): Unit = {
    Class.forName(driverName)
    var connection: Connection = DriverManager.getConnection(jdbcUrl, username, password)

    // 关闭自动提交
    connection.setAutoCommit(false)

    val sql: String = "replace into class_info(id, classname, teacher_id) values(?,?,?)"
    var ps: PreparedStatement = connection.prepareStatement(sql)
    ps.setString(1, classInfo.id)
    ps.setString(2, classInfo.classname)
    ps.setString(3, classInfo.teacherId)
    ps.execute()

    // 提交
    connection.commit()
  }
}

相关文章

网友评论

      本文标题:Flink写MySQL Demo

      本文链接:https://www.haomeiwen.com/subject/isxrzctx.html