0、准备
- Intellij IDEA
- 使用gradle构建项目,使用scala作为项目语言
1、IDEA中选择构建gradle项目即可
项目目标:自定义随机数数据源,然后不断的读取随机数
2、项目构建完成后
-
添加scala文件夹,并且【右键】-【Mark Directory as】-【Sources Root】,如下图:
目录结构 - 在build.gradle中添加如下依赖
plugins {
id 'java'
id 'scala'
}
group 'org.example'
version '1.0-SNAPSHOT'
repositories {
mavenLocal()
maven { url 'https://maven.aliyun.com/repository/public' }
mavenCentral()
}
dependencies {
testCompile group: 'junit', name: 'junit', version: '4.12'
compile group: 'org.scala-lang', name: 'scala-library', version: '2.12.10'
compile( group: 'org.apache.flink', name: 'flink-scala_2.12', version: '1.11.1') exclude module: 'slf4j-api'
compile (group: 'org.apache.flink', name: 'flink-streaming-scala_2.12', version: '1.11.1')
compile (group: 'org.apache.flink', name: 'flink-clients_2.12', version: '1.11.1') exclude module: 'slf4j-api'
}
3、代码示例
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
// 一些隐式转换需要这个形式的导入
import org.apache.flink.streaming.api.scala._
import scala.util.Random
object MyOwnDataSource {
def main(args: Array[String]): Unit = {
// env
val env = StreamExecutionEnvironment.getExecutionEnvironment
//添加数据源
val stream = env.addSource(new MyDataSource())
// 执行相关操作
stream.print()
// 开始执行
env.execute("stream-MyOwnDataSource")
}
}
class MyDataSource extends SourceFunction[String]{
var running:Boolean = true
override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
val rand = new Random()
while(running){
// 核心的一句,通过ctx可以将数据发送出去
ctx.collect("data--"+rand.nextGaussian()+": "+System.currentTimeMillis())
Thread.sleep(1000)
}
}
override def cancel(): Unit = {
running = false
}
}
4、测试
直接点击IDEA中的运行按钮即可,打印出来的内容如下:
4> data--0.025704712268118834: 1603007689941
4> data---0.8935837819447313: 1603007689941
4> data--0.6438729979951363: 1603007689941
4> data---0.34440123558130825: 1603007689941
4> data--0.27793098952562134: 1603007689941
4> data--0.33934696503260375: 1603007689941
1> data--0.2792505237097227: 1603007690259
1> data--1.6398778853971754: 1603007690259
网友评论