nodejs学习笔记Stream(流)

作者: JOKER_HAN | 来源:发表于2018-02-02 16:40 被阅读0次

    什么流

    通俗的说就是一种,有起点和终点的字节数据传输手段,把数据从一个地方传到另一个地方。
    流(Stream)是一个抽象接口,可读、可写或兼具两者的。并且所有流都是 EventEmitter 的实例。
    基于流实现的工具 webpack glup,比如HTTP 服务器request和response ,(TCP sockets),标准输出(process.stdout)等等对象都是流。

    可读流 (Readable Stream)

    可读流存在两种工作模式 流动模式(flowing)暂停模式 (paused)
    下面介绍一下两个模式的特点 :

    暂停模式

    可读流在默认状态就是暂停模式 ,不监听readable也会默认先打开文件,打开文件后会调用一次read(0)方法
    我们读取数据的时候如果文件过大,我们选择一点一点读取,读取字节为highWaterMark的值,放到缓存区中。
    在暂停模式中,我们监听readable事件,可读流会马上去向底层读取文件,然后把读到文件的文件放在缓存区里const state = this._readableState;,同过read()方法来消费数据。
    看下面一个暂停模式的例子:

    let fs = require('fs');
    let rs = fs.createReadStream('1.txt',{
        highWaterMark:3,
        encoding:'utf8'
    });
    rs.on('readable',function () {
        let char = rs.read(1);
        console.log(char);
    });
    
    read() 方法

    readable事件中,read回向可读流请求读取n个字节的数据,根据n值的不同会有下面几种情况:

    • n = undefined ; 即不传参数,此时文件会不断读取hwm(hignWaterMark)字节,并且不断触发readable事件,读取缓存区(hwm的值),如果没设置hwm大小,读取默认大小64k。
    • n = 0 ; 可读流返回一个null 并且不会消费任何数据
    • 0 < n < hwm ; 此时n小于最高水位线执行底层的 _read 方法,从数据源中读取hwm大小的数据填充到缓存区内。并且下次读取字节为( hwm-n) + hwm 个字节
    • n > hwm ; read 方法会先返回null,然后从数据源处读取hwm大小的数据加入缓冲区,并判断缓冲区内数据大小是否大于或等于n,如果是则返回数据,否则会再次返回null并读取n大小的数据。
    readable 事件

    在 readable 事件表示流中有数据可以被读取 有两种情况会被触发:

    • 缓存区为空,或者或 缓存区大小 - 可读大小 < hwm 时,第一次缓存区大小为hwm第二次为缓存去大小为 剩余缓存大小 + hwm
    • 当文件读完时,会自动触发 readable 事件

    流动模式

    开启流动模式的常用方法为两种:监听data 或者使用pipe(管道)方法,下面两个例子减少一下这两中方法。

    let fs = require('fs');
    let rs = fs.createReadStream('./1.txt',{
        highWaterMark:3
    });
    rs.setEncoding('utf8');
    rs.on('data',function (data) {
        console.log(data);
        rs.pause();//暂停读取和发射data事件
        setTimeout(function(){
            rs.resume();//恢复读取并触发data事件
        },2000);
    });
    

    当监听data 事件后 ,可读流会不断从数据源去除hwm大小的数据,并向data事件发送这些数据, 我们当然和以使用stream.pause()手动将流切换到暂停模式,否则该过程将持续下去,直到读到数据源的结束位置。

    如果数据消费的速度小于数据生产的速度的话该怎么办呢,引出了跟高级的方法pipe它就像一个管道,比如我们一边读取文件,一边把读取的数据写入另一个文件,这样写的速度会跟不上读取的速度,就相当于,消费小于生产的情况,如果写的慢我们可读流就会停下来,始终保持读写在一个频率上。

    let fs = require('fs');
    let rs = fs.createReadStream('1.txt',{
        encoding:'utf8'
    });
    let ws = fs.createWriteStream('2.txt',{
        encoding:'utf8'
    });
    rs.pipe(ws);
    

    pipe 方法返回一个 readable 对象,这意味着我们可以使用链式操作将数个流连接在一起,管道一旦被接上,数据将持续不断的从可读流写入可写流。想要终止这个过程,只能使用 stream.unpipe() 来取消管道连接。

    上面个两个例子,我们不仅理解了流动模式,还可以发现其实流动模式 和 暂停模式是可以切换的,下面总结一下模式切换的方法:

    暂停模式切换到流动模式:
    • 监听 data 事件
    • 调用 stream.resume() 方法
    • 调用 stream.pipe() 方法将数据发送到可写流中
    流动模式切换到暂停模式:
    • 如果不存在管道目标(pipe destination),可以通过调用 stream.pause() 方法实现。
    • 如果存在管道目标,可以通过取消 data 事件监听,并调用 stream.unpipe() 方法移除所有管道目标来实现。

    可写流 (Writable stream)

    可读流的默认缓存空间是64k,可写流的默认缓存空间为16k,可写流当时是向目标文件些数据的,下面同样通过代码了解:

    let fs = require('fs');
    let ws = fs.createWriteStream('2.txt',{
        flag:'w',
        mode:0o666,
        highWaterMark:3,
        encoding:'utf8'
    });
    let count = 9;
    function write(){
        let flag = true;
        while(flag && count>0){
            flag = ws.write(count-- +'');
        }
    }
    write();
    ws.on('drain',function () {
        console.log('drain');
        write();
    });
    

    面这段代码展示了一个可写流实例的几个基本的事件和方法,下面我们来逐一介绍:

    write()

    这个方法的作用是向可写流写入数据,它的类型必须是字符串或者Buffer,同时方法返回值为布尔值,当前我们的缓存区大小为hwm3个字符,我们一个一个字符的写,当写到第三个时,缓存区满了,此时返回false。
    一旦我们确认方法返回了false后,应该立刻停止调用 write 方法,直到缓冲器中的数据被清空为止。当然,即使方法返回了false,你实际上也可以继续使用 write 方法写入数据。node会将你写入的数据全部缓存起来,直到超过了能使用的最大内存。

    end()

    end 方法的作用是关闭流,它可以传入三个可选的参数。chunk 和 encoding 是在关闭可写流前希望最后写入的数据及其对应的编码。如果传入 callback,这个 callback 会作为 finish 事件的回调函数触发

    drain事件

    在可写流的缓冲区超过hwm的条件下,会触发drain事件,提示使用者,先不要在写了,内存满了,注意这个事件触发的前提,即write 方法返回了false后清空缓冲区才会触发 drain 事件

    注意:建议在 write 方法返回false时停止写入数据,在 drain 事件的回调中再次开始写入,这样可以更好的控制缓冲区的大小,避免发生内存泄漏问题。

    close事件

    close 如文件系统被关闭时。当 close 事件被触发后,可写流将不会再触发其他事件。值得注意的是,不是所有可写流都会触发 close 事件。

    可写流可读流大概内容已经说完了,下面配上一张图再总结一下流的原理:
    image.png
    • 当建立一个可读流的时候,可读流默认会监听readable 事件,此时的read(n) n=0;不会消费任何数据
    • 当我们主动监听readable事件时,调用read方法消费数据,当缓存区为空时,会再次触发readable事件,直到你读完了源文件的所有数据。
    • 当我们监听 data的时候,通过data 事件回调进行消费,这个过程不会停止,直到全部读取完成。当然,在这个过程中你随时可以通过stream.pause()方法暂停它。
    • 接下来创建一个可写流,并调用write方法来消费我们的数据,但是因为写入的速度较慢,如果当前写入还在进行,而你又调用了write方法,node会将你要写入的数据缓存在一个缓存区中,等到文件写入完毕会从缓存区中取出数据,继续写入。
    • write 有一个布尔类型的返回值,如果写入过快,缓存区满了之后就会返回false。
    • 当缓存区内容完全写入清空口,这是会调用drain事件,我们可以在他的回调中,继续写入文件,执行write()

    如有对不的地方,不吝赐教

    相关文章

      网友评论

        本文标题:nodejs学习笔记Stream(流)

        本文链接:https://www.haomeiwen.com/subject/iwcraxtx.html