准备工作
这里使用PostgreSQL数据库,使用Shell控制台模式造些数据,实际开发过程中,我们可以使用第三方工具例如Navicat Premium去方便的管理这些数据。
进入postgresql数据库服务器
$ sudo -u postgres psql
创建数据库:pysparkdb
postgres=# create database pysparkbookdb;
查看已有的数据库名称
postgres=# SELECT datname FROM pg_database;
Outcome :
datname
template1
template0
postgres
metastore
pymetastore
pysparkdb
(6 rows)
连接数据库pysparkdb
postgres=# \c pysparkdb
创建表
pysparkdb=# create table studentTable(
pysparkdb(# studentID char(50) not null,
pysparkdb(# name char(50) not null,
pysparkdb(# gender char(5) not null
pysparkdb(# );
使用studentTable表
pysparkdb=# \d studentTable
Table "public.studenttable"
Column | Type | Modifiers
-----------+---------------+-----------
studentid | character(50) | not null
name | character(50) | not null
gender | character(5) | not null
插入studentTable表数据
insert into studentTable values ('si1', 'Robin', 'M');
insert into studentTable values ('si2', 'Maria', 'F');
insert into studentTable values ('si3', 'Julie', 'F');
insert into studentTable values ('si4', 'Bob', 'M');
insert into studentTable values ('si6','William','M');
查询studentTable表内容
pysparkdb=# select * from studentTable;
studentid | name | gender
------------------------+---------------------------------- +--------
si1 | Robin | M
si2 | Maria | F
si3 | Julie | F
si4 | Bob | M
si6 | William | M
我们知道,我们有我们的学生数据在PostgreSQL服务器的表中。 我们需要阅读来自studentTable的数据,它位于pysparkdb数据库中。 为了连接
PySpark到PostgreSQL服务器,我们需要一个数据库JDBC连接器。 我们准备去通过使用以下命令启动我们的PySpark Shell(在启动PySpark Shell的同时也启动了连接器JDBC的jar):
pyspark --driver-class-path .ivy2/jars/org.postgresql_postgresql-9.4.1212.jar --packages org.postgresql:postgresql:9.4.1212
1 : Reading student data table from PostgreSQL data base..
dbURL="jdbc:postgresql://192.168.111.106/pysparkdb?user=postgres&password="
studentsDataFrame = spark.read.format('jdbc').options(
url = dbURL,
database='pysparkdb',
dbtable='studenttable'
).load()
studentsDataFrame.show()
+--------------------+--------------------+------+
| studentid| name|gender|
+--------------------+--------------------+------+
|si1 ...|Robin ...| M |
|si2 ...|Maria ...| F |
|si3 ...|Julie ...| F |
|si4 ...|Bob ...| M |
|si6 ...|William ...| M |
+--------------------+--------------------+------+
我们有我们需要的学生DataFrame。 但是你看到我们的问题吗?你在我们的DataFrame中看到(...)吗? 我们删除它们。 我们可以通过使用trim()函数将其应用于列来实现。要修剪字符串,我们必须导入trim()函数。 这个功能在子模块pyspark.sql.functions。 导入trim()函数后,我们可以使用它从我们的列删除点如下:
from pyspark.sql.functions import trim
studentsDataFrame = studentsDataFrame.select(trim(studentsDataFrame.studentid),trim(studentsDataFrame.name),studentsDataFrame.gender)
studentsDataFrame.show()
+---------------+----------+------+
|trim(studentid)|trim(name)|gender|
+---------------+----------+------+
| si1| Robin| M |
| si2| Maria| F |
| si3| Julie| F |
| si4| Bob| M |
| si6| William| M |
+---------------+----------+------+
studentsDataFrame = studentsDataFrame.withColumnRenamed('trim(studentid)', 'studentID').withColumnRenamed('trim(name)','Name').withColumnRenamed('gender', 'Gender')
studentsDataFrame.printSchema()
root
|-- studentID: string (nullable = false)
|-- Name: string (nullable = false)
|-- Gender: string (nullable = false)
studentsDataFrame.show()
+---------+-------+------+
|studentID| Name|Gender|
+---------+-------+------+
| si1| Robin| M |
| si2| Maria| F |
| si3| Julie| F |
| si4| Bob| M |
| si6|William| M |
+---------+-------+------+
PySpark 连接Oracle
首先需要去Oracle官网下载对应版本的驱动,例如:ojdbc6-11.2.0.3.jar ,提交时加上
--jars "/data/spark/ojdbc6-11.2.0.3.jar" 参数
spark-submit --jars "/opt/spark/ojdbc6-11.2.0.3.jar" pyspark_oracle.py
df = sqlContext.read.format("jdbc").options(url="jdbc:oracle:thin:@192.168.145.108:1521:orcl"
, driver = "oracle.jdbc.driver.OracleDriver"
, dbtable = "TBL_CELLPHONE_SALES"
, user="cellphone"
, password="cellphone").load()
print(df.show(10,truncate=Flase))
2 : Reading subject data from a JSON file.
subjectsDataFrame = sqlContext.read.format("json").load('/home/pysparkbook/pysparkBookData/subjects.json')
subjectsDataFrame.show()
+---------+-------+
|studentID|subject|
+---------+-------+
| si1| Python|
| si3| Java|
| si1| Java|
| si2| Python|
| si3| Ruby|
| si4| C++|
| si5| C|
| si4| Python|
| si2| Java|
+---------+-------+
subjectsDataFrame.printSchema()
root
|-- studentID: string (nullable = true)
|-- subject: string (nullable = true)
3 : Performing inner join on DataFrames.
joinedDataInner = subjectsDataFrame.join(studentsDataFrame, subjectsDataFrame.studentID==studentsDataFrame.studentID, how='inner')
joinedDataInner.show()
+---------+-------+---------+-----+------+
|studentID|subject|studentID| Name|Gender|
+---------+-------+---------+-----+------+
| si1| Java| si1|Robin| M |
| si1| Python| si1|Robin| M |
| si2| Java| si2|Maria| F |
| si2| Python| si2|Maria| F |
| si3| Ruby| si3|Julie| F |
| si3| Java| si3|Julie| F |
| si4| Python| si4| Bob| M |
| si4| C++| si4| Bob| M |
+---------+-------+---------+-----+------+
4 : Saving inner joined DataFrame as JSON file.
joinedDataInner = joinedDataInner.select(subjectsDataFrame.studentID,'subject', 'Name', 'Gender')
joinedDataInner.columns
joinedDataInner.write.format('json').save('/home/muser/innerJoinedTable')
5 : Performing left outer join.
joinedDataLeftOuter = subjectsDataFrame.join(studentsDataFrame, subjectsDataFrame.studentID==studentsDataFrame.studentID, how='left_outer')
joinedDataLeftOuter.show()
6 : Saving left outer joined DataFrame into PostgreSQL.
joinedDataLeftOuter = joinedDataLeftOuter.select(subjectsDataFrame.studentID,'subject', 'Name', 'Gender')
props = { 'user' : 'postgres', 'password' : '' }
joinedDataLeftOuter.write.jdbc(
url = dbURL,
table = 'joineddataleftoutertable',
mode = 'overwrite',
properties = props
)
7 : Performing right outer join.
joinedDataRightOuter = subjectsDataFrame.join(studentsDataFrame, subjectsDataFrame.studentID==studentsDataFrame.studentID, how='right_outer')
joinedDataRightOuter.show()
8 : Performing full outer join.
joinedDataOuter = subjectsDataFrame.join(studentsDataFrame, subjectsDataFrame.studentID==studentsDataFrame.studentID, how='outer')
joinedDataOuter.show()
网友评论