美文网首页
解析NodeJs中的流

解析NodeJs中的流

作者: allyong | 来源:发表于2018-07-02 12:02 被阅读0次

解析NodeJS中的流

前言

作为前端开发同学,相信大家对下面的一段代码应该不会陌生:

gulp.src('client/templates/*.jade')
    .pipe(jade())
    .pipe(minify())
    .pipe(gulp.dest('build/minified_templates'));

这可能是项目构建里的一条命令,将指定目录下面的后缀为jade的文件,首先通过jade去解析,然后压缩,最后输出到指定目录。这一系列操作能够顺序进行,其实就是流在背后驱动。

本文会从代码角度,去剖析nodejs中常见的几种流的内部原理,最后我们自已实现一个类似上面那种pipeline式的调用。

  1. Readable
  2. Writable
  3. Duplex
  4. Transform

Readable

什么是可读流呢?

以生活中家里吃水打个比方,我们一般不会直接一次性从水井里打上很多水,因为一次用不完,时间放久会变质。我们的做法是,家里建个蓄水池,蓄水池通过水管连接到水泵,从井中抽水出来,然后再接根水管到蓄水池,供我们日常按需取水使用。

为保证水质,蓄水池蓄水不能太多(水位不能太高),够用一两天就行。满足这个条件的水位姑且称之为适量水位。一旦水位低于适量水位时,水泵开始向蓄水池中抽水,反之,停止抽水。

在可读流中,资源也是不会直接流向消费者,而是先被push到内部的的缓存池中,缓存池有一个水位标记highWatermark(简称hwm),当缓存池中的资源量state.length小于hwm时,资源会被不断push到缓存池中,直到达到这个阀值为至。

注:为行文方便,文中标有伪代码的地方是不可运行的,只保留部分片段代码,有地方代码层级有拉平,变量的名称,位置等和node源码也不严格相同。

可读流有流动和暂停两种模式,可读流能够流动起来的驱动力在其resume方法,如:

//伪代码

function flow(stream){
    const state=stream._readableState;
    state.flowing=true;
    while(state.flowing && stream.read() !== null);
}
Readable.prototype.resume=function(){
    //...
    flow(this);
    //...
}

可以看到,该方法将先将流转变成流动模式,然后在该模式下,不断地去消耗资源,直到资源耗尽或流转变成暂停模式为止。

下面来看看,在read方法里,资源是如何被消耗掉,又是如何被补充到缓存池中的:

//伪代码

function fromList(n,state){
   var ret;
   ret= /*从缓存池state.buffe中取数据*/
   return ret;
}
Readable.prototype.read=function(n){
    var state=this._readableState;
    //...
    this._read(state.highWaterMark);
    //...

    var ret=fromList(n,state);

    if(ret===null){
        //...
    }else{
        //标记消耗量
        state.length-=n;
    }

    //触发事件,发送数据
    if(ret !==null) this.emit('data',ret);

    return data;

};

如果能从缓存池中取到资源,将触发data事件,并发送资源数据。

另外,在消耗资源前,先试着向缓存池中补充资源。 _read是一个虚方法,是每个继承了Readable的类都实现的方法。在该方法里,则要调用push方法向缓存池中注入资源:

//伪代码

Readable.prototype.push=function(chunk){
    var stream=this;

    var  state=stream._readableState;

    if(state.flowing&&state.length===0&&!state.sync){
        stream.emit('data',chunk);
        stream.read(0);
    }else{
        //...
        //省略state.objectMode等情况
        state.buffer.push(chunk);
    }
}

初始状态下,监听可读流的data事件即可进入流动模式:

//伪代码
Readable.prototype.on=function(ev,fn){
    //...
    var state=this._readableState;
    if(ev==='data'){
        if(state.flowing!==false) this.resume();
    }else if(ev==='readable'){
        state.needReadable=true;

        //...
        this.emit('readable');
        flow(this);
    }
    //...
}

下面,我们实现一个Readable:

const stream = require('stream');

//模拟底层资源
const resource = ['www', 'jd', 'com'];

readable = stream.Readable({
    read: function () {
        if (resource.length) this.push(resource.shift());
        else this.push(null);
    }
});

readable.on('data', function (data) {
    console.log(data.toString());
});

/*
www
jd
com
*/

监听data事件,进入流动模式,_read方法取得底层资源,加入到缓存池,从缓存池中取到数据,触发data事件,并发送数据。输出如下:

但是在暂停模式下,流的驱动要readable事件和read方法来配合使用:

const stream = require('stream');

//模拟底层资源
const resource = ['www', 'jd', 'com'];

readable = stream.Readable({
    objectMode: true,
    read: function () {
        if (resource.length) this.push(resource.shift());
        else this.push(null);
    }
});

