前言
前端工程领域中使用Nodejs处处掣肘,原因无非是Nodejs中困难部分:文件和网络。而文件和网络都依赖一个重要的对象Stream.
Buffer
ES6引入TypedArray之前,js没有读取或者操作二进制数据流的机制。Buffer作为Nodejs Api一部分被引入,以便能够和TCP等网络流和文件流进行交互。
Buffer 用来处理二进制数据,它的实例类似整型数组,不过缓冲区大小在创建时候确定,不能调整。它不经过V8的内存分配机制,Buffer是一个JS和C++集合的模块,内存由C++申请,javascript分配。
为什么应该使用流
在Node中,IO都是异步的,在硬盘以及网络交互的过程中会涉及到传递回调函数的过程.
const http= require('http');
const fs= require('fs');
const server = http.createServer((req, res) => {
fs.readFile('./a.mp4', (err, data) => {
res.end(data);
})
})
server.listen(8000);
上面这段代码在每次请求时候,都会把整个文件a.pm4
存入内存,然后再把结果返回给客户端。a.pm4
可能达到2G以上。这样的话程序会消耗大量的内存。
我们采用流的方式来实现
const http= require('http');
const fs= require('fs');
const server = http.createServer((req, res) => {
const stream = fs.createReadStream('./a.pm4');
stream.pipe(res);
})
server.listen(8000);
Readable Stream
基本使用
const peaks = [
'Tallac',
'Ralston',
'Rubicon',
'Twin Peaks',
'Rose',
'Freel Peak',
'Dankun Kun'
];
class StreamFromArray extends Readable {
constructor(array){
super()
this.array = array;
this.index = 0;
}
_read() {
if(this.array.length <= this.index) {
return this.push(null);
}
this.index += 1;
this.push(this.array[this.index]);
}
}
const streamFromArray = new StreamFromArray(peaks);
streamFromArray.on('data', (chunk) => {
console.log('data',chunk)
})
streamFromArray.on('end', (chunk) => {
console.log('end',chunk)
})
设置encoding
constructor(array){
super({encoding:'utf-8'})
this.array = array;
this.index = 0;
}
objectMode
对于流来说,它不仅仅可以处理Buffer,String这样的数据类型,还能处理复杂的对象类型,通过开启objectModel
属性即可。
对于可读流来说,从下游可读流中读到的数据是对象类型,对于可写流来说,从上游写入的数据是对象类型。
const { Readable } = require('stream');
const peaks = [
'Tallac',
'Ralston',
'Rubicon',
'Twin Peaks',
'Rose',
'Freel Peak',
'Dankun Kun'
];
class StreamFromArray extends Readable {
constructor(array){
super({objectMode: true,});
this.array = array;
this.index = 0;
}
_read() {
if(this.array.length <= this.index) {
return this.push(null);
}
this.index += 1;
this.push({
data: this.array[this.index],
index: this.index
});
}
}
const streamFromArray = new StreamFromArray(peaks);
streamFromArray.on('data', (chunk) => {
console.log('data',chunk)
})
streamFromArray.on('end', (chunk) => {
console.log('end',chunk)
})
两种模式
- flowing(流动)模式
数据会自动读取并且塞入到data事件的回调函数中进行消费。
const fs = require('fs');
const readStream = fs.createReadStream('./small.txt');
readStream.on('data', (chunk) => {
console.log(readStream._readableState.flowing); // true
console.log(chunk.toString()); // 这里因为内容是英文,如果是中文内容需要设置编码模式
});
console.log(readStream._readableState.flowing); // true
- 暂停模式
const fs = require('fs');
const readStream = fs.createReadStream('./small.txt');
console.log(readStream._readableState.flowing); // null
// 使用 once 来绑定监听者回调,一方面避免 readable 事件的多次触发?一方面避免内存泄漏
readStream.once('readable', () => {
while(chunk = readStream.read()){
readStream.pause();
console.log(readStream._readableState.flowing); // false
console.log(chunk.toString());
readStream.resume();
console.log(readStream._readableState.flowing); // true
}
});
我们可以看到可读流的初始状态是null,如果没有设置pipe方法或者data事件的监听回调,那么就一直是null,但是如果我们使用流的pause
方法强制让流暂停读取数据,那么内部状态值就变成false,调用resume
又会让流变成流动模式。
三种状态
可读流内部存储了一个变量readStream._readableState.flowing
来记录当前流是否处于流动模式,值分别为 null, false, true。
Writable Stream
- 基本使用
const { Readable } = require('stream');
const fs = require('fs');
const readStream = fs.createReadStream('./a.mp4');
const writeStream = fs.createWriteStream('./b.mp4');
readStream.on('data', chunk => {
writeStream.write(chunk);
})
readStream.on('error', error => {
console.log('error')
})
readStream.on('end', () => {
console.log('end')
})
背压
const { Readable } = require('stream');
const fs = require('fs');
const readStream = fs.createReadStream('./a.mp4');
const writeStream = fs.createWriteStream('./copied.mp4');
readStream.on('data', chunk => {
const flag = writeStream.write(chunk);
if(!flag) {
console.log('backpressure');
readStream.pause();
}
})
readStream.on('error', error => {
console.log('error')
})
readStream.on('end', () => {
writeStream.end();
console.log('end')
})
writeStream.on('drain', () => {
readStream.resume()
})
Transform转换流
const { Readable, Transform } = require('stream');
const peaks = [
'Tallac',
'Ralston',
'Rubicon',
'Twin Peaks',
'Rose',
'Freel Peak',
'Dankun Kun'
];
class StreamFromArray extends Readable {
constructor(array){
super();
this.array = array;
this.index = 0;
}
_read() {
if(this.array.length <= this.index) {
return this.push(null);
}
this.index += 1;
this.push(this.array[this.index]);
}
}
const streamFromArray = new StreamFromArray(peaks);
class ReplaceText extends Transform {
constructor(char) {
super();
this.replaceChar = char;
}
_transform(chunk, encoding, callabck) {
const transformChunk = chunk.toString()
.replace(/[a-zA-Z0-9]/g, this.replaceChar);
this.push(transformChunk);
callabck();
}
_flush(callabck) {
this.push('more stuff is being passed...');
callabck();
}
}
const xStream = new ReplaceText('X');
streamFromArray
.pipe(xStream)
.pipe(process.stdout);
网友评论