美文网首页
ApacheSpark存储到Postgres数据库

ApacheSpark存储到Postgres数据库

作者: Detian_e8ab | 来源:发表于2020-05-12 00:30 被阅读0次

Apache Spark 是一个围绕速度、易用性和复杂分析构建的大数据处理框架,提供了一个全面、统一的框架用于管理各种有着不同性质(文本数据、图表数据等)的数据集和数据源(批量数据或实时的流数据)的大数据处理的需求。 这里简单展示如何用Apache Spark 把数据存储到Postgres数据库。

首先开始spark 服务

pyspark --driver-class-path /opt/spark/jars/postgresql-42.2.12.jar --jars /opt/spark/jars/postgresql-42.2.12.jar
image.png

然后删除之前产生的表

import psycopg2 as p2
conn = p2.connect("host=localhost dbname =test user=detian password='p31415926'")
cur = conn.cursor()
cur.execute(""" drop table test.germanydata""")
conn.commit()

然后抽取网络数据,并且存储在Dataframe 里面

import requests
import json
from pyspark.sql import Row
from collections import OrderedDict
from pyspark import SparkContext
from collections import OrderedDict
#Assign URL
URL = "https://api.covid19api.com/country/germany/status/confirmed/live?from=2020-04-01T00:00:00Z&to=2020-05-01T00:00:00Z"
r = requests.get(url =URL)
data = r.json()
#define a function to parse json file to row 
def convert_to_row(d: dict) -> Row: 
    return Row(**OrderedDict(sorted(d.items())))
 
 #convert the data to a dataframe
 df=sc.parallelize(data).map(convert_to_row).toDF()
 #use only some of the columns 
 jdbcDF=df.select("Cases", "Country", "Date","Status")

然后通过jdbc driver 连接postgres 并将dataframe 里面的数据写入数据库。

jdbcDF.write \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:5432/test") \
    .option("dbtable", "test.germanydata") \
    .option("user", "detian") \
    .option("password", "p31415926") \
    .save()
image.png

检查数据库的germanydata 表

image.png

相关文章

网友评论

      本文标题:ApacheSpark存储到Postgres数据库

      本文链接:https://www.haomeiwen.com/subject/pxutnhtx.html