美文网首页
ES Rest-high-API简单操作(代码存档)

ES Rest-high-API简单操作(代码存档)

作者: K__3f8b | 来源:发表于2021-01-12 16:51 被阅读0次

    相关文档

    https://www.elastic.co/guide/en/elasticsearch/client/java-rest/6.8/java-rest-high-put-mapping.html

    https://www.elastic.co/guide/en/elasticsearch/reference/6.8/mapping.html

    https://www.elastic.co/guide/en/elasticsearch/reference/6.8/common-options.html

    https://www.elastic.co/guide/en/elasticsearch/reference/6.8/mapping-types.html

    https://www.elastic.co/guide/en/elasticsearch/reference/6.8/date.html

    依赖

    <dependency>
        <groupId>org.elasticsearch.client</groupId>
        <artifactId>elasticsearch-rest-high-level-client</artifactId>
        <version>6.8.4</version>
    </dependency>
    

    配置文件 config.properties

    # Elasticsearch配置
    Elasticsearch.host=192.168.1.101
    Elasticsearch.port=9200
    Elasticsearch.scheme=http
    

    工具类 ESUtil

    package com.wuzhou.utils;
    
    import com.alibaba.fastjson.JSON;
    import org.apache.http.HttpHost;
    import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
    import org.elasticsearch.action.bulk.BulkRequest;
    import org.elasticsearch.action.bulk.BulkResponse;
    import org.elasticsearch.action.delete.DeleteRequest;
    import org.elasticsearch.action.delete.DeleteResponse;
    import org.elasticsearch.action.index.IndexRequest;
    import org.elasticsearch.action.index.IndexResponse;
    import org.elasticsearch.action.support.master.AcknowledgedResponse;
    import org.elasticsearch.action.update.UpdateRequest;
    import org.elasticsearch.action.update.UpdateResponse;
    import org.elasticsearch.client.*;
    import org.elasticsearch.client.indices.CreateIndexRequest;
    import org.elasticsearch.client.indices.CreateIndexResponse;
    import org.elasticsearch.common.xcontent.XContentType;
    import org.elasticsearch.index.query.QueryBuilders;
    import org.elasticsearch.index.reindex.BulkByScrollResponse;
    import org.elasticsearch.index.reindex.DeleteByQueryRequest;
    
    import java.io.IOException;
    import java.util.List;
    import java.util.Map;
    import java.util.Objects;
    import java.util.Properties;
    
    /**
     * 使用Java High Level REST Client(待补全)
     * <p>
     * https://www.elastic.co/guide/en/elasticsearch/client/java-rest/6.8/java-rest-high-put-mapping.html
     *
     * @author Y_Kevin
     * @date 2021-01-05 16:06
     */
    public class ESUtil {
    
      /**
       * 获取Elasticsearch配置信息
       */
      private final Properties properties = PropertiesUtil.load("config.properties");
      private final String ES_HOST = properties.getProperty("Elasticsearch.host");
      private final int ES_HTTP_PORT = Integer.parseInt(properties.getProperty("Elasticsearch.port"));
      private final String ES_SCHEME = properties.getProperty("Elasticsearch.scheme");
      private final RestHighLevelClient restClient = null;
    
      /**
       * 自定义 RequestOptions 或者直接使用默认的 RequestOptions.DEFAULT
       * See <a href="https://www.elastic.co/guide/en/elasticsearch/client/java-rest/6.8/java-rest-low-usage-requests.html#java-rest-low-usage-request-options"></a>
       */
      private static final RequestOptions COMMON_OPTIONS;
    
      static {
        RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();
        //builder.addHeader("Authorization", "Bearer " + TOKEN);
        //builder.setHttpAsyncResponseConsumerFactory(
        //      new HttpAsyncResponseConsumerFactory
        //              .HeapBufferedResponseConsumerFactory(30 * 1024 * 1024 * 1024));
        COMMON_OPTIONS = builder.build();
      }
    
      /**
       * 获取客户端
       * See <a href="https://www.elastic.co/guide/en/elasticsearch/client/java-rest/6.8/java-rest-high-getting-started-initialization.html">Initialization</a>
       *
       * @return RestHighLevelClient
       */
      public RestHighLevelClient getRestClient() {
        if (restClient == null) {
          HttpHost httpHost = new HttpHost(ES_HOST, ES_HTTP_PORT, ES_SCHEME);
          RestClientBuilder builder = RestClient.builder(httpHost);
          return new RestHighLevelClient(builder);
        } else {
          return restClient;
        }
      }
    
      /**
       * 关闭客户端
       *
       * @param restClient 客户端
       */
      public void closeClient(RestHighLevelClient restClient) {
        if (!Objects.isNull(restClient)) {
          try {
            restClient.close();
          } catch (IOException e) {
            e.printStackTrace();
          }
        }
      }
    
      /**
       * 创建索引
       * See <a href="https://www.elastic.co/guide/en/elasticsearch/client/java-rest/6.8/java-rest-high-create-index.html">Create Index API</a>
       *
       * @param indexName 索引名称
       * @param mapping   映射规则
       * @throws IOException
       */
      public void createMappings(String indexName, Map<String, Object> mapping) throws IOException {
        // 获取客户端
        RestHighLevelClient restClient = getRestClient();
        CreateIndexRequest createRequest = new CreateIndexRequest(indexName);
        createRequest.mapping(mapping);
        //提交
        CreateIndexResponse createIndexResponse = restClient.indices().create(createRequest, COMMON_OPTIONS);
        System.out.println("创建完成!==》" + createIndexResponse.isAcknowledged());
        // 关闭客户端
        closeClient(restClient);
      }
    
      /**
       * 删除索引
       * See <a href="https://www.elastic.co/guide/en/elasticsearch/client/java-rest/6.8/java-rest-high-delete-index.html">Delete Index API</a>
       *
       * @param indexName 索引名称
       * @throws IOException
       */
      public void deleteIndex(String indexName) throws IOException {
        // 获取客户端
        RestHighLevelClient restClient = getRestClient();
        DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(indexName);
        // 提交
        AcknowledgedResponse deleteIndexResponse = restClient.indices().delete(deleteIndexRequest, COMMON_OPTIONS);
        System.out.println("删除完成!==》" + deleteIndexResponse.isAcknowledged());
        // 关闭客户端
        closeClient(restClient);
      }
    
      /**
       * 插入一条
       * See <a href="https://www.elastic.co/guide/en/elasticsearch/client/java-rest/6.8/java-rest-high-document-index.html">Index API</a>
       *
       * @param indexName 索引名称
       * @param source    数据
       * @throws IOException
       */
      public void addOneDoc(String indexName, Object source) throws IOException {
        // 获取客户端
        RestHighLevelClient restClient = getRestClient();
        IndexRequest indexRequest = new IndexRequest(indexName);
        // 如果只使用 JSON.toJSONString ,如果source存在List,List只有第一个被解析
        String jsonString = JSON.toJSONString(JSON.toJSON(source));
        indexRequest.source(jsonString, XContentType.JSON).type("_doc");
        // 提交
        IndexResponse indexResponse = restClient.index(indexRequest, COMMON_OPTIONS);
        System.out.println("单条插入完成!==》" + indexResponse.getResult());
        // 关闭客户端
        closeClient(restClient);
      }
    
      /**
       * 批量插入
       * See <a href="https://www.elastic.co/guide/en/elasticsearch/client/java-rest/6.8/java-rest-high-document-bulk.html">Bulk API</a>
       *
       * @param indexName 索引名称
       * @param list      数据
       * @throws IOException
       */
      public void addBatchDoc(String indexName, List<Object> list) throws IOException {
        // 获取客户端
        RestHighLevelClient restClient = getRestClient();
    
        BulkRequest bulkRequest = new BulkRequest();
        for (Object source : list) {
          // 如果只使用 JSON.toJSONString ,如果source存在List,List只有第一个被解析
          String jsonString = JSON.toJSONString(JSON.toJSON(source));
          bulkRequest.add(new IndexRequest(indexName).source(jsonString, XContentType.JSON).type("_doc"));
        }
        // 提交
        BulkResponse bulkResponse = restClient.bulk(bulkRequest, COMMON_OPTIONS);
        System.out.println("多条插入完成!==》" + bulkResponse.status());
        // 关闭客户端
        closeClient(restClient);
      }
    
      /**
       * 单条Doc删除
       * See <a href="https://www.elastic.co/guide/en/elasticsearch/client/java-rest/6.8/java-rest-high-document-delete.html">Delete API</a>
       *
       * @param indexName 索引名称
       * @param id        DocID
       * @throws IOException
       */
      public void deleteDoc(String indexName, String id) throws IOException {
        // 获取客户端
        RestHighLevelClient restClient = getRestClient();
        DeleteRequest deleteRequest = new DeleteRequest(indexName, "_doc", id);
        // 提交
        DeleteResponse deleteResponse = restClient.delete(deleteRequest, COMMON_OPTIONS);
        System.out.println("单条Doc删除完成!==》" + deleteResponse.status());
        // 关闭客户端
        closeClient(restClient);
      }
    
      /**
       * 按条件删除
       * See <a href="https://www.elastic.co/guide/en/elasticsearch/client/java-rest/6.8/java-rest-high-document-delete-by-query.html">Delete By Query API</a>
       *
       * @param indexName 索引名称
       * @throws IOException
       */
      public void deleteDocByQuery(String fieldName, String value, String... indexName) throws IOException {
        // 获取客户端
        RestHighLevelClient restClient = getRestClient();
        DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(indexName);
        deleteByQueryRequest.setQuery(QueryBuilders.termQuery(fieldName, value));
        // 提交
        BulkByScrollResponse bulkResponse = restClient.deleteByQuery(deleteByQueryRequest, COMMON_OPTIONS);
        System.out.println("条件查询Doc删除完成!==》" + bulkResponse.getStatus());
        // 关闭客户端
        closeClient(restClient);
      }
    
      /**
       * 按删除全部 (在条件删除上修改条件参数)
       * See <a href="https://www.elastic.co/guide/en/elasticsearch/client/java-rest/6.8/java-rest-high-document-delete-by-query.html">Delete By Query API</a>
       *
       * @param indexName 索引名称
       * @throws IOException
       */
      public void deleteAllDoc(String... indexName) throws IOException {
        // 获取客户端
        RestHighLevelClient restClient = getRestClient();
        DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(indexName);
        // 删除所有
        deleteByQueryRequest.setQuery(QueryBuilders.matchAllQuery());
        // 提交
        BulkByScrollResponse bulkResponse = restClient.deleteByQuery(deleteByQueryRequest, COMMON_OPTIONS);
        System.out.println("Doc全部删除完成!==》" + bulkResponse.getTotal());
        // 关闭客户端
        closeClient(restClient);
      }
    
      /**
       * 修改
       * See <a href="https://www.elastic.co/guide/en/elasticsearch/client/java-rest/6.8/java-rest-high-document-update.html">Update API</a>
       */
      public void updateDoc(String fieldName, String id) throws IOException {
        // 获取客户端
        RestHighLevelClient restClient = getRestClient();
        UpdateRequest updateRequest = new UpdateRequest(fieldName, "_doc", id);
        // 修改内容
          
        // 提交
        UpdateResponse updateResponse = restClient.update(updateRequest, COMMON_OPTIONS);
        System.out.println("Doc修改完成!==》" + updateResponse.status());
        // 关闭客户端
        closeClient(restClient);
      }
    
      /**
       * 查找
       * See <a href="https://www.elastic.co/guide/en/elasticsearch/client/java-rest/6.8/java-rest-high-search.html">Search API</a>
       */
      public void search() {
      }
    }
    

    工具类Scala版

    package com.wuzhou.utils
    
    import java.util.{Objects, Properties}
    import com.alibaba.fastjson.JSON
    import com.alibaba.fastjson.serializer.SerializerFeature
    import org.apache.http.HttpHost
    import org.elasticsearch.action.bulk.{BulkRequest, BulkResponse}
    import org.elasticsearch.action.index.{IndexRequest, IndexResponse}
    import org.elasticsearch.client.{RequestOptions, RestClient, RestClientBuilder, RestHighLevelClient}
    import org.elasticsearch.common.xcontent.XContentType
    
    /**
     * @author Y_Kevin
     * @date 2021-01-06 16:52
     */
    object EsUtil4Scala {
    
        /**
         * 获取Elasticsearch配置信息
         */
        private val properties: Properties = PropertiesUtil.load("config.properties")
        private val ES_HOST: String = properties.getProperty("Elasticsearch.host")
        private val ES_HTTP_PORT: Int = properties.getProperty("Elasticsearch.port").toInt
        private val ES_SCHEME: String = properties.getProperty("Elasticsearch.scheme")
        private val restClient: RestHighLevelClient = null
    
        /**
         * 获取客户端
         *
         * @return RestHighLevelClient
         */
        def getRestClient: RestHighLevelClient = {
            if (restClient == null) {
                val httpHost: HttpHost = new HttpHost(ES_HOST, ES_HTTP_PORT, ES_SCHEME)
                val builder: RestClientBuilder = RestClient.builder(httpHost)
                new RestHighLevelClient(builder)
            } else {
                restClient
            }
        }
    
        /**
         * 关闭客户端
         *
         * @param restClient 客户端
         */
        def closeClient(restClient: RestHighLevelClient): Unit = {
            if (!Objects.isNull(restClient)) {
                try {
                    restClient.close()
                } catch {
                    case e: Exception =>
                        e.printStackTrace()
                }
            }
        }
    
        /**
         * 插入一条
         *
         * @param indexName 索引名称
         * @param source    数据
         */
        def addOneDoc(indexName: String, source: Object): Unit = {
            // 获取客户端
            val restClient: RestHighLevelClient = getRestClient
            val indexRequest: IndexRequest = new IndexRequest(indexName)
            println("source===>" + source)
            // 如果只使用 JSON.toJSONString ,如果source存在List,List只有第一个被解析
            val jsonString: String = JSON.toJSONString(JSON.toJSON(source), SerializerFeature.EMPTY: _*)
            println("jsonString===>" + jsonString)
            indexRequest.source(jsonString, XContentType.JSON).`type`("_doc")
            //      indexRequest.source(source, XContentType.JSON).`type`("_doc")
            // 提交
            val indexResponse: IndexResponse = restClient.index(indexRequest, RequestOptions.DEFAULT)
            println("单条插入完成!==》" + indexResponse.getResult)
            // 关闭客户端
            closeClient(restClient)
        }
    
        /**
         * 批量插入
         *
         * @param indexName 索引名称
         * @param list      数据
         */
        def addBatchDoc(indexName: String, list: List[Object]): Unit = {
            // 获取客户端
            val restClient: RestHighLevelClient = getRestClient
    
            val bulkRequest: BulkRequest = new BulkRequest()
            for (source <- list) {
                // 如果只使用 JSON.toJSONString ,如果source存在List,List只有第一个被解析
                val jsonString: String = JSON.toJSONString(JSON.toJSON(source), SerializerFeature.EMPTY: _*)
                bulkRequest.add(new IndexRequest(indexName).source(jsonString, XContentType.JSON).`type`("_doc"))
            }
            // 提交
            val bulkResponse: BulkResponse = restClient.bulk(bulkRequest, RequestOptions.DEFAULT)
            println("多条插入完成!==》" + bulkResponse.status())
            // 关闭客户端
            closeClient(restClient)
        }
    }
    
    

    测试类 (映射)

    package com.wuzhou;
    
    import com.wuzhou.domin.KeyShip3;
    import com.wuzhou.domin.Ship3;
    import com.wuzhou.utils.ESUtil;
    import org.junit.Test;
    
    import java.io.IOException;
    import java.sql.Timestamp;
    import java.util.*;
    
    /**
     * @author Y_Kevin
     * @date 2021-01-05 16:25
     */
    public class ESTest {
    
      /**
       * 测试创建 Ship 实体类映射
       *
       * @throws IOException
       */
      @Test
      public void createOriginalIndex() throws IOException {
        Map<String, Object> mapping = new HashMap<>();
        Map<String, Object> properties = new HashMap<>();
    
        Map<String, String> ts = new HashMap<>();
        ts.put("type", "long");
        properties.put("ts", ts);
        Map<String, String> shipname = new HashMap<>();
        shipname.put("type", "text");
        properties.put("shipname", shipname);
        Map<String, String> lon = new HashMap<>();
        lon.put("type", "float");
        properties.put("lon", lon);
        Map<String, String> lat = new HashMap<>();
        lat.put("type", "float");
        properties.put("lat", lat);
        Map<String, String> turn = new HashMap<>();
        turn.put("type", "float");
        properties.put("turn", turn);
        Map<String, String> speed = new HashMap<>();
        speed.put("type", "float");
        properties.put("speed", speed);
        Map<String, String> course = new HashMap<>();
        course.put("type", "float");
        properties.put("course", course);
        Map<String, String> heading = new HashMap<>();
        heading.put("type", "long");
        properties.put("heading", heading);
        Map<String, String> status_text = new HashMap<>();
        status_text.put("type", "text");
        properties.put("status_text", status_text);
        Map<String, String> accuracy = new HashMap<>();
        accuracy.put("type", "boolean");
        properties.put("accuracy", accuracy);
        Map<String, String> shiptype_text = new HashMap<>();
        shiptype_text.put("type", "text");
        properties.put("shiptype_text", shiptype_text);
        Map<String, String> category = new HashMap<>();
        category.put("type", "text");
        properties.put("category", category);
        Map<String, String> mmsi = new HashMap<>();
        mmsi.put("type", "text");
        properties.put("mmsi", mmsi);
    
        mapping.put("properties", properties);
    
        ESUtil util = new ESUtil();
        util.createMappings("ship_original", mapping);
      }
    
      /**
       * 测试创建 与 mmsi为 key Ship为value的映射
       *
       * @throws IOException
       */
      @Test
      public void createGroupIndex() throws IOException {
        Map<String, Object> mapping = new HashMap<>();
        Map<String, Object> properties = new HashMap<>();
    
        Map<String, String> key_mmsi = new HashMap<>();
        key_mmsi.put("type", "text");
        properties.put("key_mmsi", key_mmsi);
    
        Map<String, Object> ships = new HashMap<>();
        Map<String, Object> shipProperties = new HashMap<>();
    
        Map<String, String> ts = new HashMap<>();
        ts.put("type", "long");
        shipProperties.put("ts", ts);
        Map<String, String> shipname = new HashMap<>();
        shipname.put("type", "text");
        shipProperties.put("shipname", shipname);
        Map<String, String> lon = new HashMap<>();
        lon.put("type", "float");
        shipProperties.put("lon", lon);
        Map<String, String> lat = new HashMap<>();
        lat.put("type", "float");
        shipProperties.put("lat", lat);
        Map<String, String> turn = new HashMap<>();
        turn.put("type", "float");
        shipProperties.put("turn", turn);
        Map<String, String> speed = new HashMap<>();
        speed.put("type", "float");
        shipProperties.put("speed", speed);
        Map<String, String> course = new HashMap<>();
        course.put("type", "float");
        shipProperties.put("course", course);
        Map<String, String> heading = new HashMap<>();
        heading.put("type", "long");
        shipProperties.put("heading", heading);
        Map<String, String> status_text = new HashMap<>();
        status_text.put("type", "text");
        shipProperties.put("status_text", status_text);
        Map<String, String> accuracy = new HashMap<>();
        accuracy.put("type", "boolean");
        shipProperties.put("accuracy", accuracy);
        Map<String, String> shiptype_text = new HashMap<>();
        shiptype_text.put("type", "text");
        shipProperties.put("shiptype_text", shiptype_text);
        Map<String, String> category = new HashMap<>();
        category.put("type", "text");
        shipProperties.put("category", category);
        Map<String, String> mmsi = new HashMap<>();
        mmsi.put("type", "text");
        shipProperties.put("mmsi", mmsi);
    
        ships.put("properties", shipProperties);
        properties.put("shipList", ships);
        mapping.put("properties", properties);
    
        ESUtil util = new ESUtil();
        util.createMappings("ship_group_by_mmsi", mapping);
      }
    
      @Test
      public void deleteIndex() throws IOException {
        ESUtil util = new ESUtil();
        //util.deleteIndex("ship_group_by_mmsi");
        util.deleteIndex("ship_original");
      }
    
      @Test
      public void addOneDocShip() throws IOException {
        ESUtil util = new ESUtil();
        Ship3 ship = new Ship3(new Timestamp(1521053005000000L), "",
                121.540085f, 31.39545f, 0.0f,
                9.2f, 273.0f, 273,
                "机动航行", false, "不明",
                "B", "412351040");
        util.addOneDoc("ship_original", ship);
      }
    
      @Test
      public void addBatchDocShip() throws IOException {
        ESUtil util = new ESUtil();
        List<Object> list = new ArrayList<>();
    
        Ship3 ship = new Ship3(new Timestamp(1521053005000000L), "",
                121.540085f, 31.39545f, 0.0f,
                9.2f, 273.0f, 273,
                "机动航行", false, "不明",
                "B", "412351040");
        list.add(ship);
        list.add(ship);
        list.add(ship);
    
        util.addBatchDoc("ship_original", list);
      }
    
      @Test
      public void addOneDocKeyShip() throws IOException {
        ESUtil util = new ESUtil();
        Ship3 ship = new Ship3(new Timestamp(1521053005000000L), "",
                121.540085f, 31.39545f, 0.0f,
                9.2f, 273.0f, 273,
                "机动航行", false, "不明",
                "B", "412351040");
        List<Ship3> list = new ArrayList<>();
        list.add(ship);
        list.add(ship);
        list.add(ship);
    
        KeyShip3 keyShip = new KeyShip3();
        keyShip.setKey_mmsi("412351040");
        keyShip.setShipList(list);
    
        util.addOneDoc("ship_group_by_mmsi", keyShip);
      }
    
      @Test
      public void addBatchDocKeyShip() throws IOException {
        ESUtil util = new ESUtil();
        List<Object> list = new ArrayList<>();
    
        Ship3 ship = new Ship3(new Timestamp(1521053005000000L), "",
                121.540085f, 31.39545f, 0.0f,
                9.2f, 273.0f, 273,
                "机动航行", false, "不明",
                "B", "412351040");
        List<Ship3> ships = new ArrayList<>();
        ships.add(ship);
        ships.add(ship);
        ships.add(ship);
    
        KeyShip3 keyShip = new KeyShip3();
        keyShip.setKey_mmsi("412351040");
        keyShip.setShipList(ships);
    
        list.add(keyShip);
        keyShip.setKey_mmsi("412351040");
        list.add(keyShip);
    
        util.addBatchDoc("ship_group_by_mmsi", list);
      }
    
      @Test
      public void deleteDoc() throws IOException {
        ESUtil util = new ESUtil();
        //util.deleteDoc("ship_group_by_mmsi", "j0ZW23YBfaU9GHb6SWvX");
        util.deleteDoc("ship_original", "lEbj2nYBfaU9GHb6zD8C");
      }
    
      @Test
      public void deleteDocByQuery() throws IOException {
        ESUtil util = new ESUtil();
        util.deleteDocByQuery("key_mmsi", "412351041", "ship_original");
      }
    
      @Test
      public void deleteAllDoc() throws IOException {
        ESUtil util = new ESUtil();
        util.deleteAllDoc("ship_original");
      }
    }
    

    192.168.1.101:9200/ship_original/_search

    192.168.1.101:9200/ship_original

    192.168.1.101:9200/_cat/indices

    192.168.1.101:5601/app/kibana#/dev_tools/console?_g=()

    ES与Spark交互 (Spark2.3.2)

    文档

    https://www.elastic.co/guide/en/elasticsearch/hadoop/current/install.html

    https://www.elastic.co/guide/en/elasticsearch/hadoop/6.8/spark.html

    https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html#configuration-serialization

    依赖

    <dependency>
        <groupId>org.elasticsearch</groupId>
        <artifactId>elasticsearch-spark-20_2.11</artifactId>
        <version>6.8.4</version>
    </dependency>
    

    配置 + 使用

    // 创建配置对象
    val sparkConf: SparkConf = new SparkConf()
    sparkConf.setMaster("local[*]")
    sparkConf.setAppName("test")
    sparkConf.set("spark.eventLog.enabled", "true")
    sparkConf.set("es.index.auto.create", "true")
    sparkConf.set("es.nodes", "192.168.5.53")
    sparkConf.set("es.port", "9200")
    sparkConf.set("es.batch.size.entries", "100000")
    sparkConf.set("es.batch.size.bytes", "300mb")
    sparkConf.set("es.batch.write.refresh", "fasle")
    // 创建SparkSQL的环境对象
    val sparkSession: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
    val sparkContext: SparkContext = sparkSession.sparkContext
    
    val dataFrame: DataFrame = sparkSession.sql(sql)
    
    import org.elasticsearch.spark.sql._
    // 使用ES-SparkAPI将原数据存进ES  (ship_original)为index
    dataFrame.saveToEs("ship_original/_doc")
    

    相关文章

      网友评论

          本文标题:ES Rest-high-API简单操作(代码存档)

          本文链接:https://www.haomeiwen.com/subject/fexpaktx.html