背景:使用EventStream读取大文件,获取文件总行数
//使用event stream
const fs = require('fs')
const path = require('path')
const es = require('event-stream')
let total = 0
fs.createReadStream(path.join(__dirname, './test.json'))
.pipe(es.split()) //defaults to lines.
.pipe(
es.map(function (line, cb) {
//do something with the line
console.log(line)
total++
})
)
.pipe(
es.wait(function(){//相当于监听前一个 stream 的 end 事件
console.log('total',total)
})
)
问题:es.wait并没有监听到end事件,始终无法输出total。
尝试排查
思路一:是否和es.wait函数有关?
//
// wait. callback when 'end' is emitted, with all chunks appended as string.
//
es.wait = function (callback) {
var arr = []
return es.through(function (data) { arr.push(data) },
function () {
var body = Buffer.isBuffer(arr[0]) ? Buffer.concat(arr)
: arr.join('')
this.emit('data', body)
this.emit('end')
if(callback) callback(null, body)
})
}
es.wait这个函数返回的是through对象(也是一个stream),传了2个function参数进去,第二个function实际会回调触发 this.emit('end'),所以继续排查这个function是否被调用
function through (write, end, opts) {
write = write || function (data) { this.queue(data) }
end = end || function () { this.queue(null) }
var ended = false, destroyed = false, buffer = [], _ended = false
var stream = new Stream()
stream.readable = stream.writable = true
stream.paused = false
// stream.autoPause = !(opts && opts.autoPause === false)
stream.autoDestroy = !(opts && opts.autoDestroy === false)
stream.write = function (data) {
write.call(this, data)
return !stream.paused
}
through构造函数的上面这个end参数就是之前es.wait的第二个参数,看看它什么时候触发调用,找到相关代码:
stream.on('end', function () {
//end()
stream.readable = false
if(!stream.writable && stream.autoDestroy)
process.nextTick(function () {
stream.destroy()
})
})
function _end () {
stream.writable = false
end.call(stream)//只有这个地方涉及end的触发
if(!stream.readable && stream.autoDestroy)
stream.destroy()
}
stream.end = function (data) {
if(ended) return
ended = true
if(arguments.length) stream.write(data)
_end() // will emit or queue
return stream
}
在_end函数断点debug发现执行end.call(stream)并没有跳回es.wait的第二个回调函数,显然这个地方出现异常,联想我们这次用的pipe和stream,那可能是上层的stream调用异常了。
思路二:是否和上一个pipe的 es.map有关?
es.map对应的实际是mapStream类,找到这个流中和end相关代码:
function end (data) {
//if end was called with args, write it,
ended = true //write will emit 'end' if ended is true
stream.writable = false
if(data !== undefined) {
return queueData(data, inputs)
} else if (inputs == outputs) { //wait for processing //满足这个条件才可以
stream.readable = false, stream.emit('end'), stream.destroy()
}
}
stream.end = function (data) {
if(ended) return
end(data)
}
只有满足inputs === ouputs才可以触发end事件,仔细跟踪代码,outputs在queueData函数被递增修改,queueData -> next -> stream.write,跟踪到:
// Wrap the mapper function by calling its callback with the order number of
// the item in the stream.
function wrappedMapper (input, number, callback) {
return mapper.call(null, input, function(err, data){
callback(err, data, number)
})
}
stream.write = function (data) {
if(ended) throw new Error('map stream is not writable')
inNext = false
inputs ++
try {
//catch sync errors and handle them like async errors
var written = wrappedMapper(data, inputs, next)//next在这里被调用
paused = (written === false)
return !paused
} catch (err) {
//if the callback has been called syncronously, and the error
//has occured in an listener, throw it again.
if(inNext)
throw err
next(err)
return !paused
}
}
仔细看wrappedMapper这个函数,next实际就是它的callback参数,这个callback的调用依赖于mapper.call的第三个参数,mapper又是什么呢?
//map stream 导出代码
module.exports = function (mapper, opts) {
var stream = new Stream()
, inputs = 0
, outputs = 0
, ended = false
, paused = false
, destroyed = false
, lastWritten = 0
, inNext = false
opts = opts || {};
var errorEventName = opts.failures ? 'failure' : 'error';
//使用map stream的代码
es.map(function (line, cb) {
//do something with the line
console.log(line)
total++
})
显然mapper就是这个函数:
function (line, cb) {
//do something with the line
console.log(line)
total++
})
mapper的第三个参数就是cb了,cb没有调用,那当然导致后面一连串函数没有调用,最终影响end事件的触发,那我们加上cb调用看看:
//使用event stream
const fs = require('fs')
const path = require('path')
const es = require('event-stream')
let total = 0
fs.createReadStream(path.join(__dirname, './test.json'))
.pipe(es.split()) //defaults to lines.
.pipe(
es.map(function (line, cb) {
//do something with the line
//console.log(line)
total++
cb(null,line)//这个回调很重要
})
)
.pipe(
es.wait(function(){//相当于监听前一个 stream 的 end 事件
console.log('total',total)
})
)
结果能输出total了:
image.png
总结
使用stream和pipe搭配使用的时候,需要注意前后stream的关联影响。
网友评论