1.聚合管道返回的数据太大:mongodb默认每个文档大小不超过16MB
2019-01-15 14:50:30.051 WARN 2431 --- [nio-9090-exec-1] o.s.data.mongodb.core.MongoTemplate : Command execution of
{ "aggregate" : "mobTaskTrack_20190110" , "pipeline" : [ { "$match" : { "bizType" : "taobao"}} , { "$match" : { "time" : { "$gte" : 1547049600000 , "$lte" : 1547135999000}}} , { "$sort" : { "time" : -1}} , { "$group" : { "_id" : "$requestId" , "total" : { "$sum" : 1} , "bizType" : { "$first" : "$bizType"} , "tenantId" : { "$first" : "$tenantId"} , "account" : { "$first" : "$account"} , "fpUid" : { "$first" : "$fpUid"} , "event" : { "$first" : "$event"} , "eventStatus" : { "$first" : "$eventStatus"} , "userId" : { "$first" : "$userId"} , "taskId" : { "$first" : "$taskId"} , "requestId" : { "$first" : "$requestId"} , "origin" : { "$first" : "$origin"} , "model" : { "$first" : "$model"} , "system" : { "$first" : "$system"} , "systemVersion" : { "$first" : "$systemVersion"} , "network" : { "$first" : "$network"} , "sdkVersion" : { "$first" : "$sdkVersion"} , "bundleId" : { "$first" : "$bundleId"} , "message" : { "$first" : "$message"} , "time" : { "$first" : "$time"}}}] , "allowDiskUse" : true}
failed: aggregation result exceeds maximum document size (16MB)
每个文档通过一个由多个节点组成的管道,每个节点有自己特殊的功能(分组、过滤等),文档经过管道处理后,最后输出相应的结果。
查询资料设置allowDiskUse就可以了,但是设置了没有任何作用。
Aggregation aggregation = newAggregation(aggregationOperationList);
aggregation = aggregation.withOptions(new AggregationOptions.Builder().allowDiskUse(true).build());
但是在Robo 3T中可以执行,虽然当记录很多时查询速度很慢,但是还是可以执行的。
代码中使用的是spring-data-mongodb-1.8.4.RELEASE,mongodb-java-driver-2.13.3。我最后把mongo-java-driver的版本升高到3.x之后,再加上cursor才可以,但是第一次查询是没有结果的,必须第二次才生效。
List<AggregationOperation> aggregationOperationList = new ArrayList<>();
// where 条件
if (EmptyUtil.isNotEmpty(propName) && EmptyUtil.isNotEmpty(propValue)) {
for (int i = 0; i < propName.length; i++) {
if (propValue[i] != null) {
aggregationOperationList.add(match(where(propName[i]).is(propValue[i])));
}
}
}
if (from != null && to != null) {
aggregationOperationList.add(match(where("time").gte(from).lte(to)));
}
aggregationOperationList.add(sort(Sort.Direction.DESC, "time"));
aggregationOperationList.add(group("requestId")
.first("bizType").as("bizType")
.first("tenantId").as("tenantId")
.first("account").as("account")
.first("fpUid").as("fpUid")
.first("event").as("event")
.first("eventStatus").as("eventStatus")
.first("userId").as("userId")
.first("taskId").as("taskId")
.first("requestId").as("requestId")
.first("origin").as("origin")
.first("model").as("model")
.first("system").as("system")
.first("systemVersion").as("systemVersion")
.first("network").as("network")
.first("sdkVersion").as("sdkVersion")
.first("bundleId").as("bundleId")
.first("message").as("message")
.first("time").as("time"));
// 1. 拿到totalCount
Aggregation aggregation1 = newAggregation(aggregationOperationList);
aggregation1 = aggregation1.withOptions(new AggregationOptions.Builder().allowDiskUse(true).cursor(new BasicDBObject()).build());
// 这条查询结果为0(这次查询结果为空)
int count = mgt.aggregate(aggregation1, collectionName, MobTrackDto.class).getMappedResults().size();
aggregationOperationList.add(sort(Sort.Direction.DESC, "time"));
aggregationOperationList.add(skip(pageNo * 10 - 10));
aggregationOperationList.add(limit(10));
Aggregation aggregation = newAggregation(aggregationOperationList);
aggregation = aggregation.withOptions(new AggregationOptions.Builder().allowDiskUse(true).build());
// 这次才有数据
AggregationResults<MobTrackDto> aggregationResults = mgt.aggregate(aggregation, collectionName, MobTrackDto.class);
List<MobTrackDto> mobTrackDtoList = aggregationResults.getMappedResults();
修改为mongodb原生写法解决问题:
List<MobTrackDto> mobTrackDtoList = Lists.newArrayList();
List<DBObject> aggregateQuery = new ArrayList<>();
BasicDBObjectBuilder matchBuilder = new BasicDBObjectBuilder();
if (EmptyUtil.isNotEmpty(propName) && EmptyUtil.isNotEmpty(propValue)) {
for (int i = 0; i < propName.length; i++) {
if (propValue[i] != null) {
matchBuilder.add(propName[i],propValue[i]);
}
}
}
if (from != null && to != null) {
BasicDBObjectBuilder dateBuilder = new BasicDBObjectBuilder();
matchBuilder.add("time", dateBuilder.add("$gte",from).add("$lte",to).get());
}
com.mongodb.AggregationOptions aggregationOptions = com.mongodb.AggregationOptions.builder()
.outputMode(com.mongodb.AggregationOptions.OutputMode.CURSOR)
.allowDiskUse(true).build();
DBCollection collection = mgt.getDb().getCollection(collectionName);
BasicDBObject groupBy = new BasicDBObject("requestId","$requestId");
BasicDBObjectBuilder groupBuilder = new BasicDBObjectBuilder();
groupBuilder.add("_id", groupBy);
groupBuilder.add("bizType", new BasicDBObject("$first","$bizType"));
groupBuilder.add("tenantId", new BasicDBObject("$first","$tenantId"));
groupBuilder.add("account", new BasicDBObject("$first","$account"));
groupBuilder.add("fpUid", new BasicDBObject("$first","$fpUid"));
groupBuilder.add("event", new BasicDBObject("$first","$event"));
groupBuilder.add("eventStatus", new BasicDBObject("$first","$eventStatus"));
groupBuilder.add("userId", new BasicDBObject("$first","$userId"));
groupBuilder.add("taskId", new BasicDBObject("$first","$taskId"));
groupBuilder.add("requestId", new BasicDBObject("$first","$requestId"));
groupBuilder.add("origin", new BasicDBObject("$first","$origin"));
groupBuilder.add("model", new BasicDBObject("$first","$model"));
groupBuilder.add("system", new BasicDBObject("$first","$system"));
groupBuilder.add("systemVersion", new BasicDBObject("$first","$systemVersion"));
groupBuilder.add("network", new BasicDBObject("$first","$network"));
groupBuilder.add("sdkVersion", new BasicDBObject("$first","$sdkVersion"));
groupBuilder.add("bundleId", new BasicDBObject("$first","$bundleId"));
groupBuilder.add("message", new BasicDBObject("$first","$message"));
groupBuilder.add("time", new BasicDBObject("$first","$time"));
aggregateQuery.add(new BasicDBObject("$match",matchBuilder.get()));
aggregateQuery.add(new BasicDBObject("$sort",new BasicDBObject("time",-1)));
aggregateQuery.add(new BasicDBObject("$group",groupBuilder.get()));
Cursor cursor = collection.aggregate(aggregateQuery,aggregationOptions);
int count = 0;
while (cursor.hasNext()) {
cursor.next();
count++;
}
aggregateQuery.add(new BasicDBObject("$sort",new BasicDBObject("time",-1)));
aggregateQuery.add(new BasicDBObject("$skip",10*(pageNo-1)));
aggregateQuery.add(new BasicDBObject("$limit",10));
cursor = collection.aggregate(aggregateQuery,aggregationOptions);
while(cursor .hasNext()) {
DBObject next = cursor.next();
mobTrackDtoList.add(new Gson().fromJson(next.toString(),MobTrackDto.class));
}
Pair<Integer, List<MobTrackDto>> pair = Pair.of(count, mobTrackDtoList);
2.Can't find a codec for class com.mongodb.BasicDBObjectBuilder.
最后发现是因为BasicDBObjectBuilder的add方法中必须传入的是BasicDBObject对象,忘记调用了BasicDBObjectBuilder的get方法。
BasicDBObjectBuilder dateBuilder = new BasicDBObjectBuilder();
matchBuilder.add("time", dateBuilder.add("$gte",from).add("$lte",to).get());
3.大数据量排序报错,mongodb默认排序不能超过32MB:
32MB=33568115 bytes
com.mongodb.MongoQueryException: Query failed with error code 17144 and error message 'Plan executor error during find: Overflow sort stage buffered data usage
of 33568115 bytes exceeds internal limit of 33554432 bytes' on server
增加索引可以避免这个问题。
4.使用Spring-data-mongodb处理百万级数据很慢
Query query = new Query();
if (EmptyUtil.isNotEmpty(propName) && EmptyUtil.isNotEmpty(propValue)) {
for (int i = 0; i < propName.length; i++) {
if (propValue[i] != null) {
query.addCriteria(Criteria.where(propName[i]).is(propValue[i]));
}
}
}
if (from != null && to != null) {
query.addCriteria(Criteria.where("time").gte(from).lte(to));
query.with(new Sort(Sort.Direction.DESC, "time"));
}
//int count = mgt.count(query,MobTrackDto.class,collectionName); */
List<MobTrackDto> mobTrackDtoList = mgt.find(query, MobTrackDto.class, collectionName);
int count = mobTrackDtoList.size();
Pair<Integer, List<MobTrackDto>> pair = Pair.of(count, mobTrackDtoList);
修改为mongodb原生写法还是比较慢:
List<MobTrackDto> mobTrackDtoList = new ArrayList<>();
BasicDBObject query = new BasicDBObject();
if (EmptyUtil.isNotEmpty(propName) && EmptyUtil.isNotEmpty(propValue)) {
for (int i = 0; i < propName.length; i++) {
if (propValue[i] != null) {
query.append(propName[i],propValue[i]);
}
}
}
if (from != null && to != null) {
BasicDBObjectBuilder dateBuilder = new BasicDBObjectBuilder();
query.append("time", dateBuilder.add("$gte",from).add("$lte",to).get());
}
Cursor cursor =
mgt.getCollection(collectionName).find(query).sort(new BasicDBObject("time", -1));
while (cursor.hasNext()) {
DBObject next = cursor.next();
mobTrackDtoList.add(new Gson().fromJson(next.toString(), MobTrackDto.class));
}
int count = mobTrackDtoList.size();
Pair<Integer, List<MobTrackDto>> pair = Pair.of(count, mobTrackDtoList);
网友评论