前言
感谢开源世界的大牛们贡献这么多开源软件,让我们有机会学习复杂软件的设计,同时也可根据自己的需要定制开源软件。本文先简单讲解Spark监控UI的实现,最后写一个简单的定制spark ui的demo。
SparkUI的实现
sparkUI的启动是在sparkContext初始化是实现的,具体代码为:
//启动web ui, 创建sparkUI对象
_ui =
if (conf.getBoolean("spark.ui.enabled", true)) {
Some(SparkUI.createLiveUI(this, _conf, listenerBus, _jobProgressListener,
_env.securityManager, appName, startTime = startTime))
} else {
// For tests, do not enable the UI
None
}
// Bind the UI before starting the task scheduler to communicate
// the bound port to the cluster manager properly
_ui.foreach(_.bind()) //启动jetty, bind方法继承于webui, 调用jetty API启动服务
SparkUI对象是整个监控服务的核心容器,负责启动jetty,维持url于ServletContextHandler之间的映射关系。其基础数据放在基类WebUI里:
protected val tabs = ArrayBuffer[WebUITab]()
protected val handlers = ArrayBuffer[ServletContextHandler]()
protected val pageToHandlers = new HashMap[WebUIPage, ArrayBuffer[ServletContextHandler]]
tabs是ui页面中标签栏的tab,handlers是请求request请求处理对象,pageToHandlers是page于ServletContextHandler之间的对应关系,这样就建立的url于handlers之间的映射。当SparkUI对象调用initialize()方法时这三个数据结构会进行初始化,将WebUIPage的render函数变换成servlet。具体代码如下:
def initialize() {
attachTab(new JobsTab(this))
attachTab(stagesTab)
attachTab(new StorageTab(this))
attachTab(new EnvironmentTab(this))
attachTab(new ExecutorsTab(this))
attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"))
attachHandler(createRedirectHandler("/", "/jobs/", basePath = basePath))
attachHandler(ApiRootResource.getServletHandler(this))
// This should be POST only, but, the YARN AM proxy won't proxy POSTs
attachHandler(createRedirectHandler(
"/stages/stage/kill", "/stages/", stagesTab.handleKillRequest,
httpMethods = Set("GET", "POST")))
}
initialize() //创建SparkUI时就初始化
def attachTab(tab: WebUITab) {
tab.pages.foreach(attachPage)
tabs += tab
}
/** Attach a page to this UI. */
def attachPage(page: WebUIPage) {
val pagePath = "/" + page.prefix
val renderHandler = createServletHandler(pagePath,
(request: HttpServletRequest) => page.render(request), securityManager, conf, basePath)
val renderJsonHandler = createServletHandler(pagePath.stripSuffix("/") + "/json",
(request: HttpServletRequest) => page.renderJson(request), securityManager, conf, basePath)
attachHandler(renderHandler)
attachHandler(renderJsonHandler)
pageToHandlers.getOrElseUpdate(page, ArrayBuffer[ServletContextHandler]())
.append(renderHandler)
}
WebUITab,WebUIPage其实是对标签,url和页面html生成的封装。WebUITab靠字段prefix拼接成其包含的WebUIPage的url,WebUIPage的子类实现处理逻辑,生成html。
在SparkUI中添加新标签
理解了spark监控页面的逻辑,给其添加新的标签tab是比较简单的事情。我实现的例子非常简单,效果如下所示:
image.png
在spark源码中ui包下新建包yss,添加两个类YssPage, YssTab。
image.png
YssTab代码为:
private[ui] class YssTab(parent: SparkUI) extends SparkUITab(parent, "yss") {
attachPage(new YssPage(this))
}
YssPage 代码为:
private[ui] class YssPage(parent: YssTab) extends WebUIPage("") {
def render(request: HttpServletRequest): Seq[Node] = {
val content =
<div class="yss_tech">
Hello Yss
</div>
UIUtils.headerSparkPage("yss", content, parent)
}
}
最后在SparkUI类initialize()函数中加入:
attachTab(new YssTab(this))
写一个简单spark app,如最经典的WordCount,在ide运行即可在浏览器看到我们新添加的页面。在此基础上我们可以添加我们关心的监控信息到新页面。
valconf =newSparkConf().setMaster("local[1]").setAppName("wordcount")
valsc =newSparkContext(conf)
valline = sc.textFile("pom.xml")
line.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect().foreach(println)
Thread.sleep(5*60*1000)
sc.stop()
网友评论