Node.js流模式编程详解

作者: 朱耀锋 | 来源:发表于2016-04-22 15:35 被阅读3129次

    本文是Node.js设计模式的笔记, 代码都是来自 <Node.js Design Patterns> by Mario Casciaro.

    流的重要性

    一般我们处理数据有两种模式, 一种是buffer模式, 一种是stream模式, buffer模式就是取完数据一次性操作, stream模式就是边取数据边操作.
    举个例子, 如果打开一个2G的文件, 用buffer模式就是先分配2G的内存, 把文件全部读出来, 然后开始操作内存, 而用流模式的方法就是边读数据, 边开始处理.

    从这里看出stream模式无论是在空间和时间上都优于buffer模式:
    在空间上, 内存只会占用当前需要处理的一块数据区域的大小, 而不是整个文件.
    在时间上, 因为不需要全部的数据就可以开始处理, 时间就相当于节约了, 从串行变成了并行操作(这里的并行不是多线程的并行, 而是生产者和消费者并行).

    还有一个好处就是链式调用, 也就是可组合操作, 大大增加了代码的可重用性.
    比如下面这个代码(中间的pipe可以很方便的增删):

    fs.createReadStream(file)
         .pipe(zlib.createGzip())
         //.pipe(crypto.createCipher('aes192', 'secret'))
         .pipe(req)
         .on('finish', function() {
           console.log('File succesfully sent');
         });
    

    开始编码

    nodejs里面的stream一般分四种, 其中转换流是一种特殊的读写流.

    • 输入流(stream.Readable)
    • 输出流(stream.Writable)
    • 读写流(stream.Duplex)
    • 转换流(stream.Transform)

    另外, nodejs里面的流有两种模式, 二进制模式和对象模式.

    • 二进制模式, 每个分块都是buffer或者string对象.
    • 对象模式, 流内部处理的是一系列普通对象.

    输入流(stream.Readable)

    先看一下怎么使用输入流, 这里一般有两种方法, 一个是非流动模式, 一个是流动模式.
    非流动模式就是直接调用read()方法, 被动模式就是监听data事件.
    下面直接看代码:

    // 非流动模式
    process.stdin
        .on('readable', function() {
            // 有数据到了, 拼命读, 直到读完.
            var chunk;
            console.log('New data available');
            while((chunk = process.stdin.read()) !== null) {
                console.log(
                'Chunk read: (' + chunk.length + ') "' + chunk.toString() + '"'
                );
            }})
        .on('end', function() {
           process.stdout.write('End of stream');
        });
    

    接下来看流动模式怎么玩

    // 流动模式
    process.stdin
         .on('data', function(chunk) {
           console.log('New data available');
           console.log(
             'Chunk read: (' + chunk.length + ')" ' +
             chunk.toString() + '"'
           );
         })
         .on('end', function() {
           process.stdout.write('End of stream');
         });
    

    同样实现一个输入流也很简单, 主要是

    1. 继承Readable类
    2. 实现_read(size)接口(一般带下划线的表示内部函数, 调用者不要直接调用, 相当于C++里面的protect方法, 只是javascript里面没有对方法做区分, 只能是命名上面区分一下了).
      下面看示例代码:
    // randomStream.js
    var stream = require('stream');
    var util = require('util');
    var chance = require('chance').Chance();
    
    function RandomStream(options) {
        // option支持3个参数
        // encoding String 用于转换Buffer到String的编码类型(默认null)
        // objMode Boolean 用户指定是否是对象模式(默认false)
        // highWaterMark Number 最高水位(可读的最大数据量), 默认是16K
        stream.Readable.call(this, options);
    }
    util.inherits(RandomStream, stream.Readable);
    RandomStream.prototype._read = function(size) {
        // 这是一个随机产生数据的流, 5%的概率输出null, 也就是流停止.
        var chunk = chance.string();
        console.log('Pushing chunk of size:' + chunk.length);
        this.push(chunk, 'utf8');
        if(chance.bool({likelihood: 5})) {
           this.push(null);
        }
    }
    module.exports = RandomStream;
    

    好, 接下来是如何使用:

    // generateRandom.js
    var RandomStream = require('./randomStream');
    var randomStream = new RandomStream();
    randomStream.on('readable', function() {
        var chunk;
        while((chunk = randomStream.read()) !== null) {
            console.log("Chunk received: " + chunk.toString());
        }
    });
    

    输出流(stream.Writable)

    先是怎么用:

    // 写数据
    writable.write(chunk, [encoding], [callback]);
    // 结束流
    writable.end([chunk], [encoding], [callback])
    

    回压(back-pressure)

    这里涉及一个概念, 回压(back-pressure), 意思就是当生产者速度大于消费者的时候, 输出流的水位会不断上升, 当到达设定的最高水位时候, 就会写入失败, 这时候也就是产生了back-pressure, 那如何处理呢, 此时输入流在水位降低到零点的时候会有一个drain事件发送, 只要监听这个事件, 在事件发生的时候就可以继续向流写入数据了.
    直接看代码:

    var chance = require('chance').Chance();
    require('http').createServer(function (req, res) {
        res.writeHead(200, {'Content-Type': 'text/plain'});
        function generateMore() {             //[1]
            while(chance.bool({likelihood: 95})) {
                var shouldContinue = res.write(
                    chance.string({length: (16 * 1024) – 1})
                );
                if(!shouldContinue) {             //[3]
                    console.log('Backpressure');
                    return res.once('drain', generateMore);
                }
            }
            res.end('\nThe end...\n', function() {
                console.log('All data was sent');
            });
        }
        generateMore();
    }).listen(8080, function () {
      console.log('Listening');
    });
    

    同样, 实现一个输出流也很简单, 只要继承Writable类, 实现_write()接口即可.
    示例代码:

    // toFileStream.js
    var stream = require('stream');
    var fs = require('fs');
    var util = require('util');
    var path = require('path');
    var mkdirp = require('mkdirp');
    
    function ToFileStream() {
        // 这次我们用对象模式
        stream.Writable.call(this, {objectMode: true});
    };
    util.inherits(ToFileStream, stream.Writable);
    ToFileStream.prototype._write = function(chunk, encoding, callback) {
        var self = this;
        mkdirp(path.dirname(chunk.path), function(err) {
            if(err) {
                return callback(err);
            }
            fs.writeFile(chunk.path, chunk.content, callback);
        });
    }
    module.exports = ToFileStream;
    

    下面是调用的代码

    var ToFileStream = require('./toFileStream');
    var tfs = new ToFileStream();
    tfs.write({path: "file1.txt", content: "Hello"});
    tfs.write({path: "file2.txt", content: "Node.js"});
    tfs.write({path: "file3.txt", content: "Streams"});
    tfs.end(function() {
        console.log("All files created");
    });
    

    读写流(stream.Duplex)

    就是把输入流和输出流的接口都实现了.
    注意:
    此时option参数是同时传给了内部的Readable和Writeable, 如果要使用不同的选项, 就要分开配置,
    像这样:
    this._writableState.objectMode
    this._readableState.objectMode
    同时, Duplex又多了一个选项allowHalfOpen, 这个选项的意思是, 当其中一个流关闭的时候, 另外一条流是否也同时关闭, 默认是true, 也就是不同时关闭.

    转换流(stream.Transform)

    对于读写流来说, 要实现的是 _read() 和 _write() 接口, 而转换流要实现的是 _transform() 和 _flush()接口.
    区别是什么, 转换流一般在transform的过程中把读写都做了, 也就是在处理输入的时候, 直接就输出了. 最后在输入结束的时候_flush() 会被调用, 就可以把剩余的内部数据一并输出了.

    示例代码:

    // 这代码写的很漂亮, 解决的问题是在流中操作替换操作
    // 其中替换的部分可以仔细看一下, stream和buffer一个很大的区别就是stream会被切割
    // 导致要替换的数据也有可能被切割, 这个例子就提供了一种解决方法, 
    // 这个在后续实践中肯定也会遇到的.
    var stream = require('stream');
    var util = require('util');
    function ReplaceStream(searchString, replaceString) {
        stream.Transform.call(this, {decodeStrings: false});
        this.searchString = searchString;
        this.replaceString = replaceString;
        this.tailPiece = '';
    }
    util.inherits(ReplaceStream, stream.Transform);
    ReplaceStream.prototype._transform = function(chunk, encoding, callback) {
        var pieces = (this.tailPiece + chunk).split(this.searchString);
        var lastPiece = pieces[pieces.length - 1];
        var tailPieceLen = this.searchString.length - 1;
        this.tailPiece = lastPiece.slice(-tailPieceLen);
        pieces[pieces.length - 1] = lastPiece.slice(0, -tailPieceLen);
        this.push(pieces.join(this.replaceString));       //[3]
        callback();
    }
    ReplaceStream.prototype._flush = function(callback) {
        this.push(this.tailPiece);
        callback();
    }
    module.exports = ReplaceStream;
    

    几个流操作相关的有用包

    程序员就是懒, 有这几个包就可以少写一些代码了.

    • readable-stream, 统一了nodejs 实现的不同版本stream接口
    • [through2](https://npmjs.org/ package/through2), 用于快速创建转化流
    • from2, 用于快速创建输入流
    • writable2, 用于快速创建输出流

    相关文章

      网友评论

        本文标题:Node.js流模式编程详解

        本文链接:https://www.haomeiwen.com/subject/vbrxrttx.html