美文网首页
new stream.Writable([options])实现

new stream.Writable([options])实现

作者: 静昕妈妈芦培培 | 来源:发表于2021-02-25 18:30 被阅读0次

例1:创建的流的内部接收到的数据是buffer对象

const { Writable } = require('stream')

//创建可写流
// 默认options.decodeStrings为true,
// 会把传入 stream._write() 的 string 编码为 Buffer,使用的字符编码为调用 stream.write() 时指定的
const writer = new Writable({
    write(chunk, encoding, callback) {
        //下面输出的chunk为buffer对象
        console.log(chunk)
        console.log(encoding)
        console.log(callback)
        //callback必须调用,否则无法进行第二次写入:stream._write()
        // callback()
    }
});


//writable.write(chunk[, encoding][, callback])
//callback <Function> 当数据块被输出到目标后的回调函数。
writer.write('你好,')
writer.write('今天学nodejs了么')

writer.end()

//调用 stream.end() 且缓冲数据都已传给底层系统之后触发。
//也就是说,输入数据完成并关闭了可写流触发
writer.on('finish', () => {
    console.log('数据写入完成')
})

执行结果:只执行了第一次writer.write,后面的writer.write和writer.end都没执行


image.png

修改代码在新建可写流的内部调用callback,

const { Writable } = require('stream')

//创建可写流
// 默认options.decodeStrings为true,
// 会把传入 stream._write() 的 string 编码为 Buffer,使用的字符编码为调用 stream.write() 时指定的
const writer = new Writable({
    write(chunk, encoding, callback) {
        //下面输出的chunk为buffer对象
        console.log(chunk)
        console.log(encoding)
        console.log(callback)
        //callback必须调用,否则无法进行第二次写入:stream._write()
        callback()
    }
});


//writable.write(chunk[, encoding][, callback])
//callback <Function> 当数据块被输出到目标后的回调函数。
writer.write('你好,')
writer.write('今天学nodejs了么', () => {
    console.log('本次写入完成')
})

writer.end()

//调用 stream.end() 且缓冲数据都已传给底层系统之后触发。
//也就是说,输入数据完成并关闭了可写流触发
writer.on('finish', () => {
    console.log('数据写入完成')
})

//如果调用 stream.write(chunk) 返回 false,则当可以继续写入数据到流时会触发 'drain' 事件。
writer.once('drain', () => {
    console.log('触发了drain事件')
})

//监听error事件
writer.on('error', (err) => {
    console.error(err)
})


//监听文件关闭时间
writer.on('close', () => {
    console.log('可写流已被关闭,不能再写入数据')
})

//当在可读流上调用 stream.pipe() 方法时会发出 'pipe' 事件,并将此可写流添加到其目标集。
writer.on('pipe', (src) => {
    console.log('有数据正通过管道流入写入器');
})


//在可读流上调用 stream.unpipe() 方法时会发出 'unpipe'事件,从其目标集中移除此可写流。
// 当可读流通过管道流向可写流发生错误时,也会触发此事件。
writer.on('unpipe', () => {
    console.log('已移除可写流管道');
})

执行结果:writer.write两次和write.end都被调用了


image.png

例2:创建的流的内部接收到的数据是字符串

const { Writable } = require('stream')

//创建可写流
// 默认options.decodeStrings为true,
// 会把传入 stream._write() 的 string 编码为 Buffer,使用的字符编码为调用 stream.write() 时指定的
const writer = new Writable({
    decodeStrings: false, //设置为false,不会把接收到的字符串转换为buffer,
    // 此时创建的可写流内部的write接收到的encoding参数为writer.write传入的字符编码
    write(chunk, encoding, callback) {

        //下面输出的chunk为字符串
        console.log(chunk)
        //encoding 参数将指示字符串的字符编码方式。
        console.log(encoding)
        console.log(callback)
        //callback必须调用,否则无法进行第二次写入:stream._write()
        callback()
    }
});


//writable.write(chunk[, encoding][, callback])
//callback <Function> 当数据块被输出到目标后的回调函数。
writer.write('你好,')
writer.write('今天学nodejs了么',  () => {
    console.log('本次写入完成')
})

