美文网首页
Flink入门案例(scala)

Flink入门案例(scala)

作者: 欧阳小伙 | 来源:发表于2021-04-02 19:29 被阅读0次

    0、准备

    • Intellij IDEA
    • 使用gradle构建项目,使用scala作为项目语言

    1、IDEA中选择构建gradle项目即可

    项目目标:自定义随机数数据源,然后不断的读取随机数

    2、项目构建完成后

    • 添加scala文件夹,并且【右键】-【Mark Directory as】-【Sources Root】,如下图:


      目录结构
    • 在build.gradle中添加如下依赖
    plugins {
        id 'java'
        id 'scala'
    }
    
    group 'org.example'
    version '1.0-SNAPSHOT'
    
    repositories {
        mavenLocal()
        maven { url 'https://maven.aliyun.com/repository/public' }
        mavenCentral()
    }
    
    dependencies {
        testCompile group: 'junit', name: 'junit', version: '4.12'
        compile group: 'org.scala-lang', name: 'scala-library', version: '2.12.10'
        compile( group: 'org.apache.flink', name: 'flink-scala_2.12', version: '1.11.1') exclude module: 'slf4j-api' 
        compile (group: 'org.apache.flink', name: 'flink-streaming-scala_2.12', version: '1.11.1')
        compile (group: 'org.apache.flink', name: 'flink-clients_2.12', version: '1.11.1') exclude module: 'slf4j-api'
    }
    

    3、代码示例

    import org.apache.flink.streaming.api.functions.source.SourceFunction
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    
    // 一些隐式转换需要这个形式的导入
    import org.apache.flink.streaming.api.scala._
    
    import scala.util.Random
    
    object MyOwnDataSource {
    
      def main(args: Array[String]): Unit = {
        // env
        val env = StreamExecutionEnvironment.getExecutionEnvironment
    
        //添加数据源
        val stream = env.addSource(new MyDataSource())
    
        // 执行相关操作
        stream.print()
    
        // 开始执行  
        env.execute("stream-MyOwnDataSource")
      }
    
    }
    
    class MyDataSource extends SourceFunction[String]{
    
      var running:Boolean = true
    
      override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
        val rand = new Random()
        while(running){
          // 核心的一句,通过ctx可以将数据发送出去
          ctx.collect("data--"+rand.nextGaussian()+": "+System.currentTimeMillis())
          Thread.sleep(1000)  
        }
      }
    
      override def cancel(): Unit = {
        running = false
      }
    }
    

    4、测试
    直接点击IDEA中的运行按钮即可,打印出来的内容如下:

    4> data--0.025704712268118834: 1603007689941
    4> data---0.8935837819447313: 1603007689941
    4> data--0.6438729979951363: 1603007689941
    4> data---0.34440123558130825: 1603007689941
    4> data--0.27793098952562134: 1603007689941
    4> data--0.33934696503260375: 1603007689941
    1> data--0.2792505237097227: 1603007690259
    1> data--1.6398778853971754: 1603007690259
    

    相关文章

      网友评论

          本文标题:Flink入门案例(scala)

          本文链接:https://www.haomeiwen.com/subject/ygpqkltx.html