美文网首页扣丁学堂Python培训
扣丁学堂解析PySpark如何设置worker的python命令

扣丁学堂解析PySpark如何设置worker的python命令

作者: 994d14631d16 | 来源:发表于2018-08-06 15:03 被阅读3次

      今天扣丁学堂Python培训老师给大家介绍一下关于研究spark-deep-learning项目,来回顾一下Pyspark的知识,下面我们要求来看那一下吧,希望能够对本文的读者有所帮助。

      问题描述

      关于PySpark的基本机制我就不讲太多,你google搜索“PySpark原理”就会有不少还不错的文章。我这次是遇到一个问题,因为我原先安装了python2.7,python3.6。后面为了方便我在我的电脑上使用virtualenv来做环境隔离,这个时候就发生一个比较诡异的事情:

      在driver端能够正常使用PIL图片处理模块,但是executor端则不行。那显然是我在~/.bash_profile的配置在executor启动pythonworker时没有生效,程序依然走了我早先安装的python2.7,而早先的2.7里我没有安装PIL。那么应该怎么解决这个问题呢?

      Python里的RDD和JVM的RDD如何进行关联

      要解答上面的问题,核心是要判定JVM里的PythonRunner启动pythonworker时,python的地址是怎么指定的。

      我们以pythonrdd里的map作为起点,

      defmap(self,f,preservesPartitioning=False):

      deffunc(_,iterator):

      returnmap(f,iterator)

      returnself.mapPartitionsWithIndex(func,preservesPartitioning)

      进入self.mapPartitionsWithIndex:

      defmapPartitionsWithIndex(self,f,preservesPartitioning=False):

      returnPipelinedRDD(self,f,preservesPartitioning)

      可以看到PipelinedRDD,进入PipelinedRDD._jrdd里,可以看到:

      wrapped_func=_wrap_function(self.ctx,self.func,self._prev_jrdd_deserializer,

      self._jrdd_deserializer,profiler)

      python_rdd=self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(),wrapped_func,

      self.preservesPartitioning)

      self._jrdd_val=python_rdd.asJavaRDD()

      这里和JVM里的PythonRDD建立了联系。进入_wrap_function:

      pickled_command,broadcast_vars,env,includes=_prepare_for_python_RDD(sc,command)

      returnsc._jvm.PythonFunction(bytearray(pickled_command),env,includes,sc.pythonExec,sc.pythonVer,broadcast_vars,sc._javaAccumulator)

      我们看到了sc.pythonExec对象,这个是传入到PythonRDD里的python命令。为了看的更清楚,我们看看sc.pythonExec的申明:

      self.pythonExec=os.environ.get("PYSPARK_PYTHON",'python')

      也就是你在很多文档中看到的,通过设置PYSPARK_PYTHON变量来设置启用哪个python。那么pythonExec是JVM里是怎么用的呢?

      private[spark]classPythonRDD(

      parent:RDD[_],

      func:PythonFunction,

      preservePartitoning:Boolean)

      extendsRDD[Array[Byte]](parent){

      PythonRDD是在python中通过_jvm对象在JVM里创建的,里面哟给重要的对象是PythonFunction,这个PythonFunction就是wrapped_func,wrapped_func里包含了env,pythonExec等。PythonRDD的compute方法里会调用PythonRunner的compute方法:

      valrunner=PythonRunner(func,bufferSize,reuse_worker)

      runner.compute(firstParent.iterator(split,context),split.index,context)

      上面的func其实就是PythonFunction,在PythonRunner里你可以看到:

      //AllthePythonfunctionsshouldhavethesameexec,versionandenvvars.

      privatevalenvVars=funcs.head.funcs.head.envVars

      privatevalpythonExec=funcs.head.funcs.head.pythonExec

      privatevalpythonVer=funcs.head.funcs.head.pythonVer

      三个变量的申明,具体使用在这:

      valworker:Socket=env.createPythonWorker(pythonExec,envVars.asScala.toMap)

      这里通过pythonRunner运行启动pythonworker。

      额外福利:Python如何启动JVM,从而启动Spark

      建议配置一套spark的开发环境,然后debug进行跟踪。Python启动时,首先启动SparkContext(context.py),在init方法里会_ensure_initialized方法确保Java里的SparkContext被初始化:

      @classmethod

      def_ensure_initialized(cls,instance=None,gateway=None,conf=None):

      withSparkContext._lock:

      ifnotSparkContext._gateway:

      SparkContext._gateway=gatewayorlaunch_gateway(conf)

      SparkContext._jvm=SparkContext._gateway.jvm

      初始时会调用lauch_gateway(java_gateway.py),该方法首先会到环境变量里找SPARK_HOME,然后使用里面的./bin/spark-submit进行Spark的启动,通过环境变量中的PYSPARK_SUBMIT_ARGS获取一些参数,默认是pyspark-shell,最后通过Popen启动Spark进程,返回一个JavaGateWay,之后持有这个对象,就可以向JVM发送指令了。

      解决问题

      有了上面的铺垫后,问题就变得很好解决了,下面的单元测试原先是跑步过去的

      deftest_readImages(self):

      #Testthatreading

      imageDF=imageIO._readImages("some/path",2,self.binaryFilesMock)

      self.assertTrue("image"inimageDF.schema.names)

      self.assertTrue("filePath"inimageDF.schema.names)

      #TheDFshouldhave2imagesand1null.

      self.assertEqual(imageDF.count(),3)

      validImages=imageDF.filter(col("image").isNotNull())

      self.assertEqual(validImages.count(),2)

      img=validImages.first().image

      self.assertEqual(img.height,array.shape[0])

      self.assertEqual(img.width,array.shape[1])

      self.assertEqual(imageIO.imageType(img).nChannels,array.shape[2])

      self.assertEqual(img.data,array.tobytes())

      现在我该如何让他通过呢?可以在setUp的时候添加

      importos

      os.environ["PYSPARK_PYTHON"]="your-python-path"

      即可。

    相关文章

      网友评论

        本文标题:扣丁学堂解析PySpark如何设置worker的python命令

        本文链接:https://www.haomeiwen.com/subject/rtcgvftx.html