美文网首页
UDF函数——更细粒度的控制流

UDF函数——更细粒度的控制流

作者: yayooo | 来源:发表于2019-08-25 19:18 被阅读0次

Flink暴露了所有udf函数的接口(实现方式为接口或者抽象类)。例如MapFunction, FilterFunction, ProcessFunction等等。

函数类(Function Classes)

package com.atguigu.apiTest

/**
  * 自定义UDF
  */

import org.apache.flink.api.common.functions.FilterFunction
import org.apache.flink.streaming.api.scala._

object Filter {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val dataStream: DataStream[String] = env.fromElements("Flink China", "Flink", "china")
    //dataStream.print()
    dataStream.filter(new MyFilter()).print()

    env.execute()
  }

}

class MyFilter extends FilterFunction[String]{
  override def filter(value: String): Boolean ={
    value.contains("Flink")

  }
}

匿名类 RichFilterFunction和FilterFunction

package com.atguigu.apiTest

/**
  * 自定义UDF
  */

import org.apache.flink.api.common.functions.{FilterFunction, RichFilterFunction}
import org.apache.flink.streaming.api.scala._

object Filter {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val dataStream: DataStream[String] = env.fromElements("Flink China", "Flink", "china")

    /*    val flinkTweets = dataStream.filter(
      new RichFilterFunction[String] {
        override def filter(value: String): Boolean = {
          value.contains("Flink")
        }
      }
    ).print()*/

    //或者

    val flinkTweets: DataStream[String] = dataStream.filter(
      new FilterFunction[String] {
        override def filter(value: String): Boolean = {
          value.contains("Flink")
        }
      }
    )
    flinkTweets.print()

    env.execute()
  }

}


将Flink当成参数传进去

package com.atguigu.apiTest

/**
  * 自定义UDF
  */

import org.apache.flink.api.common.functions.{FilterFunction, RichFilterFunction}
import org.apache.flink.streaming.api.scala._

object Filter {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val dataStream: DataStream[String] = env.fromElements("Flink China", "Flink", "china")

    val flinkTweets = dataStream.filter(new KeywordFilter("Flink"))

    env.execute()
  }

}

class KeywordFilter(keyWord: String) extends FilterFunction[String] {
  override def filter(value: String): Boolean = {
    value.contains(keyWord)
  }
}

匿名函数

package com.atguigu.apiTest

/**
  * 自定义UDF
  */

import org.apache.flink.api.common.functions.{FilterFunction, RichFilterFunction}
import org.apache.flink.streaming.api.scala._

object Filter {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val dataStream: DataStream[String] = env.fromElements("Flink China", "Flink", "china")

    val flinkTweets = dataStream.filter(_.contains("Flink")).print()

    env.execute()
  }

}

相关文章

  • UDF函数——更细粒度的控制流

    Flink暴露了所有udf函数的接口(实现方式为接口或者抽象类)。例如MapFunction, FilterFun...

  • Flink11:实现UDF函数——更细粒度的控制流

    1. 函数类(Function Classes) Flink暴露了所有udf函数的接口(实现方式为接口或者抽象类)...

  • NLTK之大规模文本挖掘

    python的流操作---map+reduceHive/pig下的UDF(用户定义函数)流封装器scikitpys...

  • Hive 自定义函数 UDF

    Hive 自定义函数 UDF UDF:用户定义(普通)函数,只对单行数值产生作用; UDF只能实现一进一出的操作。

  • Hive-UDTF

    UDTF 上一篇介绍了基础的UDF——UDF和GenericUDF的实现,这一篇将介绍更复杂的用户自定义表生成函数...

  • Hive函数

    自定义函数 自定义函数包括三种:UDF、UDAF、UDTF。 UDF(User-Defined-Function)...

  • Flink1.9 UDF使用教程

    UDF是什么? UDF是用户自定义函数(User Define Function)的缩写,从定义可以看出UDF是一...

  • Hive的UDF编程-GenericUDF编程

    UDF简介 在Hive中,用户可以自定义一些函数,用于扩展HiveQL的功能,而这类函数叫做UDF(用户自定义函数...

  • 【Hive】Hive UDF

    [TOC] 一、UDF 介绍 UDF(User-Defined Functions)即是用户自定义的hive函数。...

  • Hive- UDF&GenericUDF

    hive udf简介 在Hive中,用户可以自定义一些函数,用于扩展HiveQL的功能,而这类函数叫做UDF(用户...

网友评论

      本文标题:UDF函数——更细粒度的控制流

      本文链接:https://www.haomeiwen.com/subject/axqkectx.html