//state.flowing=false
readable.pause();
readable.on('data', function (data) {
    console.log(data.toString());
});

readable.on('readable', function () {
    while (null !== readable.read());
});

通过实现_read方法,可以让可读流自由对接不同的度层数据源,如,fs.createReadStream 中 :

//伪代码

//FileReadStream继承了Readable
FileReadStream.prototye._read=function(n){
    //..
    fs.read(this.fd,/*文件读取相关参数*/,funtion(err,bytesRead){
        if(!err){
            var b=/*读取的文件内容*/;
            this.push(b);
        }
    });
}
简单总结下:
  1. _read方法对接底层数据源,然后push数据到缓存池中
  2. read方法从缓存池中读取数据,或以data事件形式将数据发送给消费者

Writable

可写流可以作为可读流的消耗方,当可读流的生产过快时,会导致可写流"溢出",writable.write()返回false,此时要告知可读流暂停生产:readable.pause(),一旦可写流里的资源被消耗了,又要通知(以drain事件)可读流恢复生产(readable.resume()),其实这就是pipe方法的内部原理。

下面看看write方法的基本实现:

//伪代码

function onwriteStateUpdate(state){
    //...
    state.length-=state.writelen;
    state.writelen=0;
}
Writable.prototype.write=function(chunk,encoding,cb){
    var stream=this;
    var state=this._writableState;
    var ret=false;
    var len=state.objectMode?1:chunk.length;
    state.writelen=len;

    state.length+=len;
    
    //false为溢出
    ret=state.length<state.highWateMark;

    this._write(chunk,encoding,function(){
        //...

        onwriteStateUpdate(state);
        cb();
        if(state.length===0&&state.needDrain){
            state.needDrain=false;
            stream.emit('drain');
        }

    });


    return ret;
}

可以看到,在write内部调用了_write方法,将资源的消耗转交给了这个虚方法,在实现这个虚方法时,切记要调用第三参数标识的回调函数,在这里进行了资源消耗后状态更新等处理。

//伪代码

//FileWriteStream继承了Writeable
FileWriteStream.prototye._write=function(data,encoing,cb){
    //..
    fs.write(this.fd,/*写文件相关参数*/,funtion(err,bytes){
        if(!err){
            cb();
        }
    });
}

上面是fs.createWriteStream对_write的实现。

终于可以讲到pipe方法了,pipe其实是Readable的方法,原理上面已经讲过了,直接上图看代码吧:
//伪代码

Readable.prototype.pipe=function(dest,pipeOpts){
    var src=this;
    //...

    src.on('data',function(chunk){
        if(false===dest.write(chunk)){
            //...
            src.pause();
        }
    });

    dest.on('drain',function(src){
        var state=src._readableState;
        //...
        // src.resume()
        state.flowing=true;
        flow(src);
    });

    dest.emit('pipe',src);

    //...
}

好了,结合Readable,Writable,pipe三者写个例子:

const stream = require('stream');

var c = 0;
const readable = stream.Readable({
    highWaterMark: 4,
    read: function () {
        var data = c < 6 ? String.fromCharCode(c + 65) : null;
        ++c;
        //console.log('read: ', ++c, data);
        this.push(data);
    }
});

const writable = stream.Writable({
    highWaterMark: 2,
    write: function (chunk, encoding, done) {
        process.nextTick(() => {
            console.log('write: ', chunk.toString());
            done();
        });
    }
});

readable.pipe(writable);

/*
write: A
write: B
write: C
write: D
write: E
write: F
*/

Duplex

双工流,继承了Readable和Writable的流。该流可同时进行读和写,但是读写是分离的,读写的数据也是不能互通的。._read,_write两虚方法,在继承类中要被实现。

const stream = require('stream');

const resource = ['www', '.jd', '.com', '\n'];
const duplex = stream.Duplex({
    write: function (chunk, encoding, done) {
        console.log(chunk.toString().toUpperCase());
        done();
    },
    read: function () {
        resource.forEach(res => {
            res = res.toString();
            this.push(res);
            if (res === '\n') this.push(null);
        })
    }
});

process.stdin.pipe(duplex).pipe(process.stdout);

/*
www.jd.com
a
A

... 

*/

这个例子中,用pipe把这个双工流分别连接到标准输出、输入流,并自动适配读写端。等同于下面的写法:

process.stdin.pipe(duplex); duplex.pipe(process.stdout);

Transform

转换流,也是一种直通流,就是上面的Duplex流的基础上,将可写读端接受到的数据,在内部经过转换,作为可读端流的数据源。好比自来水流经过滤装置,在装置内部进行过滤处理后,再流出使用一样。

Transfom是Duplex的一种具体实现:

//伪代码

