spark加载数据到ES

作者: 若与 | 来源:发表于2021-02-09 17:53 被阅读0次

在日常开发中一定会遇到,spark将计算好的数据load到es中,供后端同学查询使用。下面介绍一下spark写es的方式。 使用scala进行演示,对应的java自己google了。

spark写es需要使用到 对应的包es包。maven配置如下

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>

        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch-hadoop</artifactId>
            <version>7.0.0</version>
        </dependency>

使用MAP方式

代码如下

package org.bigdata.es

import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.spark._

object D01 {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("d01").setMaster("local[*]")
    conf.set("es.index.auto.create", "true")

    val sc: SparkContext = new SparkContext(conf)

    // map方式
    val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3)
    val airports = Map("arrival" -> "Otopeni", "SFO" -> "San Fran")
    sc.makeRDD(Seq(numbers, airports)).saveToEs("spark/docs")

  }
}

注意: 必须要导入 import org.elasticsearch.spark._, 不然,就没有 saveToEs方法了

下面介绍一下, org.elasticsearch.spark._导入的隐式函数

包对象中隐式函数

在 org.elasticsearch.spark._ 下面的包对象中有 一个隐式函数,将 RDD转成 SparkRDDFunctions

反编译成 java代码如下

package org.bigdata.es;

import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import scala.collection.Seq;
import scala.collection.immutable.Map;
import scala.runtime.BoxesRunTime;

public final class D01$ {
 public static final D01$ MODULE$;
 
 public void main(String[] args) {
   SparkConf conf = (new SparkConf()).setAppName("d01").setMaster("local[*]");
   conf.set("es.index.auto.create", "true");
   SparkContext sc = new SparkContext(conf);
   (new scala.Tuple2[3])[0] = scala.Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(scala.Predef$.MODULE$.ArrowAssoc("one"), BoxesRunTime.boxToInteger(1));
   (new scala.Tuple2[3])[1] = scala.Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(scala.Predef$.MODULE$.ArrowAssoc("two"), BoxesRunTime.boxToInteger(2));
   (new scala.Tuple2[3])[2] = scala.Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(scala.Predef$.MODULE$.ArrowAssoc("three"), BoxesRunTime.boxToInteger(3));
   Map numbers = (Map)scala.Predef$.MODULE$.Map().apply((Seq)scala.Predef$.MODULE$.wrapRefArray((Object[])new scala.Tuple2[3]));
   (new scala.Tuple2[2])[0] = scala.Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(scala.Predef$.MODULE$.ArrowAssoc("arrival"), "Otopeni");
   (new scala.Tuple2[2])[1] = scala.Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(scala.Predef$.MODULE$.ArrowAssoc("SFO"), "San Fran");
   Map airports = (Map)scala.Predef$.MODULE$.Map().apply((Seq)scala.Predef$.MODULE$.wrapRefArray((Object[])new scala.Tuple2[2]));
   (new Map[2])[0] = numbers;
   (new Map[2])[1] = airports;
   org.elasticsearch.spark.package$.MODULE$.sparkRDDFunctions(sc.makeRDD((Seq)scala.collection.Seq$.MODULE$.apply((Seq)scala.Predef$.MODULE$.wrapRefArray((Object[])new Map[2])), sc.makeRDD$default$2(), scala.reflect.ClassTag$.MODULE$.apply(Map.class)), scala.reflect.ClassTag$.MODULE$.apply(Map.class)).saveToEs("spark/docs");
 }
 
 private D01$() {
   MODULE$ = this;
 }
}

再给一下,其他的 写es的代码

使用样例类方式


package org.bigdata.es

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.spark.rdd.EsSpark

object D02 {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("d01").setMaster("local[*]")
    conf.set("es.index.auto.create", "true")

    val sc: SparkContext = new SparkContext(conf)
    val upcomingTrip: Trip = Trip("OTP", "SFO")
    val lastWeekTrip: Trip = Trip("MUC", "OTP")

    val rdd: RDD[Trip] = sc.makeRDD(Seq(upcomingTrip, lastWeekTrip))
    EsSpark.saveToEs(rdd, "spark/docs", Map("es.mapping.id" -> "id"))
  }
}


// define a case class
case class Trip(departure: String, arrival: String)

使用字符串json方式

package org.bigdata.es

import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.spark._


object D03 {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("d01").setMaster("local[*]")
    conf.set("es.index.auto.create", "true")

    val sc: SparkContext = new SparkContext(conf)

    val json1 = """{"reason" : "business", "airport" : "SFO"}"""
    val json2 = """{"participants" : 5, "airport" : "OTP"}"""
    sc.makeRDD(Seq(json1, json2)).saveToEs("spark/json-trips")

  }
}

动态index

package org.bigdata.es

import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.spark._

object D04 {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("d01").setMaster("local[*]")
    conf.set("es.index.auto.create", "true")

    val sc: SparkContext = new SparkContext(conf)

    val game = Map(
      "media_type" -> "game",
      "title" -> "FF VI",
      "year" -> "1994")

    val book = Map("media_type" -> "book", "title" -> "Harry Potter", "year" -> "2010")
    val cd = Map("media_type" -> "music", "title" -> "Surfing With The Alien")

    sc.makeRDD(Seq(game, book, cd)).saveToEs("my-collection-{media_type}/doc")

  }
}

相关文章

网友评论

    本文标题:spark加载数据到ES

    本文链接:https://www.haomeiwen.com/subject/gojcxltx.html