例1:创建的流的内部接收到的数据是buffer对象
const { Writable } = require('stream')
//创建可写流
// 默认options.decodeStrings为true,
// 会把传入 stream._write() 的 string 编码为 Buffer,使用的字符编码为调用 stream.write() 时指定的
const writer = new Writable({
write(chunk, encoding, callback) {
//下面输出的chunk为buffer对象
console.log(chunk)
console.log(encoding)
console.log(callback)
//callback必须调用,否则无法进行第二次写入:stream._write()
// callback()
}
});
//writable.write(chunk[, encoding][, callback])
//callback <Function> 当数据块被输出到目标后的回调函数。
writer.write('你好,')
writer.write('今天学nodejs了么')
writer.end()
//调用 stream.end() 且缓冲数据都已传给底层系统之后触发。
//也就是说,输入数据完成并关闭了可写流触发
writer.on('finish', () => {
console.log('数据写入完成')
})
执行结果:只执行了第一次writer.write,后面的writer.write和writer.end都没执行

修改代码
在新建可写流的内部调用callback,
const { Writable } = require('stream')
//创建可写流
// 默认options.decodeStrings为true,
// 会把传入 stream._write() 的 string 编码为 Buffer,使用的字符编码为调用 stream.write() 时指定的
const writer = new Writable({
write(chunk, encoding, callback) {
//下面输出的chunk为buffer对象
console.log(chunk)
console.log(encoding)
console.log(callback)
//callback必须调用,否则无法进行第二次写入:stream._write()
callback()
}
});
//writable.write(chunk[, encoding][, callback])
//callback <Function> 当数据块被输出到目标后的回调函数。
writer.write('你好,')
writer.write('今天学nodejs了么', () => {
console.log('本次写入完成')
})
writer.end()
//调用 stream.end() 且缓冲数据都已传给底层系统之后触发。
//也就是说,输入数据完成并关闭了可写流触发
writer.on('finish', () => {
console.log('数据写入完成')
})
//如果调用 stream.write(chunk) 返回 false,则当可以继续写入数据到流时会触发 'drain' 事件。
writer.once('drain', () => {
console.log('触发了drain事件')
})
//监听error事件
writer.on('error', (err) => {
console.error(err)
})
//监听文件关闭时间
writer.on('close', () => {
console.log('可写流已被关闭,不能再写入数据')
})
//当在可读流上调用 stream.pipe() 方法时会发出 'pipe' 事件,并将此可写流添加到其目标集。
writer.on('pipe', (src) => {
console.log('有数据正通过管道流入写入器');
})
//在可读流上调用 stream.unpipe() 方法时会发出 'unpipe'事件,从其目标集中移除此可写流。
// 当可读流通过管道流向可写流发生错误时,也会触发此事件。
writer.on('unpipe', () => {
console.log('已移除可写流管道');
})
执行结果:writer.write两次和write.end都被调用了

例2:创建的流的内部接收到的数据是字符串
const { Writable } = require('stream')
//创建可写流
// 默认options.decodeStrings为true,
// 会把传入 stream._write() 的 string 编码为 Buffer,使用的字符编码为调用 stream.write() 时指定的
const writer = new Writable({
decodeStrings: false, //设置为false,不会把接收到的字符串转换为buffer,
// 此时创建的可写流内部的write接收到的encoding参数为writer.write传入的字符编码
write(chunk, encoding, callback) {
//下面输出的chunk为字符串
console.log(chunk)
//encoding 参数将指示字符串的字符编码方式。
console.log(encoding)
console.log(callback)
//callback必须调用,否则无法进行第二次写入:stream._write()
callback()
}
});
//writable.write(chunk[, encoding][, callback])
//callback <Function> 当数据块被输出到目标后的回调函数。
writer.write('你好,')
writer.write('今天学nodejs了么', () => {
console.log('本次写入完成')
})
writer.end()
//调用 stream.end() 且缓冲数据都已传给底层系统之后触发。
//也就是说,输入数据完成并关闭了可写流触发
writer.on('finish', () => {
console.log('数据写入完成')
})
//如果调用 stream.write(chunk) 返回 false,则当可以继续写入数据到流时会触发 'drain' 事件。
writer.once('drain', () => {
console.log('触发了drain事件')
})
//监听error事件
writer.on('error', (err) => {
console.error(err)
})
//监听文件关闭时间
writer.on('close', () => {
console.log('可写流已被关闭,不能再写入数据')
})
//当在可读流上调用 stream.pipe() 方法时会发出 'pipe' 事件,并将此可写流添加到其目标集。
writer.on('pipe', (src) => {
console.log('有数据正通过管道流入写入器');
})
//在可读流上调用 stream.unpipe() 方法时会发出 'unpipe'事件,从其目标集中移除此可写流。
// 当可读流通过管道流向可写流发生错误时,也会触发此事件。
writer.on('unpipe', () => {
console.log('已移除可写流管道');
})
执行结果:

