美文网首页Spark学习笔记
Spark中的多任务处理

Spark中的多任务处理

作者: LestatZ | 来源:发表于2019-03-28 20:41 被阅读0次

Spark中的多任务处理

Spark的一个非常常见的用例是并行运行许多作业。 构建作业DAG后,Spark将这些任务分配到多个Executor上并行处理。
但这并不能帮助我们在同一个Spark应用程序中同时运行两个完全独立的作业,例如同时从多个数据源读取数据并将它们写到对应的存储,或同时处理多个文件等。

每个spark应用程序都需要一个SparkSession(Context)来配置和执行操作。 SparkSession对象是线程安全的,可以根据需要传递给你的Spark应用程序。

一个顺序作业的例子

假设我们有一个spark 2.x应用程序,负责将几个数据写入到HDFS中。

import org.apache.spark.sql.SparkSession

object FancyApp {
  def appMain(args: Array[String]) = {
    // configure spark
    val spark = SparkSession
        .builder
        .appName("parjobs")
        .getOrCreate()

    val df = spark.sparkContext.parallelize(1 to 100).toDF
    doFancyDistinct(df, "hdfs:///dis.parquet")
    doFancySum(df, "hdfs:///sum.parquet")
  }

  def doFancyDistinct(df: DataFrame, outPath: String) = df.distinct.write.parquet(outPath)
  
  def doFancySum(df: DataFrame, outPath: String) = df.agg(sum("value")).write.parquet(outPath)

}

这个程序看起来没有什么问题,Spark将按顺序执行两个动作。但这两个动作是独立, 我们可以同时执行它们。

一个有缺陷的并发作业的例子

如果你快速的在网上搜索一下 “scala异步编程”,你就会被引到Scala Future这个解决方案中。
例如以下为一个并行处理RDD的例子:


import scala.concurrent._
import ExecutionContext.Implicits.global

def pipeline(f: String, n: Int) = {
    sqlContext
        .read
        .format("com.databricks.spark.csv")
        .option("header", "true")
        .load(f)
        .repartition(n)
        .groupBy(...)
        .agg(...)
        .cache // Cache so we can force computation later
}
val n: Int = 2 
val files: Array[String] = ['/tmp/test1.csv','/tmp/test2.csv']

val rdds = files.map(f => pipeline(f, n))

def pipelineToFuture(df: org.apache.spark.sql.DataFrame) = Future {
    df.rdd.foreach(_ => ()) // Force computation
    df
}

val result = Future.sequence(
   rdds.map(rdd => pipelineToFuture(rdd)).toList
)

我们只要根据搜索到的文档中提供的例子修改一下,就会得到以下类似内容:

import org.apache.spark.sql.SparkSession
import scala.concurrent._
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global

object FancyApp {
  def appMain(args: Array[String]) = {
    // configure spark
    val spark = SparkSession
        .builder
        .appName("parjobs")
        .getOrCreate()

    val df = spark.sparkContext.parallelize(1 to 100).toDF
    val taskA = doFancyDistinct(df, "hdfs:///dis.parquet")
    val taskB = doFancySum(df, "hdfs:///sum.parquet")
    // Now wait for the tasks to finish before exiting the app
    Await.result(Future.sequence(Seq(taskA,taskB)), Duration(1, MINUTES))
  }

  def doFancyDistinct(df: DataFrame, outPath: String) = Future { df.distinct.write.parquet(outPath) }

  def doFancySum(df: DataFrame, outPath: String) = Future { df.agg(sum("value")).write.parquet(outPath) }
}

ExecutionContext是用于==管理并行操作的Context==。 实际的线程模型可以由开发者明确提供,也可以使用全局默认值(这是一个 ForkJoinPool ),就像我们在上面的代码中使用的一样:

import scala.concurrent.ExecutionContext.Implicits.global

使用Global execution context 的问题在于它并不知道我们是在群集上启动Spark作业。 默认情况下,Global execution context 提供==与运行代码的系统中的处理器相同数量的线程==。 在我们的Spark应用程序中,它将与Driver上的处理器相同数量的线程。

