美文网首页大数据 爬虫Python AI Sql大数据机器学习与数据挖掘
如何应对Spark-Redis行海量数据插入、查询作业时碰到的问

如何应对Spark-Redis行海量数据插入、查询作业时碰到的问

作者: 华为云开发者联盟 | 来源:发表于2020-11-28 14:26 被阅读0次

摘要:由于redis是基于内存的数据库,稳定性并不是很高,尤其是standalone模式下的redis。于是工作中在使用Spark-Redis时也会碰到很多问题,尤其是执行海量数据插入与查询的场景中。

海量数据查询

Redis是基于内存读取的数据库,相比其它的数据库,Redis的读取速度会更快。但是当我们要查询上千万条的海量数据时,即使是Redis也需要花费较长时间。这时候如果我们想要终止select作业的执行,我们希望的是所有的running task立即killed。

Spark是有作业调度机制的。SparkContext是Spark的入口,相当于应用程序的main函数。SparkContext中的cancelJobGroup函数可以取消正在运行的job。

/**

  * Cancel active jobs for the specified group. See `org.apache.spark.SparkContext.setJobGroup`

  * for more information.

  */

def cancelJobGroup(groupId: String) {

  assertNotStopped()

  dagScheduler.cancelJobGroup(groupId)

}

按理说取消job之后,job下的所有task应该也终止。而且当我们取消select作业时,executor会throw TaskKilledException,而这个时候负责task作业的TaskContext在捕获到该异常之后,会执行killTaskIfInterrupted。

// If this task has been killed before we deserialized it, let's quit now. Otherwise,

// continue executing the task.

val killReason = reasonIfKilled

if (killReason.isDefined) {

  // Throw an exception rather than returning, because returning within a try{} block

  // causes a NonLocalReturnControl exception to be thrown. The NonLocalReturnControl

  // exception will be caught by the catch block, leading to an incorrect ExceptionFailure

  // for the task.

  throw new TaskKilledException(killReason.get)

}

/**

* If the task is interrupted, throws TaskKilledException with the reason for the interrupt.

*/

private[spark] def killTaskIfInterrupted(): Unit

但是Spark-Redis中还是会出现终止作业但是task仍然running。因为task的计算逻辑最终是在RedisRDD中实现的,RedisRDD的compute会从Jedis中取获取keys。所以说要解决这个问题,应该在RedisRDD中取消正在running的task。这里有两种方法:

方法一:参考Spark的JDBCRDD,定义close(),结合InterruptibleIterator。

def close() {

  if (closed) return

  try {

    if (null != rs) {

      rs.close()

    }

  } catch {

    case e: Exception => logWarning("Exception closing resultset", e)

  }

  try {

    if (null != stmt) {

      stmt.close()

    }

  } catch {

    case e: Exception => logWarning("Exception closing statement", e)

  }

  try {

    if (null != conn) {

      if (!conn.isClosed && !conn.getAutoCommit) {

        try {

          conn.commit()

        } catch {

          case NonFatal(e) => logWarning("Exception committing transaction", e)

        }

      }

      conn.close()

    }

    logInfo("closed connection")

  } catch {

    case e: Exception => logWarning("Exception closing connection", e)

  }

  closed = true

}

context.addTaskCompletionListener{ context => close() }

CompletionIterator[InternalRow, Iterator[InternalRow]](

  new InterruptibleIterator(context, rowsIterator), close())

方法二:异步线程执行compute,主线程中判断task isInterrupted

try{

  val thread = new Thread() {

    override def run(): Unit = {

      try {

          keys = doCall

      } catch {

        case e =>

          logWarning(s"execute http require failed.")

      }

      isRequestFinished = true

    }

  }

  // control the http request for quite if user interrupt the job

  thread.start()

  while (!context.isInterrupted() && !isRequestFinished) {

    Thread.sleep(GetKeysWaitInterval)

  }

  if (context.isInterrupted() && !isRequestFinished) {

    logInfo(s"try to kill task ${context.getKillReason()}")

    context.killTaskIfInterrupted()

  }

  thread.join()

  CompletionIterator[T, Iterator[T]](

    new InterruptibleIterator(context, keys), close)

我们可以异步线程来执行compute,然后在另外的线程中判断是否task isInterrupted,如果是的话就执行TaskContext的killTaskIfInterrupted。防止killTaskIfInterrupted无法杀掉task,再结合InterruptibleIterator:一种迭代器,以提供任务终止功能。通过检查[TaskContext]中的中断标志来工作。

海量数据插入

我们都已经redis的数据是保存在内存中的。当然Redis也支持持久化,可以将数据备份到硬盘中。当插入海量数据时,如果Redis的内存不够的话,很显然会丢失部分数据。这里让使用者困惑的点在于: 当Redis已使用内存大于最大可用内存时,Redis会报错:command not allowed when used memory > ‘maxmemory’。但是当insert job的数据大于Redis的可用内存时,部分数据丢失了,并且还没有任何报错。

因为不管是Jedis客户端还是Redis服务器,当插入数据时内存不够,不会插入成功,但也不会返回任何response。所以目前能想到的解决办法就是当insert数据丢失时,扩大Redis内存。

总结

Spark-Redis是一个应用还不是很广泛的开源项目,不像Spark JDBC那样已经商业化。所以Spark-Redis还是存在很多问题。相信随着commiter的努力,Spark-Redis也会越来越强大。

相关文章

  • 如何应对Spark-Redis行海量数据插入、查询作业时碰到的问

    摘要:由于redis是基于内存的数据库,稳定性并不是很高,尤其是standalone模式下的redis。于是工作中...

  • Hbase初窥

    Hbase能做什么 海量数据的存储 海量数据的查询 企业数据海量查询 项目需求功能 海量数据 实时查询 场景复杂 ...

  • SQL入门笔记(中)

    主要内容:查询、联结、表与行列的增删 子查询 内联结 其他联结方式 复合查询 插入(行)数据 篡改和删除(行)数据...

  • arango增删改查

    arango实践 插入数据 修改数据 插入数据 查询数据 复杂查询 多表查询

  • arango的AQL

    arango实践 插入数据模板 修改数据模板 插入数据 查询数据 复杂查询 多表查询 图查询

  • mysql存储过程

    一、执行过程1、创建数据库表 2、写入存储过程 3、执行 [mysql 存储过程海量数据写入和查询] 循环插入10...

  • 笔记-MySQL常用CRUD

    1.插入数据: .插入单行: .插入多行: 2.删除数据: 3.更新数据: 4.查询: .普通查询 .分组查询记录...

  • 分区

    简介 在面对海量数据存储(需要大的存储空间)和海量查询(需要高并发的查询)时,单服务器的数据存储模型无法提供高可用...

  • PostgreSQL 数据库查询

    1. 插入数据(INSERT语句) 在 PostgreSQL 中, INSERT 查询用于在表中插入新行。 您可以...

  • python——mysql数据库基础

    通过终端如何进入/退出mysql 创建表 数据操作 查询: 消除重复 where 逐个匹配每一行 增加:全列插入 ...

网友评论

    本文标题:如何应对Spark-Redis行海量数据插入、查询作业时碰到的问

    本文链接:https://www.haomeiwen.com/subject/ewgqwktx.html