SparkSQL 有着强大的与其他存储介质交互的功能,其中就包括MySQL,这里简单介绍一下我在工作中用到的使用 PySpark 读写 MySQL 的使用。
写出到 MySQL
df_weekmile.write.mode("append") \
.format("jdbc") \
.option("url","jdbc:mysql://ip地址:端口号/库名称") \
.option("dbtable","表名称") \
.option("user","用户名") \
.option("password","密码") \
.save()
df_wekmile 是之前生成的 spark 的 DataFrame,不多说了。mode 可以根据自己的需要设置 append 或者 overwrite。这里要注意 DataFrame 的字段数与字段类型,要与 mysql 中的对应,否则无法写入。
读取 MySQL 数据
df_total_trip = spark.read.format("jdbc") \
.option("url","jdbc:mysql://ip地址:端口号/库名称") \
.option("dbtable","表名称") \
.option("user","用户名") \
.option("password","密码") \
.load()
df_total_trip.show()
可以看到,和写入其实非常类似,只不过换了几个方法。 将之前的 write 方法换成了 read 方法,最后的 save 换成了 load。 读取后直接就生成了 Spark 的 DataFrame。
但是上述这种把整个 MySQL 的表都读进来了,有时候我们可能只需要几个字段,或者是需要符合特定条件的数据,这种读取方式也是支持的。
按条件读取
dbtable = "(select * from mysql_table where mileage_tag = 4000) tmp"
df_total_trip = spark.read.format("jdbc") \
.option("url","jdbc:mysql://ip地址:端口号/库名称") \
.option("dbtable",dbtable) \
.option("user","用户名") \
.option("password","密码") \
.load()
df_total_trip.show()
其实按条件读取,就是将自己的逻辑提前写好,然后写成一张临时表的格式,作为 dbtable 参数的值。而且这个临时表是必须的,不能直接写如: select ...... 这样的语句,必须在外面套上一层。
在工作中,我的数据源部分来自 MySQL 表,部分来自 hive 表,但是对于 SparkSQL 来说,处理这样的多个数据源的情况还是 so easy。
也许有更高效的读取 MySQL 的方法,如果发现,会更新到这里,如果有大神愿意指点一二,那真是感激不尽。
网友评论