美文网首页邵红晓
Flink安装部署与示例

Flink安装部署与示例

作者: stephen_k | 来源:发表于2017-06-23 13:15 被阅读0次

    安装部署

    Standalone Cluster

    • 安装配置
      首先基础的Java环境1.7x以上,ssh等等。
      到Flink官网下载好相应的包,注意hadoop和scala的版本,然后解压等, 并进入解压后的Flink根目录。然后我们对其进行相关的配置。主要涉及到的配置文件是conf/flink-conf.yaml
      设置以下比较重要的参数:
    jobmanager.rpc.address:10.0.0.1        # 设置成你master节点的IP地址
    jobmanager.rpc.port:6123              
    jobmanager.heap.mb:512 
    taskmanager.heap.mb:1024
    taskmanager.numberOfTaskSlots:2
    parallelism.default:2
    taskmanager.tmp.dirs:/tmp
    jobmanager.web.port: 8081   # web ui端口号
    

    将要作为worker节点的IP(或者是hostname)地址存放在conf/slaves文件中,就像HDFS配置一样,每个IP地址必须放在一行.
    设置JAVA_HOME 环境变量,或者设置env.java.home为jdk的路径

    • 启动
      在flink根目录下: bin/start-cluster.sh
    • 在已经运行的集群中添加JobManager/TaskManager
      bin/jobmanager.sh (start cluster)|stop|stop-all
      bin/taskmanager.sh start|stop|stop-all
    • 停止
      bin/stop-cluster.sh

    Flink On YARN

    要求集群环境中已正确安装hadoop并配置好相应的环境及变量。Flink on yarn模式不需要额外的Flink配置,只需要集群特定的一些环境变量生效即可,具体:

    export SCALA_HOME=/data/scala
    export HADOOP_CONF_DIR=/etc/hadoop/conf
    export YARN_CONF_DIR=/etc/hadoop/conf
    export FLINK_HOME=/data/flink/latest
    

    在YARN上启动Flink主要有两种方式:

    1. 启动一个YARN session(Start a long-running Flink cluster on YARN)
    • 在Flink的根目录下:bin/yarn-session.sh -n 4 -tm 1024 -s 2
      上面的命令启动了4个TaskManager,每个TaskManager内存为1G,且开启了2 TaskSlots的yarn的Flink session环境。 通过Flink的web ui可以看到启动集群的物理和环境信息。
    • 在这个session环境下,可以通过flink run命令,启动Flink任务:
      flink run -m hnode4:34707 /data/flink/latest/examples/batch/WordCount.jar --input hdfs:///tmp/yarn/photo_test.csv
      一般情况下run选项提交作业到yarn,client端可以自动找到JobManager的地址,但是本人在实验时有问题所以通过-m 制定
    1. 直接在YARN上提交运行Flink作业(Run a Flink job on YARN)
    • Flink同样支持在yarn中启动一个独立的Flink作业。具体的命令参数如下:
      flink run -m yarn-cluster -yn 4 -yjm 1024 -ytm 1024 /data/flink/latest/examples/batch/WordCount.jar --input hdfs:///tmp/yarn/photo_test.csv --output hdfs:///tmp/yarn/result_out1
    • 这种方式如果client端断开,session是会断开的。Flink提供了一种detached YARN session,启动时候加上参数-d或--detached

    示例分析

    1. 流处理一
    package com.jiuyan.flink.stream
    import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
    import org.apache.flink.streaming.api.windowing.time.Time
    import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource
    object WikipediaAnalysis {
    
      def main(args: Array[String]) {
    
        val see = StreamExecutionEnvironment.getExecutionEnvironment
        //val see = StreamExecutionEnvironment.createLocalEnvironment(2); //创建一个本地的执行环境,可以用于本机调试代码
    
        val edits = see.addSource(new WikipediaEditsSource()) // 这是一个Wikipedia IRC log的数据流
        val result = edits.keyBy(_.getUser) 
          .timeWindow(Time.seconds(5))  //定一个5s的时间窗口
          .fold(("",0l))((acc,event) => {
            val user = event.getUser
            val diff = acc._2 + event.getByteDiff
            (user,diff)
          });
        result.print
        see.execute
      }
    }
    

    一些说明:
    a. 窗口分类,按分割标准划分:timeWindow、countWindow,按窗口行为划分:Tumbling Window、Sliding Window、自定义窗口
    b. flod函数,就是一个折叠操作。可以将KeyedStream → DataStream,如:val result: DataStream[String] = keyedStream.fold("start")((str, i) => { str + "-" + i })。 初始为'sart'; 对于一个int值的序列 (1,2,3,4,5), 将产生序列 "start-1", "start-1-2", "start-1-2-3", ...
    c. 如果出现could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[org.apache.flink.streaming.connectors.wikiedits.WikipediaEditEvent] 的异常,通常是因为程序需要一个隐式参数(implicit parameter),导入scala的包信息即可,具体:import org.apache.flink.api.scala._

    1. 流处理二
      这个示例的大致逻辑:在每100毫秒产生一批车速事件的数据流中,计算10秒内,汽车每行进50米内的最高速度。具体到实现逻辑,根据carId聚合,将10s的内事件数据汇集到一个窗口,当两个事件的距离值的差值大于50,触发窗口计算。
    package com.jiuyan.flink.stream
    import java.beans.Transient
    import java.util.concurrent.TimeUnit
    import org.apache.flink.api.java.utils.ParameterTool
    import org.apache.flink.streaming.api.TimeCharacteristic
    import org.apache.flink.streaming.api.functions.source.SourceFunction
    import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
    import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows
    import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor
    import org.apache.flink.streaming.api.windowing.time.Time
    import org.apache.flink.streaming.api.windowing.triggers.DeltaTrigger
    import scala.language.postfixOps
    import scala.util.Random
    object TopSpeedWindowing {
      case class CarEvent(carId: Int, speed: Int, distance: Double, time: Long)
    
      val numOfCars = 2
      val evictionSec = 10
      val triggerMeters = 50d
    
      def main(args: Array[String]) {
        val params = ParameterTool.fromArgs(args)//参数解析的工具
    
        //val env = StreamExecutionEnvironment.createLocalEnvironment(1)
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.getConfig.setGlobalJobParameters(params)
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
        env.setParallelism(1)
    
        val cars =
          if (params.has("input")) {
            env.readTextFile(params.get("input"))
              .map(parseMap(_))
              .map(x => CarEvent(x._1, x._2, x._3, x._4))
          } else {
            println("Executing TopSpeedWindowing example with default inputs data set.")
            println("Use --input to specify file input.")
            env.addSource(new SourceFunction[CarEvent]() {
    
              val speeds = Array.fill[Integer](numOfCars)(50)
              val distances = Array.fill[Double](numOfCars)(0d)
              @Transient lazy val rand = new Random()
    
              var isRunning:Boolean = true
    
              override def run(ctx: SourceContext[CarEvent]) = {
                while (isRunning) {
                  Thread.sleep(100)
    
                  for (carId <- 0 until numOfCars) {
                    if (rand.nextBoolean) speeds(carId) = Math.min(100, speeds(carId) + 5)
                    else speeds(carId) = Math.max(0, speeds(carId) - 5)
    
                    distances(carId) += speeds(carId) / 3.6d
                    val record = CarEvent(carId, speeds(carId),
                      distances(carId), System.currentTimeMillis)
                    ctx.collect(record)
                  }
                }
              }
              override def cancel(): Unit = isRunning = false
            })
          }
    
        val topSeed = cars
          .assignAscendingTimestamps( _.time )
          .keyBy("carId")
          .window(GlobalWindows.create) // GlobalWindow是一个全局窗口
          .evictor(TimeEvictor.of(Time.of(evictionSec * 1000, TimeUnit.MILLISECONDS))) //给定的保留时间(keep time)作为剔除规则
          .trigger(DeltaTrigger.of(triggerMeters, new DeltaFunction[CarEvent] {
            def getDelta(oldSp: CarEvent, newSp: CarEvent): Double = newSp.distance - oldSp.distance 
          }, cars.getType().createSerializer(env.getConfig)))
    //      .window(Time.of(evictionSec * 1000, (car : CarEvent) => car.time))
    //      .every(Delta.of[CarEvent](triggerMeters,
    //          (oldSp,newSp) => newSp.distance-oldSp.distance, CarEvent(0,0,0,0)))
          .maxBy("speed")
    
        if (params.has("output")) {
          topSeed.writeAsText(params.get("output"))
        } else {
          println("Printing result to stdout. Use --output to specify output path.")
          topSeed.print()
        }
        env.execute("TopSpeedWindowing")
      }
      def parseMap(line : String): (Int, Int, Double, Long) = {
        val record = line.substring(1, line.length - 1).split(",")
        (record(0).toInt, record(1).toInt, record(2).toDouble, record(3).toLong)
      }
    }
    

    一些说明:
    a. 窗口的确定,一般需要定义窗口的类型(GlobalWindows),窗口元素的剔除条件(TimeEvictor),以及窗口的出发条件(DeltaTrigger),以及序列化方式。DeltaTrigger是基于DeltaFunction和一个给定的阈值触发,该触发器在最后一个到达元素和当前元素之间计算一个delta值跟给定的阈值比较,如果高于给定的阈值,则触发。
    b. 在带key的数据流上应用window操作,在无key的数据流上应用windowAll。在带key的数据流上以及进行并行计算,而非key的数据流不可。
    c. 窗口信息参考

    1. 批处理
      这个示例描述是基础的page rank算法。
      对于算法的一些背景信息,可以参考:http://www.cnblogs.com/rubinorth/p/5799848.html
      对于Markov(马可夫)过程的信息可参考: http://blog.csdn.net/weaponsun/article/details/50007411
    package com.jiuyan.flink.batch
    import java.lang.Iterable
    import org.apache.flink.api.common.functions.GroupReduceFunction
    import org.apache.flink.api.java.aggregation.Aggregations.SUM
    import org.apache.flink.api.java.utils.ParameterTool
    import org.apache.flink.api.scala._
    import org.apache.flink.util.Collector
    import scala.collection.JavaConverters._
    object PageRankBasic {
    
      private final val DAMPENING_FACTOR: Double = 0.85
      private final val EPSILON: Double = 0.0001
    
      def main(args: Array[String]) {
    
        val params: ParameterTool = ParameterTool.fromArgs(args)
    
        // set up execution environment
        //val env = ExecutionEnvironment.getExecutionEnvironment
        val env = ExecutionEnvironment.createLocalEnvironment(2)
    
        // make parameters available in the web interface
        env.getConfig.setGlobalJobParameters(params)
    
        // read input data
        val (pages, numPages) = getPagesDataSet(env, params)
        val links = getLinksDataSet(env, params)
        val maxIterations = params.getInt("iterations", 10)
    
        // assign initial ranks to pages
        val pagesWithRanks = pages.map(p => Page(p, 1.0 / numPages)).withForwardedFields("*->pageId")   //表示map的输入转发到Page的pageId字段
        // build adjacency list from link input
        val adjacencyLists = links
          .groupBy("sourceId").reduceGroup( new GroupReduceFunction[Link, AdjacencyList] {
            override def reduce(values: Iterable[Link], out: Collector[AdjacencyList]): Unit = {
              var outputId = -1L
              val outputList = values.asScala map { t => outputId = t.sourceId; t.targetId }
              out.collect(new AdjacencyList(outputId, outputList.toArray))
            }
          })
    
        // start iteration
        val finalRanks = pagesWithRanks.iterateWithTermination(maxIterations) {
          currentRanks =>
            val newRanks = currentRanks
              // distribute ranks to target pages
              .join(adjacencyLists).where("pageId").equalTo("sourceId") {
                (page, adjacent, out: Collector[Page]) =>
                  val targets = adjacent.targetIds
                  val len = targets.length
                  adjacent.targetIds foreach { t => out.collect(Page(t, page.rank /len )) }
              }
              // collect ranks and sum them up
              .groupBy("pageId").aggregate(SUM, "rank")
              // apply dampening factor
              .map { p =>
                Page(p.pageId, (p.rank * DAMPENING_FACTOR) + ((1 - DAMPENING_FACTOR) / numPages))
              }.withForwardedFields("pageId")
    
            // terminate if no rank update was significant
            val termination = currentRanks.join(newRanks).where("pageId").equalTo("pageId") {
              (current, next, out: Collector[Int]) =>
                // check for significant update
                if (math.abs(current.rank - next.rank) > EPSILON) out.collect(1)
            }
            (newRanks, termination)
        }
        val result = finalRanks
    
        // emit result
        if (params.has("output")) {
          result.writeAsCsv(params.get("output"), "\n", " ")
          // execute program
          env.execute("Basic PageRank Example")
        } else {
          println("Printing result to stdout. Use --output to specify output path.")
          result.print()
        }
      }
    
      case class Link(sourceId: Long, targetId: Long)
      case class Page(pageId: Long, rank: Double)
      case class AdjacencyList(sourceId: Long, targetIds: Array[Long])
    
      private def getPagesDataSet(env: ExecutionEnvironment, params: ParameterTool):
                         (DataSet[Long], Long) = {
        if (params.has("pages") && params.has("numPages")) {
          val pages = env
            .readCsvFile[Tuple1[Long]](params.get("pages"), fieldDelimiter = " ", lineDelimiter = "\n")
            .map(x => x._1)
          (pages, params.getLong("numPages"))
        } else {
          println("Executing PageRank example with default pages data set.")
          println("Use --pages and --numPages to specify file input.")
          (env.generateSequence(1, 15), PageRankData.getNumberOfPages)
        }
      }
      private def getLinksDataSet(env: ExecutionEnvironment, params: ParameterTool):
                          DataSet[Link] = {
        if (params.has("links")) {
          env.readCsvFile[Link](params.get("links"), fieldDelimiter = " ",
            includedFields = Array(0, 1))
        } else {
          println("Executing PageRank example with default links data set.")
          println("Use --links to specify file input.")
          val edges = PageRankData.EDGES.map { case Array(v1, v2) => Link(v1.asInstanceOf[Long],
            v2.asInstanceOf[Long])}
          env.fromCollection(edges)
        }
      }
    }
    

    一些说明:
    a. 转发字段注解 withForwardedFields函数:
    转发字段注解定义了输入对象中哪些字段是在函数中不会被修改,直接转发到output中相同位置或其他位置。
    用[field expressions]来确定field转发信息。 在output中转发位置相同的filed由它们的位置来确定。 确定的位置必须是input中有效和houtput 中数据类型必须相同 举例来说, “f2”定义了java input tuple中第三个字段, 它同样等同于output tuple中第三个字段。
    不做修改直接转发到其他位置的field, 通过“filed express”来定义。 比如”f0->f2”表示 java input tuple中第一个字段将不做修改直接copy到java 输出的第三个字段。 “*”可以表示整个输入或输出, 比如”f0->*” 表示函数的输出就是等同于java 输入tuple的第一个字段。可以在一个string中定义多个字段转发 "f0; f2->f1; f3->f2"或者多个单独string比如"f0", "f2->f1", "f3->f2"
    。 (参考:Semantic Annotations
    b. iterateWithTermination: 批量迭代函数,迭代的终止条件一个达设定的迭代次数,再有就是更新集合为空。

    相关文章

      网友评论

        本文标题:Flink安装部署与示例

        本文链接:https://www.haomeiwen.com/subject/fcpotxtx.html