美文网首页
elasticsearch spark

elasticsearch spark

作者: 無敵兔八哥 | 来源:发表于2017-12-28 23:38 被阅读51次

试用场景:TB级历史数据导入(hdfs2es)

1.添加maven依赖

   <dependency>
     <groupId>org.elasticsearch</groupId>
     <artifactId>elasticsearch-spark-20_2.11</artifactId>
     <version>5.1.1</version>
   </dependency>

   <dependency>
     <groupId>org.apache.spark</groupId>
     <artifactId>spark-core_2.11</artifactId>
     <version>2.1.1</version>
   </dependency>

注:①5.1.1添加elasticsearch-hadoop会报错
  ②es-spark版本与es版本一致

2.编写spark程序

package com.hualala.bi

import com.alibaba.fastjson.JSON
import org.apache.commons.lang3.StringUtils
import org.apache.spark.{SparkConf, SparkContext}
//隐式转换  rdd savetoes
import org.elasticsearch.spark._

object esSparkApp {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf()
          .setMaster("local").setAppName("es-spark-test")

    val inputPath = args.apply(0)
    val nodes = args.apply(1)
    
    //配置es参数 包括id routing
    conf.set("es.nodes", nodes)
    conf.set("es.index.auto.create", "true")
    conf.set("es.mapping.id", "id")
    conf.set("es.mapping.routing", "rout")
    conf.set("es.input.json", "yes")

    val sc = new SparkContext(conf)

    val dataRdd = sc.textFile(inputPath)

    //处理字段  id routing
    val billRDD = dataRdd.map(...)

    billRDD.saveToEs("{index}/{type}")

    sc.stop()
  }

}

3.es优化设置

①关闭动态索引

PUT  {index}/{type}/_mapping -d'{"dynamic":false}'

注:id rout 会保存到source 但是不会被索引
②优化gc算法
默认cms 更改为g1 大内存cms stop word会引起节点脱离
③增加refresh_interval、translog flush size、将durability同步改为异步
④加大zen.discover相关设置
⑤一次程序不建议写入过多的索引(100+) 要合理设计索引

相关文章

网友评论

      本文标题:elasticsearch spark

      本文链接:https://www.haomeiwen.com/subject/ocubgxtx.html