试用场景:TB级历史数据导入(hdfs2es)
1.添加maven依赖
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-spark-20_2.11</artifactId>
<version>5.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.1</version>
</dependency>
注:①5.1.1添加elasticsearch-hadoop会报错
②es-spark版本与es版本一致
2.编写spark程序
package com.hualala.bi
import com.alibaba.fastjson.JSON
import org.apache.commons.lang3.StringUtils
import org.apache.spark.{SparkConf, SparkContext}
//隐式转换 rdd savetoes
import org.elasticsearch.spark._
object esSparkApp {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setMaster("local").setAppName("es-spark-test")
val inputPath = args.apply(0)
val nodes = args.apply(1)
//配置es参数 包括id routing
conf.set("es.nodes", nodes)
conf.set("es.index.auto.create", "true")
conf.set("es.mapping.id", "id")
conf.set("es.mapping.routing", "rout")
conf.set("es.input.json", "yes")
val sc = new SparkContext(conf)
val dataRdd = sc.textFile(inputPath)
//处理字段 id routing
val billRDD = dataRdd.map(...)
billRDD.saveToEs("{index}/{type}")
sc.stop()
}
}
3.es优化设置
①关闭动态索引
PUT {index}/{type}/_mapping -d'{"dynamic":false}'
注:id rout 会保存到source 但是不会被索引
②优化gc算法
默认cms 更改为g1 大内存cms stop word会引起节点脱离
③增加refresh_interval、translog flush size、将durability同步改为异步
④加大zen.discover相关设置
⑤一次程序不建议写入过多的索引(100+) 要合理设计索引
网友评论