美文网首页
Nodejs Stream 初识

Nodejs Stream 初识

作者: 黑曼巴yk | 来源:发表于2019-10-03 20:10 被阅读0次

linux文件

  1. 为了区别不同文件类型,会有一个type来进行区别
    • 普通文件:包含任意数据
    • 目录: 相关一组文件的索引
    • 套接字Socket:和另一台机器上的进程通信的类型
  2. 重定向
    每个进程都有自己的描述符表(fd)。
  • fd0 stdin 输入到终端
  • fd1 stdout 输出到终端
  • fd2 stderr 输出错误到终端
  • fd4 磁盘文件

标准IO会使用流(stream)行松打开文件,所谓的流(stream)实际上是文件描述度(file descriptor)和缓冲区(buffer)在内存中的抽象

Nodejs中的Stream

Stream模块在nodejs中只是处理流数据的抽象接口,Steam模块提供了基础的API,使用者可以根据这些API构建流接口的对象

Stream流类型
  • Readable -可读流(fs.createReadStream)
  • Writeable -可写流(fs.createWriteStream)
  • Duplex - 可读写流(net.Soket)
  • Transform - 在读写过程中可以修改和变换数据的Duplex流.(zlib.createDeflate)
    Stream 模块还包含pipeline和finished公共功能
Stream 事件
image.png
  • Readable发起重要事件
    • data产生数据
    • end产生数据约束
  • Writeable发起重要事件
    • drain缓存区处理完毕
    • finish处理结束

可读流 Readable Stream

创建一个可读流
const fs = require('fs');
const rs = fs.createReadStream('./index.js', {
  highWaterMark: 1024 // 缓存大小,每次on Data的大小,默认是16kb
});
let onDataCount = 0;
rs.on('data', chunk => {
  console.log(chunk.toString('utf-8'));
  console.log(onDataCount++);
})
rs.on('end', () => {
  console.log('end');
})
可读流中的flowing/paused模式

数据传输的时候需要考虑到数据的消费者的消费速度才能保证高利用率,而发送数据端就是生产者,接收数据端处理数据是消费者
消费者示例: Http请求处理,文件写入处理,数据库处理等

const fs = require('fs');
const rs = fs.createReadStream('./index.js' ,{
  hightWaterMark: 1024
});
let onCount = 0;
rs.on('data', data => {
  console.log(data.toString('utf-8'));
  console.log(onCount ++);
  rs.pause();
  console.log('on data => ', rs.isPaused())
  setTime(() => rs.resume(), 3000)
})
rs.on('end', () => {
  console.log('end');
})
Stream 背压问题

一个Stream数据生产的速度远大于Stream消费的速度,就会造成数据的堆积。比如不停增长的日志文件(100条/1s)作为生产者,有个服务处理日志文件(100条/3s)作为消费者,如果没有pause/flowing模式就会撑爆内存造成浪费,

readable事件精确控制可读流

readable区别于data事件,readable回调使用rs.read(size)方法好像从桶中再用瓢一点点取水一样,可以更加精确的控制流数据的读取。但此事件回调无法使用paused/flowing模式

rs.on('readable', () => {
  let dataChunk = rs.read(10);
  if(dataChunk) {
    console.log(dataChunk.toString())
  }
})

可写流 Writeable Stream

创建一个可写流
const fs = require('fs')
const ws = fs.createWriteStream('./out.txt', {
    highWaterMark: 10
})

for(let i = 0; i < 1000; i++) {
    const drained = ws.write('hahaha\n')
    console.log(drained);
}

drained为true,缓存池highWaterMark还没有满,可以继续写入。

实现和使用Stream的各种接口

可写流 Writeable
  1. writeble使用
const { Writable } = require('stream');
const outStream = new Writable({
  write(chunk, encoding, callback) {
    console.log(chunk.toString());
    callback();
  }
});
process.stdin.pipe(outStream);

write方法有三个参数

  • chunk 默认为写入数据的buffer
  • encoding 写入数据的编码,通常忽略
  • callback在处理完chunk之后调用,它通知写入操作成功执行
  1. 自定义writeble实现
const { Writable } = require('stream');
const fs = require('fs');
class MyWriteable extends Writable {
    constructor(options) {
        super(options)
    }
    _write(chunk, encoding, callback) {
        // console.log(chunk.toString())
        fs.writeFileSync('./out.txt', chunk.toString() + '\n', {flag: 'a+'});
        setTimeout(() => callback(null), 100);
    }
}
const ws = new MyWriteable();
for(let i =0; i< 100; i++) {
    ws.write('hahaha');
}
可读流Readable
  1. readable使用
const { Readable } = require('stream');

const inStream = new Readable({
    read(size) {
        this.push(String.fromCharCode(this.currentCharCode++));
        if(this.currentCharCode > 90) {
            this.push(null);
        }
    }
})
inStream.currentCharCode = 0;
inStream.pipe(process.stdout);
  1. 自定义Readable实现
'use strict';

const { Readable } = require('stream');

class MyReadable extends Readable {
    constructor(options) {
      super(options)
      this.count = 65
    }
  
    _read() {
        this.count++
        this.push(String.fromCharCode(this.count++));
        if (this.count > 80) {
          return this.push(null) // push null 就结束了
        }
    }
}
  
let rs = new MyReadable(); 
rs.on('data', function(data) {  // 当 rs 注册 onData 时就开始调用 _reade()
    console.log(data.toString());
});
pipe原理和实现

pipe可以有不同的目的地,pipe就是解决了背压的问题,实现方式就是在可读流上注册一个onData事件,达到阈值后进行pause

  1. 使用pipe
const fs = require('fs');

const rs = fs.createReadStream('./package.json');
const ws = fs.createWriteStream('./out.txt');
rs.pipe(ws);
  1. 实现pipe
    Node源码中就是在可写流上注册onDataonDrain可以根据可写流的阈值和释放进行pause/flowing切换
pipe(ws) {
  this.on('data', chunk => {
    let drained = ws.write(chunk);
    if(!drained) {
      this.pause();
    }
  });
  ws.on('drain', () => {
    this.resume();
  })
}

相关文章

  • Nodejs Stream 初识

    linux文件 为了区别不同文件类型,会有一个type来进行区别普通文件:包含任意数据目录: 相关一组文件的索引套...

  • [读] NodeJS stream 一:Buffer

    NodeJS stream 一:Buffer

  • nodejs stream

    参考 nodejs stream 手册英文原版stream-handbook Streams come to us...

  • Stream in Nodejs

    reference: https://nodesource.com/blog/understanding-stre...

  • Nodejs Stream

    前言 前端工程领域中使用Nodejs处处掣肘,原因无非是Nodejs中困难部分:文件和网络。而文件和网络都依赖一个...

  • nodejs stream

    为什么需要stream 不用stream的情况:太耗内存 优化:用pipe连接读文件流和http可写流:image...

  • improve scheme

    Nodejs Stream: https://nodesource.com/blog/understanding-...

  • 初识Nodejs

    初识Nodejs NodeJs官网 什么是Nodejs Node.js® is a JavaScript runt...

  • 2.nodejs通过stream方式加载页面 / 通过fs模块加

    nodejs通过stream方式加载页面(stream方式加载优化性能) 配置index.js文件,创建服务,设置...

  • stream系列——初识stream

    初识stream 1、对于迭代来说,包含内部迭代和外部迭代。 外部迭代:(程序逻辑自己控制迭代过程) 内部迭代:(...

网友评论

      本文标题:Nodejs Stream 初识

      本文链接:https://www.haomeiwen.com/subject/gomlpctx.html