美文网首页
spark streaming 7*24 不间断运行

spark streaming 7*24 不间断运行

作者: 邵红晓 | 来源:发表于2019-10-15 15:46 被阅读0次
  1. 当程序首次启动,其将创建一个新的StreamingContext,设置所有的流并调用start()。

  2. 当程序在失效后重启,其将依据检查点目录的检查点数据重新创建一个StreamingContext。 通过使用StraemingContext.getOrCreate很容易获得这个性能。

  3. 使用kafka direct api ,driver端会lauch jobs useing offsetranges 在executor,然后executor端会直接使用
    offsetrange读取kafka 指定分区和指定偏移量,数据会被缓存在executor端(这里面还存在executor调度在kafka分区leader上的问题,参数LocationStrategies.PreferConsistent)

  4. 只要开启checkpoint,HasoffsetRange中的topic和offset都会被存储,便于从checkpoint中,错误恢复

  5. task端和driver端容错,因为kafka是可重放数据源,所以只要重新driver端从checkpoint重新启动,就会重新消费数据,如果在sink端保证实现事物,则可以实现恰好一次语义,默认实现最少一次语义

ssc.checkpoint("hdfs://...") 

# 创建和设置一个新的StreamingContext
def functionToCreateContext():
    sc = SparkContext(...) # new context
    ssc = new StreamingContext(...)
    lines = ssc.socketTextStream(...) # create DStreams
    ...
    ssc.checkpoint(checkpointDirectory) # 设置检查点目录
    return ssc
# 从检查点数据中获取StreamingContext或者重新创建一个
context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

# 在需要完成的context上做额外的配置
# 无论其有没有启动
context ...
# 启动context
context.start()
# 通常,检查点设置间隔是5-10个DStream的滑动间隔
# minbatch 3s
dstream.checkpoint(30s)

contaxt.awaitTermination()

driver 程序容错

驱动器程序的容错要求我们以特殊的方式创建 StreamingContext。我们需要把检查点目录提供给 StreamingContext。与直接调用 new StreamingContext 不同,应该使用 StreamingContext.getOrCreate() 函数。

配置过程如下:

1、 启动Driver自动重启功能
yarn:设置yarn.resourcemanager.am.max-attempts 或者spark.yarn.maxAppAttempts
2、 设置checkpoint
StreamingContext.setCheckpoint(hdfsDirectory)
3、支持从checkpoint中重启配置

 val ssc = new StreamingContext
 ssc.checkpoint(checkpointDirectory)
 ssc
}
val ssc = StreamingContext.getOrCreate(checkpointDirectory, createContext(checkpointDirectory))```

相关文章

网友评论

      本文标题:spark streaming 7*24 不间断运行

      本文链接:https://www.haomeiwen.com/subject/ezssmctx.html