一个优化过的并发作业的例子

我们需要控制我们的线程策略,更一般化地编写我们的程序,以便可以在不同的线程模型中重用它们。

例如以下是我们从重写的函数,它将允许我们精确控制execution context 来管理调用函数时提供的线程数。 例子中添加的隐式参数将允许调用的代码指定运行函数时使用哪个ExecutionContext。

def doFancyDistinct(df: DataFrame, outPath: String)(implicit xc: ExecutionContext) = Future {
  df.distinct.write.parquet(outPath)
}

现在让我们提出一个比默认的Global execution context更好的策略。我们希望能够指定我们想要的并行度。

import org.apache.spark.sql.SparkSession
import import java.util.concurrent.Executors
import scala.concurrent._
import scala.concurrent.duration._

object FancyApp {
  def appMain(args: Array[String]) = {
    // configure spark
    val spark = SparkSession
        .builder
        .appName("parjobs")
        .getOrCreate()

    // Set number of threads via a configuration property
    val pool = Executors.newFixedThreadPool(5)
    // create the implicit ExecutionContext based on our thread pool
    implicit val xc = ExecutionContext.fromExecutorService(pool)
    val df = spark.sparkContext.parallelize(1 to 100).toDF
    val taskA = doFancyDistinct(df, "hdfs:///dis.parquet")
    val taskB = doFancySum(df, "hdfs:///sum.parquet")
    // Now wait for the tasks to finish before exiting the app
    Await.result(Future.sequence(Seq(taskA,taskB)), Duration(1, MINUTES))
  }

  def doFancyDistinct(df: DataFrame, outPath: String)(implicit xc: ExecutionContext) = Future {
    df.distinct.write.parquet(outPath)
  }

  def doFancySum(df: DataFrame, outPath: String)(implicit xc: ExecutionContext) = Future {
    df.agg(sum("value")).write.parquet(outPath) 
  }
}

在这个例子中,我们定义了Execution context变量xc,含有五个线程。

参考资料

Spark Parallel Job Execution
How to run concurrent jobs(actions) in Apache Spark using single spark context
Processing multiple files as independent RDD's in parallel

相关文章

  • Spark中的多任务处理

    Spark中的多任务处理 Spark的一个非常常见的用例是并行运行许多作业。 构建作业DAG后,Spark将这些任...

  • iOS11人机交互指南(四)- System Capabilit

    一、多任务处理(Multitasking) 多任务处理让你能够通过iOS设备上的多任务处理界面或是在iPad上使用...

  • spark 基础知识整理(三)- spark SQL专题

    一、简介 Spark SQL是Spark中处理结构化数据的模块。与基础的Spark RDD API不同,Spark...

  • Spark JDBC系列--Mysql tinyInt字段特殊处

    本文旨在介绍 Spark 读取tinyInt字段时,如何处理精度损失的情况 MySQLDialect spark中...

  • 《超级高效工作术》核心整理(二)

    看我的整理,就是节省你的时间 多任务处理 1.什么是多任务处理? “多任务处理”原本是一个计算机术语,指的是计算机...

  • 脑壳累

    脑中目标列表中的任务太多,系统因为多任务处理而宕机

  • Android线程池的使用

    前言  多任务处理在现实开发场景中已经无处不在,通过多任务处理可以将计算机性能更大程度的发挥出来,避免处于空闲状态...

  • Hadoop和Spark在业务中的比较

    直接比较Hadoop和Spark有难度,因为它们处理的许多任务都一样,但是在一些方面又并不相互重叠。 比如说,Sp...

  • 重拾Java(4)-线程

    一、概述 Java对多线程编程提供了内置支持,多线程是特殊形式的多任务处理,所有现代系统都支持多任务处理。多任务处...

  • 网新关键词

    第一章 数字足迹 指一个特定的人在互联网上的活动信息 多任务处理 多任务处理是指32位windows操作系统中,多...

网友评论

    本文标题:Spark中的多任务处理

    本文链接:https://www.haomeiwen.com/subject/dkxhbqtx.html