美文网首页
UDF函数——更细粒度的控制流

UDF函数——更细粒度的控制流

作者: yayooo | 来源:发表于2019-08-25 19:18 被阅读0次

    Flink暴露了所有udf函数的接口(实现方式为接口或者抽象类)。例如MapFunction, FilterFunction, ProcessFunction等等。

    函数类(Function Classes)

    package com.atguigu.apiTest
    
    /**
      * 自定义UDF
      */
    
    import org.apache.flink.api.common.functions.FilterFunction
    import org.apache.flink.streaming.api.scala._
    
    object Filter {
      def main(args: Array[String]): Unit = {
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
        val dataStream: DataStream[String] = env.fromElements("Flink China", "Flink", "china")
        //dataStream.print()
        dataStream.filter(new MyFilter()).print()
    
        env.execute()
      }
    
    }
    
    class MyFilter extends FilterFunction[String]{
      override def filter(value: String): Boolean ={
        value.contains("Flink")
    
      }
    }
    
    

    匿名类 RichFilterFunction和FilterFunction

    package com.atguigu.apiTest
    
    /**
      * 自定义UDF
      */
    
    import org.apache.flink.api.common.functions.{FilterFunction, RichFilterFunction}
    import org.apache.flink.streaming.api.scala._
    
    object Filter {
      def main(args: Array[String]): Unit = {
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
        val dataStream: DataStream[String] = env.fromElements("Flink China", "Flink", "china")
    
        /*    val flinkTweets = dataStream.filter(
          new RichFilterFunction[String] {
            override def filter(value: String): Boolean = {
              value.contains("Flink")
            }
          }
        ).print()*/
    
        //或者
    
        val flinkTweets: DataStream[String] = dataStream.filter(
          new FilterFunction[String] {
            override def filter(value: String): Boolean = {
              value.contains("Flink")
            }
          }
        )
        flinkTweets.print()
    
        env.execute()
      }
    
    }
    
    
    

    将Flink当成参数传进去

    package com.atguigu.apiTest
    
    /**
      * 自定义UDF
      */
    
    import org.apache.flink.api.common.functions.{FilterFunction, RichFilterFunction}
    import org.apache.flink.streaming.api.scala._
    
    object Filter {
      def main(args: Array[String]): Unit = {
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
        val dataStream: DataStream[String] = env.fromElements("Flink China", "Flink", "china")
    
        val flinkTweets = dataStream.filter(new KeywordFilter("Flink"))
    
        env.execute()
      }
    
    }
    
    class KeywordFilter(keyWord: String) extends FilterFunction[String] {
      override def filter(value: String): Boolean = {
        value.contains(keyWord)
      }
    }
    
    

    匿名函数

    package com.atguigu.apiTest
    
    /**
      * 自定义UDF
      */
    
    import org.apache.flink.api.common.functions.{FilterFunction, RichFilterFunction}
    import org.apache.flink.streaming.api.scala._
    
    object Filter {
      def main(args: Array[String]): Unit = {
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
        val dataStream: DataStream[String] = env.fromElements("Flink China", "Flink", "china")
    
        val flinkTweets = dataStream.filter(_.contains("Flink")).print()
    
        env.execute()
      }
    
    }
    
    

    相关文章

      网友评论

          本文标题:UDF函数——更细粒度的控制流

          本文链接:https://www.haomeiwen.com/subject/axqkectx.html