美文网首页pyspark学习
PySpark NoteBook-11. Perform Dat

PySpark NoteBook-11. Perform Dat

作者: 7125messi | 来源:发表于2018-01-14 15:18 被阅读174次

    准备工作

    这里使用PostgreSQL数据库,使用Shell控制台模式造些数据,实际开发过程中,我们可以使用第三方工具例如Navicat Premium去方便的管理这些数据。

    进入postgresql数据库服务器
    $ sudo -u postgres psql

    创建数据库:pysparkdb
    postgres=# create database pysparkbookdb;

    查看已有的数据库名称
    postgres=# SELECT datname FROM pg_database;
    Outcome :
    datname


    template1
    template0
    postgres
    metastore
    pymetastore
    pysparkdb
    (6 rows)

    连接数据库pysparkdb
    postgres=# \c pysparkdb

    创建表
    pysparkdb=# create table studentTable(
    pysparkdb(# studentID char(50) not null,
    pysparkdb(# name char(50) not null,
    pysparkdb(# gender char(5) not null
    pysparkdb(# );

    使用studentTable表
    pysparkdb=# \d studentTable
    Table "public.studenttable"
    Column | Type | Modifiers
    -----------+---------------+-----------
    studentid | character(50) | not null
    name | character(50) | not null
    gender | character(5) | not null

    插入studentTable表数据
    insert into studentTable values ('si1', 'Robin', 'M');
    insert into studentTable values ('si2', 'Maria', 'F');
    insert into studentTable values ('si3', 'Julie', 'F');
    insert into studentTable values ('si4', 'Bob', 'M');
    insert into studentTable values ('si6','William','M');

    查询studentTable表内容
    pysparkdb=# select * from studentTable;
    studentid | name | gender
    ------------------------+---------------------------------- +--------
    si1 | Robin | M
    si2 | Maria | F
    si3 | Julie | F
    si4 | Bob | M
    si6 | William | M

    我们知道,我们有我们的学生数据在PostgreSQL服务器的表中。 我们需要阅读来自studentTable的数据,它位于pysparkdb数据库中。 为了连接
    PySpark到PostgreSQL服务器,我们需要一个数据库JDBC连接器。 我们准备去通过使用以下命令启动我们的PySpark Shell(在启动PySpark Shell的同时也启动了连接器JDBC的jar):

    pyspark --driver-class-path  .ivy2/jars/org.postgresql_postgresql-9.4.1212.jar --packages org.postgresql:postgresql:9.4.1212
    

    1 : Reading student data table from PostgreSQL data base..

    dbURL="jdbc:postgresql://192.168.111.106/pysparkdb?user=postgres&password="
    
    studentsDataFrame = spark.read.format('jdbc').options(
                                                               url = dbURL,
                                                               database='pysparkdb',
                                                               dbtable='studenttable'
                                                  ).load()
    
    studentsDataFrame.show()
    +--------------------+--------------------+------+
    | studentid| name|gender|
    +--------------------+--------------------+------+
    |si1 ...|Robin ...| M |
    |si2 ...|Maria ...| F |
    |si3 ...|Julie ...| F |
    |si4 ...|Bob ...| M |
    |si6 ...|William ...| M |
    +--------------------+--------------------+------+
    

    我们有我们需要的学生DataFrame。 但是你看到我们的问题吗?你在我们的DataFrame中看到(...)吗? 我们删除它们。 我们可以通过使用trim()函数将其应用于列来实现。要修剪字符串,我们必须导入trim()函数。 这个功能在子模块pyspark.sql.functions。 导入trim()函数后,我们可以使用它从我们的列删除点如下:

    from pyspark.sql.functions import trim
    studentsDataFrame = studentsDataFrame.select(trim(studentsDataFrame.studentid),trim(studentsDataFrame.name),studentsDataFrame.gender)
    
    studentsDataFrame.show()
    +---------------+----------+------+
    |trim(studentid)|trim(name)|gender|
    +---------------+----------+------+
    | si1| Robin| M |
    | si2| Maria| F |
    | si3| Julie| F |
    | si4| Bob| M |
    | si6| William| M |
    +---------------+----------+------+
    
    studentsDataFrame = studentsDataFrame.withColumnRenamed('trim(studentid)', 'studentID').withColumnRenamed('trim(name)','Name').withColumnRenamed('gender', 'Gender')
    studentsDataFrame.printSchema()
    
    root
    |-- studentID: string (nullable = false)
    |-- Name: string (nullable = false)
    |-- Gender: string (nullable = false)
    
    studentsDataFrame.show()
    
    +---------+-------+------+
    |studentID| Name|Gender|
    +---------+-------+------+
    | si1| Robin| M |
    | si2| Maria| F |
    | si3| Julie| F |
    | si4| Bob| M |
    | si6|William| M |
    +---------+-------+------+
    

    PySpark 连接Oracle

    首先需要去Oracle官网下载对应版本的驱动,例如:ojdbc6-11.2.0.3.jar ,提交时加上
    --jars "/data/spark/ojdbc6-11.2.0.3.jar" 参数

    spark-submit --jars "/opt/spark/ojdbc6-11.2.0.3.jar" pyspark_oracle.py
    
    df = sqlContext.read.format("jdbc").options(url="jdbc:oracle:thin:@192.168.145.108:1521:orcl"
                                       , driver = "oracle.jdbc.driver.OracleDriver"
                                       , dbtable = "TBL_CELLPHONE_SALES"
                                       , user="cellphone"
                                       , password="cellphone").load()
    print(df.show(10,truncate=Flase))
    

    2 : Reading subject data from a JSON file.

    subjectsDataFrame = sqlContext.read.format("json").load('/home/pysparkbook/pysparkBookData/subjects.json')
    subjectsDataFrame.show()
    
    +---------+-------+
    |studentID|subject|
    +---------+-------+
    | si1| Python|
    | si3| Java|
    | si1| Java|
    | si2| Python|
    | si3| Ruby|
    | si4| C++|
    | si5| C|
    | si4| Python|
    | si2| Java|
    +---------+-------+
    
    subjectsDataFrame.printSchema()
    
    root
    |-- studentID: string (nullable = true)
    |-- subject: string (nullable = true)
    

    3 : Performing inner join on DataFrames.

    joinedDataInner = subjectsDataFrame.join(studentsDataFrame, subjectsDataFrame.studentID==studentsDataFrame.studentID, how='inner')
    
    joinedDataInner.show()
    +---------+-------+---------+-----+------+
    |studentID|subject|studentID| Name|Gender|
    +---------+-------+---------+-----+------+
    | si1| Java| si1|Robin| M |
    | si1| Python| si1|Robin| M |
    | si2| Java| si2|Maria| F |
    | si2| Python| si2|Maria| F |
    | si3| Ruby| si3|Julie| F |
    | si3| Java| si3|Julie| F |
    | si4| Python| si4| Bob| M |
    | si4| C++| si4| Bob| M |
    +---------+-------+---------+-----+------+
    

    4 : Saving inner joined DataFrame as JSON file.

    joinedDataInner = joinedDataInner.select(subjectsDataFrame.studentID,'subject', 'Name', 'Gender')
    joinedDataInner.columns
    joinedDataInner.write.format('json').save('/home/muser/innerJoinedTable')
    

    5 : Performing left outer join.

    joinedDataLeftOuter = subjectsDataFrame.join(studentsDataFrame, subjectsDataFrame.studentID==studentsDataFrame.studentID, how='left_outer')
    joinedDataLeftOuter.show()
    

    6 : Saving left outer joined DataFrame into PostgreSQL.

    joinedDataLeftOuter = joinedDataLeftOuter.select(subjectsDataFrame.studentID,'subject', 'Name', 'Gender')
    
    props = { 'user' : 'postgres', 'password' : '' }
    joinedDataLeftOuter.write.jdbc(
                                     url   = dbURL,
                                     table = 'joineddataleftoutertable',
                                     mode  = 'overwrite',
                                     properties = props
                                    )
    

    7 : Performing right outer join.

    joinedDataRightOuter = subjectsDataFrame.join(studentsDataFrame, subjectsDataFrame.studentID==studentsDataFrame.studentID, how='right_outer')
    joinedDataRightOuter.show()
    

    8 : Performing full outer join.

    joinedDataOuter = subjectsDataFrame.join(studentsDataFrame, subjectsDataFrame.studentID==studentsDataFrame.studentID, how='outer')
    joinedDataOuter.show()
    

    相关文章

      网友评论

        本文标题:PySpark NoteBook-11. Perform Dat

        本文链接:https://www.haomeiwen.com/subject/uhvkoxtx.html