美文网首页搜索引擎
spark将数据写入es

spark将数据写入es

作者: bigdata张凯翔 | 来源:发表于2020-05-23 15:42 被阅读0次

    任何内容RDD都可以保存到Elasticsearch,在实践中,这意味着RDD类型是Map(Scala或Java的)类型,JavaBeanScala的案例类。如果不是这种情况,则可以轻松地在Spark中转换数据或使用自己的自定义插件ValueWriter

    import org.apache.spark.SparkContext
    //Spark Scala进口 
    import org.apache.spark.SparkContext._
    
    import org.elasticsearch.spark._        
    //elasticsearh-hadoop Scala导入
    ...
    
    val conf = ...
    val sc = new SparkContext(conf)         
    //通过其Scala API 启动Spark
    val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3)
    val airports = Map("arrival" -> "Otopeni", "SFO" -> "San Fran")
    
    sc.makeRDD( 
    //makeRDD根据指定的集合创建一个临时的,其他任何RDD(Java或Scala)都可以传入
      Seq(numbers, airports)
    ).saveToEs("spark/docs") 
    在Elasticsearch下的内容下建立索引
    

    将Map对象写入ElasticSearch

    package cn.itzkx.spark_es
    
    import org.apache.spark.{SparkConf, SparkContext}
    import org.elasticsearch.spark._
    
    //将Map对象写入ElasticSearch
    //https://www.iteblog.com/archives/1728.html#id
    object Spark2Es {
      def main(args: Array[String]): Unit = {
        val master ="local"
        val conf = new SparkConf().setAppName("iteblog").setMaster(master)
        conf.set("es.index.auto.create", "true")
        conf.set("es.nodes", "192.168.47.100:9200,192.168.47.110:9200,192.168.47.120:9200")
    
    //    //设置es的相关参数
    //    val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
    //
    //    //2、构建SparkContext对象
    //    val sc: SparkContext = spark.sparkContext
    
        val sc = new SparkContext(conf)
    
        //sc.setLogLevel("warn")
    
        val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3)
        val airports = Map("OTP" -> "ibex", "SFO" -> "San Fran")
        sc.makeRDD(Seq(numbers, airports)).saveToEs("itzkx/docs")
    
        sc.stop()
      }
    
    }
    

    将case class对象写入ElasticSearch

    package cn.itzkx.spark_es
    
    import org.apache.spark.rdd.RDD
    import org.elasticsearch.spark.rdd.EsSpark
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.sql.SparkSession
    import org.elasticsearch.spark._
    
    //将case class对象写入ElasticSearch
    object Spark2Esl {
    
    
      def main(args: Array[String]): Unit = {
        val master ="local"
        val conf = new SparkConf().setAppName("iteblog").setMaster(master)
        conf.set("es.index.auto.create", "true")
        conf.set("es.nodes", "192.168.47.100:9200,192.168.47.110:9200,192.168.47.120:9200")
    
        //    //设置es的相关参数
        //    val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
        //
        //    //2、构建SparkContext对象
        //    val sc: SparkContext = spark.sparkContext
    
        val sc = new SparkContext(conf)
    
        case class Trip(departure: String, arrival: String)
        val upcomingTrip = Trip("OTP", "SFO")
        val lastWeekTrip = Trip("MUC", "OTP")
    
        val rdd1: RDD[Trip] = sc.makeRDD(Seq(upcomingTrip, lastWeekTrip))
        rdd1.saveToEs("itzkx/class")
    
    
       /*上面的代码片段将upcomingTrip和lastWeekTrip写入到名为iteblog的_index中,
        type是class。上面都是通过隐式转换才使得rdd拥有saveToEs方法。
        elasticsearch-hadoop还提供显式方法来把RDD写入到ElasticSearch中,如下:需要导包
        import org.elasticsearch.spark.rdd.EsSpark*/
    
         val rdd2: RDD[Trip] = sc.makeRDD(Seq(upcomingTrip, lastWeekTrip))
          EsSpark.saveToEs(rdd2, "spark/docs")
          sc.stop()
      }
    }
    

    将Json字符串写入ElasticSearch

    package cn.itzkx.spark_es
    
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.{SparkConf, SparkContext}
    import org.elasticsearch.spark._
    
    //将Json字符串写入ElasticSearch
    object  Spark2Esla{
      def main(args: Array[String]): Unit = {
        val master ="local"
        val conf = new SparkConf().setAppName("iteblog").setMaster(master)
        conf.set("es.index.auto.create", "true")
        conf.set("es.nodes", "192.168.47.100:9200,192.168.47.110:9200,192.168.47.120:9200")
    
        //    //设置es的相关参数
        //    val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
        //
        //    //2、构建SparkContext对象
        //    val sc: SparkContext = spark.sparkContext
    
        val sc = new SparkContext(conf)
        sc.setLogLevel("warn")
      val json1 = """{"id" : 1, "zkx" : "www.ibex.com", "weylin" : "ibex_hadoop"}"""
      val json2 = """{"id" : 2, "zkx" : "books.ibex.com", "weylin" : "ibex_hadoop"}"""
      sc.makeRDD(Seq(json1, json2)).saveJsonToEs("itzkx/json")
    }
    }
    

    动态设置插入的type

    package cn.itzkx.spark_es
    
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.{SparkConf, SparkContext}
    import org.elasticsearch.spark._
    //动态设置插入的type
    object Spark2Eslas {
      def main(args: Array[String]): Unit = {
        val master ="local"
        val conf = new SparkConf().setAppName("iteblog").setMaster(master)
        conf.set("es.index.auto.create", "true")
        conf.set("es.nodes", "192.168.47.100:9200,192.168.47.110:9200,192.168.47.120:9200")
    
        //    //设置es的相关参数
        //    val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
        //
        //    //2、构建SparkContext对象
        //    val sc: SparkContext = spark.sparkContext
    
        val sc = new SparkContext(conf)
        sc.setLogLevel("warn")
    
        val game = Map("media_type" -> "game", "title" -> "FF VI", "year" -> "1994")
    
    
        val book = Map("media_type" -> "book", "title" -> "Harry Potter", "year" -> "2010")
    
        val cd = Map("media_type" -> "music", "title" -> "Surfing With The Alien")
    
    
        sc.makeRDD(Seq(game, book, cd)).saveToEs("itzkx/{media_type}")
      }
    }
    
    package cn.itzkx.spark_es
    
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.{SparkConf, SparkContext}
    import org.elasticsearch.spark._
    import org.elasticsearch.spark.rdd.EsSpark
    object Spark2Eslast {
     /* 自定义id
      在ElasticSearch中,_index/_type/_id的组合可以唯一确定一个Document。
      如果我们不指定id的话,ElasticSearch将会自动为我们生产全局唯一的id,自动生成的ID有20个字符长如下:
    
      {
        "_index": "iteblog",
        "_type": "docs",
        "_id": "AVZy3d5sJfxPRwCjtWM-",
        "_score": 1,
        "_source": {
          "arrival": "Otopeni",
          "SFO": "San Fran"
        }
      }
      很显然,这么长的字符串没啥意义,而且也不便于我们记忆使用。不过我们可以在插入数据的时候手动指定id的值,如下:*/
     def main(args: Array[String]): Unit = {
       val master ="local"
       val conf = new SparkConf().setAppName("iteblog").setMaster(master)
       conf.set("es.index.auto.create", "true")
       conf.set("es.nodes", "192.168.47.100:9200,192.168.47.110:9200,192.168.47.120:9200")
    
       //    //设置es的相关参数
       //    val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
       //
       //    //2、构建SparkContext对象
       //    val sc: SparkContext = spark.sparkContext
    
       val sc = new SparkContext(conf)
    
      val otp = Map("iata" -> "OTP", "name" -> "Otopeni")
    
    
      val muc = Map("iata" -> "MUC", "name" -> "Munich")
    
    
      val sfo = Map("iata" -> "SFO", "name" -> "San Fran")
    
    
     val airportsRDD = sc.makeRDD(Seq((1, otp), (2, muc), (3, sfo)))
    
      airportsRDD.saveToEsWithMeta("itzkx/2015")
    
     /* 上面的Seq((1, otp), (2, muc), (3, sfo))语句指定为各个对象指定了id值,
     分别为1、2、3。然后你可以通过/iteblog/2015/1 URL搜索到otp对象的值。我们还可以如下方式指定id:*/
    
    
    
        //下面这种更适合实际场景【动态映射】
        val json1 = """{"id" : 1, "blog" : "www.iteblog.com", "weixin" : "iteblog_hadoop"}"""
        val json2 = """{"id" : 2, "blog" : "books.iteblog.com", "weixin" : "iteblog_hadoop"}"""
        //json2:String = {"id" : 2, "blog" : "books.iteblog.com", "weixin":"iteblog_hadoop"}
        val rdd = sc.makeRDD(Seq(json1, json2))
    
          EsSpark.saveToEs(rdd, "itzkx/docs", Map("es.mapping.id" -> "id"))
      //上面通过es.mapping.id参数将对象中的id字段映射为每条记录的id。*/
     }
    }
    
    package cn.itzkx.spark_es
    
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.sql.SparkSession
    import org.elasticsearch.spark.rdd.Metadata._
    object Spark2Eslasti {
    /*  自定义记录的元数据
        可以在写入数据的时候自定义记录的元数据,如下:*/
    def main(args: Array[String]): Unit = {
      val master = "local"
      val conf = new SparkConf().setAppName("itbex").setMaster(master)
      conf.set("es.index.auto.create", "true")
      conf.set("es.nodes", "192.168.47.100:9200,192.168.47.110:9200,192.168.47.120:9200")
    
      //    //设置es的相关参数
      //    val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
      //
      //    //2、构建SparkContext对象
      //    val sc: SparkContext = spark.sparkContext
    
      val sc = new SparkContext(conf)
    
    
      //otp元数据1
      val otp = Map("iata" -> "OTP", "name" -> "Otopeni")
    
      //muc元数据2
      val muc = Map("iata" -> "MUC", "name" -> "Munich")
    
      //sfo元数据3
      val sfo = Map("iata" -> "SFO", "name" -> "San Fran")
    
    
      val otpMeta = Map(ID -> 1, TTL -> "3h")
    
      val mucMeta = Map(ID -> 2, VERSION -> "23")
    
      val sfoMeta = Map(ID -> 3)
    
      val airportsRDD = sc.makeRDD(Seq((otpMeta, otp), (mucMeta, muc), (sfoMeta, sfo)))
    
      //airportsRDD.saveToEsWithMeta "iteblog/2015"
      airportsRDD.saveAsTextFile("itzkx/2015")
    
    
    }
    }
    
    
    package cn.itzkx.spark_es
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.sql._
    import org.apache.spark.sql.{DataFrame, SparkSession}
    import org.elasticsearch.spark.sql._
    
    // case class used to define the DataFrame
    case class Person(name: String, surname: String, age: Int)
    object Spark2Eslastic {
      def main(args: Array[String]): Unit = {
    
        //设置es的相关参数
        val master = "local"
        val conf = new SparkConf().setAppName("it").setMaster(master)
        conf.set("es.index.auto.create", "true")
        conf.set("es.nodes", "192.168.220.75:9200,192.168.220.76:9200,192.168.220.77:9200")
    
        val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
    
        //2、构建SparkContext对象
        val sc: SparkContext = spark.sparkContext
        //val sc = new SparkContext(conf)
        // reusing the example from Spark SQL documentation
    
        // sc = existing SparkContext
        val sqlContext = new SQLContext(sc)
    
        //  create DataFrame
        //    import spark.implicits._
        //    val personDF: DataFrame = sc.textFile("people.txt")
        //      .map(_.split(","))
        //      .map(p => Person(p(0), p(1), p(2).trim.toInt)).toDF()
        import spark.implicits._
        val value = sc.textFile("people.txt").map(line => line.split(","))
        val people = value.map(p => {
          (p(0), p(2))
        })
          //zhangsan zhangsanfeng 108
    
        val personDF = people.toDF("tax_rate_code", "percentage_rate")
        personDF.saveToEs("itzkx/personDF")
        personDF.printSchema()
        personDF.show()
        sc.stop()
      }
    }
    
    

    以上所使用pom文件

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <parent>
            <artifactId>Spark_all</artifactId>
            <groupId>cn.itcast.Spark_all</groupId>
            <version>1.0-SNAPSHOT</version>
        </parent>
        <modelVersion>4.0.0</modelVersion>
    
        <artifactId>Spark_Es</artifactId>
    
            <dependencies>
             <dependency>
                 <groupId>org.elasticsearch</groupId>
                 <artifactId>elasticsearch</artifactId>
                 <version>6.3.1</version>
             </dependency>
    
             <dependency>
                 <groupId>org.apache.spark</groupId>
                 <artifactId>spark-core_2.11</artifactId>
                 <version>2.2.0</version>
             </dependency>
             <dependency>
                 <groupId>org.apache.spark</groupId>
                 <artifactId>spark-sql_2.11</artifactId>
                 <version>2.1.3</version>
             </dependency>
             <dependency>
                 <groupId>org.elasticsearch</groupId>
                 <artifactId>elasticsearch-spark-20_2.11</artifactId>
                 <version>6.4.1</version>
             </dependency>
         </dependencies>
        <build>
            <sourceDirectory>src/main/scala</sourceDirectory>
            <testSourceDirectory>src/test/scala</testSourceDirectory>
            <plugins>
                <plugin>
                    <groupId>net.alchim31.maven</groupId>
                    <artifactId>scala-maven-plugin</artifactId>
                    <version>3.2.2</version>
                    <executions>
                        <execution>
                            <goals>
                                <goal>compile</goal>
                                <goal>testCompile</goal>
                            </goals>
                            <configuration>
                                <args>
                                    <arg>-dependencyfile</arg>
                                    <arg>${project.build.directory}/.scala_dependencies</arg>
                                </args>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-shade-plugin</artifactId>
                    <version>2.4.3</version>
                    <executions>
                        <execution>
                            <phase>package</phase>
                            <goals>
                                <goal>shade</goal>
                            </goals>
                            <configuration>
                                <filters>
                                    <filter>
                                        <artifact>*:*</artifact>
                                        <excludes>
                                            <exclude>META-INF/*.SF</exclude>
                                            <exclude>META-INF/*.DSA</exclude>
                                            <exclude>META-INF/*.RSA</exclude>
                                        </excludes>
                                    </filter>
                                </filters>
                                <transformers>
                                    <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                        <mainClass></mainClass>
                                    </transformer>
                                </transformers>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>
    </project>
    

    SparkSql将df写入es

    // reusing the example from Spark SQL documentation
    
    import org.apache.spark.sql.SQLContext    
    import org.apache.spark.sql.SQLContext._
    
    import org.elasticsearch.spark.sql._      
    
    ...
    
    // sc = existing SparkContext
    val sqlContext = new SQLContext(sc)
    
    // case class used to define the DataFrame
    case class Person(name: String, surname: String, age: Int)
    
    //  create DataFrame
    val people = sc.textFile("people.txt")    
            .map(_.split(","))
            .map(p => Person(p(0), p(1), p(2).trim.toInt))
            .toDF()
    
    people.saveToEs("spark/people")           
    

    相关文章

      网友评论

        本文标题:spark将数据写入es

        本文链接:https://www.haomeiwen.com/subject/nztlahtx.html