Flink 批处理算子详解

作者: Tim在路上 | 来源:发表于2019-12-04 11:10 被阅读0次

    批处理程序的结果

    1. 获取运行时
     val env = ExecutionEnvironment.getExecutionEnvironment
    
    1. 添加Source
    val text = env.fromElements("who's there","I think I hear")
    
    1. 定义算子转换函数
        text.flatMap{_.toLowerCase.split("\\w+") filter(_.nonEmpty)}
            .map((_,1))
            .groupBy(0)
            .sum(1)
    
    1. 定义Sink
    counts.print();
    
    1. 启动程序
    env.execute("Kafka Dataset WordCount")
    

    source 定义

    // 递归定义整个目录下的所有文件

        val parameter = new Configuration
        parameter.setBoolean("recursive.file.enumeration",true)
        env.readTextFile("file://path/with/files").withParameters(parameter)
    

    算子

    Aggregate

        val input: DataSet[(Int,String,Double)] = env.fromElements(
          (1,"hello",4),
          (1,"hello",5),
          (2,"hello",5),
          (3,"word",6),
          (3,"word",6)
        )
        val value = input.groupBy(1).aggregate(Aggregations.SUM,0).aggregate(Aggregations.MIN,2)
    

    连接

    连接分为内连接和外连接,外连接分为左外连接,右外连接和内连接

        val input1 = env.fromElements((1,"hello"),(2,"hello"))
        val input2 = env.fromElements(("hello",1),("word",2))
        
        val result = input1.join(input2).where(0).equalTo(1)
    

    广播变量

    1. 动态数据共享。 算子间共享输入和配置参数是静态的,广播变量共享的数据是动态的

    广播变量编程步骤:

    (1)创建广播变量。

    val toBroadcast = env.fromElements(1,2,3);
    

    (2) 注册广播变量

    利用 RichFunction 自定义算子函数,注册广播变量

    val toBroadcast = env.fromElements(1,2,3);
      val toBroadcast:DataSet[Int] = env.fromElements(1,2,3);
    
        toBroadcast.map(new RichMapFunction[String,String]() {
          var broadcastSet  = null
    
          override def open(parameters: Configuration): Unit = {
            // 读取广播变量
            broadcastSet = getRuntimeContext.getBroadcastVariable[String]("broadcastSetName").get(0)
          }
          override def map(value: String): String = {
            
          }
          // 注册广播变量
        }).withBroadcastSet(toBroadcast,"broadcastSetName");
    

    相关文章

      网友评论

        本文标题:Flink 批处理算子详解

        本文链接:https://www.haomeiwen.com/subject/fvikgctx.html