美文网首页
Pyspark DataFrame读写

Pyspark DataFrame读写

作者: 熊定坤 | 来源:发表于2019-01-09 13:50 被阅读0次

    1. 连接本地spark

    import pandas as pd
    from pyspark.sql import SparkSession
    
    spark = SparkSession \
        .builder \
        .appName('my_first_app_name') \
        .getOrCreate()
    

    2. 创建DataFrame

    2.1 从变量创建

    stringrdd = spark.sparkContext.parallelize([
        (123, "Katie", 19, "brown"),
        (234, "Michael", 22, "green"),
        (345, "Simone", 23, "blue")
    ])
    # 指定模式, StructField(name,dataType,nullable)
    # 其中:
    #   name: 该字段的名字,
    #   dataType:该字段的数据类型,
    #   nullable: 指示该字段的值是否为空
    import pyspark.sql.types as typ
    labels =  [('id',typ.LongType()),
              ('name',typ.StringType()),
              ('age',typ.LongType()),
              ('eyecolor',typ.StringType())]
    schema = typ.StructType([typ.StructField(i[0],i[1],False)for i in labels])
    # 对RDD应用该模式并且创建DataFrame
    data = spark.createDataFrame(stringrdd,schema=schema)
    # 利用DataFrame创建一个临时视图
    data.registerTempTable("swimmers")
    data.show()
    

    2.2 使用自动类型推断的方式创建dataframe

    data = [(123, "Katie", 19, "brown"),
            (234, "Michael", 22, "green"),
            (345, "Simone", 23, "blue")]
    df = spark.createDataFrame(data, schema=['id', 'name', 'age', 'eyccolor'])
    df.show()
    df.count()
    

    2.3 读取json文件

    file = r"D:\spark-2.1.0-bin-hadoop2.7\examples\src\main\resources\people.json"
    df = spark.read.json(file)
    df.show()
    

    2.4 读取csv文件

    file = r'C:\Users\Administrator\Desktop\kaggle泰坦尼克号获救率预测数据集\train.csv'
    df = spark.read.csv(file,header=True,inferSchema=True)
    df.show(5)
    

    2.5 读取mysql

    # 此时需要将mysql-connector-java-8.0.13.jar驱动放到spark-2.2.0-bin-hadoop2.7\jars下面
    # 单机环境可行,集群环境不行
    # 重新执行
    sql = '(select * from sc where C =01) t'
    url = 'jdbc:mysql://localhost:3306/test?characterEncoding=utf-8&autoReconnect=true&useSSL=false&serverTimezone=GMT'
    table = sql
    properties = {"user":"root","password":"xdk520"}
    df = spark.read.jdbc(url,table,properties=properties)
    df.show()
    

    2.6 从pandas.dataframe创建

    data = pd.DataFrame([(123, "Katie", 19, "brown"),
            (234, "Michael", 22, "green"),
            (345, "Simone", 23, "blue")],columns= ['id', 'name', 'age', 'eyccolor'])
    df = spark.createDataFrame(data)
    df.show()
    

    2.7 从列式存储的parquet读取

    file = r"D:\spark-2.1.0-bin-hadoop2.7\examples\src\main\resources\users.parquet"
    data = spark.read.parquet(file)
    data.show()
    

    2.8 读取hive

    # 如果已经配置spark连接hive的参数,可以直接读取hive数据
    spark = SparkSession \
            .builder \
            .enableHiveSupport() \      
            .master("172.31.100.170:7077") \
            .appName("my_first_app_name") \
            .getOrCreate()
    
    df=spark.sql("select * from hive_tb_name")
    df.show()
    

    3. 保存文件

    3.1 写进CSV

    file1=r"D:\spark-2.1.0-bin-hadoop2.7\examples\src\main\resources\test.csv"
    df.write.csv(path=file1, header=True, sep=",", mode='overwrite')
    #保留第一行,以逗号作为分隔符,#overwrite 清空后再写入
    

    3.2 保存到parquet

    file2=r"D:\spark-2.1.0-bin-hadoop2.7\examples\src\main\resources\test.parquet"
    df.write.parquet(path=file2,mode='overwrite')
    

    3.3 写入sql

    # 会自动对齐字段,也就是说,df 的列不一定要全部包含MySQL的表的全部列才行
    
    # overwrite 清空表再导入
    url = 'jdbc:mysql://localhost:3306/test?characterEncoding=utf-8&autoReconnect=true&useSSL=false&serverTimezone=GMT'
    table = 'eye'
    properties = {"user":"root","password":"xdk520"}
    df.write.jdbc(url,table,mode='overwrite',properties=properties)
    
    # mode= 'append '追加方式
    

    相关文章

      网友评论

          本文标题:Pyspark DataFrame读写

          本文链接:https://www.haomeiwen.com/subject/smawrqtx.html