美文网首页搜索引擎
spark将数据写入es

spark将数据写入es

作者: bigdata张凯翔 | 来源:发表于2020-05-23 15:42 被阅读0次

任何内容RDD都可以保存到Elasticsearch,在实践中,这意味着RDD类型是Map(Scala或Java的)类型,JavaBeanScala的案例类。如果不是这种情况,则可以轻松地在Spark中转换数据或使用自己的自定义插件ValueWriter

import org.apache.spark.SparkContext
//Spark Scala进口 
import org.apache.spark.SparkContext._

import org.elasticsearch.spark._        
//elasticsearh-hadoop Scala导入
...

val conf = ...
val sc = new SparkContext(conf)         
//通过其Scala API 启动Spark
val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3)
val airports = Map("arrival" -> "Otopeni", "SFO" -> "San Fran")

sc.makeRDD( 
//makeRDD根据指定的集合创建一个临时的,其他任何RDD(Java或Scala)都可以传入
  Seq(numbers, airports)
).saveToEs("spark/docs") 
在Elasticsearch下的内容下建立索引

将Map对象写入ElasticSearch

package cn.itzkx.spark_es

import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.spark._

//将Map对象写入ElasticSearch
//https://www.iteblog.com/archives/1728.html#id
object Spark2Es {
  def main(args: Array[String]): Unit = {
    val master ="local"
    val conf = new SparkConf().setAppName("iteblog").setMaster(master)
    conf.set("es.index.auto.create", "true")
    conf.set("es.nodes", "192.168.47.100:9200,192.168.47.110:9200,192.168.47.120:9200")

//    //设置es的相关参数
//    val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
//
//    //2、构建SparkContext对象
//    val sc: SparkContext = spark.sparkContext

    val sc = new SparkContext(conf)

    //sc.setLogLevel("warn")

    val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3)
    val airports = Map("OTP" -> "ibex", "SFO" -> "San Fran")
    sc.makeRDD(Seq(numbers, airports)).saveToEs("itzkx/docs")

    sc.stop()
  }

}

将case class对象写入ElasticSearch

package cn.itzkx.spark_es

import org.apache.spark.rdd.RDD
import org.elasticsearch.spark.rdd.EsSpark
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
import org.elasticsearch.spark._

//将case class对象写入ElasticSearch
object Spark2Esl {


  def main(args: Array[String]): Unit = {
    val master ="local"
    val conf = new SparkConf().setAppName("iteblog").setMaster(master)
    conf.set("es.index.auto.create", "true")
    conf.set("es.nodes", "192.168.47.100:9200,192.168.47.110:9200,192.168.47.120:9200")

    //    //设置es的相关参数
    //    val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
    //
    //    //2、构建SparkContext对象
    //    val sc: SparkContext = spark.sparkContext

    val sc = new SparkContext(conf)

    case class Trip(departure: String, arrival: String)
    val upcomingTrip = Trip("OTP", "SFO")
    val lastWeekTrip = Trip("MUC", "OTP")

    val rdd1: RDD[Trip] = sc.makeRDD(Seq(upcomingTrip, lastWeekTrip))
    rdd1.saveToEs("itzkx/class")


   /*上面的代码片段将upcomingTrip和lastWeekTrip写入到名为iteblog的_index中,
    type是class。上面都是通过隐式转换才使得rdd拥有saveToEs方法。
    elasticsearch-hadoop还提供显式方法来把RDD写入到ElasticSearch中,如下:需要导包
    import org.elasticsearch.spark.rdd.EsSpark*/

     val rdd2: RDD[Trip] = sc.makeRDD(Seq(upcomingTrip, lastWeekTrip))
      EsSpark.saveToEs(rdd2, "spark/docs")
      sc.stop()
  }
}

将Json字符串写入ElasticSearch

package cn.itzkx.spark_es

import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.spark._

//将Json字符串写入ElasticSearch
object  Spark2Esla{
  def main(args: Array[String]): Unit = {
    val master ="local"
    val conf = new SparkConf().setAppName("iteblog").setMaster(master)
    conf.set("es.index.auto.create", "true")
    conf.set("es.nodes", "192.168.47.100:9200,192.168.47.110:9200,192.168.47.120:9200")

    //    //设置es的相关参数
    //    val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
    //
    //    //2、构建SparkContext对象
    //    val sc: SparkContext = spark.sparkContext

    val sc = new SparkContext(conf)
    sc.setLogLevel("warn")
  val json1 = """{"id" : 1, "zkx" : "www.ibex.com", "weylin" : "ibex_hadoop"}"""
  val json2 = """{"id" : 2, "zkx" : "books.ibex.com", "weylin" : "ibex_hadoop"}"""
  sc.makeRDD(Seq(json1, json2)).saveJsonToEs("itzkx/json")
}
}