例3:创建可写流的时候设置objectMode: true,可以写入除了null以外的任意JavaScript 值
const { Writable } = require('stream')
//创建可写流
// 默认options.decodeStrings为true,
// 会把传入 stream._write() 的 string 编码为 Buffer,使用的字符编码为调用 stream.write() 时指定的
const writer = new Writable({
objectMode: true,//设置为true,可以写入除了null以外的任意JavaScript 值
decodeStrings: false, //设置为false,不会把接收到的字符串转换为buffer,
// 此时创建的可写流内部的write接收到的encoding参数为writer.write传入的字符编码
write(chunk, encoding, callback) {
//下面输出的chunk为writer.write传入的任何除了null之外的javascript,没做任何转换
console.log(chunk)
console.log(encoding)
console.log(callback)
//callback必须调用,否则无法进行第二次写入:stream._write()
callback()
}
});
//writable.write(chunk[, encoding][, callback])
//callback <Function> 当数据块被输出到目标后的回调函数。
writer.write('你好,')
writer.write({})
writer.write('今天学nodejs了么', () => {
console.log('本次写入完成')
})
writer.end()
//调用 stream.end() 且缓冲数据都已传给底层系统之后触发。
//也就是说,输入数据完成并关闭了可写流触发
writer.on('finish', () => {
console.log('数据写入完成')
})
//如果调用 stream.write(chunk) 返回 false,则当可以继续写入数据到流时会触发 'drain' 事件。
writer.once('drain', () => {
console.log('触发了drain事件')
})
//监听error事件
writer.on('error', (err) => {
console.error(err)
})
//监听文件关闭时间
writer.on('close', () => {
console.log('可写流已被关闭,不能再写入数据')
})
//当在可读流上调用 stream.pipe() 方法时会发出 'pipe' 事件,并将此可写流添加到其目标集。
writer.on('pipe', (src) => {
console.log('有数据正通过管道流入写入器');
})
//在可读流上调用 stream.unpipe() 方法时会发出 'unpipe'事件,从其目标集中移除此可写流。
// 当可读流通过管道流向可写流发生错误时,也会触发此事件。
writer.on('unpipe', () => {
console.log('已移除可写流管道');
})
执行结果:

