美文网首页
Spark 读取本地数据并存储到MongoDB、Elastics

Spark 读取本地数据并存储到MongoDB、Elastics

作者: 枫隐_5f5f | 来源:发表于2019-05-03 15:45 被阅读0次
    import sys
    from pyspark.sql import SparkSession
    from pyspark import SparkConf
    from pyspark import SparkContext
    from pyspark.sql.types import *
    reload(sys)
    sys.setdefaultencoding("utf-8")
    ## format "mongodb://127.0.0.1:database.collection"
    mongo_read_uri = "mongodb://127.0.0.1:27017/spark_db"
    mongo_write_uri = "mongodb://127.0.0.1:27017/raw_data"
    
    #conf = SparkConf().setAppName("Dataloader").setMaster("local")
    spark = SparkSession.builder \
        .master("local") \
        .appName("Dataloader") \
        .config("spark.mongodb.input.uri",mongo_read_uri) \
        .config("spark.mongodb.output.uri",mongo_write_uri) \
        .config("spark.jars.packages","org.mongodb.spark:mongo-spark-connector_2.11:2.3.2") \
        .getOrCreate()
    
    infile_movies = "/home/njliu/prc/MovieRecSys/datas/movies_clean.csv"
    infile_ratings = "/home/njliu/prc/MovieRecSys/datas/ratings.csv"
    infile_tags = "/home/njliu/prc/MovieRecSys/datas/tags.csv"
    
    
    movie_labels = [
        ('MOVIE_ID',StringType()),
        ('MOVIE_NAME',StringType()),
        ('MOVIE_DESC',StringType()),
        ('MOVIE_LENGTH',StringType()),
        ('LAUNCH_TIME',StringType()),
        ('MAKE_TIME',StringType()),
        ('LANGUAGE',StringType()),
        ('FILM_TPYE',StringType()),
        ('ACTORS',StringType()),
        ('DIRECTOR',StringType())
    ]
    movie_schema = StructType([StructField(e[0],e[1],False) for e in movie_labels])
    
    rating_labels = [
        ('USERID',StringType()),
        ('MOVIE_ID',StringType()),
        ('RATING_SCORE',StringType()),
        ('TIMESTAMP',StringType())
    ]
    
    rating_schema = StructType([StructField(e[0],e[1],False) for e in rating_labels])
    
    
    tag_labels = [
        ('USERID',StringType()),
        ('MOVIE_ID',StringType()),
        ('TAG_CONTENT',StringType()),
        ('TIMESTAMP',StringType())
    ]
    
    tag_schema = StructType([StructField(e[0],e[1],False) for e in tag_labels])
    
    
    movies = spark.sparkContext.textFile("file://" + infile_movies) \
            .map(lambda row:row.split("^")) \
            
    movies_df = spark.createDataFrame(movies,movie_schema)
    
    
    rating = spark.sparkContext.textFile("file://" + infile_ratings) \
            .map(lambda row:row.split(","))
    ratings_df = spark.createDataFrame(rating,rating_schema)
    
    
    tag = spark.sparkContext.textFile("file://" + infile_tags) \
            .map(lambda row:row.split(","))
    tags_df = spark.createDataFrame(tag,tag_schema)
    
    
    #=========
    #movies = spark.read.csv("file://" + infile_movies, header = False, schema = movie_schema )
    #ratings = spark.read.csv("file://" + infile_ratings, header = False, schema = rating_schema)
    #tags = spark.read.csv("file://" + infile_tags, header = False, schema = tag_schema)
    
    #write data into mongodb
    # mode 
    # * `append`: Append contents of this :class:`DataFrame` to existing data.
    # * `overwrite`: Overwrite existing data.
    # * `error` or `errorifexists`: Throw an exception if data already exists.
    # * `ignore`: Silently ignore this operation if data already exists.
     
    
    movies_df.write.format("com.mongodb.spark.sql.DefaultSource") \
        .option("collection","movies") \
        .mode("overwrite") \
        .save()
    
    ratings_df.write.format("com.mongodb.spark.sql.DefaultSource") \
        .option("collection","ratings") \
        .mode("overwrite") \
        .save()
    
    
    tags_df.write.format("com.mongodb.spark.sql.DefaultSource") \
        .option("collection","tags") \
        .mode("overwrite") \
        .save()
    
    
    #write data into elasticsearch
    
    #merge the movies and tags
    merged_df = movies_df.join(tags_df.select("MOVIE_ID","TAG_CONTENT"),on="MOVIE_ID",how="left")
    
    
    
    merged_df.write \
        .format("org.elasticsearch.spark.sql") \
        .option("es.nodes","192.168.186.111") \
        .option("es.resource","recsys/movies") \
        .option("es.mapping.id","MOVIE_ID") \
        .mode("overwrite") \
        .save()
    
    
    
    
    #read data from elasticsearch
    #df = spark.read.format("org.elasticsearch.spark.sql") \
    #   .option("es.nodes","192.168.186.111") \
    #   .option("es.resource","recsys/movies") \
    #   .load()
    #
    #
    #df.registerTempTable("movies")
    #test_df = spark.sql("select * from movies limit 10")
    #test_df.show()
    #print (test_df.schema)
    #
    spark.stop()
        
    

    相关文章

      网友评论

          本文标题:Spark 读取本地数据并存储到MongoDB、Elastics

          本文链接:https://www.haomeiwen.com/subject/teqznqtx.html