美文网首页
Node Native (2) stream

Node Native (2) stream

作者: 小懒豆 | 来源:发表于2018-06-01 11:08 被阅读21次

    流的分类

    var Stream = require('stream')
    var Readable = Stream.Readable
    var Writable = Stream.Writable
    var Duplex = Stream.Duplex
    var Transform = Stream.Transform
    
    • Readable- 可读的流 (例如 fs.createReadStream()).
    • Writable - 可写的流 (例如 fs.createWriteStream()).
    • Duplex - 可读写的流 (例如 net.Socket).
    • Transform - 在读写过程中可以修改和变换数据的 Duplex 流 (例如 zlib.createDeflate()).

    一、Stream.Readable 类

    可读流

    可读流事实上工作在下面两种模式之一:flowing 和 paused 。
    • flowing 模式下, 可读流自动从系统底层读取数据,并通过 `EventEmitter接口的事件尽快将数据提供给应用。

    • paused 模式下,必须显式调用 stream.read()方法来从流中读取数据片段。

    所有初始工作模式为 paused 的 Readable 流,可以通过下面三种途径切换到 flowing 模式:
    • 监听 'data' 事件。
    • 调用 stream.resume()方法。
    • 调用 stream.pipe()方法将数据发送到 Writable。
    可读流可以通过下面途径切换到 paused 模式:
    • 如果不存在管道目标(pipe destination),可以通过调用 stream.pause()方法实现。
    • 如果存在管道目标,可以通过取消 'data'事件监听,并调用 stream.unpipe()方法移除所有管道目标来实现。

    这里需要记住的重要概念就是,可读流需要先为其提供消费或忽略数据的机制,才能开始提供数据。如果消费机制被禁用或取消,可读流将 尝试 停止生成数据

    为了向后兼容,取消 'data'事件监听并 不会 自动将流暂停。同时,如果存在管道目标(pipe destination),且目标状态变为可以接收数据(drain and ask for more data),调用了 stream.pause()方法也并不保证流会一直 保持 暂停状态。

    如果 Readable 切换到 flowing 模式,且没有消费者处理流中的数据,这些数据将会丢失。 比如, 调用了 readable.resume() 方法却没有监听 'data' 事件,或是取消了 'data' 事件监听,就有可能出现这种情况。

    三种状态

    在任意时刻,任意可读流应确切处于下面三种状态之一:

    • readable._readableState.flowing = null:由于不存在数据消费者,可读流将不会产生数据。 在这个状态下,监听 'data' 事件,调用 readable.pipe() 方法,或者调用 readable.resume() 方法, readable._readableState.flowing 的值将会变为 true 。这时,随着数据生成,可读流开始频繁触发事件。
    • readable._readableState.flowing = false:调用 readable.pause() 方法, readable.unpipe() 方法, 或者接收 “背压”(back pressure), 将导致 readable._readableState.flowing 值变为 false。 这将暂停事件流,但 不会 暂停数据生成。 在这种情况下,为 'data' 事件设置监听函数不会导致 readable._readableState.flowing 变为 true。
    • readable._readableState.flowing = true
      对于大多数用户,建议使用 readable.pipe() 方法来消费流数据,因为它是最简单的一种实现。开发者如果要精细地控制数据传递和产生的过程,可以使用 EventEmitterreadable.pause()/readable.resume() 提供的 API

    事件

    • close:在流或其底层资源(比如一个文件)关闭后触发,不是所有 Readable都会触发 'close' 事件。
    • data:会在流将数据传递给消费者时触发。
    • end:事件只有在数据被完全消费后 才会触发
    • close:在底层系统内部出错从而不能产生数据,或当流的实现试图传递错误数据时发生。
    • readable:事件将在流中有数据可供读取时触发。

    方法

    • isPaused():返回可读流的当前操作状态。

    • pause(): 方法将会使 flowing 模式的流停止触发 'data'`事件, 进而切出 flowing 模式。任何可用的数据都将保存在内部缓存中。

    • pipe(destination[, options]):绑定一个 Writable 到 readable 上, 将可写流自动切换到 flowing 模式并将所有数据传给绑定的 Writable。数据流将被自动管理。这样,即使是可读流较快,目标可写流也不会超负荷(overwhelmed)。

    • unpipe([destination]):将之前通过stream.pipe()方法绑定的流分离

    • read([size]):从内部缓冲区中抽出并返回一些数据。一般来说,建议开发人员避免使用'readable'事件和readable.read()方法,使用readable.pipe()或'data'事件代替。

    • resume():方法会重新触发 'data'事件, 将暂停模式切换到流动模式。

    • setEncoding(encoding):要使用的编码

    • unshift(chunk):把一块数据压回到Buffer内部。

    • destroy([error]):销毁流,并且触发error事件。然后,可读流将释放所有的内部资源。

    创建可读流。

    实例:流式消耗迭代器中的数据。

    'use strict'
    const Readable = require('stream').Readable
    
    class ToReadable extends Readable {
      constructor(iterator) {
        super()
        this.iterator = iterator
      }
    
      // 子类需要实现该方法
      // 这是生产数据的逻辑
      _read() {
        const res = this.iterator.next()
        if (res.done) {
          // 数据源已枯竭,调用`push(null)`通知流
          return this.push(null)
        }
        setTimeout(() => {
          // 通过`push`方法将数据添加到流中
          this.push(res.value + '\n')
        }, 0)
      }
    }
    
    module.exports = ToReadable
    
    

    实际使用时,new ToReadable(iterator)会返回一个可读流,下游可以流式的消耗迭代器中的数据。

    const iterator = function (limit) {
      return {
        next: function () {
          if (limit--) {
            return { done: false, value: limit + Math.random() }
          }
          return { done: true }
        }
      }
    }(1e10)
    
    const readable = new ToReadable(iterator)
    
    // 监听`data`事件,一次获取一个数据
    readable.on('data', data => process.stdout.write(data))
    
    // 所有数据均已读完
    readable.on('end', () => process.stdout.write('DONE'))
    
    

    执行上述代码,将会有100亿个随机数源源不断地写进标准输出流。

    创建可读流时,需要继承Readable,并实现_read方法。

    • _read方法是从底层系统读取具体数据的逻辑,即生产数据的逻辑。
    • _read方法中,通过调用push(data)将数据放入可读流中供下游消耗。
    • _read方法中,可以同步调用push(data),也可以异步调用。
    • 当全部数据都生产出来后,必须调用push(null)来结束可读流。
    • 流一旦结束,便不能再调用push(data)添加数据。

    可以通过监听data事件的方式消耗可读流。

    • 在首次监听其data事件后,readable便会持续不断地调用_read(),通过触发data事件将数据输出。
    • 第一次data事件会在下一个tick中触发,所以,可以安全地将数据输出前的逻辑放在事件监听后(同一个tick中)。
    • 当数据全部被消耗时,会触发end事件。

    上面的例子中,process.stdout代表标准输出流,实际是一个可写流。

    二、stream.Writable 类

    事件

    • close:件将在流或其底层资源(比如一个文件)关闭后触发。

    • drain:如果调用 stream.write(chunk)方法返回 false'drain' 事件会在适合恢复写入数据到流的时候触发。

    • error:写入数据出错或者使用管道出错时触发,error 事件发生时,流并不会关闭。

    • finish:在调用了 stream.end()方法,且缓冲区数据都已经传给底层系统(underlying system)之后, 'finish' 事件将被触发

    • pipe:在可读流(readable stream)上调用 stream.pipe() 方法,并在目标流向 (destinations) 中添加当前可写流 ( writable ) 时

    • unpipe:在 Readable上调用 stream.unpipe()方法,从目标流向中移除当前 Writable时

    属性

    • writableLength:返回构造该可写流时传入的 highWaterMark 参数值。
    • writableHighWaterMark:包含了写入就绪队列的字节(或者对象)数,这个值提供了关于highWaterMark状 态的内省数据。

    方法

    • cork():强制所有写入数据都存放到内存中的缓冲区里。 直到调用 stream.uncork()stream.end() 方法时,缓冲区里的数据才会被输出。

    • uncork():

    • end([chunk][, encoding][, callback]):表明接下来没有数据要被写入 Writable,如果传入了可选的 callback 函数,它将作为 'finish'事件的回调函数。

    • setDefaultEncoding(encoding):

    • write(chunk[, encoding][, callback]):

      1. chunk <string> | <Buffer> | <Uint8Array> | <any> 要写入的数据。可选的。 对于非对象模式下的流, chunk 必须是字符串, Buffer 或者 Uint8Array。对于对象模式下的流,chunk 可以是除 null 外的任意 JavaScript 值。
      2. encoding <string> 如果 chunk 是字符串,这里指定字符编码
      3. callback <Function>缓冲数据输出时的回调函数
      4. 返回: <boolean>如果流需要等待 'drain' 事件触发才能继续写入数据,这里将返回 false ; 否则返回 true
    • destroy([error]):摧毁这个流,并发出传过来的错误。当这个函数被调用后,这个写入流就结束了

    创建可写流。

    前面通过继承的方式去创建一类可读流,这种方法也适用于创建一类可写流,只是需要实现的是_write(data, enc, next)方法,而不是_read()方法。

    有些简单的情况下不需要创建一类流,而只是一个流对象,可以用如下方式去做:

    const Writable = require('stream').Writable
    
    const writable = Writable()
    // 实现`_write`方法
    // 这是将数据写入底层的逻辑
    writable._write = function (data, enc, next) {
      // 将流中的数据写入底层
      process.stdout.write(data.toString().toUpperCase())
      // 写入完成时,调用`next()`方法通知流传入下一个数据
      process.nextTick(next)
    }
    
    // 所有数据均已写入底层
    writable.on('finish', () => process.stdout.write('DONE'))
    
    // 将一个数据写入流中
    writable.write('a' + '\n')
    writable.write('b' + '\n')
    writable.write('c' + '\n')
    
    // 再无数据写入流时,需要调用`end`方法
    writable.end()
    

    上游通过调用writable.write(data)将数据写入可写流中。write()方法会调用_write()data写入底层。
    _write中,当数据成功写入底层后,必须调用next(err)告诉流开始处理下一个数据。
    next的调用既可以是同步的,也可以是异步的。
    上游必须调用writable.end(data)来结束可写流,data是可选的。此后,不能再调用write新增数据。
    end方法调用后,当所有底层的写操作均完成时,会触发finish事件。

    三、stream.Duplex 类

    Duplex 流是同时实现了 Readable和 Writable接口的流。

    创建可读可写流。

    Duplex实际上就是继承了Readable和Writable的一类流。
    所以,一个Duplex对象既可当成可读流来使用(需要实现_read方法),也可当成可写流来使用(需要实现_write方法)。

    var Duplex = require('stream').Duplex
    
    var duplex = Duplex()
    
    // 可读端底层读取逻辑
    duplex._read = function () {
      this._readNum = this._readNum || 0
      if (this._readNum > 1) {
        this.push(null)
      } else {
        this.push('' + (this._readNum++))
      }
    }
    
    // 可写端底层写逻辑
    duplex._write = function (buf, enc, next) {
      // a, b
      process.stdout.write('_write ' + buf.toString() + '\n')
      next()
    }
    
    // 0, 1
    duplex.on('data', data => console.log('ondata', data.toString()))
    
    duplex.write('a')
    duplex.write('b')
    
    duplex.end()
    

    上面的代码中实现了_read方法,所以可以监听data事件来消耗Duplex产生的数据。
    同时,又实现了_write方法,可作为下游去消耗数据。

    因为它既可读又可写,所以称它有两端:可写端和可读端。
    可写端的接口与Writable一致,作为下游来使用;可读端的接口与Readable一致,作为上游来使用。

    四、stream.Transform 类

    变换流(Transform streams) 是一种 Duplex流。它的输出与输入是通过某种方式关联的。和所有 Duplex流一样,变换流同时实现了 Readable和 Writable接口。

    变换流的实例包括:

    • zlib streams
    • crypto streams

    在上面的例子中,可读流中的数据(0, 1)与可写流中的数据('a', 'b')是隔离开的,但在Transform中可写端写入的数据经变换后会自动添加到可读端。
    Tranform继承自Duplex,并已经实现了_read和_write方法,同时要求用户实现一个_transform方法。

    'use strict'
    
    const Transform = require('stream').Transform
    
    class Rotate extends Transform {
      constructor(n) {
        super()
        // 将字母旋转`n`个位置
        this.offset = (n || 13) % 26
      }
    
      // 将可写端写入的数据变换后添加到可读端
      _transform(buf, enc, next) {
        var res = buf.toString().split('').map(c => {
          var code = c.charCodeAt(0)
          if (c >= 'a' && c <= 'z') {
            code += this.offset
            if (code > 'z'.charCodeAt(0)) {
              code -= 26
            }
          } else if (c >= 'A' && c <= 'Z') {
            code += this.offset
            if (code > 'Z'.charCodeAt(0)) {
              code -= 26
            }
          }
          return String.fromCharCode(code)
        }).join('')
    
        // 调用push方法将变换后的数据添加到可读端
        this.push(res)
        // 调用next方法准备处理下一个
        next()
      }
    
    }
    
    var transform = new Rotate(3)
    transform.on('data', data => process.stdout.write(data))
    transform.write('hello, ')
    transform.write('world!')
    transform.end()
    
    // khoor, zruog!
    
    

    注意、objectMode

    前面几节的例子中,经常看到调用data.toString()。这个toString()的调用是必需的吗?
    本节介绍完如何控制流中的数据类型后,自然就有了答案。

    在shell中,用管道(|)连接上下游。上游输出的是文本流(标准输出流),下游输入的也是文本流(标准输入流)。在本文介绍的流中,默认也是如此。

    对于可读流来说,push(data)时,data只能是String或Buffer类型,而消耗时data事件输出的数据都是Buffer类型。对于可写流来说,write(data)时,data只能是String或Buffer类型,_write(data)调用时传进来的data都是Buffer类型。

    也就是说,流中的数据默认情况下都是Buffer类型。产生的数据一放入流中,便转成Buffer被消耗;写入的数据在传给底层写逻辑时,也被转成Buffer类型。

    但每个构造函数都接收一个配置对象,有一个objectMode的选项,一旦设置为true,就能出现“种瓜得瓜,种豆得豆”的效果。
    Readable未设置objectMode时:

    const Readable = require('stream').Readable
    
    const readable = Readable()
    
    readable.push('a')
    readable.push('b')
    readable.push(null)
    
    readable.on('data', data => console.log(data))
    输出:
    
    <Buffer 61>
    <Buffer 62>
    

    Readable设置objectMode后:

    const Readable = require('stream').Readable
    
    const readable = Readable({ objectMode: true })
    
    readable.push('a')
    readable.push('b')
    readable.push({})
    readable.push(null)
    
    readable.on('data', data => console.log(data))
    输出:
    
    a
    b
    {}
    

    可见,设置objectMode后,push(data)的数据被原样地输出了。此时,可以生产任意类型的数据。

    如何通过流取到数据

    Readable创建对象readable后,便得到了一个可读流。
    如果实现_read方法,就将流连接到一个底层数据源。
    流通过调用_read向底层请求数据,底层再调用流的push方法将需要的数据传递过来。

    readable连接了数据源后,下游便可以调用readable.read(n)向流请求数据,同时监听readabledata事件来接收取到的数据。

    这个流程可简述为:

    how-data-comes-out

    read

    read方法中的逻辑可用下图表示,后面几节将对该图中各环节加以说明。

    read

    push方法

    消耗方调用read(n)促使流输出数据,而流通过_read()使底层调用push方法将数据传给流。

    如果流在流动模式下(state.flowingtrue)输出数据,数据会自发地通过data事件输出,不需要消耗方反复调用read(n)

    如果调用push方法时缓存为空,则当前数据即为下一个需要的数据。
    这个数据可能先添加到缓存中,也可能直接输出。
    执行read方法时,在调用_read后,如果从缓存中取到了数据,就以data事件输出。

    所以,如果_read异步调用push时发现缓存为空,则意味着当前数据是下一个需要的数据,且不会被read方法输出,应当在push方法中立即以data事件输出。

    因此,上图中“立即输出”的条件是:

    state.flowing && state.length === 0 && !state.sync
    
    

    end事件

    由于流是分次向底层请求数据的,需要底层显示地告诉流数据是否取完。
    所以,当某次(执行_read())取数据时,调用了push(null),就意味着底层数据取完。
    此时,流会设置state.ended

    state.length表示缓存中当前的数据量。
    只有当state.length0,且state.endedtrue,才意味着所有的数据都被消耗了。
    一旦在执行read(n)时检测到这个条件,便会触发end事件。
    当然,这个事件只会触发一次。

    readable事件

    在调用完_read()后,read(n)会试着从缓存中取数据。
    如果_read()是异步调用push方法的,则此时缓存中的数据量不会增多,容易出现数据量不够的现象。

    如果read(n)的返回值为null,说明这次未能从缓存中取出所需量的数据。
    此时,消耗方需要等待新的数据到达后再次尝试调用read方法。

    在数据到达后,流是通过readable事件来通知消耗方的。
    在此种情况下,push方法如果立即输出数据,接收方直接监听data事件即可,否则数据被添加到缓存中,需要触发readable事件。
    消耗方必须监听这个事件,再调用read方法取得数据。

    doRead

    流中维护了一个缓存,当缓存中的数据足够多时,调用read()不会引起_read()的调用,即不需要向底层请求数据。
    doRead来表示read(n)是否需要向底层取数据,其逻辑为:

    var doRead = state.needReadable
    
    if (state.length === 0 || state.length - n < state.highWaterMark) {
      doRead = true
    }
    
    if (state.ended || state.reading) {
      doRead = false
    }
    
    if (doRead) {
      state.reading = true
      state.sync = true
      if (state.length === 0) {
        state.needReadable = true
      }
      this._read(state.highWaterMark)
      state.sync = false
    }
    
    

    state.reading标志上次从底层取数据的操作是否已完成。
    一旦push方法被调用,就会设置为false,表示此次_read()结束。

    state.highWaterMark是给缓存大小设置的一个上限阈值。
    如果取走n个数据后,缓存中保有的数据不足这个量,便会从底层取一次数据。

    howMuchToRead

    调用read(n)去取n个数据时,m = howMuchToRead(n)是将从缓存中实际获取的数据量。
    根据以下几种情况赋值,一旦确定则立即返回:

    • state.length为0,state.endedtrue
      数据源已枯竭,且缓存为空,无数据可取,m为0.
    • state.objectModetrue
      n为0,则m为0;
      否则m为1,将缓存的第一个元素输出。
    • n是数字。
      n <= 0,则m为0;
      n > state.length,表示缓存中数据量不够。
      此时如果还有数据可读(state.endedfalse),则m为0,同时设置state.needReadable,下次执行read()doRead会为true,将从底层再取数据。
      如果已无数据可读(state.endedtrue),则mstate.length,将剩下的数据全部输出。
      0 < n <= state.length,则缓存中数据够用,mn
    • 其它情况。
      state.flowingtrue(流动模式),则m为缓存中第一个元素(Buffer)的长度,实则还是将第一个元素输出;
      否则mstate.length,将缓存读空。

    上面的规则中:

    • n通常是undefined0,即不指定读取的字节数。
    • read(0)不会有数据输出,但从前面对doRead的分析可以看出,是有可能从底层读取数据的。
    • 执行read()时,由于流动模式下数据会不断输出,所以每次只输出缓存中第一个元素输出,而非流动模式则会将缓存读空。
    • objectModetrue时,m01。此时,一次push()对应一次data事件。

    综上所述:
    可读流是获取底层数据的工具,消耗方通过调用read方法向流请求数据,流再从缓存中将数据返回,或以data事件输出。
    如果缓存中数据不够,便会调用_read方法去底层取数据。
    该方法在拿到底层数据后,调用push方法将数据交由流处理(立即输出或存入缓存)。

    可以结合readable事件和read方法来将数据全部消耗,这是暂停模式的消耗方法。
    但更常见的是在流动模式下消耗数据,具体见后面的章节。

    数据的流式消耗

    所谓“流式数据”,是指按时间先后到达的数据序列。

    数据消耗模式

    可以在两种模式下消耗可读流中的数据:暂停模式(paused mode)和流动模式(flowing mode)。

    流动模式下,数据会源源不断地生产出来,形成“流动”现象。
    监听流的data事件便可进入该模式。

    暂停模式下,需要显示地调用read(),触发data事件。

    可读流对象readable中有一个维护状态的对象,readable._readableState,这里简称为state
    其中有一个标记,state.flowing, 可用来判别流的模式。
    它有三种可能值:

    • true。流动模式。
    • false。暂停模式。
    • null。初始状态。

    调用readable.resume()可使流进入流动模式,state.flowing被设为true
    调用readable.pause()可使流进入暂停模式,state.flowing被设为false

    暂停模式

    在初始状态下,监听data事件,会使流进入流动模式。
    但如果在暂停模式下,监听data事件并不会使它进入流动模式。
    为了消耗流,需要显示调用read()方法。

    const Readable = require('stream').Readable
    
    // 底层数据
    const dataSource = ['a', 'b', 'c']
    
    const readable = Readable()
    readable._read = function () {
      if (dataSource.length) {
        this.push(dataSource.shift())
      } else {
        this.push(null)
      }
    }
    
    // 进入暂停模式
    readable.pause()
    readable.on('data', data => process.stdout.write('\ndata: ' + data))
    
    var data = readable.read()
    while (data !== null) {
      process.stdout.write('\nread: ' + data)
      data = readable.read()
    }
    
    

    执行上面的脚本,输出如下:

    
    data: a
    read: a
    data: b
    read: b
    data: c
    read: c
    
    

    可见,在暂停模式下,调用一次read方法便读取一次数据。
    执行read()时,如果缓存中数据不够,会调用_read()去底层取。
    _read方法中可以同步或异步地调用push(data)来将底层数据交给流处理。

    在上面的例子中,由于是同步调用push方法,数据会添加到缓存中。
    read方法在执行完_read方法后,便从缓存中取数据,再返回,且以data事件输出。

    如果改成异步调用push方法,则由于_read()执行完后,数据来不及放入缓存,
    将出现read()返回null的现象。
    见下面的示例:

    const Readable = require('stream').Readable
    
    // 底层数据
    const dataSource = ['a', 'b', 'c']
    
    const readable = Readable()
    readable._read = function () {
      process.nextTick(() => {
        if (dataSource.length) {
          this.push(dataSource.shift())
        } else {
          this.push(null)
        }
      })
    }
    
    readable.pause()
    readable.on('data', data => process.stdout.write('\ndata: ' + data))
    
    while (null !== readable.read()) ;
    
    

    执行上述脚本,可以发现没有任何数据输出。

    此时,需要使用readable事件:

    const Readable = require('stream').Readable
    
    // 底层数据
    const dataSource = ['a', 'b', 'c']
    
    const readable = Readable()
    readable._read = function () {
      process.nextTick(() => {
        if (dataSource.length) {
          this.push(dataSource.shift())
        } else {
          this.push(null)
        }
      })
    }
    
    readable.pause()
    readable.on('data', data => process.stdout.write('\ndata: ' + data))
    
    readable.on('readable', function () {
      while (null !== readable.read()) ;;
    })
    
    

    输出:

    
    data: a
    data: b
    data: c
    
    

    read()返回null时,意味着当前缓存数据不够,而且底层数据还没加进来(异步调用push())。
    此种情况下state.needReadable会被设置为true
    push方法被调用时,由于是暂停模式,不会立即输出数据,而是将数据放入缓存,并触发一次readable事件。

    所以,一旦read被调用,上面的例子中就会形成一个循环:readable事件导致read方法调用,read方法又触发readable事件。

    首次监听readable事件时,还会触发一次read(0)的调用,从而引起_readpush方法的调用,从而启动循环。

    总之,在暂停模式下需要使用readable事件和read方法来消耗流。

    流动模式

    流动模式使用起来更简单一些。

    一般创建流后,监听data事件,或者通过pipe方法将数据导向另一个可写流,即可进入流动模式开始消耗数据。
    尤其是pipe方法中还提供了back pressure机制,所以使用pipe进入流动模式的情况非常普遍。

    本节解释data事件如何能触发流动模式。

    先看一下Readable是如何处理data事件的监听的:

    Readable.prototype.on = function (ev, fn) {
      var res = Stream.prototype.on.call(this, ev, fn)
      if (ev === 'data' && false !== this._readableState.flowing) {
        this.resume()
      }
    
      // 处理readable事件的监听
      // 省略
    
      return res
    }
    
    

    Stream继承自EventEmitter,且是Readable的父类。
    从上面的逻辑可以看出,在将fn加入事件队列后,如果发现处于非暂停模式,则会调用this.resume(),开始流动模式。

    resume()方法先将state.flowing设为true
    然后会在下一个tick中执行flow,试图将缓存读空:

    if (state.flowing) do {
      var chunk = stream.read()
    } while (null !== chunk && state.flowing)
    
    

    flow中每次read()都可能触发push()的调用,
    push()中又可能触发flow()read()的调用,
    这样就形成了数据生生不息的流动。
    其关系可简述为:

    flowing-mode

    下面再详细看一下push()的两个分支:

    if (state.flowing && state.length === 0 && !state.sync) {
      stream.emit('data', chunk)
      stream.read(0)
    } else {
      state.length += state.objectMode ? 1 : chunk.length
      state.buffer.push(chunk)
    
      if (state.needReadable)
        emitReadable(stream)
    }
    
    

    称第一个分支为立即输出。

    在立即输出的情况下,输出数据后,执行read(0),进一步引起_read()push()的调用,从而使数据源源不断地输出。

    在非立即输出的情况下,数据先被添加到缓存中。
    此时有两种情况:

    • state.length为0。
      这时,在调用_read()前,state.needReadable就会被设为true
      因此,一定会调用emitReadable()
      这个方法会在下一个tick中触发readable事件,同时再调用flow(),从而形成流动。
    • state.length不为0。
      由于流动模式下,每次都是从缓存中取第一个元素,所以这时read()返回值一定不为null
      flow()中的循环还在继续。

    此外,从push()的两个分支可以看出来,如果state.flowing设为false,第一个分支便不会再进去,也就不会再调用read(0)
    同时第二个分支中引发flow的调用后,也不会再调用read(),这就完全暂停了底层数据的读取。

    事实上,pause方法就是这样使流从流动模式转换到暂停模式的。

    背压反馈机制

    考虑下面的例子:

    const fs = require('fs')
    fs.createReadStream(file).on('data', doSomething)
    
    

    监听data事件后文件中的内容便立即开始源源不断地传给doSomething()
    如果doSomething处理数据较慢,就需要缓存来不及处理的数据data,占用大量内存。

    理想的情况是下游消耗一个数据,上游才生产一个新数据,这样整体的内存使用就能保持在一个水平。
    Readable提供pipe方法,用来实现这个功能。

    pipe

    pipe方法连接上下游:

    const fs = require('fs')
    fs.createReadStream(file).pipe(writable)
    
    

    writable是一个可写流Writable对象,上游调用其write方法将数据写入其中。
    writable内部维护了一个写队列,当这个队列长度达到某个阈值(state.highWaterMark)时,
    执行write()时返回false,否则返回true

    于是上游可以根据write()的返回值在流动模式和暂停模式间切换:

    readable.on('data', function (data) {
      if (false === writable.write(data)) {
        readable.pause()
      }
    })
    
    writable.on('drain', function () {
      readable.resume()
    })
    
    

    上面便是pipe方法的核心逻辑。

    write()返回false时,调用readable.pause()使上游进入暂停模式,不再触发data事件。
    但是当writable将缓存清空时,会触发一个drain事件,再调用readable.resume()使上游进入流动模式,继续触发data事件。

    看一个例子:

    const stream = require('stream')
    
    var c = 0
    const readable = stream.Readable({
      highWaterMark: 2,
      read: function () {
        process.nextTick(() => {
          var data = c < 6 ? String.fromCharCode(c + 65) : null
          console.log('push', ++c, data)
          this.push(data)
        })
      }
    })
    
    const writable = stream.Writable({
      highWaterMark: 2,
      write: function (chunk, enc, next) {
        console.log('write', chunk)
      }
    })
    
    readable.pipe(writable)
    
    

    输出:

    push 1 A
    write <Buffer 41>
    push 2 B
    push 3 C
    push 4 D
    
    

    虽然上游一共有6个数据(ABCDEF)可以生产,但实际只生产了4个(ABCD)。
    这是因为第一个数据(A)迟迟未能写完(未调用next()),所以后面通过write方法添加进来的数据便被缓存起来。
    下游的缓存队列到达2时,write返回false,上游切换至暂停模式。
    此时下游保存了AB
    由于Readable总是缓存state.highWaterMark这么多的数据,所以上游保存了CD
    从而一共生产出来ABCD四个数据。

    下面使用tick-nodeReadable的debug信息按tick分组:

    ⌘ NODE_DEBUG=stream tick-node pipe.js
    STREAM 18930: pipe count=1 opts=undefined
    STREAM 18930: resume
    ---------- TICK 1 ----------
    STREAM 18930: resume read 0
    STREAM 18930: read 0
    STREAM 18930: need readable false
    STREAM 18930: length less than watermark true
    STREAM 18930: do read
    STREAM 18930: flow true
    STREAM 18930: read undefined
    STREAM 18930: need readable true
    STREAM 18930: length less than watermark true
    STREAM 18930: reading or ended false
    ---------- TICK 2 ----------
    push 1 A
    STREAM 18930: ondata
    write <Buffer 41>
    STREAM 18930: read 0
    STREAM 18930: need readable true
    STREAM 18930: length less than watermark true
    STREAM 18930: do read
    ---------- TICK 3 ----------
    push 2 B
    STREAM 18930: ondata
    STREAM 18930: call pause flowing=true
    STREAM 18930: pause
    STREAM 18930: read 0
    STREAM 18930: need readable true
    STREAM 18930: length less than watermark true
    STREAM 18930: do read
    ---------- TICK 4 ----------
    push 3 C
    STREAM 18930: emitReadable false
    STREAM 18930: emit readable
    STREAM 18930: flow false
    ---------- TICK 5 ----------
    STREAM 18930: maybeReadMore read 0
    STREAM 18930: read 0
    STREAM 18930: need readable false
    STREAM 18930: length less than watermark true
    STREAM 18930: do read
    ---------- TICK 6 ----------
    push 4 D
    ---------- TICK 7 ----------
    
    
    • TICK 0: readable.resume()
    • TICK 1: readable在流动模式下开始从底层读取数据
    • TICK 2: A被输出,同时执行readable.read(0)
    • TICK 3: B被输出,同时执行readable.read(0)
      writable.write('B')返回false
      执行readable.pause()切换至暂停模式。
    • TICK 4: TICK 3中read(0)引起push('C')的调用,C被加到readable缓存中。
      此时,writable中有ABreadable中有C
      这时已在暂停模式,但在readable.push('C')结束前,发现缓存中只有1个数据,小于设定的highWaterMark(2),故准备在下一个tick再读一次数据。
    • TICK 5: 调用read(0)从底层取数据。
    • TICK 6: push('D')D被加到readable缓存中。
      此时,writable中有ABreadable中有CD
      readable缓存中有2个数据,等于设定的highWaterMark(2),不再从底层读取数据。

    可以认为,随着下游缓存队列的增加,上游写数据时受到的阻力变大。
    这种back pressure大到一定程度时上游便停止写,等到back pressure降低时再继续。

    消耗驱动的数据生产

    使用pipe()时,数据的生产和消耗形成了一个闭环。
    通过负反馈调节上游的数据生产节奏,事实上形成了一种所谓的拉式流(pull stream)。

    用喝饮料来说明拉式流和普通流的区别的话,普通流就像是将杯子里的饮料往嘴里倾倒,动力来源于上游,数据是被推往下游的;拉式流则是用吸管去喝饮料,动力实际来源于下游,数据是被拉去下游的。

    所以,使用拉式流时,是“按需生产”。
    如果下游停止消耗,上游便会停止生产。
    所有缓存的数据量便是两者的阈值和。

    当使用Transform作为下游时,尤其需要注意消耗。

    const stream = require('stream')
    
    var c = 0
    const readable = stream.Readable({
      highWaterMark: 2,
      read: function () {
        process.nextTick(() => {
          var data = c < 26 ? String.fromCharCode(c++ + 97) : null
          console.log('push', data)
          this.push(data)
        })
      }
    })
    
    const transform = stream.Transform({
      highWaterMark: 2,
      transform: function (buf, enc, next) {
        console.log('transform', buf)
        next(null, buf)
      }
    })
    
    readable.pipe(transform)
    
    

    以上代码执行结果为:

    push a
    transform <Buffer 61>
    push b
    transform <Buffer 62>
    push c
    push d
    push e
    push f
    
    

    可见,并没有将26个字母全生产出来。

    Transform中有两个缓存:可写端的缓存和可读端的缓存。

    调用transform.write()时,如果可读端缓存未满,数据会经过变换后加入到可读端的缓存中。
    当可读端缓存到达阈值后,再调用transform.write()则会将写操作缓存到可写端的缓存队列。
    当可写端的缓存队列也到达阈值时,transform.write()返回false,上游进入暂停模式,不再继续transform.write()
    所以,上面的transform中实际存储了4个数据,ab在可读端(经过了_transform的处理),cd在可写端(还未经过_transform处理)。

    此时,由前面一节的分析可知,readable将缓存ef,之后便不再生产数据。

    这三个缓存加起来的长度恰好为6,所以一共就生产了6个数据。

    要想将26个数据全生产出来,有两种做法。
    第一种是消耗transform中可读端的缓存,以拉动上游的生产:

    readable.pipe(transform).pipe(process.stdout)
    
    

    第二种是,不要将数据存入可读端中,这样可读端的缓存便会一直处于数据不足状态,上游便会源源不断地生产数据:

    const transform = stream.Transform({
      highWaterMark: 2,
      transform: function (buf, enc, next) {
        next()
      }
    })
    

    参考来源
    1.https://tech.meituan.com/stream-basics.html
    2.https://tech.meituan.com/stream-internals.html
    3.https://tech.meituan.com/stream-in-action.html

    相关文章

      网友评论

          本文标题:Node Native (2) stream

          本文链接:https://www.haomeiwen.com/subject/rpjbkftx.html