相关文档
https://www.elastic.co/guide/en/elasticsearch/client/java-rest/6.8/java-rest-high-put-mapping.html
https://www.elastic.co/guide/en/elasticsearch/reference/6.8/mapping.html
https://www.elastic.co/guide/en/elasticsearch/reference/6.8/common-options.html
https://www.elastic.co/guide/en/elasticsearch/reference/6.8/mapping-types.html
https://www.elastic.co/guide/en/elasticsearch/reference/6.8/date.html
依赖
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>6.8.4</version>
</dependency>
配置文件 config.properties
# Elasticsearch配置
Elasticsearch.host=192.168.1.101
Elasticsearch.port=9200
Elasticsearch.scheme=http
工具类 ESUtil
package com.wuzhou.utils;
import com.alibaba.fastjson.JSON;
import org.apache.http.HttpHost;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.*;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
/**
* 使用Java High Level REST Client(待补全)
* <p>
* https://www.elastic.co/guide/en/elasticsearch/client/java-rest/6.8/java-rest-high-put-mapping.html
*
* @author Y_Kevin
* @date 2021-01-05 16:06
*/
public class ESUtil {
/**
* 获取Elasticsearch配置信息
*/
private final Properties properties = PropertiesUtil.load("config.properties");
private final String ES_HOST = properties.getProperty("Elasticsearch.host");
private final int ES_HTTP_PORT = Integer.parseInt(properties.getProperty("Elasticsearch.port"));
private final String ES_SCHEME = properties.getProperty("Elasticsearch.scheme");
private final RestHighLevelClient restClient = null;
/**
* 自定义 RequestOptions 或者直接使用默认的 RequestOptions.DEFAULT
* See <a href="https://www.elastic.co/guide/en/elasticsearch/client/java-rest/6.8/java-rest-low-usage-requests.html#java-rest-low-usage-request-options"></a>
*/
private static final RequestOptions COMMON_OPTIONS;
static {
RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();
//builder.addHeader("Authorization", "Bearer " + TOKEN);
//builder.setHttpAsyncResponseConsumerFactory(
// new HttpAsyncResponseConsumerFactory
// .HeapBufferedResponseConsumerFactory(30 * 1024 * 1024 * 1024));
COMMON_OPTIONS = builder.build();
}
/**
* 获取客户端
* See <a href="https://www.elastic.co/guide/en/elasticsearch/client/java-rest/6.8/java-rest-high-getting-started-initialization.html">Initialization</a>
*
* @return RestHighLevelClient
*/
public RestHighLevelClient getRestClient() {
if (restClient == null) {
HttpHost httpHost = new HttpHost(ES_HOST, ES_HTTP_PORT, ES_SCHEME);
RestClientBuilder builder = RestClient.builder(httpHost);
return new RestHighLevelClient(builder);
} else {
return restClient;
}
}
/**
* 关闭客户端
*
* @param restClient 客户端
*/
public void closeClient(RestHighLevelClient restClient) {
if (!Objects.isNull(restClient)) {
try {
restClient.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 创建索引
* See <a href="https://www.elastic.co/guide/en/elasticsearch/client/java-rest/6.8/java-rest-high-create-index.html">Create Index API</a>
*
* @param indexName 索引名称
* @param mapping 映射规则
* @throws IOException
*/
public void createMappings(String indexName, Map<String, Object> mapping) throws IOException {
// 获取客户端
RestHighLevelClient restClient = getRestClient();
CreateIndexRequest createRequest = new CreateIndexRequest(indexName);
createRequest.mapping(mapping);
//提交
CreateIndexResponse createIndexResponse = restClient.indices().create(createRequest, COMMON_OPTIONS);
System.out.println("创建完成!==》" + createIndexResponse.isAcknowledged());
// 关闭客户端
closeClient(restClient);
}
/**
* 删除索引
* See <a href="https://www.elastic.co/guide/en/elasticsearch/client/java-rest/6.8/java-rest-high-delete-index.html">Delete Index API</a>
*
* @param indexName 索引名称
* @throws IOException
*/
public void deleteIndex(String indexName) throws IOException {
// 获取客户端
RestHighLevelClient restClient = getRestClient();
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(indexName);
// 提交
AcknowledgedResponse deleteIndexResponse = restClient.indices().delete(deleteIndexRequest, COMMON_OPTIONS);
System.out.println("删除完成!==》" + deleteIndexResponse.isAcknowledged());
// 关闭客户端
closeClient(restClient);
}
/**
* 插入一条
* See <a href="https://www.elastic.co/guide/en/elasticsearch/client/java-rest/6.8/java-rest-high-document-index.html">Index API</a>
*
* @param indexName 索引名称
* @param source 数据
* @throws IOException
*/
public void addOneDoc(String indexName, Object source) throws IOException {
// 获取客户端
RestHighLevelClient restClient = getRestClient();
IndexRequest indexRequest = new IndexRequest(indexName);
// 如果只使用 JSON.toJSONString ,如果source存在List,List只有第一个被解析
String jsonString = JSON.toJSONString(JSON.toJSON(source));
indexRequest.source(jsonString, XContentType.JSON).type("_doc");
// 提交
IndexResponse indexResponse = restClient.index(indexRequest, COMMON_OPTIONS);
System.out.println("单条插入完成!==》" + indexResponse.getResult());
// 关闭客户端
closeClient(restClient);
}
/**
* 批量插入
* See <a href="https://www.elastic.co/guide/en/elasticsearch/client/java-rest/6.8/java-rest-high-document-bulk.html">Bulk API</a>
*
* @param indexName 索引名称
* @param list 数据
* @throws IOException
*/
public void addBatchDoc(String indexName, List<Object> list) throws IOException {
// 获取客户端
RestHighLevelClient restClient = getRestClient();
BulkRequest bulkRequest = new BulkRequest();
for (Object source : list) {
// 如果只使用 JSON.toJSONString ,如果source存在List,List只有第一个被解析
String jsonString = JSON.toJSONString(JSON.toJSON(source));
bulkRequest.add(new IndexRequest(indexName).source(jsonString, XContentType.JSON).type("_doc"));
}
// 提交
BulkResponse bulkResponse = restClient.bulk(bulkRequest, COMMON_OPTIONS);
System.out.println("多条插入完成!==》" + bulkResponse.status());
// 关闭客户端
closeClient(restClient);
}
/**
* 单条Doc删除
* See <a href="https://www.elastic.co/guide/en/elasticsearch/client/java-rest/6.8/java-rest-high-document-delete.html">Delete API</a>
*
* @param indexName 索引名称
* @param id DocID
* @throws IOException
*/
public void deleteDoc(String indexName, String id) throws IOException {
// 获取客户端
RestHighLevelClient restClient = getRestClient();
DeleteRequest deleteRequest = new DeleteRequest(indexName, "_doc", id);
// 提交
DeleteResponse deleteResponse = restClient.delete(deleteRequest, COMMON_OPTIONS);
System.out.println("单条Doc删除完成!==》" + deleteResponse.status());
// 关闭客户端
closeClient(restClient);
}
/**
* 按条件删除
* See <a href="https://www.elastic.co/guide/en/elasticsearch/client/java-rest/6.8/java-rest-high-document-delete-by-query.html">Delete By Query API</a>
*
* @param indexName 索引名称
* @throws IOException
*/
public void deleteDocByQuery(String fieldName, String value, String... indexName) throws IOException {
// 获取客户端
RestHighLevelClient restClient = getRestClient();
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(indexName);
deleteByQueryRequest.setQuery(QueryBuilders.termQuery(fieldName, value));
// 提交
BulkByScrollResponse bulkResponse = restClient.deleteByQuery(deleteByQueryRequest, COMMON_OPTIONS);
System.out.println("条件查询Doc删除完成!==》" + bulkResponse.getStatus());
// 关闭客户端
closeClient(restClient);
}
/**
* 按删除全部 (在条件删除上修改条件参数)
* See <a href="https://www.elastic.co/guide/en/elasticsearch/client/java-rest/6.8/java-rest-high-document-delete-by-query.html">Delete By Query API</a>
*
* @param indexName 索引名称
* @throws IOException
*/
public void deleteAllDoc(String... indexName) throws IOException {
// 获取客户端
RestHighLevelClient restClient = getRestClient();
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(indexName);
// 删除所有
deleteByQueryRequest.setQuery(QueryBuilders.matchAllQuery());
// 提交
BulkByScrollResponse bulkResponse = restClient.deleteByQuery(deleteByQueryRequest, COMMON_OPTIONS);
System.out.println("Doc全部删除完成!==》" + bulkResponse.getTotal());
// 关闭客户端
closeClient(restClient);
}
/**
* 修改
* See <a href="https://www.elastic.co/guide/en/elasticsearch/client/java-rest/6.8/java-rest-high-document-update.html">Update API</a>
*/
public void updateDoc(String fieldName, String id) throws IOException {
// 获取客户端
RestHighLevelClient restClient = getRestClient();
UpdateRequest updateRequest = new UpdateRequest(fieldName, "_doc", id);
// 修改内容
// 提交
UpdateResponse updateResponse = restClient.update(updateRequest, COMMON_OPTIONS);
System.out.println("Doc修改完成!==》" + updateResponse.status());
// 关闭客户端
closeClient(restClient);
}
/**
* 查找
* See <a href="https://www.elastic.co/guide/en/elasticsearch/client/java-rest/6.8/java-rest-high-search.html">Search API</a>
*/
public void search() {
}
}
工具类Scala版
package com.wuzhou.utils
import java.util.{Objects, Properties}
import com.alibaba.fastjson.JSON
import com.alibaba.fastjson.serializer.SerializerFeature
import org.apache.http.HttpHost
import org.elasticsearch.action.bulk.{BulkRequest, BulkResponse}
import org.elasticsearch.action.index.{IndexRequest, IndexResponse}
import org.elasticsearch.client.{RequestOptions, RestClient, RestClientBuilder, RestHighLevelClient}
import org.elasticsearch.common.xcontent.XContentType
/**
* @author Y_Kevin
* @date 2021-01-06 16:52
*/
object EsUtil4Scala {
/**
* 获取Elasticsearch配置信息
*/
private val properties: Properties = PropertiesUtil.load("config.properties")
private val ES_HOST: String = properties.getProperty("Elasticsearch.host")
private val ES_HTTP_PORT: Int = properties.getProperty("Elasticsearch.port").toInt
private val ES_SCHEME: String = properties.getProperty("Elasticsearch.scheme")
private val restClient: RestHighLevelClient = null
/**
* 获取客户端
*
* @return RestHighLevelClient
*/
def getRestClient: RestHighLevelClient = {
if (restClient == null) {
val httpHost: HttpHost = new HttpHost(ES_HOST, ES_HTTP_PORT, ES_SCHEME)
val builder: RestClientBuilder = RestClient.builder(httpHost)
new RestHighLevelClient(builder)
} else {
restClient
}
}
/**
* 关闭客户端
*
* @param restClient 客户端
*/
def closeClient(restClient: RestHighLevelClient): Unit = {
if (!Objects.isNull(restClient)) {
try {
restClient.close()
} catch {
case e: Exception =>
e.printStackTrace()
}
}
}
/**
* 插入一条
*
* @param indexName 索引名称
* @param source 数据
*/
def addOneDoc(indexName: String, source: Object): Unit = {
// 获取客户端
val restClient: RestHighLevelClient = getRestClient
val indexRequest: IndexRequest = new IndexRequest(indexName)
println("source===>" + source)
// 如果只使用 JSON.toJSONString ,如果source存在List,List只有第一个被解析
val jsonString: String = JSON.toJSONString(JSON.toJSON(source), SerializerFeature.EMPTY: _*)
println("jsonString===>" + jsonString)
indexRequest.source(jsonString, XContentType.JSON).`type`("_doc")
// indexRequest.source(source, XContentType.JSON).`type`("_doc")
// 提交
val indexResponse: IndexResponse = restClient.index(indexRequest, RequestOptions.DEFAULT)
println("单条插入完成!==》" + indexResponse.getResult)
// 关闭客户端
closeClient(restClient)
}
/**
* 批量插入
*
* @param indexName 索引名称
* @param list 数据
*/
def addBatchDoc(indexName: String, list: List[Object]): Unit = {
// 获取客户端
val restClient: RestHighLevelClient = getRestClient
val bulkRequest: BulkRequest = new BulkRequest()
for (source <- list) {
// 如果只使用 JSON.toJSONString ,如果source存在List,List只有第一个被解析
val jsonString: String = JSON.toJSONString(JSON.toJSON(source), SerializerFeature.EMPTY: _*)
bulkRequest.add(new IndexRequest(indexName).source(jsonString, XContentType.JSON).`type`("_doc"))
}
// 提交
val bulkResponse: BulkResponse = restClient.bulk(bulkRequest, RequestOptions.DEFAULT)
println("多条插入完成!==》" + bulkResponse.status())
// 关闭客户端
closeClient(restClient)
}
}
测试类 (映射)
package com.wuzhou;
import com.wuzhou.domin.KeyShip3;
import com.wuzhou.domin.Ship3;
import com.wuzhou.utils.ESUtil;
import org.junit.Test;
import java.io.IOException;
import java.sql.Timestamp;
import java.util.*;
/**
* @author Y_Kevin
* @date 2021-01-05 16:25
*/
public class ESTest {
/**
* 测试创建 Ship 实体类映射
*
* @throws IOException
*/
@Test
public void createOriginalIndex() throws IOException {
Map<String, Object> mapping = new HashMap<>();
Map<String, Object> properties = new HashMap<>();
Map<String, String> ts = new HashMap<>();
ts.put("type", "long");
properties.put("ts", ts);
Map<String, String> shipname = new HashMap<>();
shipname.put("type", "text");
properties.put("shipname", shipname);
Map<String, String> lon = new HashMap<>();
lon.put("type", "float");
properties.put("lon", lon);
Map<String, String> lat = new HashMap<>();
lat.put("type", "float");
properties.put("lat", lat);
Map<String, String> turn = new HashMap<>();
turn.put("type", "float");
properties.put("turn", turn);
Map<String, String> speed = new HashMap<>();
speed.put("type", "float");
properties.put("speed", speed);
Map<String, String> course = new HashMap<>();
course.put("type", "float");
properties.put("course", course);
Map<String, String> heading = new HashMap<>();
heading.put("type", "long");
properties.put("heading", heading);
Map<String, String> status_text = new HashMap<>();
status_text.put("type", "text");
properties.put("status_text", status_text);
Map<String, String> accuracy = new HashMap<>();
accuracy.put("type", "boolean");
properties.put("accuracy", accuracy);
Map<String, String> shiptype_text = new HashMap<>();
shiptype_text.put("type", "text");
properties.put("shiptype_text", shiptype_text);
Map<String, String> category = new HashMap<>();
category.put("type", "text");
properties.put("category", category);
Map<String, String> mmsi = new HashMap<>();
mmsi.put("type", "text");
properties.put("mmsi", mmsi);
mapping.put("properties", properties);
ESUtil util = new ESUtil();
util.createMappings("ship_original", mapping);
}
/**
* 测试创建 与 mmsi为 key Ship为value的映射
*
* @throws IOException
*/
@Test
public void createGroupIndex() throws IOException {
Map<String, Object> mapping = new HashMap<>();
Map<String, Object> properties = new HashMap<>();
Map<String, String> key_mmsi = new HashMap<>();
key_mmsi.put("type", "text");
properties.put("key_mmsi", key_mmsi);
Map<String, Object> ships = new HashMap<>();
Map<String, Object> shipProperties = new HashMap<>();
Map<String, String> ts = new HashMap<>();
ts.put("type", "long");
shipProperties.put("ts", ts);
Map<String, String> shipname = new HashMap<>();
shipname.put("type", "text");
shipProperties.put("shipname", shipname);
Map<String, String> lon = new HashMap<>();
lon.put("type", "float");
shipProperties.put("lon", lon);
Map<String, String> lat = new HashMap<>();
lat.put("type", "float");
shipProperties.put("lat", lat);
Map<String, String> turn = new HashMap<>();
turn.put("type", "float");
shipProperties.put("turn", turn);
Map<String, String> speed = new HashMap<>();
speed.put("type", "float");
shipProperties.put("speed", speed);
Map<String, String> course = new HashMap<>();
course.put("type", "float");
shipProperties.put("course", course);
Map<String, String> heading = new HashMap<>();
heading.put("type", "long");
shipProperties.put("heading", heading);
Map<String, String> status_text = new HashMap<>();
status_text.put("type", "text");
shipProperties.put("status_text", status_text);
Map<String, String> accuracy = new HashMap<>();
accuracy.put("type", "boolean");
shipProperties.put("accuracy", accuracy);
Map<String, String> shiptype_text = new HashMap<>();
shiptype_text.put("type", "text");
shipProperties.put("shiptype_text", shiptype_text);
Map<String, String> category = new HashMap<>();
category.put("type", "text");
shipProperties.put("category", category);
Map<String, String> mmsi = new HashMap<>();
mmsi.put("type", "text");
shipProperties.put("mmsi", mmsi);
ships.put("properties", shipProperties);
properties.put("shipList", ships);
mapping.put("properties", properties);
ESUtil util = new ESUtil();
util.createMappings("ship_group_by_mmsi", mapping);
}
@Test
public void deleteIndex() throws IOException {
ESUtil util = new ESUtil();
//util.deleteIndex("ship_group_by_mmsi");
util.deleteIndex("ship_original");
}
@Test
public void addOneDocShip() throws IOException {
ESUtil util = new ESUtil();
Ship3 ship = new Ship3(new Timestamp(1521053005000000L), "",
121.540085f, 31.39545f, 0.0f,
9.2f, 273.0f, 273,
"机动航行", false, "不明",
"B", "412351040");
util.addOneDoc("ship_original", ship);
}
@Test
public void addBatchDocShip() throws IOException {
ESUtil util = new ESUtil();
List<Object> list = new ArrayList<>();
Ship3 ship = new Ship3(new Timestamp(1521053005000000L), "",
121.540085f, 31.39545f, 0.0f,
9.2f, 273.0f, 273,
"机动航行", false, "不明",
"B", "412351040");
list.add(ship);
list.add(ship);
list.add(ship);
util.addBatchDoc("ship_original", list);
}
@Test
public void addOneDocKeyShip() throws IOException {
ESUtil util = new ESUtil();
Ship3 ship = new Ship3(new Timestamp(1521053005000000L), "",
121.540085f, 31.39545f, 0.0f,
9.2f, 273.0f, 273,
"机动航行", false, "不明",
"B", "412351040");
List<Ship3> list = new ArrayList<>();
list.add(ship);
list.add(ship);
list.add(ship);
KeyShip3 keyShip = new KeyShip3();
keyShip.setKey_mmsi("412351040");
keyShip.setShipList(list);
util.addOneDoc("ship_group_by_mmsi", keyShip);
}
@Test
public void addBatchDocKeyShip() throws IOException {
ESUtil util = new ESUtil();
List<Object> list = new ArrayList<>();
Ship3 ship = new Ship3(new Timestamp(1521053005000000L), "",
121.540085f, 31.39545f, 0.0f,
9.2f, 273.0f, 273,
"机动航行", false, "不明",
"B", "412351040");
List<Ship3> ships = new ArrayList<>();
ships.add(ship);
ships.add(ship);
ships.add(ship);
KeyShip3 keyShip = new KeyShip3();
keyShip.setKey_mmsi("412351040");
keyShip.setShipList(ships);
list.add(keyShip);
keyShip.setKey_mmsi("412351040");
list.add(keyShip);
util.addBatchDoc("ship_group_by_mmsi", list);
}
@Test
public void deleteDoc() throws IOException {
ESUtil util = new ESUtil();
//util.deleteDoc("ship_group_by_mmsi", "j0ZW23YBfaU9GHb6SWvX");
util.deleteDoc("ship_original", "lEbj2nYBfaU9GHb6zD8C");
}
@Test
public void deleteDocByQuery() throws IOException {
ESUtil util = new ESUtil();
util.deleteDocByQuery("key_mmsi", "412351041", "ship_original");
}
@Test
public void deleteAllDoc() throws IOException {
ESUtil util = new ESUtil();
util.deleteAllDoc("ship_original");
}
}
192.168.1.101:9200/ship_original/_search
192.168.1.101:9200/ship_original
192.168.1.101:9200/_cat/indices
192.168.1.101:5601/app/kibana#/dev_tools/console?_g=()
ES与Spark交互 (Spark2.3.2)
文档
https://www.elastic.co/guide/en/elasticsearch/hadoop/current/install.html
https://www.elastic.co/guide/en/elasticsearch/hadoop/6.8/spark.html
依赖
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-spark-20_2.11</artifactId>
<version>6.8.4</version>
</dependency>
配置 + 使用
// 创建配置对象
val sparkConf: SparkConf = new SparkConf()
sparkConf.setMaster("local[*]")
sparkConf.setAppName("test")
sparkConf.set("spark.eventLog.enabled", "true")
sparkConf.set("es.index.auto.create", "true")
sparkConf.set("es.nodes", "192.168.5.53")
sparkConf.set("es.port", "9200")
sparkConf.set("es.batch.size.entries", "100000")
sparkConf.set("es.batch.size.bytes", "300mb")
sparkConf.set("es.batch.write.refresh", "fasle")
// 创建SparkSQL的环境对象
val sparkSession: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
val sparkContext: SparkContext = sparkSession.sparkContext
val dataFrame: DataFrame = sparkSession.sql(sql)
import org.elasticsearch.spark.sql._
// 使用ES-SparkAPI将原数据存进ES (ship_original)为index
dataFrame.saveToEs("ship_original/_doc")
网友评论