随着SparkSql在大规模数据分析中的运用越来越广,在大数据分析平台中集成SparkSql提供用户交互式sql查询的功能已经成为了很多开发者的选择,而将SparkSql作为rest服务有两种方式: jobserver和livy;其中livy作为Apache的孵化项目,其良好的框架设计已经成为了spark rest服务的标准接口。
客户端与livy server交互的基本流程如下图所示:
我们在企业级的大数据平台中选择了livy作为我们的sql查询引擎,同时考虑到平台的需求,对livy的源码进行了深度定制,具体改动包括:
1. 原生的livy一次只能执行一条sql,后面提交的sql处于waiting状态,而平台的查询需要支持多条sql的并行查询,看下livy中查询的逻辑,在livy-repl模块的Session类中
其中的interpreterExecutor作为sql执行的线程池,其容量大小决定了一次能够并行运行的查询个数,源代码中容量为1,在这里将容量大小修改为可配置,具体的容量大小可以在配置文件中修改。
2. 原生的livy查询中日志只有session级别的,也即spark-submit的提交日志,然而该日志对用户来说意义不大,当用户执行一条sql查询时,用户关心的是当前查询的执行进度,为此我们对livy的查询日志这块做了修改,新增了查询statement的日志的接口/sessions/{sessionId}/log/statements/{statementId},在livy-reply的模块中,为了能够反映出当前sql的查询进度,我们通过继承了SparkListener接口来将spark执行状态中的关键信息获取到,并且通过Job0/Stage1(10/20)这样的方式打印出来。
获取执行进度的类 执行进度由上图可见,用户执行sql查询之后,下方打印出了执行进度,用户通过执行进度中的信息也可以初步看出读取的数据量大小。
3. 在livy的原生代码中,提交查询之前需要创建一个session,也就是启动一个driver,等driver启动好并且与本地客户端建立好连接,才能提交sql至该session中,session启动中的状态为starting,此时若提交sql,则会返回'session is in starting'错误;因此客户端需要不断轮询session的状态,直到session的状态变为running为止。为了提高用户的交互式体验,我们对这块的逻辑做了变动,当session创建成功之后就可以提交sql,但此时将sql缓存在livy server中,由livy server不断去轮询session的状态并且负责提交。
负责提交sql的线程方法在livy server中维护一个提交线程,不断去轮询session的状态并且负责提交sql;同时为了保证在livy server重启中待提交的sql不会丢失,我们注册一个钩子函数,在livy server进程关闭时保存未提交的sql至本地文件,启动后重新读取该文件中的sql内容。
在session启动时,由于sql并未执行,因此没有执行进度日志,但我们为了提高用户体验,在session启动时封装了虚拟的提交日志,如图所示:
启动日志提交日志中打印出了虚拟的提交进度信息,从而避免用户在没有任何信息的情况下等待。
4. 原生的livy中将查询中的结果缓存在内存之中,当查询的记录比较大时,过多的数据可能会造成内存的压力;对此,我们对查询结果的存储方式进行了改进;首先将结果数据缓存在内存之中,并且设置一个访问活跃程度,当每次有获取查询结果的请求时,活跃度加1;否则活跃度减1,当活跃度为0时,将内存中的数据写入到磁盘中,当有再次的数据访问时,重新将数据从磁盘加载到内存中,也即LRU的基本思路。
数据缓存功能5. 平台需要提供数据审计功能,即需要记录用户每条sql对应的查询类型、输入表、输出表等,方便后续进行查询,这里我们通过对sparksql的logicPlan进行解析,从而得到其输入表、输出表等,具体对logicPlan的解析过程可以参考这篇博客:https://blog.csdn.net/u012477420/article/details/80147793。
查询审计日志进一步,基于logicPlan结合DAG可以获取到当前查询输入的数据量和输出的数据量:
输入/输出数据量6. livy原生代码中,创建交互式session和batch session时,对每个提交的请求是直接创建本地的submit子进程,我们在进行k8s容器化部署时,发现如果同一时间内提交的submit请求过多时,本地的submit进程数量较多,会导致后续的提交出现OOM错误,为了适应容器化部署的特点,对其提交模块做了改进,收到提交请求时首先将请求存储到队列中,后台创建一个线程不断去消费队列中存储的信息,依次完成submit子进程的提交,从而避免一次性创建过多submit子进程导致OOM。
缓存提交请求7. 增加对flink的支持。livy的本质上是一个web service服务,为了支持对平台实时计算中flink任务的需求,我们仿照livy batch的代码逻辑编写了flink的提交逻辑:
flink的提交代码逻辑以上分析的是改动比较大的几个模块,还有一些细节优化不一一列举。基于平台的实际使用需求对livy源码做了定制,从而满足业务的需求。
网友评论