美文网首页
我理解的NODEJS Stream

我理解的NODEJS Stream

作者: sinbad_3815 | 来源:发表于2018-02-05 17:12 被阅读0次

    众所周知,NODEJS在高并发,I/O密集型应用操作的时候有很多优势,而这些都脱离不了“流”的支撑。请求流,响应流,文件流,Socket流,甚至console模块都使用了流。而流的实现,尤其是其内部的实现,在整个NODEJS的学习中就很有学习的必要。

    流的模型可以总结为“生产者,消费者”模型。流的一端生产数据(可以理解为从水龙头放水),另一端消费数据(水放出来后你干嘛用不管,消费就好了)。当然,其内部代码实现会考虑很多细节的地方,这些可以通过调试进入源码查看很多细节的地方。

    在源码中跟流相关的模块有:

    • lib/module.js
    • lib/stream_readable.js
    • lib/stream_writable.js
    • lib/stream_transform.js
    • lib/stream_duplex.js
      源码非常清晰,这就对应流的四种类型,Readable流,Writable流,Transform流,Duplex流。
      其中Readable和Writable是重点,这两个搞明白,Transform和Duplex就比较简单了。

    Readable Stream
    Readable Stream有两种模式,一种是Flowing Mode,流动模式;另外一种是Paused Mode,暂停模式。
    切换到流动模式的方式有:

    • 监听data时间
      rs.on("data", (chunk)=>{});
    • 调用stream.resume方法
    • 调用stream.pipe方法将数据发送给writable stream

    切换到暂停模式的方法有:

    • 调用stream.pause方法
    • 如果存在管道,调用stream.unpipe方法

    tip: 这两种流是可以随时切换的

    流动模式和暂停模式有什么区别,为什么要这么设计。
    流动模式就是像流水一样源源不断的读取数据(注意不是到缓存对象),不管你消费不消费。
    暂停模式可以暂时不读取数据,关闭水龙头。
    有一个用的比较多的词语叫背压,一般来说,读的速度会比写入的速度快,如果不暂停,还是源源不断的读取数据,会造成内存过大,消耗性能,这个时候最好的方式是消费多少,读取多少,由此引申出管道的概念。
    最好的管道就是生产和消费同步,如果读的过快,先暂停读取,有需要再通知读取即可。
    从上面的语义描述可以看出,流是基于事件的,事件在这里承担着消息的注册和发送。

    上面的都是概念相关,现在来一个简单版的可读流,可写流。代码虽然简化了很多,但是对于理解整个流的流程,会非常有帮助。

    先来看流动模式的可读流

    let EventEmitter = require('events');
    let fs = require('fs');
    class ReadStream extends EventEmitter {
        constructor(path, options) {
            super(path, options);
            this.path = path;
            this.flags = options.flags || 'r';
            this.mode = options.mode || 0o666;
            this.highWaterMark = options.highWaterMark || 64 * 1024;
            this.pos = this.start = options.start || 0;
            this.end = options.end;
            this.encoding = options.encoding;
            this.flowing = null;
            this.buffer = Buffer.alloc(this.highWaterMark);
            this.open();//准备打开文件读取
            //当给这个实例添加了任意的监听函数时会触发newListener
            this.on('newListener',(type,listener)=>{
                //如果监听了data事件,流会自动切换的流动模式
                if(type == 'data'){
                  this.flowing = true;
                  this.read();
                }
            });
        }
        read(){
            if(typeof this.fd != 'number'){
                return this.once('open',()=>this.read());
            }
            let howMuchToRead = this.end?Math.min(this.end - this.pos + 1,this.highWaterMark):this.highWaterMark;
            //this.buffer并不是缓存区
            console.log('howMuchToRead',howMuchToRead);
            fs.read(this.fd,this.buffer,0,howMuchToRead,this.pos,(err,bytes)=>{//bytes是实际读到的字节数
                if(err){
                    if(this.autoClose)
                        this.destroy();
                    return this.emit('error',err);
                }
                if(bytes){
                    let data = this.buffer.slice(0,bytes);
                    this.pos += bytes;
                    data = this.encoding?data.toString(this.encoding):data;
                    this.emit('data',data);
                    if(this.end && this.pos > this.end){
                       return this.endFn();
                    }else{
                        if(this.flowing)
                          this.read();
                    }
                }else{
                    return this.endFn();
                }
    
            })
        }
        endFn(){
            this.emit('end');
            this.destroy();
        }
        open() {
            fs.open(this.path,this.flags,this.mode,(err,fd)=>{
               if(err){
                   if(this.autoClose){
                       this.destroy();
                       return this.emit('error',err);
                   }
               }
               this.fd = fd;
               this.emit('open');
            })
        }
        destroy(){
            fs.close(this.fd,()=>{
                this.emit('close');
            });
        }
        pipe(dest){
            this.on('data',data=>{
                let flag = dest.write(data);
                if(!flag){
                    this.pause();
                }
            });
            dest.on('drain',()=>{
                this.resume();
            });
        }
        //可读流会进入流动模式,当暂停的时候,
        pause(){
            this.flowing = false;
        }
        resume(){
           this.flowing = true;
           this.read();
        }
    }
    module.exports = ReadStream;
    

    暂停模式的可读流

    let fs = require('fs');
    let EventEmitter = require('events');
    
    class ReadStream extends EventEmitter {
        constructor(path, options) {
            super(path, options);
            this.path = path;
            this.highWaterMark = options.highWaterMark || 64 * 1024;
            this.buffer = Buffer.alloc(this.highWaterMark);
            this.flags = options.flags || 'r';
            this.encoding = options.encoding;
            this.mode = options.mode || 0o666;
            this.start = options.start || 0;
            this.end = options.end;
            this.pos = this.start;
            this.autoClose = options.autoClose || true;
            this.bytesRead = 0;
            this.closed = false;
            this.flowing;
            this.needReadable = false;
            this.length = 0;
            this.buffers = [];
            this.on('end', function () {
                if (this.autoClose) {
                    this.destroy();
                }
            });
            this.on('newListener', (type) => {
                if (type == 'data') {
                    this.flowing = true;
                    this.read();
                }
                if (type == 'readable') {
                    this.read(0);
                }
            });
            this.open();
        }
    
        open() {
            fs.open(this.path, this.flags, this.mode, (err, fd) => {
                if (err) {
                    if (this.autoClose) {
                        this.destroy();
                        return this.emit('error', err);
                    }
                }
                this.fd = fd;
                this.emit('open');
            });
        }
    
        read(n) {
            if (typeof this.fd != 'number') {
                return this.once('open', () => this.read());
            }
            n = parseInt(n, 10);
            if (n != n) {
                n = this.length;
            }
            if (this.length == 0)
                this.needReadable = true;
            let ret;
            if (0 < n < this.length) {
                ret = Buffer.alloc(n);
                let b;
                let index = 0;
                while (null != (b = this.buffers.shift())) {
                    for (let i = 0; i < b.length; i++) {
                        ret[index++] = b[i];
                        if (index == ret.length) {
                            this.length -= n;
                            b = b.slice(i + 1);
                            this.buffers.unshift(b);
                            break;
                        }
                    }
                }
                if (this.encoding) ret = ret.toString(this.encoding);
            }
    
            let _read = () => {
                let m = this.end ? Math.min(this.end - this.pos + 1, this.highWaterMark) : this.highWaterMark;
                fs.read(this.fd, this.buffer, 0, m, this.pos, (err, bytesRead) => {
                    if (err) {
                        return
                    }
                    let data;
                    if (bytesRead > 0) {
                        data = this.buffer.slice(0, bytesRead);
                        this.pos += bytesRead;
                        this.length += bytesRead;
                        if (this.end && this.pos > this.end) {
                            if (this.needReadable) {
                                this.emit('readable');
                            }
    
                            this.emit('end');
                        } else {
                            this.buffers.push(data);
                            if (this.needReadable) {
                                this.emit('readable');
                                this.needReadable = false;
                            }
    
                        }
                    } else {
                        if (this.needReadable) {
                            this.emit('readable');
                        }
                        return this.emit('end');
                    }
                })
            }
            if (this.length == 0 || (this.length < this.highWaterMark)) {
                _read(0);
            }
            return ret;
        }
    
        destroy() {
            fs.close(this.fd, (err) => {
                this.emit('close');
            });
        }
    
        pause() {
            this.flowing = false;
        }
    
        resume() {
            this.flowing = true;
            this.read();
        }
    
        pipe(dest) {
            this.on('data', (data) => {
                let flag = dest.write(data);
                if (!flag) this.pause();
            });
            dest.on('drain', () => {
                this.resume();
            });
            this.on('end', () => {
                dest.end();
            });
        }
    }
    module.exports = ReadStream;
    

    可写流

    let fs = require('fs');
    let EventEmitter = require('events');
    
    class WriteStream extends EventEmitter {
        constructor(path, options) {
            super(path, options);
            this.path = path;
            this.flags = options.flags || 'w';
            this.mode = options.mode || 0o666;
            this.start = options.start || 0;
            this.pos = this.start;//文件的写入索引
            this.encoding = options.encoding || 'utf8';
            this.autoClose = options.autoClose;
            this.highWaterMark = options.highWaterMark || 16 * 1024;
            this.buffers = [];//缓存区
            this.writing = false;//表示内部正在写入数据
            this.length = 0;//表示缓存区字节的长度
            this.open();
        }
    
        open() {
            fs.open(this.path, this.flags, this.mode, (err, fd) => {
                if (err) {
                    if (this.autoClose) {
                        this.destroy();
                    }
                    return this.emit('error', err);
                }
                this.fd = fd;
                this.emit('open');
            });
        }
    
        //如果底层已经在写入数据的话,则必须当前要写入数据放在缓冲区里
        write(chunk, encoding, cb) {
            chunk = Buffer.isBuffer(chunk)?chunk:Buffer.from(chunk,this.encoding);
            let len = chunk.length;
            //缓存区的长度加上当前写入的长度
            this.length += len;
            //判断当前最新的缓存区是否小于最高水位线
            let ret = this.length < this.highWaterMark;
            if (this.writing) {//表示正在向底层写数据,则当前数据必须放在缓存区里
                this.buffers.push({
                    chunk,
                    encoding,
                    cb
                });
            } else {//直接调用底层的写入方法进行写入
                //在底层写完当前数据后要清空缓存区
                this.writing = true;
                this._write(chunk, encoding, () => this.clearBuffer());
            }
            return ret;
        }
    
        clearBuffer() {
            //取出缓存区中的第一个buffer
            //8 7
            let data = this.buffers.shift();
            if(data){
                this._write(data.chunk,data.encoding,()=>this.clearBuffer())
            }else{
                this.writing = false;
                //缓存区清空了
                this.emit('drain');
            }
        }
    
        _write(chunk, encoding, cb) {
           if(typeof this.fd != 'number'){
               return this.once('open',()=>this._write(chunk, encoding, cb));
           }
            fs.write(this.fd,chunk,0,chunk.length,this.pos,(err,bytesWritten)=>{
                if(err){
                    if(this.autoClose){
                        this.destroy();
                        this.emit('error',err);
                    }
                }
                this.pos += bytesWritten;
                //写入多少字母,缓存区减少多少字节
                this.length -= bytesWritten;
                cb && cb();
           })
        }
    
        destroy() {
            fs.close(this.fd, () => {
                this.emit('close');
            })
        }
    }
    module.exports = WriteStream;
    

    相关文章

      网友评论

          本文标题:我理解的NODEJS Stream

          本文链接:https://www.haomeiwen.com/subject/puwbzxtx.html