美文网首页大数据
PySpark Dataframe写入ES、Redis

PySpark Dataframe写入ES、Redis

作者: 越大大雨天 | 来源:发表于2022-09-09 18:16 被阅读0次

    一、Dataframe写入Elasticsearch

    1.1 依赖

    根据实际使用的spark版本及ES版本选择合适的包,在提交任务时指定--packages参数即可。

    example:
    --packages=org.elasticsearch:elasticsearch-spark-30_2.12:7.13.1

    1.2 参考地址

    1.3 pyspark代码示例

    to_es.py

    from pyspark.sql import SparkSession
    from pyspark.sql.types import *
    from config import ES_CONFIG
    
    
    spark = SparkSession \
            .builder \
            .appName("to_es") \
            .getOrCreate()
    
    data_schema = StructType([
                StructField("name", StringType(), True),
                StructField("source", StringType(), True),
                StructField("end_format", StringType(), True),
                StructField("operator_time", StringType(), True),
                StructField("operator_user", StringType(), True),
                StructField("sha1", StringType(), True),
            ])
    df = spark.read.csv("/home/testuser/data/csv/", schema=data_schema , sep="\t", header=False)
    df.write.format('es') \
        .mode('append') \
        .options(**{
            "es.write.operation": "upsert",   # 更新模式
            "es.spark.dataframe.write.null": "true",  # 支持写入null值字段,默认会忽略写入value为null的字段
            "es.mapping.id": "name",  # 作为写入更新唯一字段不重复插入,同时作为写入的document_id
            'es.resource': ES_CONFIG['index'], 
            'es.nodes.wan.only': 'true',
            'es.nodes': ES_CONFIG['nodes'],
            'es.port': ES_CONFIG['port'],
            'es.net.http.auth.user': ES_CONFIG['user'],
            'es.net.http.auth.pass': ES_CONFIG['password']
        }).save()
    
    

    1.4 任务提交示例

    spark-submit --packages=org.elasticsearch:elasticsearch-spark-30_2.12:7.13.1 to_es.py
    
    

    二、Dataframe 写入Redis

    2.1 按照自定义格式写入Redis

    import datetime
    import functools
    import os
    
    from pyspark.sql.session import SparkSession
    from pyspark.sql.types import *
    from config import REDIS_CONFIG, BASE_OUTPUT_DIR
    import redis
    
    
    spark = SparkSession.builder.appName("to-redis").getOrCreate()
    
    
    data_schema = StructType([
                StructField("name", StringType(), True),
                StructField("source", StringType(), True),
                StructField("end_format", StringType(), True),
                StructField("operator_time", StringType(), True),
                StructField("operator_user", StringType(), True),
                StructField("sha1", StringType(), True),
            ])
    
    
    def to_redis(part, batch=500):
        redis_pool = redis.ConnectionPool(host='127.0.0.1', port=26379, db=10, password='password')
        redis_cli = redis.StrictRedis(connection_pool=redis_pool)
    
        cnt = 0
        pipeline = redis_cli.pipeline()
        for row in part:
            pipeline.set(row.name, "\t".join([row.name, row.source, row.end_format]))
            cnt += 1
            if cnt > 0 and cnt % batch == 0:
                pipeline.execute()
        if cnt % batch != 0:
            pipeline.execute()
        pipeline.close()
        redis_cli.close()
    
    
    sdf = spark.read.csv("/home/testuser/data/csv/", schema=data_schema, header=False, sep="\t")
    sdf.show()
    # 按照自定义的写入方式和格式 分片写入到redis
    sdf.foreachPartition(functools.partial(to_redis, batch=500))
    
    

    2.2使用spark-redis连接器JAR包写入redis

    spark = SparkSession.builder. \
        config("spark.redis.host", "127.0.0.1"). \
        config("spark.redis.port", "6379"). \
        config("spark.redis.auth", "password"). \
        config("spark.redis.db", 10). \
        getOrCreate()
    
    data_schema = StructType([
                StructField("name", StringType(), True),
                StructField("source", StringType(), True),
                StructField("end_format", StringType(), True),
                StructField("operator_time", StringType(), True),
                StructField("operator_user", StringType(), True),
                StructField("sha1", StringType(), True),
            ])
    
    sdf = spark.read.csv("/home/testuser/data/csv/", schema=data_schema, header=False, sep="\t")
    
    sdf.write.format("org.apache.spark.sql.redis").option("table", "name_group").option("key.column", "name").save()
    

    2.2.1任务提交示例

    spark-submit --jars <path-to>/spark-redis-<version>-jar-with-dependencies.jar to_redis.py
    

    三、 Dataframe 写入MongoDB

    # uri可包含使用的数据库、集合,spark默认使用uri中指定的数据集
    mongo_uri = "mongodb://username:password@127.0.0.1:27017/db_name.collection_name?authSource=admin"
    spark = SparkSession.builder.\
                appName("to_mongo").\
                config("spark.mongodb.input.uri", mongo_uri). \
                getOrCreate()
    
    data_schema = StructType([
                StructField("name", StringType(), True),
                StructField("source", StringType(), True),
                StructField("end_format", StringType(), True),
                StructField("operator_time", StringType(), True),
                StructField("operator_user", StringType(), True),
                StructField("sha1", StringType(), True),
            ])
    
    sdf = self.spark.read.format("com.mongodb.spark.sql.DefaultSource").load(schema=data_schema)
    

    3.2 任务提交示例

    spark-submit --packages org.mongodb.spark:mongo-spark-connector_2.11:2.4.2 to_mongo.py
    

    相关文章

      网友评论

        本文标题:PySpark Dataframe写入ES、Redis

        本文链接:https://www.haomeiwen.com/subject/xlvanrtx.html