美文网首页
EventStream

EventStream

作者: 胖子罗 | 来源:发表于2020-04-06 12:25 被阅读0次

    背景:使用EventStream读取大文件,获取文件总行数

    //使用event stream
    const fs = require('fs')
    const path = require('path')
    const es = require('event-stream')
    let total = 0
    fs.createReadStream(path.join(__dirname, './test.json'))
    .pipe(es.split()) //defaults to lines.
    .pipe(
      es.map(function (line, cb) {
        //do something with the line
       console.log(line)
       total++
      })
      )
      .pipe(
        es.wait(function(){//相当于监听前一个 stream 的 end 事件
          console.log('total',total)
        })
      )
    

    问题:es.wait并没有监听到end事件,始终无法输出total。

    尝试排查

    思路一:是否和es.wait函数有关?

    //
    // wait. callback when 'end' is emitted, with all chunks appended as string.
    //
    
    es.wait = function (callback) {
      var arr = []
      return es.through(function (data) { arr.push(data) },
        function () {
          var body = Buffer.isBuffer(arr[0]) ? Buffer.concat(arr)
            : arr.join('')
          this.emit('data', body)
          this.emit('end')
          if(callback) callback(null, body)
        })
    }
    

    es.wait这个函数返回的是through对象(也是一个stream),传了2个function参数进去,第二个function实际会回调触发 this.emit('end'),所以继续排查这个function是否被调用

    function through (write, end, opts) {
      write = write || function (data) { this.queue(data) }
      end = end || function () { this.queue(null) }
    
      var ended = false, destroyed = false, buffer = [], _ended = false
      var stream = new Stream()
      stream.readable = stream.writable = true
      stream.paused = false
    
    //  stream.autoPause   = !(opts && opts.autoPause   === false)
      stream.autoDestroy = !(opts && opts.autoDestroy === false)
    
      stream.write = function (data) {
        write.call(this, data)
        return !stream.paused
      }
    

    through构造函数的上面这个end参数就是之前es.wait的第二个参数,看看它什么时候触发调用,找到相关代码:

    stream.on('end', function () {
        //end()
        stream.readable = false
        if(!stream.writable && stream.autoDestroy)
          process.nextTick(function () {
            stream.destroy()
          })
      })
    
      function _end () {
        stream.writable = false
        end.call(stream)//只有这个地方涉及end的触发
        if(!stream.readable && stream.autoDestroy)
          stream.destroy()
      }
    
      stream.end = function (data) {
        if(ended) return
        ended = true
        if(arguments.length) stream.write(data)
        _end() // will emit or queue
        return stream
      }
    

    在_end函数断点debug发现执行end.call(stream)并没有跳回es.wait的第二个回调函数,显然这个地方出现异常,联想我们这次用的pipe和stream,那可能是上层的stream调用异常了。

    思路二:是否和上一个pipe的 es.map有关?
    es.map对应的实际是mapStream类,找到这个流中和end相关代码:

    function end (data) {
        //if end was called with args, write it, 
        ended = true //write will emit 'end' if ended is true
        stream.writable = false
        if(data !== undefined) {
          return queueData(data, inputs)
        } else if (inputs == outputs) { //wait for processing //满足这个条件才可以
          stream.readable = false, stream.emit('end'), stream.destroy() 
        }
      }
    
      stream.end = function (data) {
        if(ended) return
        end(data)
      }
    

    只有满足inputs === ouputs才可以触发end事件,仔细跟踪代码,outputs在queueData函数被递增修改,queueData -> next -> stream.write,跟踪到:

    // Wrap the mapper function by calling its callback with the order number of
      // the item in the stream.
      function wrappedMapper (input, number, callback) {
        return mapper.call(null, input, function(err, data){
          callback(err, data, number)
        })
      }
    
      stream.write = function (data) {
        if(ended) throw new Error('map stream is not writable')
        inNext = false
        inputs ++
    
        try {
          //catch sync errors and handle them like async errors
          var written = wrappedMapper(data, inputs, next)//next在这里被调用
          paused = (written === false)
          return !paused
        } catch (err) {
          //if the callback has been called syncronously, and the error
          //has occured in an listener, throw it again.
          if(inNext)
            throw err
          next(err)
          return !paused
        }
      }
    

    仔细看wrappedMapper这个函数,next实际就是它的callback参数,这个callback的调用依赖于mapper.call的第三个参数,mapper又是什么呢?

    //map stream 导出代码
    module.exports = function (mapper, opts) {
    
      var stream = new Stream()
        , inputs = 0
        , outputs = 0
        , ended = false
        , paused = false
        , destroyed = false
        , lastWritten = 0
        , inNext = false
    
      opts = opts || {};
      var errorEventName = opts.failures ? 'failure' : 'error';
    
    //使用map stream的代码
      es.map(function (line, cb) {
        //do something with the line
       console.log(line)
       total++
      })
    

    显然mapper就是这个函数:

    function (line, cb) {
        //do something with the line
       console.log(line)
       total++
      })
    

    mapper的第三个参数就是cb了,cb没有调用,那当然导致后面一连串函数没有调用,最终影响end事件的触发,那我们加上cb调用看看:

    //使用event stream
    const fs = require('fs')
    const path = require('path')
    const es = require('event-stream')
    let total = 0
    fs.createReadStream(path.join(__dirname, './test.json'))
    .pipe(es.split()) //defaults to lines.
    .pipe(
      es.map(function (line, cb) {
        //do something with the line
        //console.log(line)
       total++
       cb(null,line)//这个回调很重要
      })
    
      )
      .pipe(
        es.wait(function(){//相当于监听前一个 stream 的 end 事件
          console.log('total',total)
        })
      )
    

    结果能输出total了:


    image.png

    总结

    使用stream和pipe搭配使用的时候,需要注意前后stream的关联影响。

    相关文章

      网友评论

          本文标题:EventStream

          本文链接:https://www.haomeiwen.com/subject/nebvphtx.html