美文网首页
pyspark+多线程提高并发度

pyspark+多线程提高并发度

作者: 高稚商de菌 | 来源:发表于2019-12-21 20:04 被阅读0次

    最近写了一个spark项目,先用spark.sql执行多个小任务,最后将所有小任务的结果集中处理,非常耗时。
    通过增加executor的数量和配置,并不能提高效率。从时间上来看,耗时基本相当于所有任务串行执行。从spark的框架UI上,能看到资源利用率其实很低,stage很多,应该是sql用了比较多的join。作为一个spark小白玩家,我并没有找到优化的线索。
    于是想到能不能在spark中引入多线程,并发处理多个小任务,提高并发度。于是最终用了如下一段代码:

    spark = SparkSession.builder.enableHiveSupport().getOrCreate()
    df_0 = spark.sql(sql_0)
    df_0.cache()
    df_0.createOrReplaceTempView("table_0") 
    df_0.show()
    
    def executor_run(sql):
          logging.info(sql)
          df = spark.sql(sql)
          df.cache()
          df.show()
          return df
    
    fs = dict()
    with ThreadPoolExecutor(20) as executor:
           for task_sql in task_sqls:
                task = executor.submit(executor_run, task_sql)
                fs[task] = task_sql
    
    dfs = list()
    for future in as_completed(fs, 7200):
         tmp = future.result()
         dfs.append(tmp)
         logging.info('thread return %s %s' % (fs[future], str(tmp)))
    
    df = reduce(lambda x,y:x.unionAll(y), dfs)
    # 这里略去后续的执行业务逻辑代码......
    for df in dfs:
          df.unpersist()
    df_0.unpersist()
    

    最终执行速度提高了5倍!stage的数量也大大减少了。
    这里有几个注意点:

    1. spark是惰性执行的。临时表table_0最好在线程池开始前用action(比如show)触发执行,以避免在线程池里执行。(如果在线程池里被触发执行会不会有问题,我没试过)
    2. 在executor_run也要用action触发执行。如果不触发,那么被正式执行时将会在多线程之外。这样就只有大概1倍的提升,猜测是在多线程中生成的DAG图,相比之前已经有了优化。
    3. 用多线程要注意提高driver的内存。我调试的过程中出现过java OOM。我最终用了16G的driver内存。

    相关文章

      网友评论

          本文标题:pyspark+多线程提高并发度

          本文链接:https://www.haomeiwen.com/subject/mwngnctx.html