前言
node的核心编程思想是异步编程,ebookcoin是基于node开发的,ebookcoin的异步编程是基于流程控制库async实现的。本文将会围绕着如何利用async完成异步编程开发进行讲解,还会对ebookcoin中出现的其他异步编程方式进行肤浅的分析和介绍。
async流程控制库
asyncasync是一个流程控制库,官网网址:http://caolan.github.io/async/。截止到发稿前,async的版本是2.1.4。async提供了三类主要方法,分别是Collections(控制集合的流程控制方法,如何理解呢,简单的讲就是方法的第一个参数可能是一个集合col,或者是一个对象object,或者是一个数组array,通过后边的代码来主要操作这第一个参数中的对象)、Control Flow(流程控制方法,标准的流程控制方法)、Utils(流程控制的公共方法)。这三类方法相互协作,共同构建起了async的流程控制库。 async官网
ebookcoin中出现的async流程控制方法
目前,async的流程控制方法一共有72个,ebookcoin中用到了10个,如下:
Collections方法 | 说明 |
---|---|
eachSeries | 别名forEachSeries,格式-eachSeries(coll, iteratee, callbackopt) , 该方法和each类似,不过,同一时间只运行一个async的操作,我们将在下方看到例子和讲解。 |
eachLimit | 别名eachLimit,格式-eachLimit(coll, limit, iteratee, callbackopt) , 该方法和each类似,不过,同一时间只运行最大数量不超过limit个的async操作,我们将在下方看到例子和讲解。 |
forEachOfSeries | 别名eachOfSeries,格式-eachOfSeries(coll, iteratee, callbackopt) , 该方法和eachOf类似,不过,同一时间只运行一个async的操作,我们将在下方看到例子和讲解。 |
Control Flow方法 | 说明 |
---|---|
auto(自动) | 格式-auto(tasks, concurrencyopt, callbackopt) -由库根据task间的依赖,决定最好的顺序来执行tasks,tasks是一个object,里边包含了多个子任务和回调函数,我们将在下方看到例子和讲解 |
parallel(并行) | 格式-parallel(tasks, callbackopt)
|
series(串行) | 格式-series(tasks, callbackopt)
|
whilst(同时) | 格式-whilst(test, iteratee, callbackopt)
|
until(直到) | 格式-until(test, fn, callbackopt)
|
retry(重试) | 格式-retry(optsopt, task, callbackopt)
|
doWhilst | 格式-doWhilst(iteratee, test, callbackopt)
|
each
eachSeries、eachLimit、eachOfSeries(forEachOfSeries)这三个方法都源于each,因此,先讲解一下each。
each的别名是forEach,格式each(coll, iteratee, callbackopt)
,其中coll是集合、对象或者数组,iteratee是业务函数,重复操作的函数,用来定义一个重复执行的函数,callbackopt是回调函数。
each的功能是并行的将iteratee函数应用于coll的每个元素,并在结束时调用每个元素自己的回调函数(callback()),如果iteratee向自己的callback传递了一个error,则外层each方法的callback(可以认为是外层的那个callback)将马上接收到这个error。需要注意的是,由于程序是并行运行的,因此,不能保证函数的执行完全遵照顺序进行。
接下来,我们来看一下例子:
var async = require('async');
var openFiles = ["package.json","markdown.js","abc.js"];
async.each(openFiles, function(file, callback) {
console.log('Processing file ' + file);
if( file.length > 32 ) {
console.log('This file name is too long');
callback('File name too long');
} else {
console.log('File processed');
callback();
}
}, function(err) {
if( err ) {
console.log('A file failed to process');
} else {
console.log('All files have been processed successfully');
}
});
输出的结果是:
输出的结果是我们可以看到结果是顺序执行的,依次是"package.json","markdown.js","abc.js",但是,官方文档给的说明中明确表示,不能100%的保证执行顺序,恩,因为是并行的,此处要注意。
eachSeries
该方法的使用和each一样,不过,同一时间只运行一个async的操作,但是在实验的过程中我发现还是有很大区别的,我们来看两个例子:
//我对原来的函数进行了修改
var async = require('async');
var openFiles = ["package.json","markdown.js","abc.js","abc2.js"];
async.each(openFiles, function(file, callback) {
console.log('Processing file ' + file);
if( file.length > 32 ) {
console.log('This file name is too long');
callback('File name too long');
} else if(file==="markdown.js"){
console.log('File processing');
setTimeout(function(){console.log('0,File processed')},0);
}else{
console.log('File processed');
callback();
}
}, function(err) {
if( err ) {
console.log('A file failed to process');
} else {
console.log('All files have been processed successfully');
}
});
结果如下:
each//改为eachSeries
var async = require('async');
var openFiles = ["package.json","markdown.js","abc.js","abc2.js"];
async.eachSeries(openFiles, function(file, callback) {
console.log('Processing file ' + file);
if( file.length > 32 ) {
console.log('This file name is too long');
callback('File name too long');
} else if(file==="markdown.js"){
console.log('File processing');
setTimeout(function(){console.log('0,File processed')},0);
}else{
console.log('File processed');
callback();
}
}, function(err) {
if( err ) {
console.log('A file failed to process');
} else {
console.log('All files have been processed successfully');
}
});
eachSeries
我们看到区别了吧,使用eachSeries时,如果出现了中断的函数,则程序会执行中断函数中的内容,然后不再向后执行了。但是,这个是现象,原因我就不懂了。找时间,找人讨论一下.....
eachLimit
该方法和each的使用类似,不过,同一时间只运行最大数量不超过limit个的async操作。不过当limit=1的时候,eachLimit 和eachSeries是没有区别的,当limit!=1的时候,eachLimit会更像each。当然,limit=0的时候,这段程序会直接执行callback里的程序,console.log('All files have been processed successfully');
我们先看limit=0的时候的程序效果:
var async = require('async');
var openFiles = ["package.json","markdown.js","abc.js","abc2.js"];
async.eachLimit(openFiles, 0,function(file, callback) {
console.log('Processing file ' + file);
if( file.length > 32 ) {
console.log('This file name is too long');
callback('File name too long');
} else if(file==="markdown.js"){
console.log('File processing');
setTimeout(function(){console.log('0,File processed')},0);
}else{
console.log('File processed');
callback();
}
}, function(err) {
if( err ) {
console.log('A file failed to process');
} else {
console.log('All files have been processed successfully');
}
});
效果如下:
limit=0当limit=1的时候的效果。
var async = require('async');
var openFiles = ["package.json","markdown.js","abc.js","abc2.js"];
async.eachLimit(openFiles, 1,function(file, callback) {
console.log('Processing file ' + file);
if( file.length > 32 ) {
console.log('This file name is too long');
callback('File name too long');
} else if(file==="markdown.js"){
console.log('File processing');
setTimeout(function(){console.log('0,File processed')},0);
}else{
console.log('File processed');
callback();
}
}, function(err) {
if( err ) {
console.log('A file failed to process');
} else {
console.log('All files have been processed successfully');
}
});
limit=1的时候,与eachSeries相似,我们看结果
eachSerieslimit>1的时候的效果
var async = require('async');
var openFiles = ["package.json","markdown.js","abc.js","abc2.js"];
async.eachLimit(openFiles, 2,function(file, callback) {
console.log('Processing file ' + file);
if( file.length > 32 ) {
console.log('This file name is too long');
callback('File name too long');
} else if(file==="markdown.js"){
console.log('File processing');
setTimeout(function(){console.log('0,File processed')},0);
}else{
console.log('File processed');
callback();
}
}, function(err) {
if( err ) {
console.log('A file failed to process');
} else {
console.log('All files have been processed successfully');
}
});
limit>1
因此通过上边的例子我们可以看出,each、eachSeries、eachLimit的区别就是同一时间内有多少个async在操作,each是默认并行的,eachSeries只有一个,eachLimit可以控制async的数量,当然,limit只是理论上的最大值,不一定会有limit个。
eachOf、eachOfSeries
我的程序跑不了这个函数,之后补充,用法就是比each多了一个key作为iteratee的第二个参数。
var obj = {dev: "/dev.json", test: "/test.json", prod: "/prod.json"};
var configs = {};
async.forEachOf(obj, function (value, key, callback) {
fs.readFile(__dirname + value, "utf8", function (err, data) {
if (err) return callback(err);
try {
configs[key] = JSON.parse(data);
} catch (e) {
return callback(e);
}
callback();
});
}, function (err) {
if (err) console.error(err.message);
// configs is now a map of JSON data
doSomethingWith(configs);
});
这个key是遍历的游标,通过key可以记录循环到了list的哪个对象了。
auto的例子
该方法的格式为auto(tasks, concurrencyopt, callbackopt)
auto这个方法有几个特性
1.auto会决定tasks中的函数和依赖的一个最好的执行顺序,每个函数都可以选择是否需要完成前置任务。
2.如果一个函数产生错误,则整个流程都会停止,其他的任务将不会被执行,auto的callback将会马上收到这个错误,并结束整个auto流程。
3.当前置任务,完成之后,会将结果传递到依赖他的函数中,如果没有依赖,则只会调用该任务的callback
在ebookcoin中,app.js加载服务器的方式就是利用了auto这个方法。好了,我们来看一个例子
async.auto({
// this function will just be passed a callback
readData: async.apply(fs.readFile, 'data.txt', 'utf-8'),
showData: ['readData', function(results, cb) {
// results.readData is the file's contents
// ...
}]
}, callback);
async.auto({
get_data: function(callback) {
console.log('in get_data');
// async code to get some data
callback(null, 'data', 'converted to array');
},
make_folder: function(callback) {
console.log('in make_folder');
// async code to create a directory to store a file in
// this is run at the same time as getting the data
callback(null, 'folder');
},
write_file: ['get_data', 'make_folder', function(results, callback) {
console.log('in write_file', JSON.stringify(results));
// once there is some data and the directory exists,
// write the data to a file in the directory
callback(null, 'filename');
}],
email_link: ['write_file', function(results, callback) {
console.log('in email_link', JSON.stringify(results));
// once the file is written let's email a link to it...
// results.write_file contains the filename returned by write_file.
callback(null, {'file':results.write_file, 'email':'user@example.com'});
}]
}, function(err, results) {
console.log('err = ', err);
console.log('results = ', results);
});
auto的执行顺序
parallel的例子
该方法的格式为:parallel(tasks, callbackopt)
该方法的特性如下:
1.并行运行tasks中的函数,并将执行结构以数组的形式保存到callback中
2.任意一个函数报错,都会向parallel的callback传递一个error。(官网没说会不会停止整个方法)
3.parallel是一个kicking-off I/O,而不是并行执行的代码,也就是说,如果没有人为添加的并行操作,则该功能就是一个顺序执行的函数,如果存在异步执行的函数,才会并行的去执行。(很拗口吧,一会看例子)
async.parallel([
function(callback) {
setTimeout(function() {
callback(null, 'one');
}, 200);
},
function(callback) {
setTimeout(function() {
callback(null, 'two');
}, 100);
}
],
// optional callback
function(err, results) {
// the results array will equal ['one','two'] even though
// the second function had a shorter timeout.
console.log('results = ', results);
});
// an example using an object instead of an array
async.parallel({
one: function(callback) {
setTimeout(function() {
callback(null, 1);
}, 200);
},
two: function(callback) {
setTimeout(function() {
callback(null, 2);
}, 100);
}
}, function(err, results) {
console.log('results = ', results);
});
parallel
series
格式:series(tasks, callbackopt)
特性:
1.顺序执行tasks中的函数,tasks可以是对象(返回对象到callback)或者数组(返回数组到callback)
2.一个函数报错则方法停止,传递一个error到callback
3.因为ES5的特殊性,如果想在各个平台上都是按照自己定义的顺序去执行函数的话,最好还是用array in tasks
我们来看一个例子:
async.series([
function(callback) {
// do some stuff ...
callback(null, 'one');
},
function(callback) {
// do some more stuff ...
callback(null, 'two');
}
],
// optional callback
function(err, results) {
// results is now equal to ['one', 'two']
console.log('results = ', results);
});
async.series({
one: function(callback) {
setTimeout(function() {
callback(null, 1);
}, 200);
},
two: function(callback){
setTimeout(function() {
callback(null, 2);
}, 100);
}
}, function(err, results) {
// results is now equal to: {one: 1, two: 2}
console.log('results = ', results);
});
series
从例子中,我们看到使用series的好处了吧,可以最大限度的将异步的程序写成同步的样式。
whilst、doWhilst
格式:whilst(test, iteratee, callbackopt)
特性:只要test返回true,就重复调用iteratee,有错也停止,看例子:
var count = 0;
async.whilst(
function() { return count < 5; },
function(callback) {
count++;
setTimeout(function() {
callback(null, count);
}, 30);
},
function (err, n) {
// 5 seconds have passed, n = 5
console.log(n);
}
);
这个例子我这个程序一直返回undefined,不知道什么原因,因此,不截图了
doWhilst的格式:doWhilst(iteratee, test, callbackopt)
doWhilst和whilst的关系,跟do while 和 while的关系一样,都是先执行一次,另外一个不同的地方是,参数的位置不一样,呵呵呵
until
格式:until(test, fn, callbackopt)
特性:
1.不断的执行fn,直到test 返回true。
2.方法存在错误将会停止,并向fn的callback传递error。
用法与whilst相反。
retry
格式为:retry(optsopt, task, callbackopt)
特性:
1.尝试从task中得到正确的响应
2.有错则会尝试最后一次函数调用
我们来看格式:
// The `retry` function can be used as a stand-alone control flow by passing
// a callback, as shown below:
// try calling apiMethod 3 times
async.retry(3, apiMethod, function(err, result) {
// do something with the result
});
// try calling apiMethod 3 times, waiting 200 ms between each retry
async.retry({times: 3, interval: 200}, apiMethod, function(err, result) {
// do something with the result
});
// try calling apiMethod 10 times with exponential backoff
// (i.e. intervals of 100, 200, 400, 800, 1600, ... milliseconds)
async.retry({
times: 10,
interval: function(retryCount) {
return 50 * Math.pow(2, retryCount);
}
}, apiMethod, function(err, result) {
// do something with the result
});
// try calling apiMethod the default 5 times no delay between each retry
async.retry(apiMethod, function(err, result) {
// do something with the result
});
// try calling apiMethod only when error condition satisfies, all other
// errors will abort the retry control flow and return to final callback
async.retry({
errorFilter: function(err) {
return err.message === 'Temporary error'; // only retry on a specific error
}
}, apiMethod, function(err, result) {
// do something with the result
});
// It can also be embedded within other control flow functions to retry
// individual methods that are not as reliable, like this:
async.auto({
users: api.getUsers.bind(api),
payments: async.retry(3, api.getPayments.bind(api))
}, function(err, results) {
// do something with the results
});
我们再来看ebookcoin中的例子:
async.retry(20, function (cb) {
modules.peer.list(config, function (err, peers) {
if (!err && peers.length) {
var peer = peers[0];
self.getFromPeer(peer, options, cb);
} else {
return cb(err || "No peers in db");
}
});
}, function (err, results) {
cb(err, results);
});
ebookcoin程序的意思是,在不报错的前提下,最多将这个匿名函数执行20次。self.getFromPeer(peer, options, cb);这个功能是寻找网络中的peer节点。
总结
本文简单介绍了async库以及ebookcoin用了async的哪些方法,并对这些方法从流程控制的角度进行了浅析。node的异步编程博大精深,ebookcoin的源码也精彩绝伦(crypti的源码就写的很好了,通过对ebookcoin和crypti的源码,给了我在程序设计和实现上很多的启发,真好,能站在巨人们的肩膀上,真好),本文仅仅算是抛砖引玉的文章,希望通过该文激起大家学习node,使用node的兴起,同时,也需要大家多提意见,我们共同成长,欢迎越来越多的小伙伴们进入到node领域,进入到区块链研究的领域。
后记
下一篇文章计划解读一下ebookcoin的编解码处的代码,当然都是浅析,都是先从最基本的技术实现入手进行解读,我希望自己能够坚持下来,争取一周能写一篇文章。
网友评论