任何内容RDD
都可以保存到Elasticsearch,在实践中,这意味着RDD
类型是Map
(Scala或Java的)类型,JavaBean
Scala的案例类。如果不是这种情况,则可以轻松地在Spark中转换数据或使用自己的自定义插件ValueWriter
。
import org.apache.spark.SparkContext
//Spark Scala进口
import org.apache.spark.SparkContext._
import org.elasticsearch.spark._
//elasticsearh-hadoop Scala导入
...
val conf = ...
val sc = new SparkContext(conf)
//通过其Scala API 启动Spark
val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3)
val airports = Map("arrival" -> "Otopeni", "SFO" -> "San Fran")
sc.makeRDD(
//makeRDD根据指定的集合创建一个临时的,其他任何RDD(Java或Scala)都可以传入
Seq(numbers, airports)
).saveToEs("spark/docs")
在Elasticsearch下的内容下建立索引
将Map对象写入ElasticSearch
package cn.itzkx.spark_es
import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.spark._
//将Map对象写入ElasticSearch
//https://www.iteblog.com/archives/1728.html#id
object Spark2Es {
def main(args: Array[String]): Unit = {
val master ="local"
val conf = new SparkConf().setAppName("iteblog").setMaster(master)
conf.set("es.index.auto.create", "true")
conf.set("es.nodes", "192.168.47.100:9200,192.168.47.110:9200,192.168.47.120:9200")
// //设置es的相关参数
// val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
//
// //2、构建SparkContext对象
// val sc: SparkContext = spark.sparkContext
val sc = new SparkContext(conf)
//sc.setLogLevel("warn")
val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3)
val airports = Map("OTP" -> "ibex", "SFO" -> "San Fran")
sc.makeRDD(Seq(numbers, airports)).saveToEs("itzkx/docs")
sc.stop()
}
}
将case class对象写入ElasticSearch
package cn.itzkx.spark_es
import org.apache.spark.rdd.RDD
import org.elasticsearch.spark.rdd.EsSpark
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
import org.elasticsearch.spark._
//将case class对象写入ElasticSearch
object Spark2Esl {
def main(args: Array[String]): Unit = {
val master ="local"
val conf = new SparkConf().setAppName("iteblog").setMaster(master)
conf.set("es.index.auto.create", "true")
conf.set("es.nodes", "192.168.47.100:9200,192.168.47.110:9200,192.168.47.120:9200")
// //设置es的相关参数
// val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
//
// //2、构建SparkContext对象
// val sc: SparkContext = spark.sparkContext
val sc = new SparkContext(conf)
case class Trip(departure: String, arrival: String)
val upcomingTrip = Trip("OTP", "SFO")
val lastWeekTrip = Trip("MUC", "OTP")
val rdd1: RDD[Trip] = sc.makeRDD(Seq(upcomingTrip, lastWeekTrip))
rdd1.saveToEs("itzkx/class")
/*上面的代码片段将upcomingTrip和lastWeekTrip写入到名为iteblog的_index中,
type是class。上面都是通过隐式转换才使得rdd拥有saveToEs方法。
elasticsearch-hadoop还提供显式方法来把RDD写入到ElasticSearch中,如下:需要导包
import org.elasticsearch.spark.rdd.EsSpark*/
val rdd2: RDD[Trip] = sc.makeRDD(Seq(upcomingTrip, lastWeekTrip))
EsSpark.saveToEs(rdd2, "spark/docs")
sc.stop()
}
}
将Json字符串写入ElasticSearch
package cn.itzkx.spark_es
import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.spark._
//将Json字符串写入ElasticSearch
object Spark2Esla{
def main(args: Array[String]): Unit = {
val master ="local"
val conf = new SparkConf().setAppName("iteblog").setMaster(master)
conf.set("es.index.auto.create", "true")
conf.set("es.nodes", "192.168.47.100:9200,192.168.47.110:9200,192.168.47.120:9200")
// //设置es的相关参数
// val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
//
// //2、构建SparkContext对象
// val sc: SparkContext = spark.sparkContext
val sc = new SparkContext(conf)
sc.setLogLevel("warn")
val json1 = """{"id" : 1, "zkx" : "www.ibex.com", "weylin" : "ibex_hadoop"}"""
val json2 = """{"id" : 2, "zkx" : "books.ibex.com", "weylin" : "ibex_hadoop"}"""
sc.makeRDD(Seq(json1, json2)).saveJsonToEs("itzkx/json")
}
}
动态设置插入的type
package cn.itzkx.spark_es
import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.spark._
//动态设置插入的type
object Spark2Eslas {
def main(args: Array[String]): Unit = {
val master ="local"
val conf = new SparkConf().setAppName("iteblog").setMaster(master)
conf.set("es.index.auto.create", "true")
conf.set("es.nodes", "192.168.47.100:9200,192.168.47.110:9200,192.168.47.120:9200")
// //设置es的相关参数
// val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
//
// //2、构建SparkContext对象
// val sc: SparkContext = spark.sparkContext
val sc = new SparkContext(conf)
sc.setLogLevel("warn")
val game = Map("media_type" -> "game", "title" -> "FF VI", "year" -> "1994")
val book = Map("media_type" -> "book", "title" -> "Harry Potter", "year" -> "2010")
val cd = Map("media_type" -> "music", "title" -> "Surfing With The Alien")
sc.makeRDD(Seq(game, book, cd)).saveToEs("itzkx/{media_type}")
}
}
package cn.itzkx.spark_es
import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.spark._
import org.elasticsearch.spark.rdd.EsSpark
object Spark2Eslast {
/* 自定义id
在ElasticSearch中,_index/_type/_id的组合可以唯一确定一个Document。
如果我们不指定id的话,ElasticSearch将会自动为我们生产全局唯一的id,自动生成的ID有20个字符长如下:
{
"_index": "iteblog",
"_type": "docs",
"_id": "AVZy3d5sJfxPRwCjtWM-",
"_score": 1,
"_source": {
"arrival": "Otopeni",
"SFO": "San Fran"
}
}
很显然,这么长的字符串没啥意义,而且也不便于我们记忆使用。不过我们可以在插入数据的时候手动指定id的值,如下:*/
def main(args: Array[String]): Unit = {
val master ="local"
val conf = new SparkConf().setAppName("iteblog").setMaster(master)
conf.set("es.index.auto.create", "true")
conf.set("es.nodes", "192.168.47.100:9200,192.168.47.110:9200,192.168.47.120:9200")
// //设置es的相关参数
// val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
//
// //2、构建SparkContext对象
// val sc: SparkContext = spark.sparkContext
val sc = new SparkContext(conf)
val otp = Map("iata" -> "OTP", "name" -> "Otopeni")
val muc = Map("iata" -> "MUC", "name" -> "Munich")
val sfo = Map("iata" -> "SFO", "name" -> "San Fran")
val airportsRDD = sc.makeRDD(Seq((1, otp), (2, muc), (3, sfo)))
airportsRDD.saveToEsWithMeta("itzkx/2015")
/* 上面的Seq((1, otp), (2, muc), (3, sfo))语句指定为各个对象指定了id值,
分别为1、2、3。然后你可以通过/iteblog/2015/1 URL搜索到otp对象的值。我们还可以如下方式指定id:*/
//下面这种更适合实际场景【动态映射】
val json1 = """{"id" : 1, "blog" : "www.iteblog.com", "weixin" : "iteblog_hadoop"}"""
val json2 = """{"id" : 2, "blog" : "books.iteblog.com", "weixin" : "iteblog_hadoop"}"""
//json2:String = {"id" : 2, "blog" : "books.iteblog.com", "weixin":"iteblog_hadoop"}
val rdd = sc.makeRDD(Seq(json1, json2))
EsSpark.saveToEs(rdd, "itzkx/docs", Map("es.mapping.id" -> "id"))
//上面通过es.mapping.id参数将对象中的id字段映射为每条记录的id。*/
}
}
package cn.itzkx.spark_es
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
import org.elasticsearch.spark.rdd.Metadata._
object Spark2Eslasti {
/* 自定义记录的元数据
可以在写入数据的时候自定义记录的元数据,如下:*/
def main(args: Array[String]): Unit = {
val master = "local"
val conf = new SparkConf().setAppName("itbex").setMaster(master)
conf.set("es.index.auto.create", "true")
conf.set("es.nodes", "192.168.47.100:9200,192.168.47.110:9200,192.168.47.120:9200")
// //设置es的相关参数
// val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
//
// //2、构建SparkContext对象
// val sc: SparkContext = spark.sparkContext
val sc = new SparkContext(conf)
//otp元数据1
val otp = Map("iata" -> "OTP", "name" -> "Otopeni")
//muc元数据2
val muc = Map("iata" -> "MUC", "name" -> "Munich")
//sfo元数据3
val sfo = Map("iata" -> "SFO", "name" -> "San Fran")
val otpMeta = Map(ID -> 1, TTL -> "3h")
val mucMeta = Map(ID -> 2, VERSION -> "23")
val sfoMeta = Map(ID -> 3)
val airportsRDD = sc.makeRDD(Seq((otpMeta, otp), (mucMeta, muc), (sfoMeta, sfo)))
//airportsRDD.saveToEsWithMeta "iteblog/2015"
airportsRDD.saveAsTextFile("itzkx/2015")
}
}
package cn.itzkx.spark_es
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql._
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.elasticsearch.spark.sql._
// case class used to define the DataFrame
case class Person(name: String, surname: String, age: Int)
object Spark2Eslastic {
def main(args: Array[String]): Unit = {
//设置es的相关参数
val master = "local"
val conf = new SparkConf().setAppName("it").setMaster(master)
conf.set("es.index.auto.create", "true")
conf.set("es.nodes", "192.168.220.75:9200,192.168.220.76:9200,192.168.220.77:9200")
val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
//2、构建SparkContext对象
val sc: SparkContext = spark.sparkContext
//val sc = new SparkContext(conf)
// reusing the example from Spark SQL documentation
// sc = existing SparkContext
val sqlContext = new SQLContext(sc)
// create DataFrame
// import spark.implicits._
// val personDF: DataFrame = sc.textFile("people.txt")
// .map(_.split(","))
// .map(p => Person(p(0), p(1), p(2).trim.toInt)).toDF()
import spark.implicits._
val value = sc.textFile("people.txt").map(line => line.split(","))
val people = value.map(p => {
(p(0), p(2))
})
//zhangsan zhangsanfeng 108
val personDF = people.toDF("tax_rate_code", "percentage_rate")
personDF.saveToEs("itzkx/personDF")
personDF.printSchema()
personDF.show()
sc.stop()
}
}
以上所使用pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>Spark_all</artifactId>
<groupId>cn.itcast.Spark_all</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>Spark_Es</artifactId>
<dependencies>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>6.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.3</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-spark-20_2.11</artifactId>
<version>6.4.1</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
<args>
<arg>-dependencyfile</arg>
<arg>${project.build.directory}/.scala_dependencies</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass></mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
SparkSql将df写入es
// reusing the example from Spark SQL documentation
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SQLContext._
import org.elasticsearch.spark.sql._
...
// sc = existing SparkContext
val sqlContext = new SQLContext(sc)
// case class used to define the DataFrame
case class Person(name: String, surname: String, age: Int)
// create DataFrame
val people = sc.textFile("people.txt")
.map(_.split(","))
.map(p => Person(p(0), p(1), p(2).trim.toInt))
.toDF()
people.saveToEs("spark/people")
网友评论