分享一波大数据&Java的学习视频和书籍
### Java与大数据资源分享links.jianshu.com
今儿个简单过一下HDFS的delete rpc的逻辑,一起来看看delete操作的实现逻辑。
并未进行整个流程总结,留待后续理解更深时总结。
看一下NameNodeRpcServer类中delete的实现。org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer#delete
@Override // ClientProtocol
public boolean delete(String src, boolean recursive) throws IOException {
checkNNStartup(); //检查NN启动状态
if (stateChangeLog.isDebugEnabled()) {
stateChangeLog.debug("*DIR* Namenode.delete: src=" + src
+ ", recursive=" + recursive);
}
namesystem.checkOperation(OperationCategory.WRITE); //Standby节点不能进行写操作
//RetryCache用来缓存上次处理过的重试请求(通过client id + Call id唯一标识)
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return true; // Return previous response
}
boolean ret = false;
try {
//调用FSNamesystem#delete方法
ret = namesystem.delete(src, recursive, cacheEntry != null);
} finally {
//保存到RetryCache中
RetryCache.setState(cacheEntry, ret);
}
//增加相关metric。
if (ret)
metrics.incrDeleteFileOps();
return ret;
}
继续往下看FSNamesystem#delete方法:
/**
* Remove the indicated file from namespace.
*
* @see ClientProtocol#delete(String, boolean) for detailed description and
* description of exceptions
*/
boolean delete(String src, boolean recursive, boolean logRetryCache)
throws IOException {
final String operationName = "delete";
BlocksMapUpdateInfo toRemovedBlocks = null;
checkOperation(OperationCategory.WRITE);
final FSPermissionChecker pc = getPermissionChecker();
writeLock();
boolean ret = false;
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot delete " + src);
//获得将要移除的Blocks
toRemovedBlocks = FSDirDeleteOp.delete(
this, pc, src, recursive, logRetryCache);
ret = toRemovedBlocks != null;
} catch (AccessControlException e) {
logAuditEvent(false, operationName, src);
throw e;
} finally {
writeUnlock(operationName);
}
getEditLog().logSync();
if (toRemovedBlocks != null) {
// 真正删除block的地方
removeBlocks(toRemovedBlocks); // Incremental deletion of blocks
}
logAuditEvent(true, operationName, src); //记录审计日志
return ret;
}
我们继续看removeBlocks方法:
/**
* From the given list, incrementally remove the blocks from blockManager
* Writelock is dropped and reacquired every BLOCK_DELETION_INCREMENT to
* ensure that other waiters on the lock can get in. See HDFS-2938
*
* 给定要删除的block的list,从blockManager中增量移除blocks。
* 写锁会被释放,然后每删除BLOCK_DELETION_INCREMENT个block时再重新获取一次写锁,这样确保其他等待写锁的线程能够得到执行。
*
* @param blocks
* An instance of {@link BlocksMapUpdateInfo} which contains a list
* of blocks that need to be removed from blocksMap
*/
void removeBlocks(BlocksMapUpdateInfo blocks) {
// 得到要删除的Block List; TODO
List<BlockInfo> toDeleteList = blocks.getToDeleteList();
Iterator<BlockInfo> iter = toDeleteList.iterator();
// 循环List,Remove Block
while (iter.hasNext()) {
writeLock();
try {
for (int i = 0; i < blockDeletionIncrement && iter.hasNext(); i++) {
blockManager.removeBlock(iter.next());
}
} finally {
writeUnlock("removeBlocks");
}
}
}
接着我们就进到了BlockManager#removeBlock方法:
public void removeBlock(BlockInfo block) {
assert namesystem.hasWriteLock();
// No need to ACK blocks that are being removed entirely
// from the namespace, since the removal of the associated
// file already removes them from the block map below.
block.setNumBytes(BlockCommand.NO_ACK);
addToInvalidates(block);
removeBlockFromMap(block);
// Remove the block from pendingReconstruction and neededReconstruction
// pendingReconstruction是已经生成复制指令,待发送给DN的block队列
// neededReconstruction是准备生成复制指令的block队列。
PendingBlockInfo remove = pendingReconstruction.remove(block);
if (remove != null) {
DatanodeStorageInfo.decrementBlocksScheduled(remove.getTargets()
.toArray(new DatanodeStorageInfo[remove.getTargets().size()]));
}
neededReconstruction.remove(block, LowRedundancyBlocks.LEVEL);
// 从推迟处理的block队列里删除块信息
postponedMisreplicatedBlocks.remove(block);
}
这个方法主要是将block信息加入到块删除队列(invalidates)。然后从块关系map中移除此块的信息,并从pending和needed队列中移除关于此块的信息,避免无用复制。
下面看一下BlockManager#addToInvalidates方法:
/**
* Adds block to list of blocks which will be invalidated on all its
* datanodes.
*/
private void addToInvalidates(BlockInfo storedBlock) {
if (!isPopulatingReplQueues()) {
return;
}
StringBuilder datanodes = blockLog.isDebugEnabled()
? new StringBuilder() : null;
for (DatanodeStorageInfo storage : blocksMap.getStorages(storedBlock)) {
if (storage.getState() != State.NORMAL) {
continue;
}
final DatanodeDescriptor node = storage.getDatanodeDescriptor();
final Block b = getBlockOnStorage(storedBlock, storage);
if (b != null) {
// 在删除队列中添加 (块信息,datanode信息)
invalidateBlocks.add(b, node, false);
if (datanodes != null) {
datanodes.append(node).append(" ");
}
}
}
if (datanodes != null && datanodes.length() != 0) {
blockLog.debug("BLOCK* addToInvalidates: {} {}", storedBlock, datanodes);
}
}
接着走进org.apache.hadoop.hdfs.server.blockmanagement.InvalidateBlocks#add
方法:
这是一个synchronized方法。
接着我们看putBlocksSet方法:
就很简单了,就是判断一下是EC块还是普通块,然后给put到set中,底层调用的是Map接口的put方法(因为hashset的底层实现就是map)
添加到InvalidateBlocks队列后,就等着定时线程来读取然后下发删除指令给Datanode了。
网友评论