美文网首页
Chapter3:Coding with Streams

Chapter3:Coding with Streams

作者: 宫若石 | 来源:发表于2017-11-15 10:11 被阅读0次

        流是Node中最重要的组件和模式之一。在社区里有一句格言说:让一切事务流动起来。这已经足够来描述在Node中流的角色。Dominic Tarr,Node社区中顶尖的贡献者,将流定义为node中最佳并且最难理解的概念。这里使Node的流机制如此吸引人的原因有很多,再次强调,它并不仅仅与技术属性相关,比如说性能和效率,更进一步来说更加关注其优雅程度和它的方式是否完美适用于Node的哲学思想。

    \   在这篇文章中你将了解如下话题:

        •为什么流机制在Node中如此重要?

        •使用并创造流

        •流式作为一种编程范式,在各种不同的上下文而不仅仅是I/O情境下使用它的能力

        •在不同的配置下,管道模式与流连接相结合

    Discovering the importance of streams

        在一个基于事件机制的平台比如Node中,最高效处理I/O业务的方式是实时模式,对输入变量一旦其提供资源就尽快进行消费,一旦应用产生输出值就尽快把它发出。

        在本节,我们将初步介绍Node流机制和它们的优势。请记住,这只是一个概述,之后的章节中会有更细节的分析来介绍如何使用和构建流。

    Buffering vs Streaming

        几乎所有至今我们在这本书所看到的异步API所工作的时候都采用缓冲区模式(buffer mode)。对于一个输入操作,缓冲区模式(buffer mode)导致从一个资源收集到的所有数据被注入一个缓冲区中,一旦整个资源被读取则缓冲区中的数据将会被传递给回调函数(callback),接下来展示的虚拟模式代表了这种编程范式:

        在前面的图中,我们可以看到,在t1时刻时,一些数据从资源中被收集,并被保存在缓冲区。在时刻t2时,另一个数据块(最后一个)被接收,它结束了读取操作并导致整个缓冲区被发送给消费者。    

        另一方面而言,流机制允许你当数据从资源到达时对该数据进行处理,这个过程如下图所示:

        此刻,上图展示了数据中的每个分块由资源发出来被接收并且立刻被提供给消费者,现在消费者拥有机会来立即处理这些数据分块而不需要等待所有的数据在缓冲区被收集。

        但是针对这两种方式之间的主要区别,我们可以总结为2个主方面:

        • 空间效率

        • 时间效率

        然而,Node中的流式具有另外一方面很重要的优势:可组合性。让我们一起来看看,这些特性对我们设计和编写我们应用的方式会造成哪些影响。

    Spatial efficiency

        首先,流式允许我们去做一些,缓冲数据并一次性处理模式不可能做到的事情。例如,我们考虑一种业务情境:我们必须读取一个非常大的文件,比如说,几百MB甚至是GB的数量级规模。显然,使用API来在该文件被完全读取时返回 一个很大的缓冲区buffer,这并不是一个好主意。想象一下,如果并发的读取一批这样的大文件,我们的应用将会很容易造成内存耗尽。除此之外,缓冲区buffer在V8引擎中不能超过0x3FFFFFFFbytes(略小于1GB)。所以我们可能在耗尽物理内存之前,就遭遇屏障。

    Gzipping using a buffered API

        作为一个具体例子,让我们来考虑一个简单的命令行接口CLI应用程序,通过Gzip方式来实现文件压缩。使用一个缓冲模式buffered的API,这样一个应用的代码将以Node形式展示为以下代码(错误处理略为简洁):

    var fs = require('fs');

    var zlib = require('zlib');

    var file = process.argv[2];

    fs.readFile(file, function(err, buffer) {

        zlib.gzip(buffer, function(err, buffer) {

            fs.writeFile(file + '.gz', buffer, function(err) {

                console.log('File successfully compressed');

            });

        });

    });    

        现在,我们可以把之前代码放进一个名为gzip.js 的文件中,并且通过以下命令行运行:

    node gzip <path to file>

        如果我们选择一个足够大的文件,比如说稍微大于1 GB,我们将收到一个很好的错误信息,来告诉我们:尝试去读取的文件,比允许的最大缓冲区大小要大,结果如下:

    RangeError: File size is greater than possible Buffer: 0x3FFFFFFF bytes

        这正是我们所预期的结果,并且这个结果代表了我们使用了错误的方式。

    Gzipping using streams

        最简单的方式来修复我们的压缩应用程序并且使其能够针对大文件工作的方式是使用流式API。让我们一起来看看这种方式是如何实现的。我们把刚才所写模块中的内容替换为以下代码:

    var fs = require('fs');

    var zlib = require('zlib');

    var file = process.argv[2];

     fs.createReadStream(file)

        .pipe(zlib.createGzip())

        .pipe(fs.createWriteStream(file + '.gz'))

        .on('finish', function() {

            console.log('File successfully compressed');

         });

        是这种方式么?你可能会产生疑问。是的,正如我们所介绍的,流式是可以带来惊喜的,因为其拥有的接口化和可组合性,由此可以组织干净、优雅、简洁的代码。我们将在一段时间之后更细节的看到它的效果,但是现在最重要的事情是理解这种方式可以针对任何规模size的文件来完美运行程序,针对内存利用率不断优化。请自己尝试一下(但是考虑到压缩一个大文件需要花一些时间)。

    Time efficiency

        让我们现在来一起考虑一个应用程序来压缩文件然后将其上传到一个远程的HTTP服务器,在其上解压缩并且将其存储在文件系统上。如果我们的客户端是以缓冲模式API实现的话,上传过程只能在整个文件被读取和压缩之后开始。从另一个方面来说,解压过程将会在所有的数据被读取之后再服务器端开始。更好的解决方案来获得相同的结果,涉及使用流模式。在客户端机制中,流模式允许你一旦能够从文件系统读取到数据块就将它发送出去。与此同时,在服务器端,它能够允许你去在它从其它远程服务器接收到数据时,去压缩每一个数据块。为了证明这一观点,我们一起来构建以前提到过的一个应用程序,由服务器端开始:

        让我们一起创建一个名为gzipReceive.js的模块,并在其中编写以下代码:

    var http = require('http');

    var fs = require('fs');

    var zlib = require('zlib');

    var server = http.createServer(function (req, res) {

        var filename = req.headers.filename;

        console.log('File request received: ' + filename);

        req

        .pipe(zlib.createGunzip())

        .pipe(fs.createWriteStream(filename))

        .on('finish', function() {

            res.writeHead(201, {'Content-Type': 'text/plain'});

            res.end('That\'s it\n');

            console.log('File saved: ' + filename);

        });

    });

    server.listen(3000, function () {

        console.log('Listening');

    });

        通过Node的流式机制,服务器由网络接收数据块,并且一旦完成接受就将它们压缩并保存。

        我们的应用程序中的客户端代码,将被注入一个名为gzipSend.js的模块,并且其内容如下所示:

    var fs = require('fs');

    var zlib = require('zlib');

    var http = require('http');

    var path = require('path');

    var file = process.argv[2];

    var server = process.argv[3];

    var options = {

        hostname: server,

        port: 3000, path: '/',

        method: 'PUT',

        headers: {

            filename: path.basename(file),

            'Content-Type': 'application/octet-stream' ,

            'Content-Encoding': 'gzip'

        }

    };

    var req = http.request(options, function(res) {

        console.log('Server response: ' + res.statusCode);

    });

    fs.createReadStream(file)

        .pipe(zlib.createGzip())

        .pipe(req)

        .on('finish', function() {

            console.log('File successfully sent');

        });

        在上述代码中,我们再次使用流模式来从文件中读取数据,并且针对每个数据块当其从文件系统完成读取时,进行压缩和发送流程。    

        现在,来尝试运行我们的应用程序,使用以下命令来尝试运行我们的服务器:

    node gzipReceive

        之后,我们将以的方式指定所要发送的文件和服务器地址(比如:localhost)的方式,来部署客户端程序:

    node gzipSend <path to file> localhost

        如果我们选择了一个足够大的文件,我们将更容易观察到数据流如何从客户端向文件流传输,但是为什么在这种模式下,相对于缓冲(buffer)API模式下明显更加高效?以下示意图将给我们一个提示:

        当一个文件被处理时,它经过以下序列状态:

        1.[Client]由文件系统读取

        2.[Client]压缩数据

        3.[Client]将压缩之后的数据发送给server

        4.[Server]由客户端接收

        5.[Server]解压缩这份数据

        6.[Server]将这份数据写入硬盘

        为了完成这个处理过程,我们不得不经像一个流水线一样来经过每一个状态步骤,串行序列化的依次进行,直到结束。在前面的图例中,我们可以看到,使用一个缓冲buffer模式的API,流程整体时串行序列化的。为了压缩数据,我们首先要等待整个文件被读取,之后,去发送数据,我们需要等待直到整个文件不仅被读取而且被压缩,此后,我们将代替原有模式使用流模式,当我们接收到第一个数据块时,整个流水线将会发起进行,而不需要等待整个文件被读取。但是更加令人惊讶的是,当下一个数据块已经可以提供时,在这里不需要等待之前的一系列任务被完成,而是,另外一条流水线并行的被部署。这种机制工作的很好,因为我们所执行的每一个业务都是异步的,所以任务可以被Node并行的处理。唯一的限制是:数据块到达每个状态的顺序需要被保留(并且Node的流模式为我们专门处理了这个部分)。

        正如我们在之前数据所看到的那样,使用流模式的结果是:整个处理流程使用更少的时间,因为我们不再浪费时间来等待所有数据被读取然后全部数据一次性处理。

    Composability

        迄今为止,我们所看到的代码给我们展示了一个关于流模式如何组成的全景,感谢pipe()方法。通过pipe()方法,能够允许我们连接不同的处理单元,每个处理单元采用完美的Node风格针对单独的函数进行负责。这可能是因为流模式下具备一套统一的接口标准,可以依据API让不同的处理模块理解对方。唯一的先决条件是:在流水线上的下一个流的必须要支持之前流所创建的数据类型,这种数据类型可能涵盖二进制,文本,甚至对象,如同我们将在这节稍后内容所看到的一样:

        为了来观察到另一个关于这个属性能力的证明,我们可以添加一个加密层到我们之前建立的gzipReceive/gzipSend应用程序上去。为了达到这一目的,我们只需要更新客户端来将另外一个流加入流水线。更确切地说,是由crypto.createChipher()所返回的流。结果代码如下所示:

    var crypto = require('crypto');

    [...]

    fs.createReadStream(file)

        .pipe(zlib.createGzip())

        .pipe(crypto.createCipher('aes192', 'a_shared_secret'))

        .pipe(req)

        .on('finish', function() {

            console.log('File succesfully sent');

        });

        通过一个相似方式,我们更新服务器端,来使数据在被解压缩之前,首先进行解密:

    var crypto = require('crypto');

    [...]

    var server = http.createServer(function (req, res) {

        [...]

            req

                .pipe(crypto.createDecipher('aes192', 'a_shared_secret'))             .pipe(zlib.createGunzip())

                .pipe(fs.createWriteStream(filename))

                .on('finish', function() {

                    [...]

                });

    });

        只需要一点工作量(仅仅加入几行代码),我们在应用程序的基础上加入了一层加密层。我们仅仅通过在我们已有的流水线上复用一个已经可以提供的传输流。通过一种相似方式,我们可以加入或者结合其他流,如图我们玩乐高积木一样。

        很显然,这种方式的主要好处是可重用性,但是从我们所列举出的代码来看,流模式也同样使得代码更加干净并且更加模块化。基于以上原因,流不仅仅用于处理I/O,,也是一种方式来简化和模块化代码。

    Getting started with streams

        在之前小节,我们了解了为什么流模式如此强大,并且它的应用遍及Node的各处,由它的核心模块开始。举个例子,我们看到fs模块具有createReadStream()函数来读取一个文件,createWriteStream()函数来写入一个文件,http请求和应答的对象是关键的流,并且zlib模块允许我们通过使用一个流接口来压缩和解压缩数据。

        现在我们知道为什么流模式如此重要,让我们退回一步来开始更细节的探索这些特性。

    Anatomy of streams

        在Node中的每一个流是stream核心模块所提供的4个基础抽象类中之一的实现:

        • stream.Readable

        • stream.Writable

        • stream.Duplex

        • stream.Transform

        每一个stream类同时也是EventEmitter的一个实例。流,实际上,可以创造很多类型的事件,比如说end(当一个可读的流完成读取),或者error(当一些情况下出错)。

        请注意,为了简洁起见,在本节所呈现的例子中,我们往往会忽略适当的错误管理。然而,在生产应用程序中,常常建议去注册一个error错误时间监听器来针对于你所涉及的所有流。

        流模式之所如此灵活的一个原因是基于事实上它不仅仅可以处理二进制数据,但是事实上,几乎任何的JavaScript值,实际上都可以支持两种操作模式。

        • 二进制模式: 这种模式是指在其中数据被以块的形式流化,比如缓冲区或者字符串形式。

        • 对象模式: 这种模式中流数据被当做一个严谨对象(被允许使用几乎所有的JavaScript值)的序列。

        这两种模式下允许我们去使用流不仅仅针对于I/O业务情境,同时作为一种工具来一种函数式风格来优雅的组成处理单元,如我们稍后将在本节所看到的一样。

        在本节中,我们主要讨论了被认识为Version 2的流接口,这种流接口从Node 0.10版本本引入。跟进一步细节的来比对和老接口的区别:

        请参考官方Node的博客:http://blog.nodejs.org/2012/12/20/ streams2/.

    Readable streams

        一个可读流代表一个数据源:在Node中,它是通过使用stream模块中所提供的Readable抽象类来实现的。

    Reading from a stream

        这里有两种方式来接收Readable流的数据:non-flowing and flowing。让我们以更细节的方式来分析这些模式。

    The non-flowing mode

        从一个可读Readable流中读取的默认模式,包括针对于readable事件添加一个监听器,这个事件指示新数据是否可供读取。然后,在循环中,我们将读取所有的数据直到内部缓冲区被清空。这个过程可以通过使用read()方法来完成,这个方法可以同步的从内部缓冲区读取数据,并返回Buffer 或者 String类型对象,来代表数据块。read()方法呈现以下写法:

    readable.read([size]) 

        使用这种方式,数据是根据需要来确定从流中拉出来的。

        为了说明这个过程是如何工作的,让我们创建一个名为readStdin.js的新模块,在其中实现了一个简单的程序,它能够从标准输入(a Readable stream)中读取,并回到标准输出将其中的所有内容echoes。

    process.stdin

        .on('readable', function() {

            var chunk;

            console.log('New data available');

            while((chunk = process.stdin.read()) !== null) {

                console.log(

                    'Chunk read: (' + chunk.length + ') "' +

                    chunk.toString() + '"'

            );

        }

    })

    .on('end', function() {

        process.stdout.write('End of stream');

    });

        read()方法是一个同步操作,从一个可读Readable流的内部缓冲区中来出数据块。所返回的数据块(默认情况下),如果该流以二进制模式而工作时,是一个Buffer对象。

        在一个以二进制模式工作的可读Readable流中,我们可以读取字符串strings而不是缓冲区内容buffers,以在流中调用setEncoding(encoding)的方式来实现,并提供一个有效的编码格式(比如:utf8)。

        数据可以在可读监听器内部唯一的读取,当新数据可读时就可以调用它。read()方法会返回null当内部缓冲区内没有更多的数据处于可提供的状态,在这样的业务情境下,我们不得不等到另外的可读事件被触发-来告诉我们可以再次读取-或者等到最后的事件来标识流的结束。当一个流在二进制状态下工作时,我们同样可以指定我们有兴趣去读取一个特定数量的数据,来通过传递一个size值到read()方法。这种特别有用,当所实现的网络协议或者解析一个特殊的数据格式。

        现在,我们已经准备好去运行readStdin模块并且针对它做实验。让我们一起在console控制台中键入一些字符并且按下Enter键来看看在标准输出中回显echoed的数据。为了终止流,进而生成一种优雅的结束事件,我们需要插入一个EOF (End-Of-File)字符,windows上(使用Ctrl + Z),Linux上(使用Ctrl + D)。

        我们可以尝试把我们的程序与其他处理过程连接:这个步骤很可能使用管道操作符pipe operator (|),这个符号可以重定向一个程序的标准输出到另外一个程序的标准输入。例如,我们可以运行一个命令,如下所示:

    cat  <path to a file> | node readStdin

        这是个令人激动地演示关于流式编程范式是如何成为一种通用接口标准的,这种情况将使我们的程序之间能够通信,而不用考虑它们所编写基于的程序语言。

    The flowing mode

        另外一个方式从流中读取数据,是通过针对data事件,加入一个监听器来实现的。这种实现方法将会转变流进入使用流flowing的模式,在其中数据将不会再通过read()方法进行拉取pull,而是每当新的可供使用的数据到达,就将其推送push到data监听器端。例如,我们之前创建的readStdin应用程序,通过使用flow模式,将表现为如下所示:

    process.stdin

        .on('data', function(chunk) {

            console.log('New data available');

            console.log(

                'Chunk read: (' + chunk.length + ')" ' +

                chunk.toString() + '"'

            );

        }) .on('end', function() {

            process.stdout.write('End of stream');

        });

        flowing模式是一种针对旧版流接口(也被成为Streams1),并且提供更少的灵活性来控制数据的流。伴随着Streams2接口的引入,以下模式将不再是磨人的工作模式。为了实现这种模式,需要添加一个监听器来针对data事件或者明确针对tresume()方法进行调用。为了临时性的终止流去发射data事件,我们需要在之后调用pause()方法没来造成很多输入数据被缓存在内部缓冲区。

        调用pause()函数,并不能造成流转回non-flowing模式。

    Implementing Readable streams

        现在我们知道如何从一个流中读取数据,下一步是学习如何实现一个新的可读流。为了达到这个目的,需要通过继承stream.Readable原型来创建一个新的类。具体的流需要提供一个关于_read()方法的实现,它形如以下写法:

    readable._read(size)

        Readable类的内部将会调用_read()方法,这个方法将会反过来通过使用push()方法来装填内部缓冲区buffer。

    readable.push(chunk)

        请注意read()是一个被流消费者调用的方法,而_read()是一个由stream子类实现的方法,并且不能直接调用。下划线一般表示,方法是非公开的(不是public的),并且不能直接调用。

        为了实现如何使用新形式的Readable流,我们可以尝试实现一个流来产生随机字符串。我们来产生一个新的名为randomStream.js的模块,这个模块将包含我们之后编写的字符串生成器代码。在文件的最上部,我们将下载我们的依赖:

    var stream = require('stream');

    var util = require('util');

    var chance = require('chance').Chance();

        这里的处理没有什么特别的,除了我们加载一个npm模块。

        这个模块被称为chance(https://npmjs.org/package/chance),这个模块是一个库来产生各种类型的随机值,从数字numbers到字符串strings到整个语句。

        下一个步骤是去创建一个新的被称为RandomStream的类,并且指定stream.Readable作为它的父类:

    function RandomStream(options) {

        stream.Readable.call(this, options);

    }

    util.inherits(RandomStream, stream.Readable);

        在上述代码中,我们将调用父类的构造器constructor,来初始化它的内部声明,然后将options参数作为一个输入来接收,可能通过options对象来传递的参数包括:

    • 编码参数用来将Buffers 转换为 Strings(默认类型转到null)。

    • 一个标志位来授权对象模式(objectMode defaults to false)。

    • 在内部缓冲区中的数据存储上限如果超过,将不能再从资源中读取更多数据((highWaterMark defaults to 16 KB)。

        好的现在我们具备了RandomStream构造器constructor,我们可以实现_read()方法如下:

    RandomStream.prototype._read = function(size) {

        var chunk = chance.string(); //[1]

        console.log('Pushing chunk of size:' + chunk.length);

        this.push(chunk, 'utf8'); //[2]

        if(chance.bool({likelihood: 5})) {

            //[3] this.push(null);

        }

    }

    module.exports = RandomStream;

        上述代码将被解释如下:

        1. 这个方法通过chance.函数来产生一个随机字符串。

        2. 它把字符串推进内部读取缓冲区。注意到,既然我们推入一个String类型数据,我们也要指定编码格式为utf8(这个是非必需的,如果这个数据块时一个二进制的缓冲区Buffer)。

        3. 它能够随机终止流,以概率likelihood为5%的指标,通过将null推入内部缓冲区buffer来标识一个EOF情况,或者换句话,流的终结。

        我们可以看到针对于_read()函数输入值中的size参数被忽略,因为它被作为一种建议参数。我们可以简单的推入push所有可用的数据,但是在同一个调用中有很多推送pushes的过程,之后我们需要检查push()是否能够返回false,因为这个将会意味着内部的缓冲区已经达到了highWaterMark限制并且我们需要停止加入更多的数据到它里面。

        这是RandomStream的方式,我们并不准备用他。我们来创建一个新的模块命名为:generateRandom.js,在其中我们实例化一个RandomStream对象,并且从中拉取一些数据。

    var RandomStream = require('./randomStream');

    var randomStream = new RandomStream();

    randomStream.on('readable', function() {

        var chunk;

        while((chunk = randomStream.read()) !== null) {

            console.log("Chunk received: " + chunk.toString());

        }

    });

        现在所有的准备工作已经具备了,我们可以尝试定义新模式的流。只需要如同平时一样执行一下generateRandom模块,并且观察到一组随机序列的字符串流在屏幕上出现。

    Writable streams

        一个可写入writable的流代表了一个数据目的地。在Node中,它通过可写Writable的抽象类(由stream模块提供),来实现其构建。

    Writing to a stream

        将一些数据从可写writable流中推下,是一件很简单的业务。我们需要去使用的是write()方法,可以如下写法编写:

    writable.write(chunk, [encoding], [callback])

        编码参数是可选的并且可以被指定,如果数据块时String类型的(默认为utf8编码格式,忽略数据块是缓冲Buffer类型的情况),当数据块被冲入底层资源时,回调函数callback将被调用,并且也同样是可选的。

        为了表明没有更多数据被写入流,我们不得不使用end()方法:

    writable.end([chunk], [encoding], [callback])

        我们可以通过end()方法来提供一个最后的数据块。在这个实例中,callback回调函数等价于:注册一个针对于最终事件finish event的监听器,这个最终事件将会在所有被写入流中的数据冲入底层资源的时候得到触发。

        我们现在来看看这个机制是如何工作的,通过建立一个实例:一个小的HTTP服务器会输出一个随机序列的字符串:

    var chance = require('chance').Chance();

    require('http').createServer(function (req, res) {

        res.writeHead(200, {'Content-Type': 'text/plain'}); //[1]

        while(chance.bool({likelihood: 95})) {//[2]

            res.write(chance.string() + '\n'); //[3]

        }

        res.end('\nThe end...\n'); //[4]

        res.on('finish', function() { //[5]

            console.log('All data was sent');

        });

    }).listen(8080, function () {

        console.log('Listening');

    });

        我们所创建的HTTP服务器注入了res对象,它是一个http.ServerResponse的实例并且同时是一个可写Writable的流。所要发生的被解释如下所示:

    1.我们首先编写HTTP响应(response)的头部。注意到writeHead()并不是可写Writable接口的一部分。事实上,它是由http.ServerResponse类暴露的辅助方法。

    2..我们开始一个循环,并将其终止于似然率为5%(我们引入chance.bool()函数赖在95%的时间内返回true)。

    3.在循环内部,我们编写了随机字符串来注入流。

    4.一旦我们来到了循环外部,我们调用stream流中的end()函数,标志这里已经没有更多的数据被写入。与此同时,我们提供一个最终final字符串来在终止流之前,将其写入流中。

    5.最终,我们针对完成事件finish event注册一个监听器,这个监听器将被触发,当所有的数据被冲入底层socket套接字中时。

        我们称这个小模块为entropyServer.js,并且在之后运行它。为了测试服务器,我们可以打开一个浏览器键入地址:http://localhost:8080,或者使用终端的cur,如下:l

    curl localhost:8080

        此时,服务器应该会开始发送随机字符串到你所选择的HTTP客户端(请记住,一些浏览器可能会缓存数据,并且流的表现可能不会是明显的)。

        一个有趣的事实是:http.ServerResponse实际上是一个老的Stream类(http://nodejs.

    org/docs/v0.8.0/api/stream.html)的实例,它的声明很重要,尽管,这个机制并不影响我们的例子,因为可写端的接口和表现,将在新建立的流可写类stream.Writable class中保持几乎一致。

    Back-pressure

        类似于在一个实际管道系统中的液体流动。Node的流也会遭遇一个瓶颈问题,在这种情况下数据写入会加快,超过流消耗(消费:consumer)所写入数据的速度。处理这个问题的机制,包括缓冲区缓存所到来的数据。然而,如果这条流并不急于写入者任何的回馈feedback,我们将遭受一种情境,在其中:越来越多的数据被注入内部缓冲区,导致不能预期等级的内存占用率。

        为了阻止这种情况的发生,writable.write()函数将会当内部缓冲区超过highWaterMark限制时,返回false。可写Writable流拥有一个highWaterMark属性,将会限制内部缓冲区的大小,当超过这个标准时,write()方法将会开始返回false,表明应用程序现在需要停止写入。当缓冲区已经耗尽,耗尽事件(drain event)将被发射,来沟通说明它很安全去再次开启写入操作。

        这个机制被称为:背压(back-pressure)。

        本节所描述的机制也被相似的应用于Readable流。事实上,背压back-pressure问题同样也在可读Readable流中存在。当push()方法(在_read()方法内部)被返回false的时候,它将被触发。然而,这个问题具体到流的实现者,所以我们将以较低频率来处理它。

        我们将很快展示如何将可写Writable流中的背压back-pressure问题考虑进来,来通过将我们之前所创造的entropyServer模块进行改进。

    var chance = require('chance').Chance();

    require('http').createServer(function (req, res) {

        res.writeHead(200, {'Content-Type': 'text/plain'});

        function generateMore() { //[1]

            while(chance.bool({likelihood: 95})) {

                var shouldContinue = res.write(

                    chance.string({length: (16 * 1024) – 1}) //[2]

                );

                if(!shouldContinue) { //[3]

                    console.log('Backpressure');

                    return res.once('drain', generateMore);

                } }

                res.end('\nThe end...\n', function() {

                    console.log('All data was sent');

                });

        }

        generateMore();

    }).listen(8080, function () {

        console.log('Listening');

    });

        上述代码中最关键的步骤,可以总结如下:

    1.我们将主要逻辑包装在一个名为generateMore()的函数中。

    2.为了提高接收到背压back-pressure的机会,我们将数据块的大小size提高到16KB-1Byte,这个数值和默认的highWaterMark门限值非常接近。

    3.当写入一个数据块时,我们检查res.write()函数的返回值,如果我们接收到了false,意味着内部缓冲区是满的,而我们需要停止发送更多的数据。在这种情况下,我们从这个函数中退出,并且注册下一个循环的写入操作,当耗尽事件(drain event)被发射。

        我们现在尝试在此运行服务器,并且在之后以curl产生一个客户端请求,这里将会有很大可能将会出现一些背压back-pressure的情况,因为服务器以一个非常高的速率产生的数据,这个产生数据的速率要快于底层socket所能处理的限度。

    相关文章

      网友评论

          本文标题:Chapter3:Coding with Streams

          本文链接:https://www.haomeiwen.com/subject/zhaqvxtx.html