动态设置插入的type

package cn.itzkx.spark_es

import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.spark._
//动态设置插入的type
object Spark2Eslas {
  def main(args: Array[String]): Unit = {
    val master ="local"
    val conf = new SparkConf().setAppName("iteblog").setMaster(master)
    conf.set("es.index.auto.create", "true")
    conf.set("es.nodes", "192.168.47.100:9200,192.168.47.110:9200,192.168.47.120:9200")

    //    //设置es的相关参数
    //    val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
    //
    //    //2、构建SparkContext对象
    //    val sc: SparkContext = spark.sparkContext

    val sc = new SparkContext(conf)
    sc.setLogLevel("warn")

    val game = Map("media_type" -> "game", "title" -> "FF VI", "year" -> "1994")


    val book = Map("media_type" -> "book", "title" -> "Harry Potter", "year" -> "2010")

    val cd = Map("media_type" -> "music", "title" -> "Surfing With The Alien")


    sc.makeRDD(Seq(game, book, cd)).saveToEs("itzkx/{media_type}")
  }
}
package cn.itzkx.spark_es

import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.spark._
import org.elasticsearch.spark.rdd.EsSpark
object Spark2Eslast {
 /* 自定义id
  在ElasticSearch中,_index/_type/_id的组合可以唯一确定一个Document。
  如果我们不指定id的话,ElasticSearch将会自动为我们生产全局唯一的id,自动生成的ID有20个字符长如下:

  {
    "_index": "iteblog",
    "_type": "docs",
    "_id": "AVZy3d5sJfxPRwCjtWM-",
    "_score": 1,
    "_source": {
      "arrival": "Otopeni",
      "SFO": "San Fran"
    }
  }
  很显然,这么长的字符串没啥意义,而且也不便于我们记忆使用。不过我们可以在插入数据的时候手动指定id的值,如下:*/
 def main(args: Array[String]): Unit = {
   val master ="local"
   val conf = new SparkConf().setAppName("iteblog").setMaster(master)
   conf.set("es.index.auto.create", "true")
   conf.set("es.nodes", "192.168.47.100:9200,192.168.47.110:9200,192.168.47.120:9200")

   //    //设置es的相关参数
   //    val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
   //
   //    //2、构建SparkContext对象
   //    val sc: SparkContext = spark.sparkContext

   val sc = new SparkContext(conf)

  val otp = Map("iata" -> "OTP", "name" -> "Otopeni")


  val muc = Map("iata" -> "MUC", "name" -> "Munich")


  val sfo = Map("iata" -> "SFO", "name" -> "San Fran")


 val airportsRDD = sc.makeRDD(Seq((1, otp), (2, muc), (3, sfo)))

  airportsRDD.saveToEsWithMeta("itzkx/2015")

 /* 上面的Seq((1, otp), (2, muc), (3, sfo))语句指定为各个对象指定了id值,
 分别为1、2、3。然后你可以通过/iteblog/2015/1 URL搜索到otp对象的值。我们还可以如下方式指定id:*/



    //下面这种更适合实际场景【动态映射】
    val json1 = """{"id" : 1, "blog" : "www.iteblog.com", "weixin" : "iteblog_hadoop"}"""
    val json2 = """{"id" : 2, "blog" : "books.iteblog.com", "weixin" : "iteblog_hadoop"}"""
    //json2:String = {"id" : 2, "blog" : "books.iteblog.com", "weixin":"iteblog_hadoop"}
    val rdd = sc.makeRDD(Seq(json1, json2))

      EsSpark.saveToEs(rdd, "itzkx/docs", Map("es.mapping.id" -> "id"))
  //上面通过es.mapping.id参数将对象中的id字段映射为每条记录的id。*/
 }
}
package cn.itzkx.spark_es

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
import org.elasticsearch.spark.rdd.Metadata._
object Spark2Eslasti {
/*  自定义记录的元数据
    可以在写入数据的时候自定义记录的元数据,如下:*/
def main(args: Array[String]): Unit = {
  val master = "local"
  val conf = new SparkConf().setAppName("itbex").setMaster(master)
  conf.set("es.index.auto.create", "true")
  conf.set("es.nodes", "192.168.47.100:9200,192.168.47.110:9200,192.168.47.120:9200")

  //    //设置es的相关参数
  //    val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
  //
  //    //2、构建SparkContext对象
  //    val sc: SparkContext = spark.sparkContext

  val sc = new SparkContext(conf)


  //otp元数据1
  val otp = Map("iata" -> "OTP", "name" -> "Otopeni")

  //muc元数据2
  val muc = Map("iata" -> "MUC", "name" -> "Munich")

  //sfo元数据3
  val sfo = Map("iata" -> "SFO", "name" -> "San Fran")


  val otpMeta = Map(ID -> 1, TTL -> "3h")

  val mucMeta = Map(ID -> 2, VERSION -> "23")

  val sfoMeta = Map(ID -> 3)

  val airportsRDD = sc.makeRDD(Seq((otpMeta, otp), (mucMeta, muc), (sfoMeta, sfo)))

  //airportsRDD.saveToEsWithMeta "iteblog/2015"
  airportsRDD.saveAsTextFile("itzkx/2015")


}
}

