美文网首页
Elasticsearch 7.x 深入【9】分页

Elasticsearch 7.x 深入【9】分页

作者: 孙瑞锴 | 来源:发表于2020-05-27 12:33 被阅读0次

    1. 借鉴

    极客时间 阮一鸣老师的Elasticsearch核心技术与实战
    Elasticsearch扫盲之四: ElasticSearch的index的Settings配置参数详解
    游标查询 Scroll
    Elasticsearch 篇之深入了解 Search 的运行机制
    Elasticsearch 5.x 源码分析(3)from size, scroll 和 search after
    ElasticSearch分页查询四种解决方案与原理
    使用scroll实现Elasticsearch数据遍历和深度分页

    2. 开始

    在elasticsearch中,随着数据越来越多,集群越来越大,数据保存在多个分片上,那在多个分片的不同机器上,如何实现的排序(按照score)分页呢?

    数据准备:<Elasticsearch 7.x 深入 数据准备>

    分页原理及例子

    原理:

    es搜索分为两个步骤阶段:

      1. Query
        a. coordinating node接收到用户请求后,在索引的主副分片上随机选择(比如有3个主分片,3个副本分片,则随机选择3个),并将请求广播到对应的分片
        b. 被选中的分片分别进行查询,返回from+size个文档ID以及排序值
        c. 接受请求的 coordinating node 获取分片返回的from + size 个文档的ID后,并按照排序值进行排序后选取from到from+size个文档的ID
      1. Fetch
        a. coordinating node根据Query阶段中排序后的文档ID到对应的分片上获取文档的详情数据,向相关的分片发送mutli_get请求
        b. 相关分片返回文档的详细数据
        c. coordinating node拼接返回结果并发送给客户端

    例子

    假如一个索引有4个主分片,4个副本分片,我们要查询的是第500页,每页20条,那es的会做如下操作:

    1. 在随机的4个分片上先获取前1000个文档
    2. 在coordinating node上进行聚合
    3. 通过排序选择前20个文档

    页数越深,单次查询es占用的内存越多,如果索引文档过大,会导致OOM,为了避免深度分页导致的内存占用过大,ES的默认设定为最大查询10000个文档,可通过设置index.max_result_window来更改,但是绝对不建议。

    分页方式

    在es中有三种分页方式:

    • from,size
    • search after
    • scroll

    我们依次来看下

    from,size

    最基本的分页方法,from默认从0开始,默认每页10条。
    但是要注意的是,from并不是页码,如果要查第二页的10条数据,则from应该为10,可以想象limit的用法

    GET /tmdb_movies/_search
    {
      "from": 1,
      "size": 10, 
      "query": {
        "match_all": {}
      }
    }
    
    • 如果我们将分页条件设置为10000,则会出现illegal_argument_exception错误
    # 这种会导致异常
    GET /tmdb_movies/_search
    {
      "from": 10000,
      "size": 1, 
      "query": {
        "match_all": {}
      }
    }
    
    # 这种也会导致异常[如果文档数量小于10000,自然会返回的结果的]
    GET /tmdb_movies/_search
    {
      "from": 0,
      "size": 10000, 
      "query": {
        "match_all": {}
      }
    }
    

    java 实现

    /*
         * @Author 【孙瑞锴】
         * @Description es查询并返回结果
         * @Date 2:23 下午 2019/12/23
         * @Param [indexOrAlias 索引名称或者是别名, searchSourceBuilder 查询条件, clazz 类型]
         * @return com.qingzhu.crs.core.tuple.Tuple2<java.util.List<T>,java.lang.Long>
         **/
        public <T> Tuple2<List<T>, Long> searchWithSize(String indexOrAlias, SearchSourceBuilder searchSourceBuilder, Class<T> clazz) throws IOException
        {
            String conditions = searchSourceBuilder.query().toString().replaceAll("\\s*|\t|\r|\n", "");
            SearchRequest searchRequest = new SearchRequest();
            searchRequest.indices(indexOrAlias);
            searchRequest.source(searchSourceBuilder);
            searchRequest.preference(MD5.cryptToHexString(conditions));
            SearchResponse response = client.search(searchRequest);
            SearchHits hits = response.getHits();
            response.getTook(), response.getSuccessfulShards(), response.getFailedShards());
            List<T> sourceAsMap = Arrays.stream(hits.getHits())
                    .map(item -> {
                        T t = GsonHelper.getGson().fromJson(item.getSourceAsString(), clazz);
                        if (t instanceof IdSearchForEs) {
                            IdSearchForEs search = (IdSearchForEs) t;
                            search.setId(Long.valueOf(item.getId()));
                        }
                        return t;
                    })
                    .collect(Collectors.toList());
            return Tuples.of(sourceAsMap, hits.getTotalHits());
        }
    

    search after

    为了避免深度分页,可以实时获取下一页的文档信息,但是有以下限制需要注意:

    1. 不支持指定页码(from)
    2. 只能往下翻页

    具体的使用步骤是:

    1. 指定排序(sort),并且保证值的唯一性(按照_id排序)
    2. 使用上一次查询结果当中的最后一个值的sort条件,作为下一次分页的条件

    我们来看下,以下查询就是一个很普通的查询条件,为了篇幅避免过长,这里我只保留了title,我指定了每页2条,还有按照_score和_id进行排序
    我们这里是从第0页开始的,你也可以从其他页开始向后翻页

    GET /tmdb_movies/_search
    {
      "_source": ["title"], 
      "size": 2,
      "query": {
        "match_all": {}
      },
      "sort": [
        {
          "_score": {
            "order": "desc"
          },
          "_id": {
            "order": "asc"
          }
        }
      ]
    }
    
    • 我们看下结果
    {
      "took" : 1,
      "timed_out" : false,
      "_shards" : {
        "total" : 1,
        "successful" : 1,
        "skipped" : 0,
        "failed" : 0
      },
      "hits" : {
        "total" : {
          "value" : 4802,
          "relation" : "eq"
        },
        "max_score" : null,
        "hits" : [
          {
            "_index" : "tmdb_movies",
            "_type" : "_doc",
            "_id" : "100",
            "_score" : 1.0,
            "_source" : {
              "title" : "Lock, Stock and Two Smoking Barrels"
            },
            "sort" : [
              1.0,
              "100"
            ]
          },
          {
            "_index" : "tmdb_movies",
            "_type" : "_doc",
            "_id" : "10003",
            "_score" : 1.0,
            "_source" : {
              "title" : "The Saint"
            },
            "sort" : [
              1.0,
              "10003"
            ]
          }
        ]
      }
    }
    
    • 那接下来我们就使用search after的语法
    GET /tmdb_movies/_search
    {
      "_source": ["title"], 
      "size": 2,
      "query": {
        "match_all": {}
      },
      "search_after": [ // 这里使用的是上次分页结果,最后一条数据的sort值
        1.0,
        "10003"
      ],
      "sort": [
        {
          "_score": {
            "order": "desc"
          },
          "_id": {
            "order": "asc"
          }
        }
      ]
    }
    
    • 结果如下:
    {
      "took" : 1,
      "timed_out" : false,
      "_shards" : {
        "total" : 1,
        "successful" : 1,
        "skipped" : 0,
        "failed" : 0
      },
      "hits" : {
        "total" : {
          "value" : 4802,
          "relation" : "eq"
        },
        "max_score" : null,
        "hits" : [
          {
            "_index" : "tmdb_movies",
            "_type" : "_doc",
            "_id" : "100042",
            "_score" : 1.0,
            "_source" : {
              "title" : "Dumb and Dumber To"
            },
            "sort" : [
              1.0,
              "100042"
            ]
          },
          {
            "_index" : "tmdb_movies",
            "_type" : "_doc",
            "_id" : "10008",
            "_score" : 1.0,
            "_source" : {
              "title" : "An American Haunting"
            },
            "sort" : [
              1.0,
              "10008"
            ]
          }
        ]
      }
    }
    
    • 为了比对search after的分页结果是否准确,我们可以进一步使用from,size进行验证
    GET /tmdb_movies/_search
    {
      "_source": ["title"], 
      "size": 2,
      "from": 2, 
      "query": {
        "match_all": {}
      },
      "sort": [
        {
          "_score": {
            "order": "desc"
          },
          "_id": {
            "order": "asc"
          }
        }
      ]
    }
    
    • 可以看下返回结果,结果是一致的
    {
      "took" : 1,
      "timed_out" : false,
      "_shards" : {
        "total" : 1,
        "successful" : 1,
        "skipped" : 0,
        "failed" : 0
      },
      "hits" : {
        "total" : {
          "value" : 4802,
          "relation" : "eq"
        },
        "max_score" : null,
        "hits" : [
          {
            "_index" : "tmdb_movies",
            "_type" : "_doc",
            "_id" : "100042",
            "_score" : 1.0,
            "_source" : {
              "title" : "Dumb and Dumber To"
            },
            "sort" : [
              1.0,
              "100042"
            ]
          },
          {
            "_index" : "tmdb_movies",
            "_type" : "_doc",
            "_id" : "10008",
            "_score" : 1.0,
            "_source" : {
              "title" : "An American Haunting"
            },
            "sort" : [
              1.0,
              "10008"
            ]
          }
        ]
      }
    }
    

    java 实现

    /**
         * search after 深度分页
         * @param indexOrAlias
         * @param searchSourceBuilder
         * @param clazz
         * @param <T>
         * @return
         * @throws IOException
         */
        public <T extends BaseSearchForEs> Tuple2<List<T>, Long> searchWithSizeAndScoreAndSearchAfter(String indexOrAlias, SearchSourceBuilder searchSourceBuilder, Class<T> clazz) throws IOException
        {
            String conditions = searchSourceBuilder.query().toString().replaceAll("\\s*|\t|\r|\n", "");
            SearchRequest searchRequest = new SearchRequest();
            searchRequest.indices(indexOrAlias);
            searchRequest.source(searchSourceBuilder);
            searchRequest.preference(MD5.cryptToHexString(conditions));
            SearchResponse response = client.search(searchRequest);
            SearchHits hits = response.getHits();
            List<T> sourceAsMap = Arrays.stream(hits.getHits())
                    .map(item ->
                    {
                        T t = GsonHelper.getGson().fromJson(item.getSourceAsString(), clazz);
                        t.setSearchAfter(Arrays.asList(item.getSortValues()));
                        t.setScore(item.getScore());
                        return t;
                    })
                    .collect(Collectors.toList());
            return Tuples.of(sourceAsMap, hits.getTotalHits());
        }
    

    scroll

    使用scroll有以下需要注意的点:

    1. scroll 会在首次查询时创建一个快照,如果有新的数据写入,则无法被查到
    2. 每次查询时,输入上一次查询的scroll id

    我们来看下第一次查询

    # 保持游标查询窗口五分钟
    GET /tmdb_movies/_search?scroll=5m
    {
      "_source": ["title"], 
      "size": 2,
      "query": {
        "match_all": {}
      }
    }
    
    • 返回结果如下,可以看到有一个_scroll_id
    {
      "_scroll_id" : "DXF1ZXJ5QW5kRmV0Y2gBAAAAAAAAGUsWTTRMeVRwdWVULS00MC1vSmFYS3ZmQQ==",
      "took" : 0,
      "timed_out" : false,
      "_shards" : {
        "total" : 1,
        "successful" : 1,
        "skipped" : 0,
        "failed" : 0
      },
      "hits" : {
        "total" : {
          "value" : 4802,
          "relation" : "eq"
        },
        "max_score" : 1.0,
        "hits" : [
          {
            "_index" : "tmdb_movies",
            "_type" : "_doc",
            "_id" : "19995",
            "_score" : 1.0,
            "_source" : {
              "title" : "Avatar"
            }
          },
          {
            "_index" : "tmdb_movies",
            "_type" : "_doc",
            "_id" : "285",
            "_score" : 1.0,
            "_source" : {
              "title" : "Pirates of the Caribbean: At World's End"
            }
          }
        ]
      }
    }
    
    • 我们接着进行下一次查询,使用上一次查询返回的scoll id
    # 保持游标查询窗口一分钟
    GET /_search/scroll
    {
      "scroll": "1m",
      "scroll_id": "DXF1ZXJ5QW5kRmV0Y2gBAAAAAAAAGUsWTTRMeVRwdWVULS00MC1vSmFYS3ZmQQ=="
    }
    
    • 返回结果如下:
    {
      "_scroll_id" : "DXF1ZXJ5QW5kRmV0Y2gBAAAAAAAAGUsWTTRMeVRwdWVULS00MC1vSmFYS3ZmQQ==",
      "took" : 6,
      "timed_out" : false,
      "terminated_early" : false,
      "_shards" : {
        "total" : 1,
        "successful" : 1,
        "skipped" : 0,
        "failed" : 0
      },
      "hits" : {
        "total" : {
          "value" : 4802,
          "relation" : "eq"
        },
        "max_score" : 1.0,
        "hits" : [
          {
            "_index" : "tmdb_movies",
            "_type" : "_doc",
            "_id" : "206647",
            "_score" : 1.0,
            "_source" : {
              "title" : "Spectre"
            }
          },
          {
            "_index" : "tmdb_movies",
            "_type" : "_doc",
            "_id" : "49026",
            "_score" : 1.0,
            "_source" : {
              "title" : "The Dark Knight Rises"
            }
          }
        ]
      }
    }
    

    清除scroll

    • 我们也可以清除scroll
    DELETE _search/scroll
    {
      "scroll_id": "DXF1ZXJ5QW5kRmV0Y2gBAAAAAAAABMAWTTRMeVRwdWVULS00MC1vSmFYS3ZmQQ=="
    }
    
    • 清除后,如果再使用DXF1ZXJ5QW5kRmV0Y2gBAAAAAAAABMAWTTRMeVRwdWVULS00MC1vSmFYS3ZmQQ==就会报一个search_context_missing_exception的错误

    java 实现

        /*
         * @Author 【孙瑞锴】
         * @Description 批量查询,但是不适用与搜索,仅用于批查询
         * @Date 9:02 下午 2019/12/18
         * @Param [indexName, typeName, searchBuilder, clazz]
         * @return com.qingzhu.crs.core.tuple.Tuple3<java.util.List<T> 列表,java.lang.Long 总条数,java.lang.String 滚动id>
         **/
        public <T> Tuple3<List<T>, Long, String> scroll(String indexName, String typeName, SearchSourceBuilder searchBuilder, Class<T> clazz) throws IOException
        {
            // 设定滚动时间间隔,60秒,不是处理查询结果的所有文档的所需时间
            // 游标查询的过期时间会在每次做查询的时候刷新,所以这个时间只需要足够处理当前批的结果就可以了
            Scroll scroll = new Scroll(TimeValue.timeValueSeconds(60));
            SearchRequest searchRequest = new SearchRequest(indexName);
            searchRequest.types(typeName);
            searchRequest.source(searchBuilder);
            searchRequest.scroll(scroll);
            SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);
            SearchHits hits = response.getHits();
            List<T> sourceAsMap = Arrays.stream(hits.getHits())
                    .map(item -> GsonHelper.getGson().fromJson(item.getSourceAsString(), clazz))
                    .collect(Collectors.toList());
            return Tuples.of(sourceAsMap, hits.getTotalHits(), response.getScrollId());
        }
    
        /*
         * @Author 【孙瑞锴】
         * @Description
         * @Date 9:09 下午 2019/12/18
         * @Param [indexName, typeName, scrollId, clazz]
         * @return com.qingzhu.crs.core.tuple.Tuple3<java.lang.Boolean 是否有下一批次,java.util.List<T> 列表,java.lang.String 下一批次查询ID>
         **/
        public <T> Tuple3<Boolean, List<T>, String> scroll(String scrollId, Class<T> clazz) throws IOException
        {
            final Scroll scroll = new Scroll(TimeValue.timeValueSeconds(60));
            SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId);
            scrollRequest.scroll(scroll);
            SearchResponse response = client.scroll(scrollRequest, RequestOptions.DEFAULT);
            SearchHits hits = response.getHits();
            List<T> sourceAsMap = Arrays.stream(hits.getHits())
                    .map(item -> GsonHelper.getGson().fromJson(item.getSourceAsString(), clazz))
                    .collect(Collectors.toList());
            return Tuples.of(response.getHits().getHits().length != 0, sourceAsMap, response.getScrollId());
        }
    
        /*
         * @Author 【孙瑞锴】
         * @Description 清除滚屏
         * @Date 9:13 下午 2019/12/18
         * @Param [scrollId]
         * @return boolean
         **/
        public boolean clearScroll(String scrollId) throws IOException
        {
            // 清除滚屏
            ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
            // 也可以选择setScrollIds()将多个scrollId一起使用
            clearScrollRequest.addScrollId(scrollId);
            ClearScrollResponse clearScrollResponse = null;
            clearScrollResponse = client.clearScroll(clearScrollRequest,RequestOptions.DEFAULT);
    
            if (clearScrollResponse != null)
            {
                return clearScrollResponse.isSucceeded();
            }
            return false;
        }
    

    3. 大功告成

    相关文章

      网友评论

          本文标题:Elasticsearch 7.x 深入【9】分页

          本文链接:https://www.haomeiwen.com/subject/souuahtx.html