数据量一大就引发了各种各样的问题:性能,效率,存储等等。
下面就讲讲遇到的一些细节,并且当前的解决方案。
如何通过Rest API得到所有的数据
举个例子,对于一个论坛网站,我想要得到里面所有的topic内容,数据量在50万以上。API为/api/core/v3/contents
,这是Jive的Rest API。
对于需要返回很多数据的API来说,一般都做了分页和排序处理。那么第一直觉的做法就是遍历每一页,一页一页拿到所有的数据。
/api/core/v3/contents?sort=dateCreatedAsc&count=100&startIndex=0
看起来没有问题,但是实践过程中发现,随着页数往后,API返回结果的时间越来越长,当页数达到1000页,几乎80%的可能都是超时,但是目前也就10万左右的数据获取到了。
当然也可能有人会问,如果直接把每页的数量(count)设置的大一些,那么页数就少一些,会不会就可以拿到所有数据了。
这里我们分析一下为什么API在1000左右会超时?
首先论坛上的数据都存在数据库中,包括我们想要的topic的内容。虽然我们不知道它是以什么机构存储的,有哪些字段,有没有索引,但是如果我们要得到最近发布的第10000到10100的topic,就还是会用到order by、 limit和offset,我们知道当offset很大时,会出现效率问题。所以API超时一部分原因是从数据库获取数据的时间变长了。
所以就算把每页数量(count)增加了,还是会有一样的瓶颈。
做的处理
1.只获取需要的字段。例如,假设不需要这个topic的更新日期,有哪些人点赞,有哪些人收藏等等信息,那么可以在API上加上fields参数来减少数据。查询数据库的时候返回20个字段的时间总是大于1个字段的时间的。而且结果中的一些不需要的内容可能是关联了其他表级联查询出来的,少了这些内容,极大的减少了API的返回时间。
/api/core/v3/contents?sort=dateCreatedAsc&count=100&startIndex=0&fields=id,author.emails,author.id,categories,content.text
2.解析数据,分段获取
- 根据Group获取所有组内的topic。在API中加上filter字段,获取一个组的topic,数量就大大减少了,通过API一页一页的获取,不会出现超时的问题。
- 根据People获取每个人的topic。有一些topic不是发到组里的,还有一些topic是系统发出的,不是有效的用户,不需要迁移。
/api/core/v3/contents?filter=author(user_link)&sort=dateCreatedAsc&count=100&startIndex=0
如何加快数据处理的速度
下载数据的时候,请求一个Rest API平均3s,解析数据并且存到临时数据库中是毫秒级的,暂且忽略不计,所以10万数据需要的时间是3*100000/60/60≈83小时,将近3天半。
太慢了,所以需要提升效率。选择的做法是加入大量的并发处理。
- 请求API获取数据时并发。因为用的Node语言,所以直接用Promise.all同时发送多个URL请求,如果请求失败,再重复发送一次(这是因为有的是超时,也有可能server不稳定,所以最多重试5次去请求结果)。假设一次同时请求20个URL,等到20个请求都返回结果后,对于结果集responses再进行分析处理,这个过程大大缩短了请求API的总时间。
let responses = [];
responses = await Promise.all(urls.map(url => {
return retryOnError(() => {
console.log(`[${new Date().toISOString()}][GET] ${url}`)
return axios.get(url, jiveRequestHeader)
})
}))
responses = responses.filter(x => !!x);
await Promise.all(responses.map(async response => {
try {
let contents = await buildContents(response);
await saveContents(contents);
}
catch (e) {
console.error(`[ERROR] ${new Date().toISOString()} | ${e.stack}`)
}
}))
const retryOnError = async (fn) => {
const max = 5;
for (let i = 0; i < max; i++) {
try {
return await fn();
} catch (e) {
console.log(`[RETRY] ${new Date().toISOString()} | ${e.message}`);
console.error(`[RETRY] ${new Date().toISOString()} | ${e.message}`);
await sleep(5000);
}
}
};
- 存数据库并发。把数据存到GCP上的Spanner DB中,这个过程也消耗时间。因为Spanner DB对于插入数据有并发数的限制,所以并发量要控制。
插入表的数据可以是一个数组,但是这里会有一个问题就是当中途出错了,数组后面的数据就不会再继续插入而是终止了,所以为了确认每一条数据插入的状态,选择了一次插入一条数据。
const _ = require('lodash')
const Spanner = require('@google-cloud/spanner');
const insertData = async (tableName, dataArray) => {
const databaseId = 'XXXXX';
const spanner = Spanner();
const instance = spanner.instance(process.env.SPANNER_INSTANCE_ID);
const database = instance.database(databaseId);
const table = database.table(tableName);
if (dataArray[0]) {
const columnCount = Object.keys(dataArray[0]).length;
const bulks = _.chunk(dataArray, 2000 / columnCount);
for (let i = 0; i < bulks.length; i++) {
const bulk = bulks[i];
await Promise.all(bulk.map(async object => {
try {
return await table.insert(object);
} catch (e) {
console.log(`[ERROR] [Save Data] ${e.message}`);
}
}))
}
}
}
如何扩展GCP上的VM磁盘空间的问题
因为要下载很多数据:视频、图片、音频、文件等等。所以在使用VM的时候发现磁盘空间慢慢就不够用了,那么如何扩展磁盘空间就是一个问题了。
网友评论