writer.end()
//调用 stream.end() 且缓冲数据都已传给底层系统之后触发。
//也就是说,输入数据完成并关闭了可写流触发
writer.on('finish', () => {
    console.log('数据写入完成')
})

//如果调用 stream.write(chunk) 返回 false,则当可以继续写入数据到流时会触发 'drain' 事件。
writer.once('drain', () => {
    console.log('触发了drain事件')
})

//监听error事件
writer.on('error', (err) => {
    console.error(err)
})


//监听文件关闭时间
writer.on('close', () => {
    console.log('可写流已被关闭,不能再写入数据')
})

//当在可读流上调用 stream.pipe() 方法时会发出 'pipe' 事件,并将此可写流添加到其目标集。
writer.on('pipe', (src) => {
    console.log('有数据正通过管道流入写入器');
})


//在可读流上调用 stream.unpipe() 方法时会发出 'unpipe'事件,从其目标集中移除此可写流。
// 当可读流通过管道流向可写流发生错误时,也会触发此事件。
writer.on('unpipe', () => {
    console.log('已移除可写流管道');
})

执行结果:


image.png

例3:创建可写流的时候设置objectMode: true,可以写入除了null以外的任意JavaScript 值

const { Writable } = require('stream')

//创建可写流
// 默认options.decodeStrings为true,
// 会把传入 stream._write() 的 string 编码为 Buffer,使用的字符编码为调用 stream.write() 时指定的
const writer = new Writable({
    objectMode: true,//设置为true,可以写入除了null以外的任意JavaScript 值
    decodeStrings: false, //设置为false,不会把接收到的字符串转换为buffer,
    // 此时创建的可写流内部的write接收到的encoding参数为writer.write传入的字符编码
    write(chunk, encoding, callback) {

        //下面输出的chunk为writer.write传入的任何除了null之外的javascript,没做任何转换
        console.log(chunk)
        console.log(encoding)
        console.log(callback)
        //callback必须调用,否则无法进行第二次写入:stream._write()
        callback()
    }
});


//writable.write(chunk[, encoding][, callback])
//callback <Function> 当数据块被输出到目标后的回调函数。
writer.write('你好,')
writer.write({})
writer.write('今天学nodejs了么',  () => {
    console.log('本次写入完成')
})

writer.end()
//调用 stream.end() 且缓冲数据都已传给底层系统之后触发。
//也就是说,输入数据完成并关闭了可写流触发
writer.on('finish', () => {
    console.log('数据写入完成')
})

//如果调用 stream.write(chunk) 返回 false,则当可以继续写入数据到流时会触发 'drain' 事件。
writer.once('drain', () => {
    console.log('触发了drain事件')
})

//监听error事件
writer.on('error', (err) => {
    console.error(err)
})


//监听文件关闭时间
writer.on('close', () => {
    console.log('可写流已被关闭,不能再写入数据')
})

//当在可读流上调用 stream.pipe() 方法时会发出 'pipe' 事件,并将此可写流添加到其目标集。
writer.on('pipe', (src) => {
    console.log('有数据正通过管道流入写入器');
})


//在可读流上调用 stream.unpipe() 方法时会发出 'unpipe'事件,从其目标集中移除此可写流。
// 当可读流通过管道流向可写流发生错误时,也会触发此事件。
writer.on('unpipe', () => {
    console.log('已移除可写流管道');
})

执行结果:


image.png

必须在 writable._write() 内部同步地调用、或异步地(即不同的时间点)调用 callback 函数,以表明写入成功完成或因错误而失败。
如果调用失败,则 callback 的第一个参数必须是 Error 对象。
如果写入成功,则 callback 的第一个参数为 null。
在 writable._write() 被调用之后且 callback 被调用之前,所有对 writable.write() 的调用都会把要写入的数据缓冲起来。 当调用 callback 时,流将会触发 'drain'事件。
案例暂无:

例四:使用writer.cork()一次性将几个数据快速连续地写入创建的可写流,会将它们放在数组中作为chunks参数全部传给 创建的可写流内部的writev()

const { Writable } = require('stream')



