美文网首页
Nodejs Stream

Nodejs Stream

作者: 黑曼巴yk | 来源:发表于2019-10-13 10:36 被阅读0次

前言

前端工程领域中使用Nodejs处处掣肘,原因无非是Nodejs中困难部分:文件和网络。而文件和网络都依赖一个重要的对象Stream.

Buffer

ES6引入TypedArray之前,js没有读取或者操作二进制数据流的机制。Buffer作为Nodejs Api一部分被引入,以便能够和TCP等网络流和文件流进行交互。
Buffer 用来处理二进制数据,它的实例类似整型数组,不过缓冲区大小在创建时候确定,不能调整。它不经过V8的内存分配机制,Buffer是一个JS和C++集合的模块,内存由C++申请,javascript分配。

为什么应该使用流

在Node中,IO都是异步的,在硬盘以及网络交互的过程中会涉及到传递回调函数的过程.

const http= require('http');
const fs= require('fs');
const server = http.createServer((req, res) => {
  fs.readFile('./a.mp4', (err, data) => {
    res.end(data);
  })
})
server.listen(8000);

上面这段代码在每次请求时候,都会把整个文件a.pm4存入内存,然后再把结果返回给客户端。a.pm4可能达到2G以上。这样的话程序会消耗大量的内存。
我们采用流的方式来实现

const http= require('http');
const fs= require('fs');
const server = http.createServer((req, res) => {
  const stream = fs.createReadStream('./a.pm4');
  stream.pipe(res);
})
server.listen(8000);

Readable Stream

基本使用
const peaks = [
    'Tallac',
    'Ralston',
    'Rubicon',
    'Twin Peaks',
    'Rose',
    'Freel Peak',
    'Dankun Kun'
];
class StreamFromArray extends Readable {
    constructor(array){
        super()
        this.array = array;
        this.index = 0;
    }
    _read() {
        if(this.array.length <= this.index) {
            return this.push(null);
        }
        this.index += 1;
        this.push(this.array[this.index]);
    }
}

const streamFromArray = new StreamFromArray(peaks);
streamFromArray.on('data', (chunk) => {
    console.log('data',chunk)
})

streamFromArray.on('end', (chunk) => {
    console.log('end',chunk)
})
设置encoding
constructor(array){
        super({encoding:'utf-8'})
        this.array = array;
        this.index = 0;
}
objectMode

对于流来说,它不仅仅可以处理Buffer,String这样的数据类型,还能处理复杂的对象类型,通过开启objectModel属性即可。
对于可读流来说,从下游可读流中读到的数据是对象类型,对于可写流来说,从上游写入的数据是对象类型。

const { Readable } = require('stream');

const peaks = [
    'Tallac',
    'Ralston',
    'Rubicon',
    'Twin Peaks',
    'Rose',
    'Freel Peak',
    'Dankun Kun'
];
class StreamFromArray extends Readable {
    constructor(array){
        super({objectMode: true,});
        this.array = array;
        this.index = 0;
    }
    _read() {
        if(this.array.length <= this.index) {
            return this.push(null);
        }
        this.index += 1;
        this.push({
            data: this.array[this.index],
            index: this.index
        });
    }
}

const streamFromArray = new StreamFromArray(peaks);
streamFromArray.on('data', (chunk) => {
    console.log('data',chunk)
})

streamFromArray.on('end', (chunk) => {
    console.log('end',chunk)
})
两种模式
  1. flowing(流动)模式
    数据会自动读取并且塞入到data事件的回调函数中进行消费。
const fs = require('fs');
const readStream = fs.createReadStream('./small.txt');

readStream.on('data', (chunk) => {
  console.log(readStream._readableState.flowing);  // true
    console.log(chunk.toString());  // 这里因为内容是英文,如果是中文内容需要设置编码模式
});

console.log(readStream._readableState.flowing); // true
  1. 暂停模式
const fs = require('fs');
const readStream = fs.createReadStream('./small.txt');

console.log(readStream._readableState.flowing);  // null

