stream

作者: 西域战神 | 来源:发表于2021-05-16 21:01 被阅读0次

第一个stream的例子:

const fs = require('fs')
const stream = fs.createWriteStream('./big_file.txt')
for(let i=0;i<1000000;i++) {
  stream.write(`this is ${i} \n`)
}
stream.end() //关闭stream
console.log('done')
  1. 创建流,多次往里面填充内容,关闭流
  2. 最终得到一个100MB的文件

stream-流

stream 是水流,但默认没有水
stream.write可以让水流中有水(数据)
每次写的数据叫chunk(块)
产生数据的一段叫source(源头)
得到数据的一段叫sink(水池)

第二个stream的例子

1.直接读取文件:

const fs = require('fs')
const http = require('http')
const server = http.createServer()
server.on('request',(request,response)=>{
  fs.readFile('./big_file.txt',(error,data)=>{
    if(error) throw error
    response.end(data)
    console.log('done')
  })
})
server.listen(8888)
console.log('started-----8888')

此时我们创建一个http服务,当我们试图读取刚刚创建的文件时,发现node.js的服务内存瞬间增加了100MB,此时如果存在多个请求,将会对机器造成很大的压力.

2. 通过createReadStream读取文件;

const fs = require('fs')
const http = require('http')
const server = http.createServer()
server.on('request',(request,response)=>{
 const stream = fs.createReadStream('./big_file.txt')
  stream.pipe(response)
})
server.listen(8888)
console.log('started at 8888')

此时我们可以看到node.js服务的内存只增加了20多MB,虽然读取的速度不如比之前fs.readFile慢一些。

管道pipe

两个流可以用一个管道相连,stream1的末尾连上stream2开头

stream1.pipe(stream2)
//链式操作
a.pipe(b).pipe(c)

Stream对象的原型链

const s = fs.createReadStream(path)

s的对象层级为:
自身属性(由fs.ReadStream构造)
原型:stream.Readable.prototype
二级原型:stream.Stream.prototype
三级原型:events.EventEmitter.prototype
四级原型:Object.prototype


image.pngimage.png
image.pngimage.png

Stream 分类

  1. Readable 可读
  2. Writable 可写
  3. Duplex 可读可写(双向,默认读写分离,互相不干扰)
  4. Transform 可读可写(变化,相当于一个转换器,比如babel写入es6,然后读取的是es5)

Readable stream

  1. 默认处于paused态
  2. 添加data事件,变成flowing态
  3. 删掉data事件,变为paused态
  4. 调用pause()可以变为paused态
  5. 调用resume()可以变为flowing态
const fs = require('fs')
const http = require('http')
const server = http.createServer()
server.on('request',(request,response)=>{
 const stream = fs.createReadStream('./big_file.txt')
  stream.pipe(response)
  stream.pause()
})
server.listen(8888)
console.log('started-----8888')

此时我们发送请求,无法获得任何的响应,因为调用了pause()

stream.pipe(response)
  stream.pause()
  setTimeout(()=>{
    console.log('3s 后恢复')
    stream.resume()
  },3000)

我们在3秒后调用resume(),此时响应恢复

创建一个readable stream

const {Readable} = require('stream')
const inStream = new Readable()
inStream.push('ABCDEFG')
inStream.push('EFGHI')
inStream.push(null)
inStream.pipe(process.stdout)

目前的方法不是按需供给的

const { Readable } = require("stream");
const inStream = new Readable({
  read(size) {
    const char = String.fromCharCode(this.currentCharCode++)
    this.push(char);
    console.log(`push ${char}`)
    if (this.currentCharCode > 90) {
      this.push(null);
    }
  }
})
inStream.currentCharCode = 65
inStream.pipe(process.stdout)

用户调用read才会调用

Writable stream

1.drain事件

我们在调用stream.write(chunk)的时候可能会得到false,表示写的太快,数据积压,要监听到drain事件后才能继续write

2. finish事件

调用stream.end()后,且缓冲区数据都传递给底层系统后,触发finish事件

创建一个writable stream

const {Writable} = require('stream')
const outStream = new Writable({
  write(chunk, encoding, callback) {
    console.log('get user input:')
    console.log(chunk.toString())
    callback()
  }
})
process.stdin.pipe(outStream)

这时我们输入一个值,终端会出现你输入的值

Duplex stream

const { Duplex } = require("stream");
const inoutStream = new Duplex({
write(chunk, encoding, callback) {
    console.log(chunk.toString());
    callback();
},
read(size) {
    this.push(String.fromCharCode(this.currentCharCode++));
    if (this.currentCharCode > 90) {
        this.push(null);
    }
}
});
inoutStream.currentCharCode = 65;
process.stdin.pipe(inoutStream).pipe(process.stdout);

Transform stream

const { Transform } = require("stream");
const upperCaseTr = new Transform({
transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase());
    callback();
    }
});
process.stdin.pipe(upperCaseTr).pipe(process.stdout);

创建一个gzip流

const fs = require("fs");
const zlib = require("zlib");
const file = process.argv[2];
fs.createReadStream(file)
.pipe(zlib.createGzip())
.pipe(fs.createWriteStream(file + ".gz"));

相关文章

网友评论

      本文标题:stream

      本文链接:https://www.haomeiwen.com/subject/fpwrjltx.html