nodejs中的流
流是什么,应该很好理解,计算机中数据流处处存在。linux的管道设计就是利用了流的概念,所有的数据处理就像流水一样通过管道一层层的处理。而nodejs中的stream模块就是流,它是 EventEmitter
的实例。
一个简单的http echo服务:
const http = require('http');
const server = http.createServer((req, res) => {
// req 是一个 http.IncomingMessage 实例,它是可读流。
// res 是一个 http.ServerResponse 实例,它是可写流。
let body = '';
req.setEncoding('utf8');
//所有的流都是EventEmitter的实例,数据接收的事件
req.on('data', (chunk) => {
body += chunk;
});
req.on('end', () => {
try {
res.write(body);
res.end();
} catch(e) {
// json 解析失败。
res.statusCode = 400;
return res.end(`错误: ${e.message}`);
}
})
});
server.listen(3000);
上面的例子,res和req就是标准的流,http.IncomingMessage实现了stream.Readable
接口,而http.ServerResponse则继承了stream。
nodejs流的类型
Node.js 中有四种基本的流类型:
-
Writable
- 可写入数据的流(例如fs.createWriteStream()
)。 -
Readable
- 可读取数据的流(例如fs.createReadStream()
)。 -
Duplex
- 可读又可写的流(例如net.Socket
)。 -
Transform
- 在读写过程中可以修改或转换数据的Duplex
流(例如zlib.createDeflate()
)。
可读和可写流上面已经见过了,比较简单,基本就是一个读一个写,如fs.createWriteStream创建一个可写流,向文件写入数据:
const fs = require('fs');
const writeFileStream = fs.createWriteStream('./data.txt');
for (let i = 0; i < 1000; i ++ ) {
writeFileStream.write(Math.random().toString());
}
可读流也很简单,read方法读取数据,还有一个pipe方法,接受一个<stream.Writable>
对象,可以绑定可写流到可读流,将可读流自动切换到流动模式,并将可读流的所有数据推送到绑定的可写流。 数据流会被自动管理,所以即使可读流更快,目标可写流也不会超负荷。
所有的流都是EventEmitter的实例,所以任何流会暴露关键事件可以监听。
运行完上面写入流的例子,在写一个读文件并放入管道进行压缩的demo:
const fs = require('fs');
const zlib = require('zlib');
fs.createReadStream('./data.txt')
.pipe(zlib.createGzip())
.pipe(fs.createWriteStream('input.txt.gz'));
流背后的简单原理
Flowing Mode

其实流就是生产者和消费者问题,在处理大数据流动和大文件的时候非常有用,生产者并不直接给消费者,而是而是先 push 到缓存池,缓存池有一个水位标记 highWatermark,超过这个标记阈值,push 的时候会返回 false。这种情况被称为“背压”,一般是生产者过快,如下;
const Chance = require('chance');
const chance = new Chance();
require('http').createServer((req, res) => {
res.writeHead(200, {'Content-Type': 'text/plain'});
function generateMore() {
while(chance.bool( { likelihood: 95 } )) {
let shouldContince = res.write(
chance.string({ length: ( 16 * 1024) -1 })
);
if (!shouldContince) {
console.log('Backpressure');
return res.once('drain', generateMore);
}
}
res.end('\nThe End...\n', () => console.log('All Date was sent'));
}
generateMore();
}).listen(3000, () => {
console.log('server listening 3000');
})
No-Flowing Mode
但还有一种模式No-Flowing Mode,即生产者到了highWaterMark阈值后,消费者才开始消费数据,需要多少取多少,慢慢读。

Duplex Stream

上面说的第三种流模式:双工模式。即实现了可读流又实现了可写流,但它的输入和输出可以没有任何关系。
Transform Stream

还有一种Transform Stream,它同样具备 Readable 和 Writable 的能力,只不过它的输入和输出是存在相互关联的,中间做了一次转换处理。常见的处理有 Gzip 压缩、解压等。
网友评论