美文网首页elasticsearchElasticsearch 程序员
Elasticsearch Java API 索引的增删改查(二

Elasticsearch Java API 索引的增删改查(二

作者: 全科 | 来源:发表于2017-11-15 21:35 被阅读322次

    Elasticsearch Java API - 客户端连接(TransportClient,PreBuiltXPackTransportClient)(一)

    本节介绍以下 CRUD API:

    单文档 APIs

    多文档 APIs

    Multi Get API
    Bulk API

    注意:所有的单文档的CRUD API,index参数只能接受单一的索引库名称,或者是一个指向单一索引库的alias。

    Index API

    Index API 允许我们存储一个JSON格式的文档,使数据可以被搜索。文档通过index、type、id唯一确定。我们可以自己提供一个id,或者也使用Index API 为我们自动生成一个。

    这里有几种不同的方式来产生JSON格式的文档(document):

    • 手动方式,使用原生的byte[]或者String
    • 使用Map方式,会自动转换成与之等价的JSON
    • 使用第三方库来序列化beans,如Jackson
    • 使用内置的帮助类 XContentFactory.jsonBuilder()

    手动方式

    数据格式

    String json = "{" +
            "\"user\":\"kimchy\"," +
            "\"postDate\":\"2013-01-30\"," +
            "\"message\":\"trying out Elasticsearch\"" +
        "}";
    
    实例
    /**  
     * 手动生成JSON  
     */  
    @Test  
    public void CreateJSON(){  
          
        String json = "{" +  
                "\"user\":\"fendo\"," +  
                "\"postDate\":\"2013-01-30\"," +  
                "\"message\":\"Hell word\"" +  
            "}";  
          
        IndexResponse response = client.prepareIndex("fendo", "fendodate")  
                .setSource(json)  
                .get();  
        System.out.println(response.getResult());  
          
    }  
    

    Map方式

    Map是key:value数据类型,可以代表json结构.

    Map<String, Object> json = new HashMap<String, Object>();
    json.put("user","kimchy");
    json.put("postDate",new Date());
    json.put("message","trying out Elasticsearch");
    
    实例
     /**  
     * 使用集合  
     */  
    @Test  
    public void CreateList(){  
          
        Map<String, Object> json = new HashMap<String, Object>();  
        json.put("user","kimchy");  
        json.put("postDate","2013-01-30");  
        json.put("message","trying out Elasticsearch");  
          
        IndexResponse response = client.prepareIndex("fendo", "fendodate")  
                .setSource(json)  
                .get();  
        System.out.println(response.getResult());  
          
    }  
    

    序列化方式

    ElasticSearch已经使用了jackson,可以直接使用它把javabean转为json.

    import com.fasterxml.jackson.databind.*;
    
    // instance a json mapper
    ObjectMapper mapper = new ObjectMapper(); // create once, reuse
    
    // generate json
    byte[] json = mapper.writeValueAsBytes(yourbeaninstance);
    
    实例
    /**  
     * 使用JACKSON序列化  
     * @throws Exception  
     */  
    @Test  
    public void CreateJACKSON() throws Exception{  
          
        CsdnBlog csdn=new CsdnBlog();  
        csdn.setAuthor("fendo");  
        csdn.setContent("这是JAVA书籍");  
        csdn.setTag("C");  
        csdn.setView("100");  
        csdn.setTitile("编程");  
        csdn.setDate(new Date().toString());  
          
        // instance a json mapper  
        ObjectMapper mapper = new ObjectMapper(); // create once, reuse  
    
        // generate json  
        byte[] json = mapper.writeValueAsBytes(csdn);  
          
        IndexResponse response = client.prepareIndex("fendo", "fendodate")  
                .setSource(json)  
                .get();  
        System.out.println(response.getResult());  
    }  
    

    XContentBuilder帮助类方式

    ElasticSearch提供了一个内置的帮助类XContentBuilder来产生JSON文档

    // Index name
    String _index = response.getIndex();
    // Type name
    String _type = response.getType();
    // Document ID (generated or not)
    String _id = response.getId();
    // Version (if it's the first time you index this document, you will get: 1)
    long _version = response.getVersion();
    // status has stored current instance statement.
    RestStatus status = response.status();
    
    实例
    /**  
     * 使用ElasticSearch 帮助类  
     * @throws IOException   
     */  
    @Test  
    public void CreateXContentBuilder() throws IOException{  
          
        XContentBuilder builder = XContentFactory.jsonBuilder()  
                .startObject()  
                    .field("user", "ccse")  
                    .field("postDate", new Date())  
                    .field("message", "this is Elasticsearch")  
                .endObject();  
          
        IndexResponse response = client.prepareIndex("fendo", "fendodata").setSource(builder).get();  
        System.out.println("创建成功!");  
          
          
    }  
    

    综合实例

     
    import java.io.IOException;  
    import java.net.InetAddress;  
    import java.net.UnknownHostException;  
    import java.util.Date;  
    import java.util.HashMap;  
    import java.util.Map;  
      
    import org.elasticsearch.action.index.IndexResponse;  
    import org.elasticsearch.client.transport.TransportClient;  
    import org.elasticsearch.common.settings.Settings;  
    import org.elasticsearch.common.transport.InetSocketTransportAddress;  
    import org.elasticsearch.common.xcontent.XContentBuilder;  
    import org.elasticsearch.common.xcontent.XContentFactory;  
    import org.elasticsearch.transport.client.PreBuiltTransportClient;  
    import org.junit.Before;  
    import org.junit.Test;  
      
    import com.fasterxml.jackson.core.JsonProcessingException;  
    import com.fasterxml.jackson.databind.ObjectMapper;  
      
    public class CreateIndex {  
      
        private TransportClient client;  
          
        @Before  
        public void getClient() throws Exception{  
            //设置集群名称  
            Settings settings = Settings.builder().put("cluster.name", "my-application").build();// 集群名  
            //创建client  
            client  = new PreBuiltTransportClient(settings)  
                    .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), 9300));  
        }  
          
        /**  
         * 手动生成JSON  
         */  
        @Test  
        public void CreateJSON(){  
              
            String json = "{" +  
                    "\"user\":\"fendo\"," +  
                    "\"postDate\":\"2013-01-30\"," +  
                    "\"message\":\"Hell word\"" +  
                "}";  
              
            IndexResponse response = client.prepareIndex("fendo", "fendodate")  
                    .setSource(json)  
                    .get();  
            System.out.println(response.getResult());  
              
        }  
          
          
        /**  
         * 使用集合  
         */  
        @Test  
        public void CreateList(){  
              
            Map<String, Object> json = new HashMap<String, Object>();  
            json.put("user","kimchy");  
            json.put("postDate","2013-01-30");  
            json.put("message","trying out Elasticsearch");  
              
            IndexResponse response = client.prepareIndex("fendo", "fendodate")  
                    .setSource(json)  
                    .get();  
            System.out.println(response.getResult());  
              
        }  
          
        /**  
         * 使用JACKSON序列化  
         * @throws Exception  
         */  
        @Test  
        public void CreateJACKSON() throws Exception{  
              
            CsdnBlog csdn=new CsdnBlog();  
            csdn.setAuthor("fendo");  
            csdn.setContent("这是JAVA书籍");  
            csdn.setTag("C");  
            csdn.setView("100");  
            csdn.setTitile("编程");  
            csdn.setDate(new Date().toString());  
              
            // instance a json mapper  
            ObjectMapper mapper = new ObjectMapper(); // create once, reuse  
      
            // generate json  
            byte[] json = mapper.writeValueAsBytes(csdn);  
              
            IndexResponse response = client.prepareIndex("fendo", "fendodate")  
                    .setSource(json)  
                    .get();  
            System.out.println(response.getResult());  
        }  
          
        /**  
         * 使用ElasticSearch 帮助类  
         * @throws IOException   
         */  
        @Test  
        public void CreateXContentBuilder() throws IOException{  
              
            XContentBuilder builder = XContentFactory.jsonBuilder()  
                    .startObject()  
                        .field("user", "ccse")  
                        .field("postDate", new Date())  
                        .field("message", "this is Elasticsearch")  
                    .endObject();  
              
            IndexResponse response = client.prepareIndex("fendo", "fendodata").setSource(builder).get();  
            System.out.println("创建成功!");  
              
              
        }  
          
    }  
    

    你还可以通过startArray(string)和endArray()方法添加数组。.field()方法可以接受多种对象类型。你可以给它传递数字、日期、甚至其他XContentBuilder对象。

    Get API

    根据id查看文档:

    GetResponse response = client.prepareGet("twitter", "tweet", "1").get();
    
    

    更多请查看 rest get API 文档

    配置线程

    operationThreaded 设置为 true 是在不同的线程里执行此次操作

    下面的例子是operationThreaded 设置为 false

    GetResponse response = client.prepareGet("twitter", "tweet", "1")
            .setOperationThreaded(false)
            .get();
    

    Delete API

    根据ID删除:

    DeleteResponse response = client.prepareDelete("twitter", "tweet", "1").get();
    
    

    更多请查看 delete API 文档

    配置线程

    operationThreaded 设置为 true 是在不同的线程里执行此次操作

    下面的例子是operationThreaded 设置为 false

    GetResponse response = client.prepareGet("twitter", "tweet", "1")
            .setOperationThreaded(false)
            .get();
    
    DeleteResponse response = client.prepareDelete("twitter", "tweet", "1")
            .setOperationThreaded(false)
            .get();
    

    Delete By Query API

    通过查询条件删除

    BulkByScrollResponse response =
        DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
            .filter(QueryBuilders.matchQuery("gender", "male")) //查询条件
            .source("persons") //index(索引名)
            .get();  //执行
    
    long deleted = response.getDeleted(); //删除文档的数量
    

    如果需要执行的时间比较长,可以使用异步的方式处理,结果在回调里面获取

    DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
        .filter(QueryBuilders.matchQuery("gender", "male"))      //查询            
        .source("persons")                //index(索引名)                                    
        .execute(new ActionListener<BulkByScrollResponse>() {     //回调监听     
            @Override
            public void onResponse(BulkByScrollResponse response) {
                long deleted = response.getDeleted();   //删除文档的数量                 
            }
            @Override
            public void onFailure(Exception e) {
                // Handle the exception
            }
        });
    

    Update API

    有两种方式更新索引:

    • 创建 UpdateRequest,通过client发送;
    • 使用 prepareUpdate() 方法;

    使用UpdateRequest

    UpdateRequest updateRequest = new UpdateRequest();
    updateRequest.index("index");
    updateRequest.type("type");
    updateRequest.id("1");
    updateRequest.doc(jsonBuilder()
            .startObject()
                .field("gender", "male")
            .endObject());
    client.update(updateRequest).get();
    

    使用 prepareUpdate() 方法

    这里官方的示例有问题,new Script()参数错误,所以一下代码是我自己写的(2017/11/10)

    client.prepareUpdate("ttl", "doc", "1")
            .setScript(new Script("ctx._source.gender = \"male\""  ,ScriptService.ScriptType.INLINE, null, null))//脚本可以是本地文件存储的,如果使用文件存储的脚本,需要设置 ScriptService.ScriptType.FILE 
            .get();
    
    client.prepareUpdate("ttl", "doc", "1")
            .setDoc(jsonBuilder()   //合并到现有文档
                .startObject()
                    .field("gender", "male")
                .endObject())
            .get();
    

    Update by script

    使用脚本更新文档

    UpdateRequest updateRequest = new UpdateRequest("ttl", "doc", "1")
            .script(new Script("ctx._source.gender = \"male\""));
    client.update(updateRequest).get();
    
    

    Update by merging documents

    合并文档

    UpdateRequest updateRequest = new UpdateRequest("index", "type", "1")
            .doc(jsonBuilder()
                .startObject()
                    .field("gender", "male")
                .endObject());
    client.update(updateRequest).get();
    

    Upsert

    更新插入,如果存在文档就更新,如果不存在就插入

    IndexRequest indexRequest = new IndexRequest("index", "type", "1")
            .source(jsonBuilder()
                .startObject()
                    .field("name", "Joe Smith")
                    .field("gender", "male")
                .endObject());
    UpdateRequest updateRequest = new UpdateRequest("index", "type", "1")
            .doc(jsonBuilder()
                .startObject()
                    .field("gender", "male")
                .endObject())
            .upsert(indexRequest); //如果不存在此文档 ,就增加 `indexRequest`
    client.update(updateRequest).get();
    

    如果 index/type/1 存在,类似下面的文档:

    {
        "name"  : "Joe Dalton",
        "gender": "male"        
    }
    

    如果不存在,会插入新的文档:

    {
        "name" : "Joe Smith",
        "gender": "male"
    }
    

    Multi Get API

    一次获取多个文档

    MultiGetResponse multiGetItemResponses = client.prepareMultiGet()
        .add("twitter", "tweet", "1") //一个id的方式
        .add("twitter", "tweet", "2", "3", "4") //多个id的方式
        .add("another", "type", "foo")  //可以从另外一个索引获取
        .get();
    
    for (MultiGetItemResponse itemResponse : multiGetItemResponses) { //迭代返回值
        GetResponse response = itemResponse.getResponse();
        if (response.isExists()) {      //判断是否存在                
            String json = response.getSourceAsString(); //_source 字段
        }
    }
    

    更多请浏览REST multi get 文档

    Bulk API

    Bulk API,批量插入:

    import static org.elasticsearch.common.xcontent.XContentFactory.*;
    
    BulkRequestBuilder bulkRequest = client.prepareBulk();
    
    // either use client#prepare, or use Requests# to directly build index/delete requests
    bulkRequest.add(client.prepareIndex("twitter", "tweet", "1")
            .setSource(jsonBuilder()
                        .startObject()
                            .field("user", "kimchy")
                            .field("postDate", new Date())
                            .field("message", "trying out Elasticsearch")
                        .endObject()
                      )
            );
    
    bulkRequest.add(client.prepareIndex("twitter", "tweet", "2")
            .setSource(jsonBuilder()
                        .startObject()
                            .field("user", "kimchy")
                            .field("postDate", new Date())
                            .field("message", "another post")
                        .endObject()
                      )
            );
    
    BulkResponse bulkResponse = bulkRequest.get();
    if (bulkResponse.hasFailures()) {
        // process failures by iterating through each bulk response item
        //处理失败
    }
    

    使用 Bulk Processor

    BulkProcessor 提供了一个简单的接口,在给定的大小数量上定时批量自动请求

    创建BulkProcessor实例

    首先创建BulkProcessor实例

    import org.elasticsearch.action.bulk.BackoffPolicy;
    import org.elasticsearch.action.bulk.BulkProcessor;
    import org.elasticsearch.common.unit.ByteSizeUnit;
    import org.elasticsearch.common.unit.ByteSizeValue;
    import org.elasticsearch.common.unit.TimeValue;
    
    BulkProcessor bulkProcessor = BulkProcessor.builder(
            client,  //增加elasticsearch客户端
            new BulkProcessor.Listener() {
                @Override
                public void beforeBulk(long executionId,
                                       BulkRequest request) { ... } //调用bulk之前执行 ,例如你可以通过request.numberOfActions()方法知道numberOfActions
    
                @Override
                public void afterBulk(long executionId,
                                      BulkRequest request,
                                      BulkResponse response) { ... } //调用bulk之后执行 ,例如你可以通过request.hasFailures()方法知道是否执行失败
    
                @Override
                public void afterBulk(long executionId,
                                      BulkRequest request,
                                      Throwable failure) { ... } //调用失败抛 Throwable
            })
            .setBulkActions(10000) //每次10000请求
            .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)) //拆成5mb一块
            .setFlushInterval(TimeValue.timeValueSeconds(5)) //无论请求数量多少,每5秒钟请求一次。
            .setConcurrentRequests(1) //设置并发请求的数量。值为0意味着只允许执行一个请求。值为1意味着允许1并发请求。
            .setBackoffPolicy(
                BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))//设置自定义重复请求机制,最开始等待100毫秒,之后成倍更加,重试3次,当一次或多次重复请求失败后因为计算资源不够抛出 EsRejectedExecutionException 异常,可以通过BackoffPolicy.noBackoff()方法关闭重试机制
            .build();
    

    BulkProcessor 默认设置

    • bulkActions 1000
    • bulkSize 5mb
    • 不设置flushInterval
    • concurrentRequests 为 1 ,异步执行
    • backoffPolicy 重试 8次,等待50毫秒

    增加requests

    然后增加requestsBulkProcessor

    bulkProcessor.add(new IndexRequest("twitter", "tweet", "1").source(/* your doc here */));
    bulkProcessor.add(new DeleteRequest("twitter", "tweet", "2"));
    

    关闭 Bulk Processor

    当所有文档都处理完成,使用awaitCloseclose 方法关闭BulkProcessor:

    bulkProcessor.awaitClose(10, TimeUnit.MINUTES);
    
    

    bulkProcessor.close();
    
    

    在测试中使用Bulk Processor

    如果你在测试种使用Bulk Processor可以执行同步方法

    BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() { /* Listener methods */ })
            .setBulkActions(10000)
            .setConcurrentRequests(0)
            .build();
    
    // Add your requests
    bulkProcessor.add(/* Your requests */);
    
    // Flush any remaining requests
    bulkProcessor.flush();
    
    // Or close the bulkProcessor if you don't need it anymore
    bulkProcessor.close();
    
    // Refresh your indices
    client.admin().indices().prepareRefresh().get();
    
    // Now you can start searching!
    client.prepareSearch().get();
    

    所有实例 已经上传到Git

    更多请浏览 spring-boot-starter-es 开源项目

    全科龙婷▼升职加薪

    全科龙婷

    相关文章

      网友评论

        本文标题:Elasticsearch Java API 索引的增删改查(二

        本文链接:https://www.haomeiwen.com/subject/ewzhvxtx.html