美文网首页大数据
py4j 原理与pyspark 交互

py4j 原理与pyspark 交互

作者: lmy_8db4 | 来源:发表于2018-09-28 18:50 被阅读0次

    python 如何和java 的JVM通信 最简单的就是RPC. JVM 作为RPC的服务端, python app 作为RPC 的客户端. JVM 会开启一个Socket 端口提供服务, python app 只需要调用py4j 提供的client 的接口即可. (需要指出py4j 并不会启动一个JVM, 需要java程序)

    下面我们简单介绍一下py4j 的安装与使用

    Install

    1. conda install py4j
    2. 找到py4j 的jar包 ${HOME}/anaconda3/share/py4j/py4j0.10.7.jar

    执行Java 程序

    touch AdditionApplication.java

    import py4j.GatewayServer;
    
    public class AdditionApplication {
    
      public int addition(int first, int second) {
        return first + second;
      }
    
      public static void main(String[] args) {
        // 熟悉的rpc registe 过程
        AdditionApplication app = new AdditionApplication();
    
        GatewayServer server = new GatewayServer(app);
        server.start();
      }
    }
    

    编译AdditionApplication.java

    javac -cp ${HOME}/anaconda3/share/py4j/py4j0.10.7.jar  AdditionApplication.java
    java -cp ${HOME}/anaconda3/share/py4j/py4j0.10.7.jar  AdditionApplication
    jps # 查看java 程序
    lsof -p <pid> # 会看到 socket 25333 (LISTEN)
    
    

    运行python

    >>> from py4j.java_gateway import JavaGateway
    >>> gateway = JavaGateway()                   # connect to the JVM
    >>> random = gateway.jvm.java.util.Random()   # create a java.util.Random instance
    >>> number1 = random.nextInt(10)              # call the Random.nextInt method
    >>> number2 = random.nextInt(10)
    >>> print(number1, number2)
    (2, 7)
    >>> addition_app = gateway.entry_point               # get the AdditionApplication instance
    >>> gateway.help(gateway.jvm.AdditionApplication)
    >>> value = addition_app.addition(number1, number2)) # call the addition method
    >>> print(value)
    9
    >>> gateway.help(addition_app)                   # help
    
    >>> from py4j.java_gateway import java_import
    >>> java_import(gateway.jvm,'java.util.*')
    >>> jList = gateway.jvm.ArrayList()
    >>> gateway.help(jList)
    >>> addition_app
    
    

    Gateway Server创建的任意对象都会携带由服务端生成的唯一的对象id,服务端会将生成的所有对象装在一个Map结构里。当Python客户端需要操纵远程对象时,会将对象id和操纵指令以及参数一起传递到服务端,服务端根据对象id找到对应的对象,然后使用反射方法执行指令。

    Py4J Memory model

    Every time a Java object is sent to the Python side, a reference to the object is kept on the Java side (in the Gateway class). Once the object is garbage collected on the Python VM (reference count == 0), the reference is removed on the Java VM: if this was the last reference, the object will likely be garbage collected too. When a gateway is shut down, the remaining references are also removed on the Java VM.

    Because Java objects on the Python side are involved in a circular reference (JavaObject and JavaMember reference each other), these objects are not immediately garbage collected once the last reference to the object is removed (but they are guaranteed to be eventually collected if the Python garbage collector runs before the Python program exits).

    In doubt, users can always call the detach function on the Python gateway to explicitly delete a reference on the Java side. A call to gc.collect() also usually works.

    PySpark 与py4j

    class SparkSession(object):
        @ignore_unicode_prefix
        def __init__(self, sparkContext, jsparkSession=None):
                from pyspark.sql.context import SQLContext
                self._sc = sparkContext
                self._jsc = self._sc._jsc
                self._jvm = self._sc._jvm
                if jsparkSession is None:
                    jsparkSession = self._jvm.SparkSession(self._jsc.sc())
                self._jsparkSession = jsparkSession
                self._jwrapped = self._jsparkSession.sqlContext()
                self._wrapped = SQLContext(self._sc, self, self._jwrapped)
                _monkey_patch_RDD(self)
                install_exception_handler()
    

    SparkSession 生成流程

    SparkSession.builder.getOrCreate()
    def getOrCreate():
        with self._lock:
                    from pyspark.context import SparkContext
                    from pyspark.conf import SparkConf
                    session = SparkSession._instantiatedSession
                    if session is None or session._sc._jsc is None:
                        sparkConf = SparkConf()
                        for key, value in self._options.items():
                            sparkConf.set(key, value)
                        sc = SparkContext.getOrCreate(sparkConf)
                        # This SparkContext may be an existing one.
                        for key, value in self._options.items():
                            # we need to propagate the confs
                            # before we create the SparkSession. Otherwise, confs like
                            # warehouse path and metastore url will not be set correctly (
                            # these confs cannot be changed once the SparkSession is created).
                            sc._conf.set(key, value)
                        session = SparkSession(sc)
                    for key, value in self._options.items():
                        session._jsparkSession.sessionState().conf().setConfString(key, value)
                    for key, value in self._options.items():
                        session.sparkContext._conf.set(key, value)
                    return session
    

    从spark._jvm 获取gateway

    >>> from py4j.java_gateway import JavaGateway
    >>> g = JavaGateway(spark._jvm._gateway_client)
    >>> g.help(spark._jsc)

    相关文章

      网友评论

        本文标题:py4j 原理与pyspark 交互

        本文链接:https://www.haomeiwen.com/subject/bzupoftx.html