美文网首页工作生活
Flink的分布式缓存

Flink的分布式缓存

作者: JasonLee实时计算 | 来源:发表于2019-07-03 22:04 被阅读0次

    Flink提供了一个分布式缓存,类似于Apache Hadoop,可以在本地访问用户函数的并行实例。此函数可用于共享包含静态外部数据的文件,如字典或机器学习的回归模型。
    缓存的工作原理如下。程序在其作为缓存文件的特定名称下注册本地或远程文件系统(如HDFS或S3)的文件或目录ExecutionEnvironment。执行程序时,Flink会自动将文件或目录复制到所有工作程序的本地文件系统。用户函数可以查找指定名称下的文件或目录,并从worker的本地文件系统访问它。
    其实分布式缓存就相当于spark的广播,把一个变量广播到所有的executor上,也可以看做是Flink的广播流,只不过这里广播的是一个文件.
    分布式缓存使用如下:
    注册中的文件或目录ExecutionEnvironment。

    val env = ExecutionEnvironment.getExecutionEnvironment
    
    // register a file from HDFS
    env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile")
    
    // register a local executable file (script, executable, ...)
    env.registerCachedFile("file:///path/to/exec/file", "localExecFile", true)
    
    // define your program and execute
    ...
    val input: DataSet[String] = ...
    val result: DataSet[Integer] = input.map(new MyMapper())
    ...
    env.execute()
    

    访问用户函数中的缓存文件或目录(此处为a MapFunction)。该函数必须扩展RichFunction类,因为它需要访问RuntimeContext。

    
    // extend a RichFunction to have access to the RuntimeContext
    class MyMapper extends RichMapFunction[String, Int] {
    
      override def open(config: Configuration): Unit = {
    
        // access cached file via RuntimeContext and DistributedCache
        val myFile: File = getRuntimeContext.getDistributedCache.getFile("hdfsFile")
        // read the file (or navigate the directory)
        ...
      }
    
      override def map(value: String): Int = {
        // use content of cached file
        ...
      }
    }
    

    下面给出一个完整的demo,计算存在于缓存文件中的单词出现的次数,看下面的代码

    
    package flink.cache
    
    import org.apache.flink.api.common.functions.RichMapFunction
    import org.apache.flink.api.scala.ExecutionEnvironment
    import org.apache.flink.configuration.Configuration
    import scala.io.Source
    import org.apache.flink.api.scala._
    
    object FlinkCacheDemo {
      def main(args: Array[String]): Unit = {
        val env = ExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(3)
        // 注册缓存的文件,里面有数据hello jason
        env.registerCachedFile("D:/test.txt", "testfile")
        val stream = env.fromElements("hello", "jason", "hello", "jim")
        val result = stream
          .flatMap(_.split(","))
          .map(new RichMapFunction[String, String] {
            var list: List[(String)] = _
            override def open(parameters: Configuration): Unit = {
              super.open(parameters)
              // 获取缓存的数据
              val file = getRuntimeContext.getDistributedCache.getFile("testfile")
              val lines = Source.fromFile(file.getAbsoluteFile).getLines()
              list = lines.toList
            }
            override def map(value: String): String = {
              var middle: String = ""
              if(list(0).contains(value)) {
                middle = value
              }
              middle
            }
          })
          .map((_,1L))
          .filter(_._1.nonEmpty)
          .groupBy(0)
          .sum(1)
          .print()
      }
    }
    

    运行代码输出的结果是,因为jim不在缓存的文件中,被过滤掉了
    (hello,2)
    (jason,1)

    如果有写的不对的地方,欢迎大家指正,如果有什么疑问,可以加QQ群:340297350,更多的Flink和spark的干货可以加入下面的星球


    image.png

    相关文章

      网友评论

        本文标题:Flink的分布式缓存

        本文链接:https://www.haomeiwen.com/subject/wwshhctx.html