Flink暴露了所有udf函数的接口(实现方式为接口或者抽象类)。例如MapFunction, FilterFunction, ProcessFunction等等。
函数类(Function Classes)
package com.atguigu.apiTest
/**
* 自定义UDF
*/
import org.apache.flink.api.common.functions.FilterFunction
import org.apache.flink.streaming.api.scala._
object Filter {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val dataStream: DataStream[String] = env.fromElements("Flink China", "Flink", "china")
//dataStream.print()
dataStream.filter(new MyFilter()).print()
env.execute()
}
}
class MyFilter extends FilterFunction[String]{
override def filter(value: String): Boolean ={
value.contains("Flink")
}
}
匿名类 RichFilterFunction和FilterFunction
package com.atguigu.apiTest
/**
* 自定义UDF
*/
import org.apache.flink.api.common.functions.{FilterFunction, RichFilterFunction}
import org.apache.flink.streaming.api.scala._
object Filter {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val dataStream: DataStream[String] = env.fromElements("Flink China", "Flink", "china")
/* val flinkTweets = dataStream.filter(
new RichFilterFunction[String] {
override def filter(value: String): Boolean = {
value.contains("Flink")
}
}
).print()*/
//或者
val flinkTweets: DataStream[String] = dataStream.filter(
new FilterFunction[String] {
override def filter(value: String): Boolean = {
value.contains("Flink")
}
}
)
flinkTweets.print()
env.execute()
}
}
将Flink当成参数传进去
package com.atguigu.apiTest
/**
* 自定义UDF
*/
import org.apache.flink.api.common.functions.{FilterFunction, RichFilterFunction}
import org.apache.flink.streaming.api.scala._
object Filter {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val dataStream: DataStream[String] = env.fromElements("Flink China", "Flink", "china")
val flinkTweets = dataStream.filter(new KeywordFilter("Flink"))
env.execute()
}
}
class KeywordFilter(keyWord: String) extends FilterFunction[String] {
override def filter(value: String): Boolean = {
value.contains(keyWord)
}
}
匿名函数
package com.atguigu.apiTest
/**
* 自定义UDF
*/
import org.apache.flink.api.common.functions.{FilterFunction, RichFilterFunction}
import org.apache.flink.streaming.api.scala._
object Filter {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val dataStream: DataStream[String] = env.fromElements("Flink China", "Flink", "china")
val flinkTweets = dataStream.filter(_.contains("Flink")).print()
env.execute()
}
}
网友评论