美文网首页
spark读写ES

spark读写ES

作者: 阿粒_lxf | 来源:发表于2018-07-10 01:11 被阅读0次

    所有测试代码全部基于scala,构建工具基于sbt

    build.sbt依赖

    name := "spark-demo"
    version := "1.0"
    scalaVersion := "2.11.8"
    val sparkVersion = "2.1.1"
    
    libraryDependencies ++= Seq(
      //spark
      "org.apache.spark" %% "spark-core" % sparkVersion,
      "org.apache.spark" %% "spark-sql" % sparkVersion,
      ("org.elasticsearch" %% "elasticsearch-spark-20" % "6.0.0").excludeAll(
        ExclusionRule(organization = "org.apache.spark")
      )
    )
    

    spark-sql读写ES

        /**
      * @author created by LXF on 2018/6/1 10:03
      */
    
    import org.apache.spark.sql.SparkSession
    import org.elasticsearch.spark._
    
    object App {
    
      case class Person(age: Long, name: String)
    
      def main(args: Array[String]): Unit = {
    
        println("Hello World!")
    
        System.setProperty("hadoop.home.dir", "G:\\hadoop_home")
    
        val spark = SparkSession.builder()
          .appName("SparkTest")
          .master("local[*]")
          .config("es.index.auto.create", "true")
          .config("pushdown", "true")
          .config("es.nodes", "192.168.7.130")
          .config("es.port", "9200")
          .config("es.nodes.wan.only", "true")
          .getOrCreate()
    
        import spark.implicits._
        //从ES中读取数据  {age: xxx, name: xxx} 类型
        val sparkDF = spark.sqlContext.read
          .format("org.elasticsearch.spark.sql")、
          .option("inferSchema", "true").load("test_lxf").as[Person]
    
        //    sparkDF.take(10).foreach(println(_))
    
        //    val data = spark.read.textFile("g:\\mydata\\*")
    
        //写入到ES,一定要按照这个格式,因为这种格式才带有元数据信息,content就是ES中的列名
        val rdd = sparkDF.rdd
        //    println(s"rdd = ${rdd}")
    
        rdd.saveToEs("index/external")
    
        spark.stop()
    
      }
    
    }
    

    spark RDD 读ES

    import org.apache.spark.rdd.RDD
    import org.elasticsearch.spark._ 
    object LoadElasticsearchData { 
        def main(args: Array[String]): Unit = { 
            val sc = new SparkContext(
          new SparkConf()
            .setAppName("e2e.computing.test")
            .setMaster("local[*]")
            .set("spark.cassandra.connection.host", "192.168.14.141")
            //.set("es.nodes", "192.168.14.140")
            //192.168.7.130:9200
            .set("es.nodes", "192.168.7.130")
            .set("es.port", "9200")
            .set("es.index.auto.create", "true")
            .set("es.mapping.date.rich", "false")
        )
        // ES的RDD  test_lxf   query = "查询串"  elasticsearch.spark 默认全部查出数据
        val query =
            s"""
               |{
               |  "query": {
               |    "match_all": {}
               |  }
               |}
             """.stripMargin
       val esRdd = sc.esRDD(s"test_lxf", query) 
    }
    

    spark RDD 写ES

    import com.ffcs.itm.e2e.test.util
    import org.elasticsearch.spark._
    
    /**
      * @author LXF
      *         2018/3/28 15:29
      */
    object SaveElasticsearch {
      def main(args: Array[String]): Unit = {
        val sc = new SparkContext(
          new SparkConf()
            .setAppName("e2e.computing.test")
            .setMaster("local[*]")
            .set("spark.cassandra.connection.host", "192.168.14.141")
            //.set("es.nodes", "192.168.14.140")
            //192.168.7.130:9200
            .set("es.nodes", "192.168.7.130")
            .set("es.port", "9200")
            .set("es.index.auto.create", "true")
            .set("es.mapping.date.rich", "false")
        )
        
        val airports = Map("OTP" -> "Otopeni", "SFO" -> "San Fran")
        //不存在就新建
        sc.makeRDD(Seq(airports)).saveToEs("test_lxf2")
      }
    }
    

    相关文章

      网友评论

          本文标题:spark读写ES

          本文链接:https://www.haomeiwen.com/subject/eemxpftx.html