//创建可写流
// 默认options.decodeStrings为true,
// 会把传入 stream._write() 的 string 编码为 Buffer,使用的字符编码为调用 stream.write() 时指定的
const writer = new Writable({
    decodeStrings: false, //设置为false,不会把接收到的字符串转换为buffer,
    // 此时创建的可写流内部的write接收到的encoding参数为writer.write传入的字符编码
    write(chunk, encoding, callback) {
        //下面输出的chunk为字符串
        console.log(chunk)
        //encoding 参数将指示字符串的字符编码。
        console.log(encoding)
        console.log(callback)
        //必须在 writable._write() 内部同步地调用、或异步地(即不同的时间点)调用 callback 函数,
        // 以表明写入成功完成或因错误而失败。 如果调用失败,则 callback 的第一个参数必须是 Error 对象。
        // 如果写入成功,则 callback 的第一个参数为 null。
        //在 writable._write() 被调用之后且 callback 被调用之前,所有对 writable.write() 的调用都会把要写入的数据缓冲起来。 
        //当调用 callback 时,流将会触发 'drain'事件。

        //callback必须调用,否则无法进行第二次写入:stream._write()
        callback()
    },
    //在writer.cork()调用之后,writer.uncork()或writer.end()调用之前,通过writer.write写入的所有数据块,
    //会存放在数组里,作为chunks参数传给创建的可写流的内部的writerv方法里
    writev(chunks, callback) {
        //chunks:要写入的多个数据块。 每个数据块的格式为:{ chunk: ..., encoding: ... }。
        //callback当全部数据块被处理完成后的回调函数。在这个回调函数里会一次调用
        // 在writer.cork()调用之后,writer.uncork()或writer.end()调用之前,通过writer.write传入的所有回调
        console.log(chunks)
        console.log(callback)
        callback()
    }
});




//writable.cork() 方法强制把所有写入的数据都缓冲到内存中。
// 当调用 stream.uncork() 或 stream.end() 方法时,缓冲的数据才会被输出。
//writable.cork() 的主要目的是为了适应将几个数据快速连续地写入流的情况。
// writable.cork() 不会立即将它们转发到底层的目标,而是缓冲所有数据块,直到调用 writable.uncork(),
// 这会将它们全部传给 writable._writev()(如果存在)。
//writable.uncork() 方法将调用 stream.cork() 后缓冲的所有数据输出到目标。
writer.cork()
//writable.write(chunk[, encoding][, callback])
//callback <Function> 当数据块被输出到目标后的回调函数。
writer.write('你好,', () => {
    console.log('第一次写入完成')
})
writer.write('今天学nodejs了么',  () => {
    console.log('第二次写入完成')
})
// process.nextTick(() => writer.uncork());
writer.end()

//调用 stream.end() 且缓冲数据都已传给底层系统之后触发。
//也就是说,输入数据完成并关闭了可写流触发
writer.on('finish', () => {
    console.log('数据写入完成')
})

//如果调用 stream.write(chunk) 返回 false,则当可以继续写入数据到流时会触发 'drain' 事件。
writer.once('drain', () => {
    console.log('触发了drain事件')
})

//监听error事件
writer.on('error', (err) => {
    console.error(err)
})


//监听文件关闭时间
writer.on('close', () => {
    console.log('可写流已被关闭,不能再写入数据')
})

//当在可读流上调用 stream.pipe() 方法时会发出 'pipe' 事件,并将此可写流添加到其目标集。
writer.on('pipe', (src) => {
    console.log('有数据正通过管道流入写入器');
})


//在可读流上调用 stream.unpipe() 方法时会发出 'unpipe'事件,从其目标集中移除此可写流。
// 当可读流通过管道流向可写流发生错误时,也会触发此事件。
writer.on('unpipe', () => {
    console.log('已移除可写流管道');
})


执行结果:


image.png

例5:销毁创建的可写流,销毁流后会触发close事件,如果销毁的时候传了一个表示错误的参数,会传给创建的可写流的内部的destroy方法的err参数

const { Writable } = require('stream')



