美文网首页
简明writeStream实现

简明writeStream实现

作者: Cirs_冷峥子 | 来源:发表于2018-01-24 20:34 被阅读369次

    Pro

    一个createWriteStream的简单实现,以求能增加对可写流的理解与应用。

    参数配置

    /**
     * createWriteStream
     * @param1 path
     * @param2 options
     */
    let fs = require('fs');
    let ws = fs.createWriteStream('./1.txt',{
      flags:'w'//文件的打开模式
      ,mode:0o666//文件的权限设置
      ,encoding:'utf8'//写入文件的字符的编码
      ,highWaterMark:3//最高水位线
      ,start:0 //写入文件的起始索引位置        
      ,autoClose:true//是否自动关闭文档
    })
    

    createWriteStream类的实例化

    • 实例化一个createWriteStream
      • pathoptions挂载在createWriteStream的实例上,除此之外再在实例上挂载以下属性
        • self.fd=null:文件打开后返回的文件描述符
        • self.pos=self.start:用于表示文件真正写入时的指针位置
        • self.Buffer=[]:用来表示文件的缓冲区
        • self.len=null:用来表示缓冲区此时的大小
        • self.isWriting=false:用来表示是否正在真正写入文件
      • 调用open方法,打开文件(发射open事件)

    实例write方法的执行流程

    • wirte方法接收三个参数,chunk要写入的内容,encoding要进行的,cb回调函数。
    • write执行流程:
      • 判断传入的chunk是否为buffer,如果不是,则转换成buffer,用于转化编码依据传入的encoding参数。
      • 更新Buffer缓冲区的len长度,让len加上该次chunk的长度
      • 判断len是否已经超过highWaterMark,将值存入flag
      • 判断是否处于isWriting状态:
        • 是,则先加chunk写入实例对象下的Buffer缓冲区
        • 否,更新isWriting,接将参数传递给实例下的_write方法写入文件
      • 返回flag

    实例_write方法的执行流程

    此方法用于真正写入文件

    • 查看实例的fd属性是否存在(文件是否打开成功)
      • 成功,调用fs模块的write方法正式写入数据
        • 更新实例对象下的len以及pos属性
        • 调用clearBuffer方法将缓冲区的内容写入
        • 调用write方法传入的回调函数cb
      • 失败,订阅一个open事件(open事件将会在open方法中被发射),在订阅中的回调方法中再次以相同的参数调用_write方法

    实例clearBuffer方法

    • 从缓冲区中取出一个数据
      • 如果数据存在,调用_write方法
      • 如果数据不存在,将isWriting更改为false,发射drain事件

    实现源码以及测试文件

    let fs = require('fs');
    let EventEmitter = require('events');
    
    class WriteStream extends EventEmitter {
      constructor(path, options) {
        super();
        let self = this;
        Object.assign(self, options); //还需设置默认值
        self.path = path;
        self.isWriting = false;
        self.Buffer = []; //源码中为链表实现的缓冲区
        self.len = null;
        self.pos = self.start; //初始化写入位置
        self.fd = null;
        self.open();
      }
    
      open() {
        let self = this;
        fs.open(self.path, self.flags, self.mode, (err, fd) => {
          self.fd = fd;
          if (err) return self.destroy(err);
          self.emit('open');
        });
      }
    
      destroy(err) {
        fs.close(this.fd, () => {
          this.emit('error', err);
        });
      }
    
      write(chunk, encoding, cb) {
        let self = this
          , ret = null;
        encoding = encoding?encoding:self.encoding; //优先使用write传入的编码方式
        chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, encoding);
        self.len += chunk.length;
        ret = self.highWaterMark > self.len; //判断当前最新的缓冲区是否已达到最高水位线
    
        if (self.isWriting) { //说明正在调用底层方法真正写入文件,先写入Buffer
          self.Buffer.push({
            chunk
            , cb
          });
        } else {
          self.isWriting = true;
          self._write(chunk, cb, () => self.clearBuffer());
        }
    
        return ret;
      }
    
      _write(chunk, cb, clear) {
        let self = this;
        if (!self.fd) return self.once('open', () => {
          self._write(chunk, cb, clear)
        });
        fs.write(self.fd, chunk, 0, chunk.length, self.pos, (err, bytesWritten) => {
          if (err) {
            if (self.autoClose) {
              self.destroy();
              self.emit('error', err);
            }
          }
          self.len -= bytesWritten;
          self.pos += bytesWritten;
          cb && cb();
          clear && clear();
        });
      }
    
      clearBuffer() {
        let self = this
          , data = null;
        data = self.Buffer.shift();
        if (data) {
          self._write(data.chunk, data.cb, () => self.clearBuffer());
        } else { //此时说明缓冲区已无数据
          self.isWriting = false;
          self.emit('drain');
        }
      }
    }
    
    module.exports = WriteStream;
    

    测试文件:

    let WriteStream = require('./practice');
    let ws = new WriteStream('./1.txt',{
      flags:'w'
      ,mode:0o666
      ,start:0
      ,encoding:'utf8'
      ,autoClose:true //当流写完之后自动关闭文件
      ,highWaterMark:3
    });
    let n = 9;
    ws.on('error',(err)=>{
      console.log(err)
    })
    function write(){
      let flag = true;
      while(flag&&n>0){
        flag = ws.write(n+"",'utf8',()=>{
          console.log('ok');
        });
        n--;
        console.log('flag=',flag)
      }
      ws.once('drain',()=>{
        console.log('drain');
        write();
      })
    }
    // ws.on('drain',()=>{
    //   console.log('drain');
    //   write();
    // })
    write();
    
    

    参考资料:
    https://nodejs.org/dist/latest-v9.x/docs/api/stream.html#stream_writable_streams

    相关文章

      网友评论

          本文标题:简明writeStream实现

          本文链接:https://www.haomeiwen.com/subject/yskpaxtx.html