package cn.itzkx.spark_es

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql._
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.elasticsearch.spark.sql._

// case class used to define the DataFrame
case class Person(name: String, surname: String, age: Int)
object Spark2Eslastic {
  def main(args: Array[String]): Unit = {

    //设置es的相关参数
    val master = "local"
    val conf = new SparkConf().setAppName("it").setMaster(master)
    conf.set("es.index.auto.create", "true")
    conf.set("es.nodes", "192.168.220.75:9200,192.168.220.76:9200,192.168.220.77:9200")

    val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()

    //2、构建SparkContext对象
    val sc: SparkContext = spark.sparkContext
    //val sc = new SparkContext(conf)
    // reusing the example from Spark SQL documentation

    // sc = existing SparkContext
    val sqlContext = new SQLContext(sc)

    //  create DataFrame
    //    import spark.implicits._
    //    val personDF: DataFrame = sc.textFile("people.txt")
    //      .map(_.split(","))
    //      .map(p => Person(p(0), p(1), p(2).trim.toInt)).toDF()
    import spark.implicits._
    val value = sc.textFile("people.txt").map(line => line.split(","))
    val people = value.map(p => {
      (p(0), p(2))
    })
      //zhangsan zhangsanfeng 108

    val personDF = people.toDF("tax_rate_code", "percentage_rate")
    personDF.saveToEs("itzkx/personDF")
    personDF.printSchema()
    personDF.show()
    sc.stop()
  }
}

以上所使用pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>Spark_all</artifactId>
        <groupId>cn.itcast.Spark_all</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>Spark_Es</artifactId>

        <dependencies>
         <dependency>
             <groupId>org.elasticsearch</groupId>
             <artifactId>elasticsearch</artifactId>
             <version>6.3.1</version>
         </dependency>

         <dependency>
             <groupId>org.apache.spark</groupId>
             <artifactId>spark-core_2.11</artifactId>
             <version>2.2.0</version>
         </dependency>
         <dependency>
             <groupId>org.apache.spark</groupId>
             <artifactId>spark-sql_2.11</artifactId>
             <version>2.1.3</version>
         </dependency>
         <dependency>
             <groupId>org.elasticsearch</groupId>
             <artifactId>elasticsearch-spark-20_2.11</artifactId>
             <version>6.4.1</version>
         </dependency>
     </dependencies>
    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <testSourceDirectory>src/test/scala</testSourceDirectory>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                        <configuration>
                            <args>
                                <arg>-dependencyfile</arg>
                                <arg>${project.build.directory}/.scala_dependencies</arg>
                            </args>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass></mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

SparkSql将df写入es

// reusing the example from Spark SQL documentation

import org.apache.spark.sql.SQLContext    
import org.apache.spark.sql.SQLContext._

import org.elasticsearch.spark.sql._      

...

// sc = existing SparkContext
val sqlContext = new SQLContext(sc)

// case class used to define the DataFrame
case class Person(name: String, surname: String, age: Int)

//  create DataFrame
val people = sc.textFile("people.txt")    
        .map(_.split(","))
        .map(p => Person(p(0), p(1), p(2).trim.toInt))
        .toDF()

people.saveToEs("spark/people")           

相关文章

网友评论

    本文标题:spark将数据写入es

    本文链接:https://www.haomeiwen.com/subject/nztlahtx.html