美文网首页
2019-01-16

2019-01-16

作者: 早安丶全世界 | 来源:发表于2019-01-16 14:09 被阅读0次

    MongoDB聚合操作

    MongoDB版本:1.8.6.RELEASE
    需求:有一个会员消费记录表需要查询每个会员最后的消费时间

    //会员手机号码列表
            List<String> phoneList = memberList.stream().map(item -> MapUtils.getString(item, "phone")).collect(Collectors.toList());
            //一次处理10万条
            int max = 100000;
            //一共需要处理次数
            int times = phoneList.size() / max;
            times =  phoneList.size() % max == 0 ? times  : times + 1;
            //按照十万一条分页
            List<List<String>> splitList = Stream.iterate(0, n -> n + 1).limit(times).parallel()
                    .map(idx -> phoneList.stream().skip(idx * max).limit(max).parallel().collect(Collectors.toList())).collect(Collectors.toList());
            //分页去积分表查询会员最后一条消费记录
            List<Map> resultList = splitList.parallelStream().map(item -> {
                Criteria criteria = new Criteria();
                criteria.and("phone").in(item);
                MatchOperation match = Aggregation.match(criteria);
                GroupOperation group = Aggregation.group("phone").max("orderTime").as("orderTime");
                ProjectionOperation project = Aggregation.project( "phone","orderTime").and("_id").as("phone").andExclude("_id");
                Aggregation aggregation = Aggregation.newAggregation(match, group, project);
                List<Map> mapList = mongoTemplate.aggregate(aggregation, "member_point_document", Map.class).getMappedResults();
                return mapList;
            }).flatMap(List::stream).collect(Collectors.toList());
            // 按照时间天分组 这样就可以减少插入会员的次数
            Map<String, List<String>> orderTimeList = resultList.parallelStream().filter(item -> StringUtils.isNotEmpty(MapUtils.getString(item,"orderTime",""))).collect(Collectors.groupingBy(item -> MapUtils.getString(item, "orderTime"), Collectors.mapping(item -> MapUtils.getString(item, "phone"), Collectors.toList())));
    

    注意:聚合操作的时候如果结果大于16M时需要使用游标的方式返回结果缓存在硬盘中

    AggregationOptions.builder().outputMode(AggregationOptions.OutputMode.CURSOR).allowDiskUse(true).build());
    

    如果想使用上面的方式必须要保证MongoDB的版本为1.9.0以上

    相关文章

      网友评论

          本文标题:2019-01-16

          本文链接:https://www.haomeiwen.com/subject/buapdqtx.html