Readable - 可读的流 (例如 fs.createReadStream()).
Writable - 可写的流 (例如 fs.createWriteStream()).
Duplex - 可读写的流 (例如 net.Socket).
Transform - 在读写过程中可以修改和变换数据的 Duplex 流 (例如 zlib.createDeflate()).
可读的流
-
fs.createReadStream('text.txt')
; - http 作为服务端的 req 对象
- http 作为客户端的 res 对象
readable.setEncoding('utf8'); // chunk 为字符串,否则 chunk 为 Buffer 对象
readable.on('data', function(chunk){
console.log(chunk); // Buffer 对象或字符串
});
readable.on('end', function(){
console.log('读取结束');
});
readable.on('error', function(err){
console.log('读取失败', err);
});
可写的流
- fs.createWriteStream('text.txt');
- http 作为服务端的 res 对象
- http 作为客户端的 req 对象
writable.on('finish', function(){
console.log('写入结束');
});
writable.on('error', function(err){
console.log('写入失败', err); // error 事件发生时,可写流不会关闭
});
writable.write(data); // Buffer 对象或字符串
writable.end();
pipe() 方法
readable.pipe(writable);
writable 这里自动转为 flowing 模式,不是 paused 模式;当 readable 触发 end 事件之后,writable 自动调用 end() 方法
Transform 流
var zlib = require('zlib');
var gzip = zlib.createGzip();
var rs = fs.createReadStream('1.jpg');
var ws = fs.createWriteStream('1.tar.gz');
rs.pipe(gzip).pipe(ws); // pipe 方法返回目标流,如果目标流实现了可读接口,可以链式调用
工作模式
paused 模式
// 每当数据源中的一部分数据流入到缓存中时,就会触发 readable 事件,当数据流到达尾部时,也会触发 readable 事件
rs.on('readable', () => {
let chunk = rs.read();
if (chunk !== null) {
缓存中有数据,调用 rs.read() 消费缓存中的数据,返回数据,并且会触发 data 事件
} else {
缓存中没有数据,调用 rs.read() 返回 null,并且会触发 end 事件
}
});
实例:
// 每当数据流入缓存时,会触发 readable 事件
rs.on('readable', function handler() {
let chunk = rs.read(); // 手动调用 rs.read() 从缓存中读取数据
if (chunk !== null) {
if (!ws.write(chunk)) {
rs.removeListener('readable', handler); // 相当于手动 rs.pause();
ws.once('drain', function () {
rs.on('readable', handler); // 相当于手动 rs.resume();
});
}
} else {
ws.end();
}
});
flowing 模式
启动 flowing 模式:每当数据流入缓存时,会自动调用 rs.read() 从缓存中读取数据
rs.on('data', () => {});
rs.resume();
rs.pipe();
// 添加 data 事件的监听器,自动切换到 flowing 模式
rs.on('data', (chunk) => {
每次触发 readable 事件的时候,都会自动调用 rs.read() 从缓存中读取数据来消费,chunk 就是读取的数据块
});
rs.on('end', () => {
当 rs.read() 返回 null 时,会触发 end 事件
});
rs.pause(); // 当触发 readable 事件的时候,停止自动调用 rs.read() 从缓存中读取数据了,因此也就不会触发 data 事件了
rs.resume(); // 当触发 readable 事件的时候,恢复自动调用 rs.read() 从缓存中读取数据
实例:
rs.on('data', function (chunk) {
if (!ws.write(chunk)) {
rs.pause();
ws.once('drain', function () {
rs.resume();
});
}
});
rs.on('end', function () {
ws.end();
});
pipe 模式
rs.pipe(ws);
// 当 rs 触发 data 事件时,自动 ws.write()
// 当 rs 触发 end 事件时,自动 ws.end(),除非指定 options.end 为 false,rs.pipe(ws, { end: false });
// 每当 ws.write() 返回 false 时,自动 rs.pause(),当 ws 触发 drain 事件时,自动 rs.resume()
网友评论