美文网首页
ElasticSearch中的一些聚合操作

ElasticSearch中的一些聚合操作

作者: 香山上的麻雀 | 来源:发表于2020-05-18 19:49 被阅读0次

今天有一个小工作,原来数据是在Hive中存着,每次查询的时候就是后端拼接SQL然后用JDBC去数据库中查询,速度很慢很慢,于是我决定把数据源改为ElasticSearch中,下面就是整个流程:

注:ElasticSearch使用的是6.3.1版本

原拼接的SQL:

select sum(cast(bdp_user_rfm.order_num as int)) as order_num,
        bdp_user_rfm.rfm_label as rfm,
        count(bdp_user_rfm.user_id) as user_count, 
        sum(cast(bdp_user_rfm.monetary as double)) as money_sum
from bdp_user_rfm inner join user_basic_info
on user_basic_info.user_id = bdp_user_rfm.user_id 
where 1=1 and bdp_user_rfm.etl_date = (select etl_date from bdp_user_rfm group by etl_date order by etl_date desc limit 1) 
and bdp_user_rfm.time = 'all' 
group by bdp_user_rfm.rfm_label;

上面用到了2张表,而且这两张表join条件是user_id,这两张表都有一个特点就是同一个user_id只在表中有一条记录(就是在关系型数据库中user_id完全可以作为主键),所以我用Hive On Es把两张表的数据跑到同一个Index中,用user_id作为Index的id,这样在往Es中写数据的时候已经实现了join关联。
上述SQL可以转换为如下:

ElasticSearch命令行格式:

POST /xxxxxxxxxxxxxx
{
  "size": 1,
  "aggs": {
    "group_by_rfm_label": {
      "terms": {
        "field": "rfm_label.keyword",
        "size": 10,
        "min_doc_count": 1,
        "shard_min_doc_count": 0,
        "show_term_doc_count_error": false,
        "order": [
          {
            "_count": "desc"
          },
          {
            "_key": "asc"
          }
        ]
      },
      "aggregations": {
        "order_num": {
          "sum": {
            "field": "order_num",
            "format": "#"
          }
        },
        "monetary": {
          "sum": {
            "field": "monetary"
          }
        }
      }
    }
  }
}