const Duplex = require('_stream_duplex');
const util = require('util');
util.inherits(Transform, Duplex);


function afterTransform(err,data){
    var ts=this._transformState;
    ts.transforming=false;
    //...

    ts.writechunk = null;
    ts.writecb = null;

    if(data != null) this.push(data);
    //...
    var rs=this._readableState;
    
    if(rs.needReadable || rs.length < rs.highWaterMark) {
        this._read(rs.highWaterMark);
    }
}
function Transform (options) {
    if(!(this instanceof Transform))
       return new Transform(options);
    
    this._transformState = {
        afterTransform: afterTransform.bind(this),
        //...
    };
    //...
}
Transform.prototype._write=function(chunk,encoding,cb){
    var ts=this._transformState;

    ts.writecb=cb;
    ts.writechunk=chunk;
    ts.writeencoding=encoding;
    
    if(!ts.transforming){
        var rs=this._readableState;
        if(ts.needTransform || ts.needReadable || rs.length<rs.highWaterMark){  
           this._read(rs.highWaterMark);
        }
    }

}

Transform.prototype._read=function(n){
    var ts=this._transformState;

    if(ts.writechunk!==null && ts.writecb && !ts.transforming){
        ts.transforming=true;
        this._transform(ts.writechunk,ts.writeencoding,ts.afterTransform);
    }

}

Transform.prototype._transform=function(chunk,encoding,cb){
    //对外曝露的虚方法
    //数据在这里被转换,然后在回调cb (afterTransform)里,通过push到可读端的缓存池中
}

Transform.prototype._flush = function() {
    //所有可写读的数据被转换到写读端时执行
}

在可写端的_write方法中,先将数据chunk临时保存到_transformState的writechunk上,然后调用可读端的_read方法,在_read方法里,取到_transformState.writechunk,然后将其传递给虚方法_transform,在这个方法里,完成数据的转换,并将转换好的数据传给回调函数cb,这个函数其实是内部预定好的afterTransform,它会完成各种状态处理,并将数据push到可读端的缓存中。

最后,综合前面所讲,实现一个pipeline式调用:

const stream = require('stream');

var resource = ['www', 'jd', 'com'];
var src = function (res) {
    return stream.Readable({
        read: function () {
            if (res.length) this.push(res.shift());
            else this.push(null);
        }
    });

};

var upper = function () {
    return stream.Transform({
        transform: function (chunk, encoding, done) {
            done(null, chunk.toString().toUpperCase());
        }
    });
};


var join = function () {
    var data = [];
    var _join = stream.Transform({
        transform: function (chunk, encoding, done) {
            data.push(chunk.toString());
            done();
        },
        flush: function () {
            this.push(data.join('.'));
        }
    });


    return _join;
};

var dest = function () {
    //return process.stdout;
    return stream.Writable({
        write: function (chunk, encoding, done) {
            console.log(chunk.toString());
            done();
        }
    });
};

src(resource).pipe(upper())
             .pipe(join())
             .pipe(dest());


/*
WWW.JD.COM
*/

相关文章

  • 解析NodeJs中的流

    解析NodeJS中的流 前言 作为前端开发同学,相信大家对下面的一段代码应该不会陌生: 这可能是项目构建里的一条命...

  • nodejs 中有哪些常用的内置模块

    path模块nodejs中的path模块用于处理文件和目录的路径url模块在nodejs中url模块是用来解析ur...

  • 浅析nodejs中的stream(流)

    这篇文章我们来聊一下nodejs中的stream,也就是nodejs中的流。 什么是流呢?从字面上来看我们应该可以...

  • Node url模块

    url Nodejs的url模块只要提供一些实用函数,用于URL的处理和解析。在Nodejs中可以直接引用url模...

  • nodejs流编程入门

    nodejs中的流 流是什么,应该很好理解,计算机中数据流处处存在。linux的管道设计就是利用了流的概念,所有的...

  • nodejs 实践:express 最佳实践 express-s

    nodejs 实践:express 最佳实践 express-session 解析 nodejs 发展很快,从 n...

  • minimist轻量级的命令行参数解析库

    nodejs的命令行参数解析工具

  • XML解析

    Sax解析:流的方式进行解析流解析:以流的方式进行解析(一行一行解析)Dom解析:加载整个文档,以树的方式解析

  • nodejs文件流初识

    NodeJs中fs模块提供了readFile和readFileSync方法,为什么还要使用文件流呢?其实文件流的优...

  • 前端干货链接-持续更新

    前端文件上传 拖拽上传、图片预览、文件上传nodejs-process-excel :nodejs解析或导出exc...

网友评论

      本文标题:解析NodeJs中的流

      本文链接:https://www.haomeiwen.com/subject/kznkuftx.html