linux文件
- 为了区别不同文件类型,会有一个type来进行区别
- 普通文件:包含任意数据
- 目录: 相关一组文件的索引
- 套接字Socket:和另一台机器上的进程通信的类型
- 重定向
每个进程都有自己的描述符表(fd)。
- fd0 stdin 输入到终端
- fd1 stdout 输出到终端
- fd2 stderr 输出错误到终端
- fd4 磁盘文件
标准IO会使用流(stream)行松打开文件,所谓的流(stream)实际上是文件描述度(file descriptor)和缓冲区(buffer)在内存中的抽象
Nodejs中的Stream
Stream模块在nodejs中只是处理流数据的抽象接口,Steam模块提供了基础的API,使用者可以根据这些API构建流接口的对象
Stream流类型
- Readable -可读流(fs.createReadStream)
- Writeable -可写流(fs.createWriteStream)
- Duplex - 可读写流(net.Soket)
- Transform - 在读写过程中可以修改和变换数据的Duplex流.(zlib.createDeflate)
Stream 模块还包含pipeline和finished公共功能
Stream 事件
image.png- Readable发起重要事件
-
data
产生数据 -
end
产生数据约束
-
- Writeable发起重要事件
-
drain
缓存区处理完毕 -
finish
处理结束
-
可读流 Readable Stream
创建一个可读流
const fs = require('fs');
const rs = fs.createReadStream('./index.js', {
highWaterMark: 1024 // 缓存大小,每次on Data的大小,默认是16kb
});
let onDataCount = 0;
rs.on('data', chunk => {
console.log(chunk.toString('utf-8'));
console.log(onDataCount++);
})
rs.on('end', () => {
console.log('end');
})
可读流中的flowing/paused模式
数据传输的时候需要考虑到数据的消费者的消费速度才能保证高利用率,而发送数据端就是
生产者
,接收数据端处理数据是消费者
消费者示例: Http请求处理,文件写入处理,数据库处理等
const fs = require('fs');
const rs = fs.createReadStream('./index.js' ,{
hightWaterMark: 1024
});
let onCount = 0;
rs.on('data', data => {
console.log(data.toString('utf-8'));
console.log(onCount ++);
rs.pause();
console.log('on data => ', rs.isPaused())
setTime(() => rs.resume(), 3000)
})
rs.on('end', () => {
console.log('end');
})
Stream 背压问题
一个Stream数据生产的速度远大于Stream消费的速度,就会造成数据的堆积。比如不停增长的日志文件(100条/1s)作为生产者,有个服务处理日志文件(100条/3s)作为消费者,如果没有pause/flowing模式就会撑爆内存造成浪费,
readable事件精确控制可读流
readable区别于data事件,readable回调使用rs.read(size)方法好像从桶中再用瓢一点点取水一样,可以更加精确的控制流数据的读取。但此事件回调无法使用paused/flowing模式
rs.on('readable', () => {
let dataChunk = rs.read(10);
if(dataChunk) {
console.log(dataChunk.toString())
}
})
可写流 Writeable Stream
创建一个可写流
const fs = require('fs')
const ws = fs.createWriteStream('./out.txt', {
highWaterMark: 10
})
for(let i = 0; i < 1000; i++) {
const drained = ws.write('hahaha\n')
console.log(drained);
}
drained为true,缓存池highWaterMark还没有满,可以继续写入。
实现和使用Stream的各种接口
可写流 Writeable
- writeble使用
const { Writable } = require('stream');
const outStream = new Writable({
write(chunk, encoding, callback) {
console.log(chunk.toString());
callback();
}
});
process.stdin.pipe(outStream);
write方法有三个参数
- chunk 默认为写入数据的buffer
- encoding 写入数据的编码,通常忽略
- callback在处理完chunk之后调用,它通知写入操作成功执行
- 自定义writeble实现
const { Writable } = require('stream');
const fs = require('fs');
class MyWriteable extends Writable {
constructor(options) {
super(options)
}
_write(chunk, encoding, callback) {
// console.log(chunk.toString())
fs.writeFileSync('./out.txt', chunk.toString() + '\n', {flag: 'a+'});
setTimeout(() => callback(null), 100);
}
}
const ws = new MyWriteable();
for(let i =0; i< 100; i++) {
ws.write('hahaha');
}
可读流Readable
- readable使用
const { Readable } = require('stream');
const inStream = new Readable({
read(size) {
this.push(String.fromCharCode(this.currentCharCode++));
if(this.currentCharCode > 90) {
this.push(null);
}
}
})
inStream.currentCharCode = 0;
inStream.pipe(process.stdout);
- 自定义Readable实现
'use strict';
const { Readable } = require('stream');
class MyReadable extends Readable {
constructor(options) {
super(options)
this.count = 65
}
_read() {
this.count++
this.push(String.fromCharCode(this.count++));
if (this.count > 80) {
return this.push(null) // push null 就结束了
}
}
}
let rs = new MyReadable();
rs.on('data', function(data) { // 当 rs 注册 onData 时就开始调用 _reade()
console.log(data.toString());
});
pipe原理和实现
pipe可以有不同的目的地,pipe就是解决了背压的问题,实现方式就是在可读流上注册一个onData事件,达到阈值后进行pause
- 使用pipe
const fs = require('fs');
const rs = fs.createReadStream('./package.json');
const ws = fs.createWriteStream('./out.txt');
rs.pipe(ws);
- 实现pipe
Node源码中就是在可写流上注册onData
和onDrain
可以根据可写流的阈值和释放进行pause/flowing切换
pipe(ws) {
this.on('data', chunk => {
let drained = ws.write(chunk);
if(!drained) {
this.pause();
}
});
ws.on('drain', () => {
this.resume();
})
}
网友评论