必须在 writable._write() 内部同步地调用、或异步地(即不同的时间点)调用 callback 函数,以表明写入成功完成或因错误而失败。
如果调用失败,则 callback 的第一个参数必须是 Error 对象。
如果写入成功,则 callback 的第一个参数为 null。
在 writable._write() 被调用之后且 callback 被调用之前,所有对 writable.write() 的调用都会把要写入的数据缓冲起来。 当调用 callback 时,流将会触发 'drain'事件。
案例暂无:
例四:使用writer.cork()一次性将几个数据快速连续地写入创建的可写流,会将它们放在数组中作为chunks参数全部传给 创建的可写流内部的writev()
const { Writable } = require('stream')
//创建可写流
// 默认options.decodeStrings为true,
// 会把传入 stream._write() 的 string 编码为 Buffer,使用的字符编码为调用 stream.write() 时指定的
const writer = new Writable({
decodeStrings: false, //设置为false,不会把接收到的字符串转换为buffer,
// 此时创建的可写流内部的write接收到的encoding参数为writer.write传入的字符编码
write(chunk, encoding, callback) {
//下面输出的chunk为字符串
console.log(chunk)
//encoding 参数将指示字符串的字符编码。
console.log(encoding)
console.log(callback)
//必须在 writable._write() 内部同步地调用、或异步地(即不同的时间点)调用 callback 函数,
// 以表明写入成功完成或因错误而失败。 如果调用失败,则 callback 的第一个参数必须是 Error 对象。
// 如果写入成功,则 callback 的第一个参数为 null。
//在 writable._write() 被调用之后且 callback 被调用之前,所有对 writable.write() 的调用都会把要写入的数据缓冲起来。
//当调用 callback 时,流将会触发 'drain'事件。
//callback必须调用,否则无法进行第二次写入:stream._write()
callback()
},
//在writer.cork()调用之后,writer.uncork()或writer.end()调用之前,通过writer.write写入的所有数据块,
//会存放在数组里,作为chunks参数传给创建的可写流的内部的writerv方法里
writev(chunks, callback) {
//chunks:要写入的多个数据块。 每个数据块的格式为:{ chunk: ..., encoding: ... }。
//callback当全部数据块被处理完成后的回调函数。在这个回调函数里会一次调用
// 在writer.cork()调用之后,writer.uncork()或writer.end()调用之前,通过writer.write传入的所有回调
console.log(chunks)
console.log(callback)
callback()
}
});
//writable.cork() 方法强制把所有写入的数据都缓冲到内存中。
// 当调用 stream.uncork() 或 stream.end() 方法时,缓冲的数据才会被输出。
//writable.cork() 的主要目的是为了适应将几个数据快速连续地写入流的情况。
// writable.cork() 不会立即将它们转发到底层的目标,而是缓冲所有数据块,直到调用 writable.uncork(),
// 这会将它们全部传给 writable._writev()(如果存在)。
//writable.uncork() 方法将调用 stream.cork() 后缓冲的所有数据输出到目标。
writer.cork()
//writable.write(chunk[, encoding][, callback])
//callback <Function> 当数据块被输出到目标后的回调函数。
writer.write('你好,', () => {
console.log('第一次写入完成')
})
writer.write('今天学nodejs了么', () => {
console.log('第二次写入完成')
})
// process.nextTick(() => writer.uncork());
writer.end()
//调用 stream.end() 且缓冲数据都已传给底层系统之后触发。
//也就是说,输入数据完成并关闭了可写流触发
writer.on('finish', () => {
console.log('数据写入完成')
})
//如果调用 stream.write(chunk) 返回 false,则当可以继续写入数据到流时会触发 'drain' 事件。
writer.once('drain', () => {
console.log('触发了drain事件')
})
//监听error事件
writer.on('error', (err) => {
console.error(err)
})
//监听文件关闭时间
writer.on('close', () => {
console.log('可写流已被关闭,不能再写入数据')
})
//当在可读流上调用 stream.pipe() 方法时会发出 'pipe' 事件,并将此可写流添加到其目标集。
writer.on('pipe', (src) => {
console.log('有数据正通过管道流入写入器');
})
//在可读流上调用 stream.unpipe() 方法时会发出 'unpipe'事件,从其目标集中移除此可写流。
// 当可读流通过管道流向可写流发生错误时,也会触发此事件。
writer.on('unpipe', () => {
console.log('已移除可写流管道');
})
执行结果:

