分享一波大数据&Java的学习视频和书籍:
### Java与大数据资源分享
namenode启动后加载完fsimage和editlog文件后,新的修改元数据的操作会被记录到editlog文件中,本文就来看看写editlog文件相关的逻辑。
首先来看logEdit方法,传入的参数是FSEditLogOp类型的,代表一种操作类型,例如:DeleteOp、AddBlockOp、MkdirOp等等,这个跟RPC操作大致是对应的。
/**
* Write an operation to the edit log.
* <p/>
* Additionally, this will sync the edit log if required by the underlying
* edit stream's automatic sync policy (e.g. when the buffer is full, or
* if a time interval has elapsed).
*首先看doc描述,此方法功能是向edit log中写一个operation信息。
*另外,如果底层的eidt stream的自动同步策略要求同步edit log的话,这个方法
*还会进行sync edit log
*/
void logEdit(final FSEditLogOp op) {
boolean needsSync = false;
synchronized (this) {
//判断editlog的状态是否openForWrite
assert isOpenForWrite() :
"bad state: " + state;
// wait if an automatic sync is scheduled
waitIfAutoSyncScheduled();
// check if it is time to schedule an automatic sync
needsSync = doEditTransaction(op);
if (needsSync) {
//唯一一处给isAutoSyncScheduled赋值为true的地方,
//赋值为true之后上面的waitIfAutoSyncScheduled()方法内就要调用wait方法了。
isAutoSyncScheduled = true;
}
}
// Sync the log if an automatic sync is required.
if (needsSync) {
//进行一次log sync
logSync();
}
}
再细节过一下logEdit方法逻辑:
初始化needsSync为false;如果isAutoSyncScheduled为true,说明edit log的autoSync已经被调度了,则需要调用waitIfAutoSyncScheduled方法等待。
waitIfAutoSyncScheduled方法代码如下:
![](https://img.haomeiwen.com/i5679451/abcad34c1ea7d21b.png)
接着更新needSync变量,更新值是doEditTransaction方法的返回值。看doEditTransaction方法:
![](https://img.haomeiwen.com/i5679451/3f08107b6bc49cb3.png)
可以看到doEditTransaction方法中开启事务和结束事务中间做了把op写到editLog文件流的操作。也即调用了editLogStream.write(op)。write方法就不继续看了,就是往outputStream里写一些数据。最后return shouldForceSync(),这个方法的返回值会赋值给needSync变量,进而影响isAutoSyncScheduled的值。shouldForceSync()就是判断当前流的buffer size是不是大于初始的默认buffer大小,如果是就是返回true,表示缓冲区中数据够多了,可以sync了。
回到主线如果needSync被赋值为true,那么就更新isAutoSyncScheduled=true,然后调用logSync。因为后面我们调用了logSync方法,所以需要更新isAutoSyncScheduled变量。
最后看一下logSync()方法,这个方法很重要。介绍这个方法之前我们了解一些前置知识:双缓冲区和logSync的同步策略。我们知道运行时,有多个线程会同时写editlog文件。而HDFS为了提高并发度,并没粗粒度的加锁同步,而是使用了双缓冲区和自己的同步策略。
首先来看双缓冲区,双缓冲区是由类EditsDoubleBuffer实现的。
![](https://img.haomeiwen.com/i5679451/8336ef59ff76ca97.png)
注释写的很清楚,bufCurrent是当前用来写的buffer,bufReady是用来flush到磁盘的buffer。使用双buffer的好处是,在进行同步到磁盘的操作时,不影响其他线程继续写到bufCurrent。双buffer的思想有点像JVM年轻代中的survivor 0区和survivor 1区的设计思想。不过需要注意的是,在更换缓冲区角色时,是要加锁的。
聊完双buffer,接着说一下logSync方法使用的同步策略吧,有三点
- 所有操作同步写入到内存中的EditLogOutputStream时,会被分配一个唯一的txid。
- 当一个线程要同步流中的内容到磁盘时,logSync会使用ThreadLocal类型的变量myTransactionId获取当前线程需要同步的txid。如果当前线程需要同步的txid大于已经同步完成的txid(editlog中的txid,利用synctxid变量记录),则说明当前线程的内容是新的,可以进行同步到磁盘操作。如果当前线程需要同步的txid小于已经同步完成的txid,说明当前线程需要同步的内容已经被同步过了,所以直接跳了(return)
- logSync中使用isSyncRunning表明当前是否有线程正在进行同步操作刷盘。isSyncRunning是个volatile boolean类型的变量。如果当前线程准备同步内容到磁盘时,发现这个变量为true时,就会wait。
介绍完前置内容,下面进入logSync源码的介绍部分:
/**
* Sync all modifications done by this thread.
*
* The internal concurrency design of this class is as follows:
* - Log items are written synchronized into an in-memory buffer,
* and each assigned a transaction ID.
* - When a thread (client) would like to sync all of its edits, logSync()
* uses a ThreadLocal transaction ID to determine what edit number must
* be synced to.
* - The isSyncRunning volatile boolean tracks whether a sync is currently
* under progress.
*
* The data is double-buffered within each edit log implementation so that
* in-memory writing can occur in parallel with the on-disk writing.
*
* Each sync occurs in three steps:
* 1. synchronized, it swaps the double buffer and sets the isSyncRunning
* flag.
* 2. unsynchronized, it flushes the data to storage
* 3. synchronized, it resets the flag and notifies anyone waiting on the
* sync.
*
* The lack of synchronization on step 2 allows other threads to continue
* to write into the memory buffer while the sync is in progress.
* Because this step is unsynchronized, actions that need to avoid
* concurrency with sync() should be synchronized and also call
* waitForSyncToFinish() before assuming they are running alone.
*/
//建议认真看下英文Doc,关于同步策略的描述
public void logSync() {
// Fetch the transactionId of this thread.
// 传入当前线程ThreadLocal变量myTransactionId对象里面的txid字段。
logSync(myTransactionId.get().txid);
}
追到带参数的logSync方法里:
protected void logSync(long mytxid) {
long syncStart = 0;
boolean sync = false;
//用于记录这次同步到磁盘多少条txid,会被metrics记录。
long editsBatchedInSync = 0;
try {
EditLogOutputStream logStream = null;
synchronized (this) {
try {
printStatistics(false);
// if somebody is already syncing, then wait
while (mytxid > synctxid && isSyncRunning) {
try {
wait(1000);
} catch (InterruptedException ie) {
}
}
//
// If this transaction was already flushed, then nothing to do
//
if (mytxid <= synctxid) {
return;
}
// now, this thread will do the sync. track if other edits were
// included in the sync - ie. batched. if this is the only edit
// synced then the batched count is 0
editsBatchedInSync = txid - synctxid - 1;
//记录当前最高的txid
syncStart = txid;
//更新isSyncRunning为true,然后下面就开始互换双buffer
isSyncRunning = true;
sync = true;
// swap buffers
try {
if (journalSet.isEmpty()) {
throw new IOException("No journals available to flush");
}
//将两个buffer互换,此时还在synchronized里
editLogStream.setReadyToFlush();
} catch (IOException e) {
final String msg =
"Could not sync enough journals to persistent storage " +
"due to " + e.getMessage() + ". " +
"Unsynced transactions: " + (txid - synctxid);
LOG.error(msg, new Exception());
synchronized(journalSetLock) {
IOUtils.cleanupWithLogger(LOG, journalSet);
}
terminate(1, msg);
}
} finally {
// Prevent RuntimeException from blocking other log edit write
doneWithAutoSyncScheduling();
}
//editLogStream may become null,
//so store a local variable for flush.
logStream = editLogStream;
}
// do the sync
//这部分代码没加锁,因为操作的是写磁盘buffer,不影响写内存的buffer
long start = monotonicNow();
try {
if (logStream != null) {
//把内存中的editlog刷盘
logStream.flush();
}
} catch (IOException ex) {
synchronized (this) {
final String msg =
"Could not sync enough journals to persistent storage. "
+ "Unsynced transactions: " + (txid - synctxid);
LOG.error(msg, new Exception());
synchronized(journalSetLock) {
IOUtils.cleanupWithLogger(LOG, journalSet);
}
terminate(1, msg);
}
}
long elapsed = monotonicNow() - start;
if (metrics != null) { // Metrics non-null only when used inside name node
metrics.addSync(elapsed);
metrics.incrTransactionsBatchedInSync(editsBatchedInSync);
numTransactionsBatchedInSync.addAndGet(editsBatchedInSync);
}
} finally {
// Prevent RuntimeException from blocking other log edit sync
synchronized (this) {
if (sync) {
//如果同步成功,更新synctxid(stores the last synced transactionId)
synctxid = syncStart;
for (JournalManager jm : journalSet.getJournalManagers()) {
/**
* {@link FileJournalManager#lastReadableTxId} is only meaningful
* for file-based journals. Therefore the interface is not added to
* other types of {@link JournalManager}.
*/
if (jm instanceof FileJournalManager) {
((FileJournalManager)jm).setLastReadableTxId(syncStart);
}
}
isSyncRunning = false;
}
this.notifyAll();
}
}
}
<--END-->
毁灭吧,赶紧的,累了。
网友评论