美文网首页
PySpark 读写 MySQL

PySpark 读写 MySQL

作者: 门朝大海 | 来源:发表于2019-07-30 23:07 被阅读0次

SparkSQL 有着强大的与其他存储介质交互的功能,其中就包括MySQL,这里简单介绍一下我在工作中用到的使用 PySpark 读写 MySQL 的使用。

写出到 MySQL

df_weekmile.write.mode("append") \
.format("jdbc") \
.option("url","jdbc:mysql://ip地址:端口号/库名称") \
.option("dbtable","表名称") \
.option("user","用户名") \
.option("password","密码") \
.save()

df_wekmile 是之前生成的 spark 的 DataFrame,不多说了。mode 可以根据自己的需要设置 append 或者 overwrite。这里要注意 DataFrame 的字段数与字段类型,要与 mysql 中的对应,否则无法写入。

读取 MySQL 数据

df_total_trip = spark.read.format("jdbc") \
.option("url","jdbc:mysql://ip地址:端口号/库名称") \
.option("dbtable","表名称") \
.option("user","用户名") \
.option("password","密码") \
.load()

df_total_trip.show()

可以看到,和写入其实非常类似,只不过换了几个方法。 将之前的 write 方法换成了 read 方法,最后的 save 换成了 load。 读取后直接就生成了 Spark 的 DataFrame。

但是上述这种把整个 MySQL 的表都读进来了,有时候我们可能只需要几个字段,或者是需要符合特定条件的数据,这种读取方式也是支持的。

按条件读取
dbtable = "(select * from mysql_table where mileage_tag = 4000) tmp"

    df_total_trip = spark.read.format("jdbc") \
    .option("url","jdbc:mysql://ip地址:端口号/库名称") \
    .option("dbtable",dbtable) \
    .option("user","用户名") \
    .option("password","密码") \
    .load()

    df_total_trip.show()

其实按条件读取,就是将自己的逻辑提前写好,然后写成一张临时表的格式,作为 dbtable 参数的值。而且这个临时表是必须的,不能直接写如: select ...... 这样的语句,必须在外面套上一层。

在工作中,我的数据源部分来自 MySQL 表,部分来自 hive 表,但是对于 SparkSQL 来说,处理这样的多个数据源的情况还是 so easy。

也许有更高效的读取 MySQL 的方法,如果发现,会更新到这里,如果有大神愿意指点一二,那真是感激不尽。

相关文章

网友评论

      本文标题:PySpark 读写 MySQL

      本文链接:https://www.haomeiwen.com/subject/zmsorctx.html