美文网首页
Namenode写EditLog之logEdit与logSync

Namenode写EditLog之logEdit与logSync

作者: 小北觅 | 来源:发表于2020-08-31 09:19 被阅读0次

    分享一波大数据&Java的学习视频和书籍:
    ### Java与大数据资源分享

    namenode启动后加载完fsimage和editlog文件后,新的修改元数据的操作会被记录到editlog文件中,本文就来看看写editlog文件相关的逻辑。

    首先来看logEdit方法,传入的参数是FSEditLogOp类型的,代表一种操作类型,例如:DeleteOp、AddBlockOp、MkdirOp等等,这个跟RPC操作大致是对应的。

      /**
       * Write an operation to the edit log.
       * <p/>
       * Additionally, this will sync the edit log if required by the underlying
       * edit stream's automatic sync policy (e.g. when the buffer is full, or
       * if a time interval has elapsed).
       *首先看doc描述,此方法功能是向edit log中写一个operation信息。
       *另外,如果底层的eidt stream的自动同步策略要求同步edit log的话,这个方法 
       *还会进行sync edit log
       */
      void logEdit(final FSEditLogOp op) {
        boolean needsSync = false;
        synchronized (this) {
          //判断editlog的状态是否openForWrite
          assert isOpenForWrite() :
            "bad state: " + state;
          
          // wait if an automatic sync is scheduled
          waitIfAutoSyncScheduled();
    
          // check if it is time to schedule an automatic sync
          needsSync = doEditTransaction(op);
          if (needsSync) {
            //唯一一处给isAutoSyncScheduled赋值为true的地方,
            //赋值为true之后上面的waitIfAutoSyncScheduled()方法内就要调用wait方法了。
            isAutoSyncScheduled = true;
          }
        }
    
        // Sync the log if an automatic sync is required.
        if (needsSync) {
          //进行一次log sync
          logSync();
        }
      }
    

    再细节过一下logEdit方法逻辑:

    初始化needsSync为false;如果isAutoSyncScheduled为true,说明edit log的autoSync已经被调度了,则需要调用waitIfAutoSyncScheduled方法等待。

    waitIfAutoSyncScheduled方法代码如下:

    接着更新needSync变量,更新值是doEditTransaction方法的返回值。看doEditTransaction方法:

    可以看到doEditTransaction方法中开启事务和结束事务中间做了把op写到editLog文件流的操作。也即调用了editLogStream.write(op)。write方法就不继续看了,就是往outputStream里写一些数据。最后return shouldForceSync(),这个方法的返回值会赋值给needSync变量,进而影响isAutoSyncScheduled的值。shouldForceSync()就是判断当前流的buffer size是不是大于初始的默认buffer大小,如果是就是返回true,表示缓冲区中数据够多了,可以sync了。

    回到主线如果needSync被赋值为true,那么就更新isAutoSyncScheduled=true,然后调用logSync。因为后面我们调用了logSync方法,所以需要更新isAutoSyncScheduled变量。

    最后看一下logSync()方法,这个方法很重要。介绍这个方法之前我们了解一些前置知识:双缓冲区和logSync的同步策略。我们知道运行时,有多个线程会同时写editlog文件。而HDFS为了提高并发度,并没粗粒度的加锁同步,而是使用了双缓冲区和自己的同步策略。

    首先来看双缓冲区,双缓冲区是由类EditsDoubleBuffer实现的。

    注释写的很清楚,bufCurrent是当前用来写的buffer,bufReady是用来flush到磁盘的buffer。使用双buffer的好处是,在进行同步到磁盘的操作时,不影响其他线程继续写到bufCurrent。双buffer的思想有点像JVM年轻代中的survivor 0区和survivor 1区的设计思想。不过需要注意的是,在更换缓冲区角色时,是要加锁的。

    聊完双buffer,接着说一下logSync方法使用的同步策略吧,有三点

    1. 所有操作同步写入到内存中的EditLogOutputStream时,会被分配一个唯一的txid。
    2. 当一个线程要同步流中的内容到磁盘时,logSync会使用ThreadLocal类型的变量myTransactionId获取当前线程需要同步的txid。如果当前线程需要同步的txid大于已经同步完成的txid(editlog中的txid,利用synctxid变量记录),则说明当前线程的内容是新的,可以进行同步到磁盘操作。如果当前线程需要同步的txid小于已经同步完成的txid,说明当前线程需要同步的内容已经被同步过了,所以直接跳了(return)
    3. logSync中使用isSyncRunning表明当前是否有线程正在进行同步操作刷盘。isSyncRunning是个volatile boolean类型的变量。如果当前线程准备同步内容到磁盘时,发现这个变量为true时,就会wait。
      介绍完前置内容,下面进入logSync源码的介绍部分:
     /**
       * Sync all modifications done by this thread.
       *
       * The internal concurrency design of this class is as follows:
       *   - Log items are written synchronized into an in-memory buffer,
       *     and each assigned a transaction ID.
       *   - When a thread (client) would like to sync all of its edits, logSync()
       *     uses a ThreadLocal transaction ID to determine what edit number must
       *     be synced to.
       *   - The isSyncRunning volatile boolean tracks whether a sync is currently
       *     under progress.
       *
       * The data is double-buffered within each edit log implementation so that
       * in-memory writing can occur in parallel with the on-disk writing.
       *
       * Each sync occurs in three steps:
       *   1. synchronized, it swaps the double buffer and sets the isSyncRunning
       *      flag.
       *   2. unsynchronized, it flushes the data to storage
       *   3. synchronized, it resets the flag and notifies anyone waiting on the
       *      sync.
       *
       * The lack of synchronization on step 2 allows other threads to continue
       * to write into the memory buffer while the sync is in progress.
       * Because this step is unsynchronized, actions that need to avoid
       * concurrency with sync() should be synchronized and also call
       * waitForSyncToFinish() before assuming they are running alone.
       */
        //建议认真看下英文Doc,关于同步策略的描述
      public void logSync() {
        // Fetch the transactionId of this thread.
        // 传入当前线程ThreadLocal变量myTransactionId对象里面的txid字段。
        logSync(myTransactionId.get().txid);
      }
    

    追到带参数的logSync方法里:

    protected void logSync(long mytxid) {
        long syncStart = 0;
        boolean sync = false;
        //用于记录这次同步到磁盘多少条txid,会被metrics记录。
        long editsBatchedInSync = 0;
        try {
          EditLogOutputStream logStream = null;
          synchronized (this) {
            try {
              printStatistics(false);
    
              // if somebody is already syncing, then wait
              while (mytxid > synctxid && isSyncRunning) {
                try {
                  wait(1000);
                } catch (InterruptedException ie) {
                }
              }
      
              //
              // If this transaction was already flushed, then nothing to do
              //
              if (mytxid <= synctxid) {
                return;
              }
    
              // now, this thread will do the sync.  track if other edits were
              // included in the sync - ie. batched.  if this is the only edit
              // synced then the batched count is 0
              editsBatchedInSync = txid - synctxid - 1;
              //记录当前最高的txid
              syncStart = txid;
              //更新isSyncRunning为true,然后下面就开始互换双buffer
              isSyncRunning = true;
              sync = true;
    
              // swap buffers
              try {
                if (journalSet.isEmpty()) {
                  throw new IOException("No journals available to flush");
                }
                //将两个buffer互换,此时还在synchronized里
                editLogStream.setReadyToFlush();
              } catch (IOException e) {
                final String msg =
                    "Could not sync enough journals to persistent storage " +
                    "due to " + e.getMessage() + ". " +
                    "Unsynced transactions: " + (txid - synctxid);
                LOG.error(msg, new Exception());
                synchronized(journalSetLock) {
                  IOUtils.cleanupWithLogger(LOG, journalSet);
                }
                terminate(1, msg);
              }
            } finally {
              // Prevent RuntimeException from blocking other log edit write 
              doneWithAutoSyncScheduling();
            }
            //editLogStream may become null,
            //so store a local variable for flush.
            logStream = editLogStream;
          }
          
          // do the sync
          //这部分代码没加锁,因为操作的是写磁盘buffer,不影响写内存的buffer
          long start = monotonicNow();
          try {
            if (logStream != null) {
              //把内存中的editlog刷盘
              logStream.flush();
            }
          } catch (IOException ex) {
            synchronized (this) {
              final String msg =
                  "Could not sync enough journals to persistent storage. "
                  + "Unsynced transactions: " + (txid - synctxid);
              LOG.error(msg, new Exception());
              synchronized(journalSetLock) {
                IOUtils.cleanupWithLogger(LOG, journalSet);
              }
              terminate(1, msg);
            }
          }
          long elapsed = monotonicNow() - start;
      
          if (metrics != null) { // Metrics non-null only when used inside name node
            metrics.addSync(elapsed);
            metrics.incrTransactionsBatchedInSync(editsBatchedInSync);
            numTransactionsBatchedInSync.addAndGet(editsBatchedInSync);
          }
          
        } finally {
          // Prevent RuntimeException from blocking other log edit sync 
          synchronized (this) {
            if (sync) {
              //如果同步成功,更新synctxid(stores the last synced transactionId)
              synctxid = syncStart;
              for (JournalManager jm : journalSet.getJournalManagers()) {
                /**
                 * {@link FileJournalManager#lastReadableTxId} is only meaningful
                 * for file-based journals. Therefore the interface is not added to
                 * other types of {@link JournalManager}.
                 */
                if (jm instanceof FileJournalManager) {
                  ((FileJournalManager)jm).setLastReadableTxId(syncStart);
                }
              }
              isSyncRunning = false;
            }
            this.notifyAll();
         }
        }
      }
    

    <--END-->
    毁灭吧,赶紧的,累了。

    相关文章

      网友评论

          本文标题:Namenode写EditLog之logEdit与logSync

          本文链接:https://www.haomeiwen.com/subject/dbmdsktx.html