美文网首页elasticsearch
elasticsearch-spark更新文档

elasticsearch-spark更新文档

作者: 愚公300代 | 来源:发表于2016-09-05 16:16 被阅读898次

    先看源码:

    import org.apache.spark.{SparkConf, SparkContext}
    import org.elasticsearch.spark._
    
    /**
      * Created by magneto on 16-6-29.
      */
    object Upsert4 extends App{
      val index = "test"
      val target = s"$index/docs"
    
      val props = Map("es.write.operation" -> "upsert",
        //"es.input.json" -> "true",
        "es.mapping.id" -> "id",
        "es.update.script.lang" -> "groovy"
      )
    
      val conf = new SparkConf().setAppName("read for elasticsearch").setMaster("local")
      conf.set("es.nodes", "172.24.63.14")//修改成自己使用的es服务器地址
    
      val sc = new SparkContext(conf)
      val name= Map("id" -> 3, "name" -> Set("sxl1989","sxl9199"))
      val lines = sc.makeRDD(Seq(name))
      val up_params = "new_name:name"
      //val up_script =  "if (ctx._source.containsKey(\"name\")) {ctx._source.name += new_name;} else {ctx._source.name = [new_name];}"
      val up_script =  "if (ctx._source.containsKey('name')) {ctx._source.name += new_name;} else {ctx._source.name = [new_name];}"
      lines.saveToEs(target, props + ("es.update.script.params" -> up_params) + ("es.update.script" -> up_script))
    }
    

    其中:es.input.json控制你输入的是否是json格式的,这里的lines用的是Map所以注释掉。
    另外一处注释是注释掉的脚本,此脚本在es客户端是可以运行的,但是使用es-spark不能运行,必须将双引号改成单引号才行。
    name这个其实代表一个文档,id这列因为“es.mapping.id”->"id"被选做meta数据。
    注意,此处的index可以使用别名。

    es.write.operation -> upsert
    es.update.script.lang -> groovy
    es.update.script.params -> up_params
    es.update.script -> up_script
    是核心配置。

    更多请参考:
    es官方文档
    elasticsearch-hadoop源码

    相关文章

      网友评论

        本文标题:elasticsearch-spark更新文档

        本文链接:https://www.haomeiwen.com/subject/hjvrettx.html