所有测试代码全部基于scala,构建工具基于sbt
build.sbt依赖
name := "spark-demo"
version := "1.0"
scalaVersion := "2.11.8"
val sparkVersion = "2.1.1"
libraryDependencies ++= Seq(
//spark
"org.apache.spark" %% "spark-core" % sparkVersion,
"org.apache.spark" %% "spark-sql" % sparkVersion,
("org.elasticsearch" %% "elasticsearch-spark-20" % "6.0.0").excludeAll(
ExclusionRule(organization = "org.apache.spark")
)
)
spark-sql读写ES
/**
* @author created by LXF on 2018/6/1 10:03
*/
import org.apache.spark.sql.SparkSession
import org.elasticsearch.spark._
object App {
case class Person(age: Long, name: String)
def main(args: Array[String]): Unit = {
println("Hello World!")
System.setProperty("hadoop.home.dir", "G:\\hadoop_home")
val spark = SparkSession.builder()
.appName("SparkTest")
.master("local[*]")
.config("es.index.auto.create", "true")
.config("pushdown", "true")
.config("es.nodes", "192.168.7.130")
.config("es.port", "9200")
.config("es.nodes.wan.only", "true")
.getOrCreate()
import spark.implicits._
//从ES中读取数据 {age: xxx, name: xxx} 类型
val sparkDF = spark.sqlContext.read
.format("org.elasticsearch.spark.sql")、
.option("inferSchema", "true").load("test_lxf").as[Person]
// sparkDF.take(10).foreach(println(_))
// val data = spark.read.textFile("g:\\mydata\\*")
//写入到ES,一定要按照这个格式,因为这种格式才带有元数据信息,content就是ES中的列名
val rdd = sparkDF.rdd
// println(s"rdd = ${rdd}")
rdd.saveToEs("index/external")
spark.stop()
}
}
spark RDD 读ES
import org.apache.spark.rdd.RDD
import org.elasticsearch.spark._
object LoadElasticsearchData {
def main(args: Array[String]): Unit = {
val sc = new SparkContext(
new SparkConf()
.setAppName("e2e.computing.test")
.setMaster("local[*]")
.set("spark.cassandra.connection.host", "192.168.14.141")
//.set("es.nodes", "192.168.14.140")
//192.168.7.130:9200
.set("es.nodes", "192.168.7.130")
.set("es.port", "9200")
.set("es.index.auto.create", "true")
.set("es.mapping.date.rich", "false")
)
// ES的RDD test_lxf query = "查询串" elasticsearch.spark 默认全部查出数据
val query =
s"""
|{
| "query": {
| "match_all": {}
| }
|}
""".stripMargin
val esRdd = sc.esRDD(s"test_lxf", query)
}
spark RDD 写ES
import com.ffcs.itm.e2e.test.util
import org.elasticsearch.spark._
/**
* @author LXF
* 2018/3/28 15:29
*/
object SaveElasticsearch {
def main(args: Array[String]): Unit = {
val sc = new SparkContext(
new SparkConf()
.setAppName("e2e.computing.test")
.setMaster("local[*]")
.set("spark.cassandra.connection.host", "192.168.14.141")
//.set("es.nodes", "192.168.14.140")
//192.168.7.130:9200
.set("es.nodes", "192.168.7.130")
.set("es.port", "9200")
.set("es.index.auto.create", "true")
.set("es.mapping.date.rich", "false")
)
val airports = Map("OTP" -> "Otopeni", "SFO" -> "San Fran")
//不存在就新建
sc.makeRDD(Seq(airports)).saveToEs("test_lxf2")
}
}
网友评论