美文网首页
node之stream - 流

node之stream - 流

作者: imakan | 来源:发表于2018-11-06 19:57 被阅读0次

    流(stream)是Nodejs中出处理流式数据的抽象接口。stream模块提供了一些API,用于构建实现了流接口的对象。
    Node.js提供了多种流对象。例如,HTTP服务器的请求和process.stdout都是流的实例。
    流可以是可读的,可写的,或者是可读可写的。所有的流都是EventEmitter的实例
    stream模块可以通过以下方式使用

    const stream = require('stream')
    

    尽管理解流的工作方式很重要,但是stream模块本身主要用于开发者创建新类型的流实例,对于以消费对象为主的开发者,极少需要直接使用stream模块

    流的类型

    • Writable 可写流
    • Readable 可读流
    • Duplex 双公流,可读又可写流
    • Transform 在读写过程中可以修改或者转换数据的Duplex

    对象模式

    node创建的流都是运作在字符串和Buffer(或者uint8Array)上,当然,流的实现也可以使用其他的类型的JavaScript值(除了null)。这些流会以“对象模式”进行操作。当创建流时,可以使用objectMode 选项把流实例切换到对象模式。将已存在的流切换到对象模式是不安全的

    用于消费流的API

    几乎所有的Nodejs应用都在某种程度上使用了流。
    比如:

    const http = require('http');
    
    const server = http.createServer((req, res) => {
      // req 是一个 http.IncomingMessage 实例,它是可读流。
      // res 是一个 http.ServerResponse 实例,它是可写流。
    
      let body = '';
      // 接收数据为 utf8 字符串,
      // 如果没有设置字符编码,则会接收到 Buffer 对象。
      req.setEncoding('utf8');
    
      // 如果添加了监听器,则可读流会触发 'data' 事件。
      req.on('data', (chunk) => {
        body += chunk;
      });
    
      // 'end' 事件表明整个请求体已被接收。 
      req.on('end', () => {
        try {
          const data = JSON.parse(body);
          // 响应信息给用户。
          res.write(typeof data);
          res.end();
        } catch (er) {
          // json 解析失败。
          res.statusCode = 400;
          return res.end(`错误: ${er.message}`);
        }
      });
    });
    
    server.listen(1337);
    
    // $ curl localhost:1337 -d "{}"
    // object
    // $ curl localhost:1337 -d "\"foo\""
    // string
    // $ curl localhost:1337 -d "not json"
    // 错误: Unexpected token o in JSON at position 1
    

    可写流(比如例子中的 res)会暴露了一些方法,比如 write()end() 用于写入数据到流。
    当数据可以从流读取时,可读流会使用 EventEmitter API 来通知应用程序。 从流读取数据的方式有很多种。
    可写流可读流都通过多种方式使用 EventEmitter API 来通讯流的当前状态。
    Duplex 流和 Transform 流都是可写又可读的。
    对于只需写入数据到流或从流消费数据的应用程序,并不需要直接实现流的接口,通常也不需要调用 require('stream')。

    可写流

    可写流是对数据要被写入的目的地的一种抽象。

    可写流的例子包括:

    上面的一些例子事实上是实现了可写流接口的 Duplex 流。

    所有可写流都实现了 stream.Writable 类定义的接口。

    可读流

    可读流的例子包括:

    所有可读流都实现了 stream.Readable 类定义的接口。

    可写流的例子

    下面的例子是一个简单的可写流的实现

    const {Writable} = require('stream')
    class MyWritable extends Writable {
        constructor (options) {
          super(options);
          // todo
        }
      _write(chunk,encoding,callback){
        if(chunk.toString().index('a') >= 0){
             callback(new Error('无效的数据块'))
        }else {
          callback();
        }
      }
    }
    

    在可写流中解码buffer

    const { Writable } = require('stream');
    const { StringDecoder } = require('string_decoder');
    
    class StringWritable extends Writable {
      constructor(options) {
        super(options);
        this._decoder = new StringDecoder(options && options.defaultEncoding);
        this.data = '';
      }
      _write(chunk, encoding, callback) {
        if (encoding === 'buffer') {
          chunk = this._decoder.write(chunk);
        }
        this.data += chunk;
        callback();
      }
      _final(callback) {
        this.data += this._decoder.end();
        callback();
      }
    }
    
    const euro = [[0xE2, 0x82], [0xAC]].map(Buffer.from);
    const w = new StringWritable();
    
    w.write('货币: ');
    w.write(euro[0]);
    w.end(euro[1]);
    
    console.log(w.data); // 货币: €
    

    可读流的例子

    class Counter extends stream.Readable {
      constructor(opt){
        super(opt)
        this._max = 1000000
        this._index = 1
      }
      _read(){
        const i = this._index++
        if(i > this._max){
          this.push(null)
        }else{
          const str = String(i)
          const buf = Buffer.from(str,'ascii')
          this.push(buf)
        }
      }
    }
    

    双工流的例子

    下面是一个双工流的例子,封装了一个可读可写的底层资源对象。

    const { Duplex } = require('stream');
    const kSource = Symbol('source');
    
    class MyDuplex extends Duplex {
      constructor(source, options) {
        super(options);
        this[kSource] = source;
      }
    
      _write(chunk, encoding, callback) {
        // 底层资源只处理字符串。
        if (Buffer.isBuffer(chunk))
          chunk = chunk.toString();
        this[kSource].writeSomeData(chunk);
        callback();
      }
    
      _read(size) {
        this[kSource].fetchSomeData(size, (data, encoding) => {
          this.push(Buffer.from(data, encoding));
        });
      }
    }
    

    在下面的例子中,创建了一个变换流(双工流的一种),对象模式的可写端接收 JavaScript 数值,并在可读端转换为十六进制字符串。

    const { Transform } = require('stream');
    
    // 转换流也是双工流。
    const myTransform = new Transform({
      writableObjectMode: true,
    
      transform(chunk, encoding, callback) {
        // 强制把 chunk 转换成数值。
        chunk |= 0;
    
        // 将 chunk 转换成十六进制。
        const data = chunk.toString(16);
    
        // 推送数据到可读队列。
        callback(null, '0'.repeat(data.length % 2) + data);
      }
    });
    
    myTransform.setEncoding('ascii');
    myTransform.on('data', (chunk) => console.log(chunk));
    
    myTransform.write(1);
    // 打印: 01
    myTransform.write(10);
    // 打印: 0a
    myTransform.write(100);
    // 打印: 64
    
    

    相关文章

      网友评论

          本文标题:node之stream - 流

          本文链接:https://www.haomeiwen.com/subject/lhbuxqtx.html