//创建可写流
// 默认options.decodeStrings为true,
// 会把传入 stream._write() 的 string 编码为 Buffer,使用的字符编码为调用 stream.write() 时指定的
const writer = new Writable({
    decodeStrings: false, //设置为false,不会把接收到的字符串转换为buffer,
    // 此时创建的可写流内部的write接收到的encoding参数为writer.write传入的字符编码
    write(chunk, encoding, callback) {
        //下面输出的chunk为字符串
        console.log(chunk)
        //encoding 参数将指示字符串的字符编码。
        console.log(encoding)
        console.log(callback)
        //必须在 writable._write() 内部同步地调用、或异步地(即不同的时间点)调用 callback 函数,
        // 以表明写入成功完成或因错误而失败。 如果调用失败,则 callback 的第一个参数必须是 Error 对象。
        // 如果写入成功,则 callback 的第一个参数为 null。
        //在 writable._write() 被调用之后且 callback 被调用之前,所有对 writable.write() 的调用都会把要写入的数据缓冲起来。 当调用 callback 时,流将会触发 'drain'事件。

        //callback必须调用,否则无法进行第二次写入:stream._write()
        callback()
    },

    //在writer.cork()调用之后,writer.uncork()或writer.end()调用之前,通过writer.write写入的所有数据块,
    //会存放在数组里,作为chunks参数传给创建的可写流的内部的writerv方法里
    writev(chunks, callback) {
        //chunks:要写入的多个数据块。 每个数据块的格式为:{ chunk: ..., encoding: ... }。
        //callback当全部数据块被处理完成后的回调函数。在这个回调函数里会一次调用
        // 在writer.cork()调用之后,writer.uncork()或writer.end()调用之前,通过writer.write传入的所有回调
        console.log(chunks)
        console.log(callback)
        callback()
    },

    //销毁流。 可选地触发 'error',并且触发 'close' 事件(除非将 emitClose 设置为 false)。
    // 调用该方法后,可写流就结束了,之后再调用 write() 或 end() 都会导致 ERR_STREAM_DESTROYED 错误。
    // 这是销毁流的最直接的方式。
    destroy(err, callback) {
        //err是writer.destroy(new Error('流已被销毁'))作为参数传入的错误
        console.log(err)
        callback()
    },
    final(callback) {
        //callback为调用writer.end()时传入的回调
        callback()
    }
});




//writable.write(chunk[, encoding][, callback])
//callback <Function> 当数据块被输出到目标后的回调函数。
writer.write('你好,', () => {
    console.log('第一次写入完成')
})
writer.write('今天学nodejs了么',  () => {
    console.log('第二次写入完成')
})
writer.end(() => {
    console.log('可写流已被关闭')
})

//销毁创建的可写流,销毁流后会触发close事件,如果销毁的时候传了一个表示错误的参数,
// 会传给创建的可写流的内部的destroy方法的err参数
writer.destroy(new Error('流已被销毁'))

//writable.destroyed在调用了 writable.destroy() 之后为 true。
console.log('writable.destroyed为' + writer.destroyed)

//调用 stream.end() 且缓冲数据都已传给底层系统之后触发。
//也就是说,输入数据完成并关闭了可写流触发
writer.on('finish', () => {
    console.log('数据写入完成')
})



//如果调用 stream.write(chunk) 返回 false,则当可以继续写入数据到流时会触发 'drain' 事件。
writer.once('drain', () => {
    console.log('触发了drain事件')
})

//监听error事件
writer.on('error', (err) => {
    console.error('销毁流的时候并没有触发error事件')
})


//监听文件关闭时间
writer.on('close', () => {
    console.log('可写流已被关闭,不能再写入数据')
})

//当在可读流上调用 stream.pipe() 方法时会发出 'pipe' 事件,并将此可写流添加到其目标集。
writer.on('pipe', (src) => {
    console.log('有数据正通过管道流入写入器');
})


//在可读流上调用 stream.unpipe() 方法时会发出 'unpipe'事件,从其目标集中移除此可写流。
// 当可读流通过管道流向可写流发生错误时,也会触发此事件。
writer.on('unpipe', () => {
    console.log('已移除可写流管道');
})



执行结果:


image.png

相关文章

网友评论

      本文标题:new stream.Writable([options])实现

      本文链接:https://www.haomeiwen.com/subject/betsfltx.html