### Java与大数据资源分享
* Write an operation to the edit log.
* <p/>
* Additionally, this will sync the edit log if required by the underlying
* edit stream's automatic sync policy (e.g. when the buffer is full, or
* if a time interval has elapsed).
*首先看doc描述,此方法功能是向edit log中写一个operation信息。
*另外,如果底层的eidt stream的自动同步策略要求同步edit log的话,这个方法
*还会进行sync edit log
void logEdit(final FSEditLogOp op) {
boolean needsSync = false;
synchronized (this) {
assert isOpenForWrite() :
"bad state: " + state;
// wait if an automatic sync is scheduled
// check if it is time to schedule an automatic sync
needsSync = doEditTransaction(op);
if (needsSync) {
isAutoSyncScheduled = true;
// Sync the log if an automatic sync is required.
if (needsSync) {
//进行一次log sync
初始化needsSync为false;如果isAutoSyncScheduled为true,说明edit log的autoSync已经被调度了,则需要调用waitIfAutoSyncScheduled方法等待。
可以看到doEditTransaction方法中开启事务和结束事务中间做了把op写到editLog文件流的操作。也即调用了editLogStream.write(op)。write方法就不继续看了,就是往outputStream里写一些数据。最后return shouldForceSync(),这个方法的返回值会赋值给needSync变量,进而影响isAutoSyncScheduled的值。shouldForceSync()就是判断当前流的buffer size是不是大于初始的默认buffer大小,如果是就是返回true,表示缓冲区中数据够多了,可以sync了。
注释写的很清楚,bufCurrent是当前用来写的buffer,bufReady是用来flush到磁盘的buffer。使用双buffer的好处是,在进行同步到磁盘的操作时,不影响其他线程继续写到bufCurrent。双buffer的思想有点像JVM年轻代中的survivor 0区和survivor 1区的设计思想。不过需要注意的是,在更换缓冲区角色时,是要加锁的。
- 所有操作同步写入到内存中的EditLogOutputStream时,会被分配一个唯一的txid。
- 当一个线程要同步流中的内容到磁盘时,logSync会使用ThreadLocal类型的变量myTransactionId获取当前线程需要同步的txid。如果当前线程需要同步的txid大于已经同步完成的txid(editlog中的txid,利用synctxid变量记录),则说明当前线程的内容是新的,可以进行同步到磁盘操作。如果当前线程需要同步的txid小于已经同步完成的txid,说明当前线程需要同步的内容已经被同步过了,所以直接跳了(return)
- logSync中使用isSyncRunning表明当前是否有线程正在进行同步操作刷盘。isSyncRunning是个volatile boolean类型的变量。如果当前线程准备同步内容到磁盘时,发现这个变量为true时,就会wait。
* Sync all modifications done by this thread.
* The internal concurrency design of this class is as follows:
* - Log items are written synchronized into an in-memory buffer,
* and each assigned a transaction ID.
* - When a thread (client) would like to sync all of its edits, logSync()
* uses a ThreadLocal transaction ID to determine what edit number must
* be synced to.
* - The isSyncRunning volatile boolean tracks whether a sync is currently
* under progress.
* The data is double-buffered within each edit log implementation so that
* in-memory writing can occur in parallel with the on-disk writing.
* Each sync occurs in three steps:
* 1. synchronized, it swaps the double buffer and sets the isSyncRunning
* flag.
* 2. unsynchronized, it flushes the data to storage
* 3. synchronized, it resets the flag and notifies anyone waiting on the
* sync.
* The lack of synchronization on step 2 allows other threads to continue
* to write into the memory buffer while the sync is in progress.
* Because this step is unsynchronized, actions that need to avoid
* concurrency with sync() should be synchronized and also call
* waitForSyncToFinish() before assuming they are running alone.
public void logSync() {
// Fetch the transactionId of this thread.
// 传入当前线程ThreadLocal变量myTransactionId对象里面的txid字段。
protected void logSync(long mytxid) {
long syncStart = 0;
boolean sync = false;
long editsBatchedInSync = 0;
try {
EditLogOutputStream logStream = null;
synchronized (this) {
try {
// if somebody is already syncing, then wait
while (mytxid > synctxid && isSyncRunning) {
try {
} catch (InterruptedException ie) {
// If this transaction was already flushed, then nothing to do
if (mytxid <= synctxid) {
// now, this thread will do the sync. track if other edits were
// included in the sync - ie. batched. if this is the only edit
// synced then the batched count is 0
editsBatchedInSync = txid - synctxid - 1;
syncStart = txid;
isSyncRunning = true;
sync = true;
// swap buffers
try {
if (journalSet.isEmpty()) {
throw new IOException("No journals available to flush");
} catch (IOException e) {
final String msg =
"Could not sync enough journals to persistent storage " +
"due to " + e.getMessage() + ". " +
"Unsynced transactions: " + (txid - synctxid);
LOG.error(msg, new Exception());
synchronized(journalSetLock) {
IOUtils.cleanupWithLogger(LOG, journalSet);
terminate(1, msg);
} finally {
// Prevent RuntimeException from blocking other log edit write
//editLogStream may become null,
//so store a local variable for flush.
logStream = editLogStream;
// do the sync
long start = monotonicNow();
try {
if (logStream != null) {
} catch (IOException ex) {
synchronized (this) {
final String msg =
"Could not sync enough journals to persistent storage. "
+ "Unsynced transactions: " + (txid - synctxid);
LOG.error(msg, new Exception());
synchronized(journalSetLock) {
IOUtils.cleanupWithLogger(LOG, journalSet);
terminate(1, msg);
long elapsed = monotonicNow() - start;
if (metrics != null) { // Metrics non-null only when used inside name node
} finally {
// Prevent RuntimeException from blocking other log edit sync
synchronized (this) {
if (sync) {
//如果同步成功,更新synctxid(stores the last synced transactionId)
synctxid = syncStart;
for (JournalManager jm : journalSet.getJournalManagers()) {
* {@link FileJournalManager#lastReadableTxId} is only meaningful
* for file-based journals. Therefore the interface is not added to
* other types of {@link JournalManager}.
if (jm instanceof FileJournalManager) {
isSyncRunning = false;