在接口中用Java-High-Level-Client实现:

 public static JSONArray getRFMInfo(String time, String clusterId, String key, String code) {
        JSONArray jsonArray = new JSONArray();

        RestHighLevelClient client = null;

        try {

            client = getClient();
            SearchRequest searchRequest = new SearchRequest("bdp_dev_profile_user_basic_label");
            SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();

            // 时间限制 必须的字段
            BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery()
                    .must(QueryBuilders.termQuery("time", time));

            // clusterId 限制
            if (StringUtils.isNotBlank(clusterId)) {
                boolQueryBuilder = boolQueryBuilder.must(QueryBuilders.termQuery("clusterid.keyword", clusterId));
            }

            // 用户组织机构过滤 使用模糊查询
            if (StringUtils.isNotBlank(key) && StringUtils.isNotBlank(code)) {
                boolQueryBuilder = boolQueryBuilder.must(QueryBuilders.matchQuery(key, code));
            }

            MaxAggregationBuilder maxEtlDate = AggregationBuilders
                    .max("max_etl_date").field("etl_date").format("yyyy-MM-dd");

            String maxDate = "";
            try {
                ParsedMax max_etl_date = (ParsedMax) client.search(searchRequest.source(
                        new SearchSourceBuilder().aggregation(maxEtlDate)))
                        .getAggregations().asMap().get("max_etl_date");
                maxDate = max_etl_date.getValueAsString();
                System.out.println("maxDate = " + maxDate);
            } catch (Exception e) {
                e.printStackTrace();
            }

            if (StringUtils.isNotBlank(maxDate)) {
                boolQueryBuilder = boolQueryBuilder.must(QueryBuilders.termQuery("etl_date", maxDate));
            }

            TermsAggregationBuilder group_by = AggregationBuilders.terms("group_by_rfm_label").field("rfm_label.keyword");
            SumAggregationBuilder sum_order_num = AggregationBuilders.sum("order_num").field("order_num");
            //取整数
            sum_order_num.format("#");
            SumAggregationBuilder sum_monetary = AggregationBuilders.sum("monetary").field("monetary");
            //小数点后保留N位
            sum_monetary.format("0.0000000");

            group_by.subAggregation(sum_order_num);
            group_by.subAggregation(sum_monetary);

            sourceBuilder.query(boolQueryBuilder).aggregation(group_by);
            searchRequest.source(sourceBuilder);
            SearchResponse response = client.search(searchRequest);
            Map<String, Aggregation> aggregationMap = response.getAggregations().asMap();
            ParsedStringTerms user_count = (ParsedStringTerms) aggregationMap.get("group_by_rfm_label");
            for (Terms.Bucket bucket : user_count.getBuckets()) {
                JSONObject jsonObject = new JSONObject();
                jsonObject.put("rfm", bucket.getKey());
                jsonObject.put("user_count", bucket.getDocCount());
                /*一般保持客户-->96844 206150932.24 order_num  ===>96887*/
//                System.out.println(bucket.getKey() + "-->" + bucket.getDocCount());
                Aggregations aggregations = bucket.getAggregations();
                Map<String, Aggregation> map = aggregations.asMap();
                jsonObject.put("money_sum", ((ParsedSum) map.get("monetary")).getValue());
//                System.out.println(((ParsedSum) map.get("monetary")).getValueAsString());
                jsonObject.put("order_num", ((ParsedSum) map.get("order_num")).getValue());
//                System.out.println("order_num  ===>" + ((ParsedSum) map.get("order_num")).getValueAsString());
                jsonArray.add(jsonObject);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            returnClient(client);
        }

        return jsonArray;
    }

关于数据格式的转换可以看我的另外一篇文章:
https://blog.csdn.net/qq_26502245/article/details/106197146

相关文章

  • ElasticSearch中的一些聚合操作

    今天有一个小工作,原来数据是在Hive中存着,每次查询的时候就是后端拼接SQL然后用JDBC去数据库中查询,速度很...

  • elasticsearch java聚合api

    elasticsearch java聚合api使用(多字段分组统计、聚合,最大最小值) elasticsearch...

  • ElasticSearch聚合查询

    ElasticSearch聚合查询

  • elasticsearch 聚合统计

    elasticsearch 聚合统计 创建普通聚合器 字段分组聚合 时间聚合 聚合统计并且显示统计的原始数据

  • ELK 聚合查询

    在elasticsearch中es支持对存储文档进行复杂的统计.简称聚合。 ES中的聚合被分为两大类。 1、Met...

  • Elasticsearch(七)聚合分析

    聚合分析对应数据库中的聚合函数。在 Elasticsearch 中使用 aggs 标签表示。 计算所有商品的价格总...

  • Java自学-集合框架 聚合操作

    聚合操作 步骤 1 : 聚合操作 JDK8之后,引入了对集合的聚合操作,可以非常容易的遍历,筛选,比较集合中的元素...

  • 聚合查询

    数据聚合 聚合[https://www.elastic.co/guide/en/elasticsearch/ref...

  • MongoDB Compass Aggregation的几个应用

    聚合管道,是对查询的数据进行聚合等操作,在MongoDB Compass中,此页面可以创建多个聚合操作进行数据处理...

  • Elasticsearch - 聚合

    注:此文档仅适用于 Elasticsearch > 5.0 版本 通过结构化搜索和全文搜索,如果我们有一个查询并且...

网友评论

      本文标题:ElasticSearch中的一些聚合操作

      本文链接:https://www.haomeiwen.com/subject/hvmpohtx.html