例5:销毁创建的可写流,销毁流后会触发close事件,如果销毁的时候传了一个表示错误的参数,会传给创建的可写流的内部的destroy方法的err参数
const { Writable } = require('stream')
//创建可写流
// 默认options.decodeStrings为true,
// 会把传入 stream._write() 的 string 编码为 Buffer,使用的字符编码为调用 stream.write() 时指定的
const writer = new Writable({
decodeStrings: false, //设置为false,不会把接收到的字符串转换为buffer,
// 此时创建的可写流内部的write接收到的encoding参数为writer.write传入的字符编码
write(chunk, encoding, callback) {
//下面输出的chunk为字符串
console.log(chunk)
//encoding 参数将指示字符串的字符编码。
console.log(encoding)
console.log(callback)
//必须在 writable._write() 内部同步地调用、或异步地(即不同的时间点)调用 callback 函数,
// 以表明写入成功完成或因错误而失败。 如果调用失败,则 callback 的第一个参数必须是 Error 对象。
// 如果写入成功,则 callback 的第一个参数为 null。
//在 writable._write() 被调用之后且 callback 被调用之前,所有对 writable.write() 的调用都会把要写入的数据缓冲起来。 当调用 callback 时,流将会触发 'drain'事件。
//callback必须调用,否则无法进行第二次写入:stream._write()
callback()
},
//在writer.cork()调用之后,writer.uncork()或writer.end()调用之前,通过writer.write写入的所有数据块,
//会存放在数组里,作为chunks参数传给创建的可写流的内部的writerv方法里
writev(chunks, callback) {
//chunks:要写入的多个数据块。 每个数据块的格式为:{ chunk: ..., encoding: ... }。
//callback当全部数据块被处理完成后的回调函数。在这个回调函数里会一次调用
// 在writer.cork()调用之后,writer.uncork()或writer.end()调用之前,通过writer.write传入的所有回调
console.log(chunks)
console.log(callback)
callback()
},
//销毁流。 可选地触发 'error',并且触发 'close' 事件(除非将 emitClose 设置为 false)。
// 调用该方法后,可写流就结束了,之后再调用 write() 或 end() 都会导致 ERR_STREAM_DESTROYED 错误。
// 这是销毁流的最直接的方式。
destroy(err, callback) {
//err是writer.destroy(new Error('流已被销毁'))作为参数传入的错误
console.log(err)
callback()
},
final(callback) {
//callback为调用writer.end()时传入的回调
callback()
}
});
//writable.write(chunk[, encoding][, callback])
//callback <Function> 当数据块被输出到目标后的回调函数。
writer.write('你好,', () => {
console.log('第一次写入完成')
})
writer.write('今天学nodejs了么', () => {
console.log('第二次写入完成')
})
writer.end(() => {
console.log('可写流已被关闭')
})
//销毁创建的可写流,销毁流后会触发close事件,如果销毁的时候传了一个表示错误的参数,
// 会传给创建的可写流的内部的destroy方法的err参数
writer.destroy(new Error('流已被销毁'))
//writable.destroyed在调用了 writable.destroy() 之后为 true。
console.log('writable.destroyed为' + writer.destroyed)
//调用 stream.end() 且缓冲数据都已传给底层系统之后触发。
//也就是说,输入数据完成并关闭了可写流触发
writer.on('finish', () => {
console.log('数据写入完成')
})
//如果调用 stream.write(chunk) 返回 false,则当可以继续写入数据到流时会触发 'drain' 事件。
writer.once('drain', () => {
console.log('触发了drain事件')
})
//监听error事件
writer.on('error', (err) => {
console.error('销毁流的时候并没有触发error事件')
})
//监听文件关闭时间
writer.on('close', () => {
console.log('可写流已被关闭,不能再写入数据')
})
//当在可读流上调用 stream.pipe() 方法时会发出 'pipe' 事件,并将此可写流添加到其目标集。
writer.on('pipe', (src) => {
console.log('有数据正通过管道流入写入器');
})
//在可读流上调用 stream.unpipe() 方法时会发出 'unpipe'事件,从其目标集中移除此可写流。
// 当可读流通过管道流向可写流发生错误时,也会触发此事件。
writer.on('unpipe', () => {
console.log('已移除可写流管道');
})
执行结果:

网友评论