批处理程序的结果
- 获取运行时
val env = ExecutionEnvironment.getExecutionEnvironment
- 添加Source
val text = env.fromElements("who's there","I think I hear")
- 定义算子转换函数
text.flatMap{_.toLowerCase.split("\\w+") filter(_.nonEmpty)}
.map((_,1))
.groupBy(0)
.sum(1)
- 定义Sink
counts.print();
- 启动程序
env.execute("Kafka Dataset WordCount")
source 定义
// 递归定义整个目录下的所有文件
val parameter = new Configuration
parameter.setBoolean("recursive.file.enumeration",true)
env.readTextFile("file://path/with/files").withParameters(parameter)
算子
Aggregate
val input: DataSet[(Int,String,Double)] = env.fromElements(
(1,"hello",4),
(1,"hello",5),
(2,"hello",5),
(3,"word",6),
(3,"word",6)
)
val value = input.groupBy(1).aggregate(Aggregations.SUM,0).aggregate(Aggregations.MIN,2)
连接
连接分为内连接和外连接,外连接分为左外连接,右外连接和内连接
val input1 = env.fromElements((1,"hello"),(2,"hello"))
val input2 = env.fromElements(("hello",1),("word",2))
val result = input1.join(input2).where(0).equalTo(1)
广播变量
- 动态数据共享。 算子间共享输入和配置参数是静态的,广播变量共享的数据是动态的
广播变量编程步骤:
(1)创建广播变量。
val toBroadcast = env.fromElements(1,2,3);
(2) 注册广播变量
利用 RichFunction 自定义算子函数,注册广播变量
val toBroadcast = env.fromElements(1,2,3);
val toBroadcast:DataSet[Int] = env.fromElements(1,2,3);
toBroadcast.map(new RichMapFunction[String,String]() {
var broadcastSet = null
override def open(parameters: Configuration): Unit = {
// 读取广播变量
broadcastSet = getRuntimeContext.getBroadcastVariable[String]("broadcastSetName").get(0)
}
override def map(value: String): String = {
}
// 注册广播变量
}).withBroadcastSet(toBroadcast,"broadcastSetName");
网友评论