Flink提供了一个分布式缓存,类似于Apache Hadoop,可以在本地访问用户函数的并行实例。此函数可用于共享包含静态外部数据的文件,如字典或机器学习的回归模型。
缓存的工作原理如下。程序在其作为缓存文件的特定名称下注册本地或远程文件系统(如HDFS或S3)的文件或目录ExecutionEnvironment。执行程序时,Flink会自动将文件或目录复制到所有工作程序的本地文件系统。用户函数可以查找指定名称下的文件或目录,并从worker的本地文件系统访问它。
其实分布式缓存就相当于spark的广播,把一个变量广播到所有的executor上,也可以看做是Flink的广播流,只不过这里广播的是一个文件.
分布式缓存使用如下:
注册中的文件或目录ExecutionEnvironment。
val env = ExecutionEnvironment.getExecutionEnvironment
// register a file from HDFS
env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile")
// register a local executable file (script, executable, ...)
env.registerCachedFile("file:///path/to/exec/file", "localExecFile", true)
// define your program and execute
...
val input: DataSet[String] = ...
val result: DataSet[Integer] = input.map(new MyMapper())
...
env.execute()
访问用户函数中的缓存文件或目录(此处为a MapFunction)。该函数必须扩展RichFunction类,因为它需要访问RuntimeContext。
// extend a RichFunction to have access to the RuntimeContext
class MyMapper extends RichMapFunction[String, Int] {
override def open(config: Configuration): Unit = {
// access cached file via RuntimeContext and DistributedCache
val myFile: File = getRuntimeContext.getDistributedCache.getFile("hdfsFile")
// read the file (or navigate the directory)
...
}
override def map(value: String): Int = {
// use content of cached file
...
}
}
下面给出一个完整的demo,计算存在于缓存文件中的单词出现的次数,看下面的代码
package flink.cache
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration
import scala.io.Source
import org.apache.flink.api.scala._
object FlinkCacheDemo {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
env.setParallelism(3)
// 注册缓存的文件,里面有数据hello jason
env.registerCachedFile("D:/test.txt", "testfile")
val stream = env.fromElements("hello", "jason", "hello", "jim")
val result = stream
.flatMap(_.split(","))
.map(new RichMapFunction[String, String] {
var list: List[(String)] = _
override def open(parameters: Configuration): Unit = {
super.open(parameters)
// 获取缓存的数据
val file = getRuntimeContext.getDistributedCache.getFile("testfile")
val lines = Source.fromFile(file.getAbsoluteFile).getLines()
list = lines.toList
}
override def map(value: String): String = {
var middle: String = ""
if(list(0).contains(value)) {
middle = value
}
middle
}
})
.map((_,1L))
.filter(_._1.nonEmpty)
.groupBy(0)
.sum(1)
.print()
}
}
运行代码输出的结果是,因为jim不在缓存的文件中,被过滤掉了
(hello,2)
(jason,1)
如果有写的不对的地方,欢迎大家指正,如果有什么疑问,可以加QQ群:340297350,更多的Flink和spark的干货可以加入下面的星球
image.png
网友评论