LordWar 2013-05-13
Hadoop的一致性模型:一次写,多次读。当一个文件被创建-写入-关闭之后,只能执行append操作。任意时刻只运行一个client对一个文件进行write,却可以允许多个client对一个文件进行append操作(类似于线程的并发)。lease是为了处理wirte操作的同步问题而使用的。
hdfs支持write-once-read-many,也就是说不支持并行写,那么对读写的互斥同步就是靠Lease实现的。Lease是一个有时间约束的锁。客户端写文件时需要先申请一个Lease,对应到namenode中的LeaseManager,客户端的client name就作为一个lease的holder,即租约持有者。LeaseManager维护了文件的path与lease的对应关系,还有clientname->lease的对应关系。LeaseManager中有两个时间限制:softLimit and hardLimit。
软限制就是写文件时规定的租约超时时间,硬限制则是考虑到文件close时未来得及释放lease的情况强制回收租约。
软租约的超时检测则在DFSClient的LeaseChecker中进行。
当客户端(DFSClient)create一个文件的时候,会通过RPC 调用 namenode 的create方法来创建文件(在DFSOutputStream构造函数)。进而又调用FSNameSystem的startFile方法,又调用 LeaseManager 的addLease方法为新创建的文件添加一个lease。如果lease已存在,则更新该lease的lastUpdate (最近更新时间)值,并将该文件的path对应到该lease上。之后DFSClient 将该文件的path 添加 LeaseChecker中。文件创建成功后,守护线程LeaseChecker会每隔一定时间间隔调用namenode.renewLease(clientName)刷新NN上此DFSClient的租约。
软限制超时之后,会跟硬限制超时一样,调用internalReleaseLease进行处理。
LeaseManager中还有一个Monitor线程来检测Lease是否超过hardLimit。租约硬限制(1小时)的情况下NameNode发生租约恢复的过程:
LeaseManager的监视线程Monitor定时调用checkLeases检查当前租约列表中最早的文件有没租约过期,如果过期了就调用FSNamesystem.internalReleaseLease对此文件进行租约释放:
首先找此文件的InodeFileUnderConstruction,然后找到此文件的最后一个块和此块所在的所有DN,然后调用assignPrimaryDatanode在这些DN中选出一个DN进行块恢复,将此块加入到选出的DN的recoverBlocks中。
NN在接收到DN发过来的心跳时,会检查该DN的recoverBlocks上是否有需要进行恢复的块。有的话就给DN回送一个类型为DatanodeProtocol.DNA_RECOVERBLOCK的DatanodeCommand,并附上要<恢复的块,块所在的Datanode集>
2、DN通过RPC机制调用DatanodeCommand[] cmds = namenode.sendHeartbeat(...)得到DatanodeCommand后,在processCommand里调用recoverBlocks(bcmd.getBlocks(), bcmd.getTargets());对这些指令进行处理。
如果是DNA_RECOVERBLOCK消息,就启动一个recoverBlocks线程处理块进行恢复。
线程是通过调用recoverBlock方法对块进行恢复:
首先查看此块是否在ongoingRecovery内,如果在表示已经有在对此块进行恢复,直接返回。否则将这个块插入到ongoingRecovery内。然后找出块所在的所有DN中,时间戳比此块新的DN,将这些DN放到syncList中。查找的过程中也记录最小的块的长度。如果不保存原块的长度(keepLength=false)就将要同步的块的长度设为这个最小的长度block.setNumBytes(minlength),然后调用syncBlock对syncList上的每个DN更新此块。
private LocatedBlock recoverBlock(Block block, boolean keepLength,
DatanodeID[] datanodeids, boolean closeFile) throws IOException {
// If the block is already being recovered, then skip recovering it.
// This can happen if the namenode and client start recovering the same
// file at the same time.
synchronized (ongoingRecovery) {
Block tmp = new Block();
tmp.set(block.getBlockId(), block.getNumBytes(), GenerationStamp.WILDCARD_STAMP);
if (ongoingRecovery.get(tmp) != null) {//此要恢复的块已经在ongoingRecovery上,即已经处于恢复过程
String msg = "Block " + block + " is already being recovered, " +
" ignoring this request to recover it.";
LOG.info(msg);
throw new IOException(msg);
}
ongoingRecovery.put(block, block);//没在ongoingRecovery中,就put进去
}
try {
List<BlockRecord> syncList = new ArrayList<BlockRecord>();//建立需要sync块的节点列表
long minlength = Long.MAX_VALUE;
int errorCount = 0;
//check generation stamps
for(DatanodeID id : datanodeids) {//遍历该块所在的DNs,找出
try {
//获取到DatanodeId的协议,这是一个D-D间的协议
InterDatanodeProtocol datanode = dnRegistration.equals(id)/*如果此id就是本Datanode*/?
this: DataNode.createInterDataNodeProtocolProxy(id, getConf());
BlockMetaDataInfo info = datanode.getBlockMetaDataInfo(block);//获得该block在DN上的元数据信息
if (info != null && info.getGenerationStamp() >= block.getGenerationStamp()) {//这个块更新
if (keepLength) {//块长度保持不变
if (info.getNumBytes() == block.getNumBytes()) {
syncList.add(new BlockRecord(id, datanode, new Block(info)));
}
}
else {//块长度可变
syncList.add(new BlockRecord(id, datanode, new Block(info)));//将这个块加到要同步的列表syncList中去
if (info.getNumBytes() < minlength) {
minlength = info.getNumBytes();//更新这些副本的最短长度
}
}
}
} catch (IOException e) {
++errorCount;
InterDatanodeProtocol.LOG.warn(
"Failed to getBlockMetaDataInfo for block (=" + block
+ ") from datanode (=" + id + ")", e);
}
}
if (syncList.isEmpty() && errorCount > 0) {//这些DN上都没有需要更新的副本
throw new IOException("All datanodes failed: block=" + block
+ ", datanodeids=" + Arrays.asList(datanodeids));
}
if (!keepLength) {
block.setNumBytes(minlength);//块大小设为所有副本中最小的
}
return syncBlock(block, syncList, closeFile);//进行同步更新syncList上的DN
} finally {
synchronized (ongoingRecovery) {
ongoingRecovery.remove(block);
}
}
}