美文网首页
Flink自定义MySQLSource读取MySQL数据

Flink自定义MySQLSource读取MySQL数据

作者: 喵星人ZC | 来源:发表于2020-05-31 23:26 被阅读0次

    先查看数据库连接池实现MySQL连接池

    MySQL表student的实体采用case class定义

    object Domain {
    
      case class Access(time: Long, domain: String, traffic: Long)
    
      case class Student(id: Int, name: String, age: Int)
    
    }
    
    

    实现RichSourceFunction来自定义MySQLSource

    package com.zc.bigdata.flink02
    
    import java.sql.{Connection, PreparedStatement}
    
    import com.zc.bigdata.bean.Domain.Student
    import com.zc.bigdata.utils.JDBCPoolUtils
    import org.apache.flink.configuration.Configuration
    import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
    
    class MySQLSource extends RichSourceFunction[Student] {
    
      var conn: Connection = _
      var pstmt: PreparedStatement = _
    
      var running = true
    
      override def open(parameters: Configuration): Unit = {
        conn = JDBCPoolUtils.getConnection
        pstmt = conn.prepareStatement("select * from student")
      }
    
      override def run(ctx: SourceFunction.SourceContext[Student]): Unit = {
        val rs = pstmt.executeQuery()
        while (rs.next()) {
          val student = Student(rs.getInt("id"), rs.getString("name"), rs.getInt("age"))
          ctx.collect(student)
        }
    
      }
    
      override def cancel(): Unit = {
        running = false
      }
    
      override def close(): Unit = {
        JDBCPoolUtils.close(pstmt, conn)
      }
    }
    

    StreamApp

    package com.zc.bigdata.flink02
    
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    import org.apache.flink.api.scala._
    
    object StreamApp {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
    
        /* env.addSource(new AccessSource)//.setParallelism(3)  java.lang.IllegalArgumentException: The maximum parallelism of non parallel operator must be 1.
           .print()*/
    
        /*    env.addSource(new AccessSource02).setParallelism(3)
              .print()*/
    
    
        env.addSource(new MySQLSource).print()
    
        env.execute(this.getClass.getSimpleName)
    
      }
    }
    
    
    image.png

    相关文章

      网友评论

          本文标题:Flink自定义MySQLSource读取MySQL数据

          本文链接:https://www.haomeiwen.com/subject/vvrdzhtx.html