现在我们了解到的设计模式能够应对很多情况,不过针对一些特定的问题,我们还有一些方案可以使⽤。就像做菜的菜谱⼀样,它提供了一个⼤概的步骤,我们还可以⾃己创造一些,在这个章节,我们主要提供以下情形的解决方案:
- 需要进行异步初始化的模块
- 以较小的开销,在忙碌应⽤用中实现⾼表现⼒的批量量异步缓存
- 运⾏可能阻塞事件循环或者可能削弱Node.js处理并发请求能⼒的,同步计算密集(CPUbound)操作
处理需要异步初始化模块的方案
我们在第⼆章提到过,require()和module.exports是同步工作的,这是同步API存在于核⼼模块和很多npm的库中的⼀个主要原因。这样的方案提供了了更方便的转换机制,这也是最初使用同步而不是异步方案开初始化模块的原型。
不幸的是,有时候同步API并不不好用,特别是在需要连接⽹网络进⾏初始化的时候,需要花费精力去配置参数、使⽤握⼿进⾏连接等等。这种场景存在于许多数据库驱动和类似消息队列这样的中间件里。
标准解决方案
举例,我们需要通过请求,连接⼀个叫db的远程数据库,我们通常有两种选择:
- 在使⽤前先确保它已经被初始化,否则就等它初始化完毕后再处理接下来的异步操作
const db = require('aDb'); // The async module
module.exports = function findAll(tyoe, callback) {
if (db.connected) {
runFild();
} else {
db.once('connected', runFild);
}
function runFild() {
db.findAll(type, callback);
}
};
- 使⽤依赖注⼊(Dependency Injection)⽽不是直接加载异步模块。延迟其他模块的初始化直到它的依赖都加载完成,这样就转移了了重点到别的地⽅
// in module app.js
const db = require('aDb');
const findAllFactory = require('./findAll');
db.on('connected', function(){
const findAll = findAllFactory(db);
});
// in module findAll.js
module.exports = db => {
return function findAll(type, callback) {
db.findAll(type, callback);
}
};
显然第一种⽅方案就很不受欢迎,因为包含很多引⽤模板(boilerplate)
第⼆种⽅案有时候也不太好,在大型项目中使用依赖注入会很复杂,特别是需要手动去初始化一些模块的时候。不过使⽤用一些⽀持异步初始化的依赖注⼊容器会有些帮助。
下⾯的第三种⽅案,能够轻易的帮助我们从初始化依赖里分离出需要的模块
预初始化队列方案
方案的主要思想:把含未完成的初始化的模块操作保存下来,一旦初始化步骤完成,就执行行这些操作。下⾯是⼀个例⼦:
- asyncModule.js(⼀个需要异步初始化的模块)
const asyncModule = module.exports;
asyncModule.initialized = false;
asyncModule.initialize = callback => { // 一个延迟10s的初始化操作
setTimeout(() => {
asyncModule.initialized = true;
callback();
}, 10000);
};
asyncModule.tellMeSomething = callback => {
process.nextTick(() => {
if(!asyncModule.initialized) {
return callback(
new Error('I don\'t have anything to say right now') //如果还没初始化
);
}
callback(null, 'Current time is: ' + new Date());
});
};
- routes.js(一个HTTP请求的处理模块)
const asyncModule = require('./asyncModule');
module.exports.say = (req, res) => {
asyncModule.tellMeSomething((err, something) => {
if(err) {
res.writeHead(500);
return res.end('Error:' + err.message);
}
res.writeHead(200);
res.end('I say: ' + something);
});
};
这个HTTP处理模块(handler)主要是触发刚刚的tellMeSomething()方法并将结果写入HTTP响应头。可以看到这⾥,我们require()这个异步的asyncModule时并没有检测它是不是已经初始化了,这就可能导致问题。
让我们⽤基本的http核⼼模块,创建⼀个HTTP server
- app.js
const http = require('http');
const routes = require('./routes');
const asyncModule = require('./asyncModule.js');
asyncModule.initialize(() => {
console.log('Async module initialized');
});
http.createServer((req, res) => {
if (req.method === 'GET' && req.url === '/say') {
return routes.say(req, res);
}
res.writeHead(404);
res.end('Not found');
}).listen(8000, () => console.log('Started'));
这个app.js是程序的入⼝,它触发了AsyncModule的初始化、利用routes.say()创建了一个
HTTPServer。运⾏一下,正如想象,如果我们在运行server时,快一点打开http://localhost:8000/say (也就是调⽤了say()的时候,AsyncModule还没有异步初始化完成),我们会在浏览器上看到:
本例的显示取决于异步初始化的细节,如果慢⼀点等初始化结束,我们就能看到正确的结果了。我们应该尽量避免这种问题,因为这可能导致丢失信息、程序崩溃等更严重的问题。虽然初始化一般很快,我们⼀般不会去注意失败的请求,但是对于加载频繁和自动设置(autoscale)的云服务情况来说,这可能导致阻塞等更多问题。
⽤用预加载队列列去包装模块
我们⽤队列去存放未初始化的操作,这有点像状态模式(State Pattern)。存在两种状态:⼀一种是把未初始化的操作排队,另一种是当初始化完成时,再把最后的方法委托给原模块(也就是AsyncModule)
- AsyncModuleWarpper.js
const asyncModule = require('./asyncModule');
//The wrapper 给 activeState 设置分发操作
const asyncModuleWrapper = module.exports;
asyncModuleWrapper.initialized = false;
asyncModuleWrapper.initialize = function() {
activeState.initialize.apply(activeState, arguments);
};
asyncModuleWrapper.tellMeSomething = function() {
activeState.tellMeSomething.apply(activeState, arguments);
};
//Module没有初始化成功的时候的state
let pending = [];
let notInitializedState = {
initialize: function(callback) {
asyncModule.initialize(function() {\
asyncModuleWrapper.initalized = true;
activeState = initializedState; // [1]更更新activeState的变量量
pending.forEach(function(req) { // [2]运⾏行行存储队列列的操作
asyncModule[req.method].apply(null, req.args);
});
pending = [];
callback(); // [3]触发原来的回调
});
},
tellMeSomething: function(callback) {
return pending.push({
method: 'tellMeSomething',
args: arguments
});
}
};
// The state to use when the module is initialized
let initializedState = asyncModule;
// Set the initial state to the notInitializedState
let activeState = notInitializedState; // 设置更新activeState
当开始初始化的时候,我们提供了一个回调代理,它让我们的warpper知道什么时候已经初始化好了,应该进⾏接下来的操作了。
可⻅代码,如果没有初始化完成,tellMeSomething会放在pending队列里等完成初始化再调⽤。
而且,通过这个模式我们可以清晰的知道AsyncModule这个模块有没有被初始化完成,因为完成之后会切换状态为initializedState(倒数第二⾏代码)
运行一下:
通过队列我们可以把为可能含初始化模块的操作给挂起,这是⼀个更稳健的行为。
核⼼在于:如果要初始化异步模块,我们让需要的操作排队等待直到初始化完成。
在自然情况下
刚刚这种模式在很多数据库驱动和ORM库中经常使用,最著名的就是MongoDB的ORM库
Moogoose了,这样就不需要等待数据库的连接情况,因为这都⾃动被放入队列里,并且在初始化建立完成后执⾏,这种方案增强了了API的可用性。
处理批量异步缓存的方案
在高负载的应用中,缓存在web里里起到很重要的作用。无论是静态的文件资源还是数据库的查询结果。在这个章节我们了解高吞吐量下的异步操作缓存。
暂时没有缓存和批量量处理理的Server
这是一个管理理销售情况的情景,JSON数据格式:transactionId{ amount, item}
- totalSales.js
const level = require('level');
const sublevel = require('level-sublevel');
const db = sublevel(level('example-db', {valueEncoding: 'json'}));
const salesDb = db.sublevel('sales');
module.exports = function totalSales(item, callback) {
console.log('totalSales() invoked');
let sum = 0;
salesDb.createValueStream() // [1] 从数据库流式输⼊入数据
.on('data', data => {
if(!item || data.item === item) { // [2] 如果同样商品,求和
sum += data.amount;
}
})
.on('end', () => {
callback(null, sum); // [3]返回刚刚的求和结果
});
};
这种方案显然表现不大好,我们没有用index去区分transaction,求和计算也没有⽤更⽅便的map或者reduce。接下来暴露接⼝给HTTP server:
- app.js
const http = require('http');
const url = require('url');
const totalSales = require('./totalSales');
//const totalSales = require('./totalSalesBatch');
//const totalSales = require('./totalSalesCache');
http.createServer((req, res) => {
const query = url.parse(req.url, true).query;
totalSales(query.item, (err, sum) => {
res.writeHead(200);
res.end(`Total sales for item ${query.item} is ${sum}`);
});
}).listen(8000, () => console.log('Started'));
接下来访问http://localhost:8000/?item=book,顺⼿造一点假数据更方便查看
批量处理异步请求
最基本的方案是使用相同的API处理⼀系列的调用,通过回调的方式:
image.png
图示两个客户端(可能是两个不同的对象或者两个不同的web请求)触发了相同输入的的异步操作,在这第一个图中,它们单独发起⾃己的异步操作执⾏完毕后通过回调返回。第⼆张图针对这种相同输⼊的异步操作进⾏了批量处理,当操作完成时,通知两个客户端,这种方式优化了应⽤的加载,避免了了使⽤复杂的缓存机制,也避免了这种机制带来的内存管理和失效问题。
image.png
在这个web server使用批量处理请求
主要采⽤队列的方案,把回调加⼊队列,等异步操作完成之后⼀块儿触发
- totalSalesBatch.js
const totalSales = require('./totalSales');
const queues = {};
module.exports = function totalSalesBatch(item, callback) {
if(queues[item]) { // [1] 队列列存在,请求正在执⾏行行,回调加⼊入队列列
console.log('Batching operation');
return queues[item].push(callback);
}
queues[item] = [callback]; // [2]我们需建⽴立新请求因为队列列有内容了了
totalSales(item, (err, res) => {
const queue = queues[item]; // [3]获得操作结果后⼀一个个触发回调
queues[item] = null;
queue.forEach(cb => cb(err, res));
});
};
缓存异步请求
刚刚的批量处理方案在着快速API和少量批处理请求的情况下会有点问题。在这种情况下,采⽤缓存模式是比较好的方案。
缓存模式的核心思想是:请求完成后我们就把结果保存在缓存里(可以是一个变量、⼀个数据库入口等),下一次就不请求了,直接从缓存里取出
缓存模式很常见,但真正厉害的是把缓存模式和批处理模式结合起来使用:
image.png
由图可知,没有缓存时,和批处理模式是差不多的,不过当请求完成后,结果会被放入缓存里,方便下次直接从缓存里提取。
在这个web server使用缓存请求
- totalSalesCache.js (在批处理的基础上加上缓存层)
const totalSales = require('./totalSales');
const queues = {};
const cache = {};
module.exports = function totalSalesBatch(item, callback) {
const cached = cache[item];
if (cached) { //如果缓存,⽤用回调返回,注意⽤用的nextTick哦
console.log('Cache hit');
return process.nextTick(callback.bind(null, null, cached));
}
if (queues[item]) {
console.log('Batching operation');
return queues[item].push(callback);
}
queues[item] = [callback];
totalSales(item, (err, res) => {
if (!err) {
cache[item] = res;
setTimeout(() => {
delete cache[item];
}, 30 * 1000); //30 seconds expiry 设置了了超时时间
}
const queue = queues[item];
queues[item] = null;
queue.forEach(cb => cb(err, res));
});
};
缓存函数调⽤结果是实现记忆的⼀种方式,在npm中你可以找到很多方便用于异步存储记忆的⼯具,比如memoizee
缓存机制的具体实现
在实际应用中,我们需要更高级的存储和释放机制,有以下的原因:
- 缓存大量的结果会消耗大量内存,我们可以采用LRU(Least Recently Used)算法来只缓存最近的缓存的结果
- 当应用通过多进程执⾏时,不同server实例的缓存结果可能不一样,解决方案是共享缓存,流
⾏的方案有Redis和Memcached - 可以手动的清理缓存,或者进⾏时间限制,从而避免时间过期缓存。虽然这会有点不易管理。
使用Promise进行批处理和缓存
在这种情况下,使用Promise有以下几种优势:
- 一个Promise可以用多个then()方法,在这里⽅便执行批处理请求
- then()方法最多触发一次,在这里刚好只缓存一次结果
- resolved的Promise也可以使用then(),在这里⽅便持久的获取缓存值
- then()方法是异步调用的,着这里⽅便异步返回结果
在这里,我们使用Promise包装totalSale()接口: - totalSalesPromises.js
// [1]引⼊入promisification模块,使返回Promise⽽而不不是回调
const pify = require('pify');
const totalSales = pify(require('./totalSales'));
const cache = {};
module.exports = function totalSalesPromises(item) {
if (cache[item]) { // [2]检测缓存情况
return cache[item];
}
cache[item] = totalSales(item) // [3]新建Promise
.then(res => {
// [4]resolve这个Promise之后,设计缓存清理理
setTimeout(() => {delete cache[item]}, 30 * 1000);
//30 seconds expiry
return res;
})
.catch(err => { // [5]Promise被reject,删除缓存抛出错误
delete cache[item];
throw err;
});
return cache[item]; // [6]返回缓存结果
};
可见使用Promise之后代码变得优雅简洁,并且同时使用了批处理和缓存
处理计算密集型(CPU-bound)任务的方案
在第⼆章我们知道,通过触发异步操作我们让堆栈回到事件循环,从而可以自由处理其它的请求。
但是如果运⾏一个同步请求时间很⻓(比如计算密集型任务),它可能久久不返回给事件循环,因为它在重度利用CPU而不是在频繁使⽤I/O操作
解决子集求和问题(subset sum problem)
子集求和问题,即求一个集合的非空子集满足和为0
最简单的解决方案是把⼦集进行排列组合,这有2^n的复杂度,这就很计算复杂了
- subsetSum.js实现该算法
const EventEmitter = require('events').EventEmitter;
//继承EventEmitter,每次实现匹配就就触发⼀一下事件
class SubsetSum extends EventEmitter {
constructor(sum, set) {
super();
this.sum = sum;
this.set = set;
this.totalSubsets = 0;
}
//实现可能的组合,注意它是同步的
_combine(set, subset) {
for(let i = 0; i < set.length; i++) {
let newSubset = subset.concat(set[i]);
this._combine(set.slice(i + 1), newSubset);
this._processSubset(newSubset);
}
}
//⼀一旦实现组合之后,确定是否匹配,匹配就emit⼀一个match事件
_processSubset(subset) {
console.log('Subset', ++this.totalSubsets, subset);
const res = subset.reduce((prev, item) => (prev + item), 0);
if(res == this.sum) {
this.emit('match', subset);
}
}
start() {
this._combine(this.set, []); //触发同步的排列列组合
this.emit('end'); //所有排列列组合结果计算之后触发end
}
}
module.exports = SubsetSum;
- app.js 把刚才的SubsetSum放在HTTP server里执⾏
const http = require('http');
const SubsetSum = require('./subsetSum');
//const SubsetSum = require('./subsetSumDefer');
//const SubsetSum = require('./subsetSumFork');
http.createServer((req, res) => {
const url = require('url').parse(req.url, true);
if(url.pathname === '/subsetSum') {
const data = JSON.parse(url.query.data);
res.writeHead(200);
const subsetSum = new SubsetSum(url.query.sum, data);
subsetSum.on('match', match => {
res.write('Match: ' + JSON.stringify(match) + '\n');
});
subsetSum.on('end', () => res.end());
subsetSum.start();
} else {
res.writeHead(200);
res.end('I\m alive!\n');
}
}).listen(8000, () => console.log('Started'));
通过事件机制,我们知道,当算法执行完毕时,也就是要等⼀会儿,会⾃自动返回结果:
image.png
可以看到,在算法返回结果时请求⼀直被挂起,也就是暂时都不会显示I'm alive,Node.js的单线程就这样被一个⻓的同步计算操作阻塞。
使⽤用setImmediate实现交错
通常来说,计算密集型(CPU-bound)的算法都是由一系列步骤组成,可能是递归调⽤、循环或者是和其他的组合。所以⼀个简单的解决⽅案就是让每一个(或者一定数量)步骤执行完成之后把执行权交给事件循环。
核心思想是:通过setImmediate实现异步任务和密集计算的交错执⾏
采⽤交错执⾏的⼦集求和问题
- subsetSumDefer.js 在刚刚的基础上修改
class SubsetSumDefer extends EventEmitter {
constructor(sum, set) {
super();
this.sum = sum;
this.set = set;
this.totalSubsets = 0;
}
//实现交错执行的核心函数
_combineInterleaved(set, subset) {
this.runningCombine++; //需要新的参数来计数
setImmediate(() => {
this._combine(set, subset);
if(--this.runningCombine === 0) {
this.emit('end');
}
});
}
_combine(set, subset) {
for(let i = 0; i < set.length; i++) {
let newSubset = subset.concat(set[i]);
this._combineInterleaved(set.slice(i + 1), newSubset); //替换为step
this._processSubset(newSubset);
}
}
_processSubset(subset) {
console.log('Subset', ++this.totalSubsets, subset);
const res = subset.reduce((prev, item) => prev + item, 0);
if(res == this.sum) {
this.emit('match', subset);
}
}
start() {
this.runningCombine = 0; //计数参数确认计算完毕
this._combineInterleaved(this.set, []);
}
}
交错执行模式的思考
尽管刚刚的模式解决了阻塞的问题,但是它并不不是一个很好的模式,因为延迟同步操作会带来事件开销,这可能导致⼤大的影响。特别是当我们想尽快响应用户时,不不希望等待太久。我们可以设置step的数量,但这并不能从根本上解决问题。
不过也不能因为它有时间开销就完全放弃这种模式,实际上,如果同步操作顺利,偶尔执行行异步的操作,这种利用setImmediate的模式还是可以被认可的
注意process.nextTick() 不适用与这种交错的模式,正如第一章讲到,它在I/O之前执⾏操作,频繁的调用可能导致I/O饥饿。
使⽤多进程
另⼀个防止事件循环阻塞的方案是使用⼦进程,从而不把昂贵的计算密集型任务运行在主进程上,这样做有以下好处:
- 同步的任务能够全心的去运⾏,不用进⾏交错执⾏
- 使⽤多进程会比setImmediate更简单,而且不⽤考虑主程序的规模⼤小
- 如果需要更好的性能,我们甚⾄至可以用更底层的语言去实现(比如C)
Node.js有很多操作外部进程的工具库可以使用,比如可以使用child_process模块。甚至外部进程可以是Node.js程序,我们只需要做相关的连接。child_process.fork()方法创建的子进程会⾃动的创建进程间的交流通道,还可以⽤⼀一个类似EventEmitter的接⼝实现进程间的信息交互
把子集求和任务交付其他进程处理
核⼼思想是创建⼀个子进程去处理同步任务,让时间循环自由处理⽹络请求,下面提出可行的⽅案:
- 创建processPool.js 模块来建立进程池。因为开新进程要花时间,所以就让他们在进程池⾥里运行来等待请求,这样可以节约时间和CPU。另外限制一下进程的数量,可以防止Dos攻击
const fork = require('child_process').fork; //创建⼦子进程
class ProcessPool {
constructor(file, poolMax) {
this.file = file;
this.poolMax = poolMax;
this.pool = []; //pool是⼀系列将被使⽤用的运⾏的进程
this.active = []; //active包含现在正在使用的进程
this.waiting = []; //因为缺少进程资源 等待的回调队列
}
acquire(callback) {
let worker;
if(this.pool.length > 0) { // [1] 可⽤用的进程就把它变为active
worker = this.pool.pop();
this.active.push(worker);
return process.nextTick(callback.bind(null, null, worker));
}
if(this.active.length >= this.poolMax) {
// [2]缺少进程资源 等待
return this.waiting.push(callback);
}
worker = fork(this.file); // [3]如果进程数还没到最⼤大就再开⼀一个进程
this.active.push(worker);
process.nextTick(callback.bind(null, null, worker));
}
release(worker) {
if(this.waiting.length > 0) { // [1]如果请求队列列⾥里里有,则赋给worker
const waitingCallback = this.waiting.shift();
waitingCallback(null, worker);
}
this.active = this.active.filter(w => worker !== w);
// [2]完成worker之后,放回池中
this.pool.push(worker);
}
}
module.exports = ProcessPool;
进程⼀直在运行不停止,想要减少常驻内存的使用、增强代码的健壮性,可以:
- 在⼀段时间后清理闲置的进程
- 设计⼀种机制,将无响应和崩溃的进程重启
- 接下来的subsetSumFork.js把⼦集求和的任务分发到子进程。它的作用是和⼦进程交流然后传输结果
const EventEmitter = require('events').EventEmitter;
const ProcessPool = require('./processPool');
//初始化进程池,并且设置最⼤可用的进程数为2
const workers = new ProcessPool(__dirname + '/subsetSumWorker.js', 2);
class SubsetSumFork extends EventEmitter {
constructor(sum, set) { //接收sum和set的EventEmitter
super();
this.sum = sum;
this.set = set;
}
start() { //触发算法运⾏行行
workers.acquire((err, worker) => { // [1]从进程池获取⼀一个⼦子进程
worker.send({sum: this.sum, set: this.set}); //进程通信的通道
const onMessage = msg => {
// [3]监听任务完成 删掉onmessage然后放回进程池
if (msg.event === 'end') {
worker.removeListener('message', onMessage);
workers.release(worker);
}
this.emit(msg.event, msg.data); // [4]⽆无缝传递信息
};
worker.on('message', onMessage); // [2]监听信息
});
}
}
module.exports = SubsetSumFork;
send()⽅法在子进程也是可用的,这也被cluster模块用来实现多线程分发HTTP server
- 最后我们的subsetSumWorker.js 需要⼀一个worker(子进程)来执⾏子集求和的算法,并且把它的算法结果传给⽗进程。
const SubsetSum = require('./subsetSum');
process.on('message', msg => {
// [1]从⽗进程监测信息,⼀旦有信息我们就新建实例,然后说明匹配
const subsetSum = new SubsetSum(msg.sum, msg.set);
subsetSum.on('match', data => {
// [2]⽤一个对象封装匹配结果传给父进程
process.send({event: 'match', data: data});
});
subsetSum.on('end', data => {
process.send({event: 'end', data: data});
});
subsetSum.start();
});
可以看到,我们重⽤了原来的subsetSum(同步版本),但是这次由于我们单独开了进程,所以不⽤担心事件循环被阻塞。
综上,可以看到,应用程序的一部分可以交付外部进程去处理的。
不过当子进程不是Node.js程序时,进程间通信的通道可能就不可⽤了。我们可以⾃己通过流式输入输出协议设计接口,可以参考child_process的实现
多进程模式的思考
我们可以并发的开两个⾃己求和任务,如果开3个会挂起⼀个直到其中一个任务完成,这是因为我们之前设置了进程的最大数。
可⻅多进程模式比交错执⾏模式更高效有力,不过,因为单一设备对资源有硬性限制,它并不不⽅便扩展。所以我们可以通过多个设备去实现分发加载任务。这也是我们接下来会提到的分布式架构模式。
网友评论