美文网首页
elasticsearch spark

elasticsearch spark

作者: 無敵兔八哥 | 来源:发表于2017-12-28 23:38 被阅读51次

    试用场景:TB级历史数据导入(hdfs2es)

    1.添加maven依赖

       <dependency>
         <groupId>org.elasticsearch</groupId>
         <artifactId>elasticsearch-spark-20_2.11</artifactId>
         <version>5.1.1</version>
       </dependency>
    
       <dependency>
         <groupId>org.apache.spark</groupId>
         <artifactId>spark-core_2.11</artifactId>
         <version>2.1.1</version>
       </dependency>
    
    

    注:①5.1.1添加elasticsearch-hadoop会报错
      ②es-spark版本与es版本一致

    2.编写spark程序

    package com.hualala.bi
    
    import com.alibaba.fastjson.JSON
    import org.apache.commons.lang3.StringUtils
    import org.apache.spark.{SparkConf, SparkContext}
    //隐式转换  rdd savetoes
    import org.elasticsearch.spark._
    
    object esSparkApp {
    
      def main(args: Array[String]): Unit = {
    
        val conf = new SparkConf()
              .setMaster("local").setAppName("es-spark-test")
    
        val inputPath = args.apply(0)
        val nodes = args.apply(1)
        
        //配置es参数 包括id routing
        conf.set("es.nodes", nodes)
        conf.set("es.index.auto.create", "true")
        conf.set("es.mapping.id", "id")
        conf.set("es.mapping.routing", "rout")
        conf.set("es.input.json", "yes")
    
        val sc = new SparkContext(conf)
    
        val dataRdd = sc.textFile(inputPath)
    
        //处理字段  id routing
        val billRDD = dataRdd.map(...)
    
        billRDD.saveToEs("{index}/{type}")
    
        sc.stop()
      }
    
    }
    
    

    3.es优化设置

    ①关闭动态索引

    PUT  {index}/{type}/_mapping -d'{"dynamic":false}'
    

    注:id rout 会保存到source 但是不会被索引
    ②优化gc算法
    默认cms 更改为g1 大内存cms stop word会引起节点脱离
    ③增加refresh_interval、translog flush size、将durability同步改为异步
    ④加大zen.discover相关设置
    ⑤一次程序不建议写入过多的索引(100+) 要合理设计索引

    相关文章

      网友评论

          本文标题:elasticsearch spark

          本文链接:https://www.haomeiwen.com/subject/ocubgxtx.html