// 使用 once 来绑定监听者回调,一方面避免 readable 事件的多次触发?一方面避免内存泄漏
readStream.once('readable', () => {
    while(chunk = readStream.read()){
        readStream.pause();
        console.log(readStream._readableState.flowing); // false
        console.log(chunk.toString());
        readStream.resume();
        console.log(readStream._readableState.flowing); // true
    }
});

我们可以看到可读流的初始状态是null,如果没有设置pipe方法或者data事件的监听回调,那么就一直是null,但是如果我们使用流的pause方法强制让流暂停读取数据,那么内部状态值就变成false,调用resume又会让流变成流动模式。

三种状态

可读流内部存储了一个变量readStream._readableState.flowing来记录当前流是否处于流动模式,值分别为 null, false, true。

Writable Stream

  1. 基本使用
const { Readable } = require('stream');
const fs = require('fs');
const readStream = fs.createReadStream('./a.mp4');
const writeStream = fs.createWriteStream('./b.mp4');

readStream.on('data', chunk => {
    writeStream.write(chunk);
})

readStream.on('error', error => {
    console.log('error')
})

readStream.on('end', () => {
    console.log('end')
})

背压

const { Readable } = require('stream');
const fs = require('fs');
const readStream = fs.createReadStream('./a.mp4');
const writeStream = fs.createWriteStream('./copied.mp4');

readStream.on('data', chunk => {
    const flag = writeStream.write(chunk);
    if(!flag) {
        console.log('backpressure');
        readStream.pause();
    }
})

readStream.on('error', error => {
    console.log('error')
})

readStream.on('end', () => {
    writeStream.end();
    console.log('end')
})
writeStream.on('drain', () => {
    readStream.resume()
})

Transform转换流

const { Readable, Transform } = require('stream');

const peaks = [
    'Tallac',
    'Ralston',
    'Rubicon',
    'Twin Peaks',
    'Rose',
    'Freel Peak',
    'Dankun Kun'
];
class StreamFromArray extends Readable {
    constructor(array){
        super();
        this.array = array;
        this.index = 0;
    }
    _read() {
        if(this.array.length <= this.index) {
            return this.push(null);
        }
        this.index += 1;
        this.push(this.array[this.index]);
    }
}

const streamFromArray = new StreamFromArray(peaks);

class ReplaceText extends Transform {
    constructor(char) {
        super();
        this.replaceChar = char;
    }
    _transform(chunk, encoding, callabck) {
        const transformChunk = chunk.toString()
            .replace(/[a-zA-Z0-9]/g, this.replaceChar);
        this.push(transformChunk);
        callabck();
    }
    _flush(callabck) {
        this.push('more stuff is being passed...');
        callabck();
    }
}

const xStream = new ReplaceText('X');
streamFromArray
    .pipe(xStream)
    .pipe(process.stdout);

相关文章

  • [读] NodeJS stream 一:Buffer

    NodeJS stream 一:Buffer

  • nodejs stream

    参考 nodejs stream 手册英文原版stream-handbook Streams come to us...

  • Stream in Nodejs

    reference: https://nodesource.com/blog/understanding-stre...

  • Nodejs Stream

    前言 前端工程领域中使用Nodejs处处掣肘,原因无非是Nodejs中困难部分:文件和网络。而文件和网络都依赖一个...

  • nodejs stream

    为什么需要stream 不用stream的情况:太耗内存 优化:用pipe连接读文件流和http可写流:image...

  • improve scheme

    Nodejs Stream: https://nodesource.com/blog/understanding-...

  • 2.nodejs通过stream方式加载页面 / 通过fs模块加

    nodejs通过stream方式加载页面(stream方式加载优化性能) 配置index.js文件,创建服务,设置...

  • nodejs--stream

    Stream是一个抽象接口,非常重要,应用广泛,通过流接口可以实现一些磁盘文件读写,套字节,http请求交互等。 ...

  • Nodejs Stream 手册

    编译地址:https://github.com/substack/stream-handbook译者:jabez1...

  • Nodejs Stream 初识

    linux文件 为了区别不同文件类型,会有一个type来进行区别普通文件:包含任意数据目录: 相关一组文件的索引套...

网友评论

      本文标题:Nodejs Stream

      本文链接:https://www.haomeiwen.com/subject/ynltmctx.html