HDFS上块恢复机制

LordWar 2013-05-13

Hadoop的一致性模型:一次写,多次读。当一个文件被创建-写入-关闭之后,只能执行append操作。任意时刻只运行一个client对一个文件进行write,却可以允许多个client对一个文件进行append操作(类似于线程的并发)。lease是为了处理wirte操作的同步问题而使用的。
 hdfs支持write-once-read-many,也就是说不支持并行写,那么对读写的互斥同步就是靠Lease实现的。Lease是一个有时间约束的锁。客户端写文件时需要先申请一个Lease,对应到namenode中的LeaseManager,客户端的client name就作为一个lease的holder,即租约持有者。LeaseManager维护了文件的path与lease的对应关系,还有clientname->lease的对应关系。LeaseManager中有两个时间限制:softLimit and hardLimit。

 软限制就是写文件时规定的租约超时时间,硬限制则是考虑到文件close时未来得及释放lease的情况强制回收租约。
 软租约的超时检测则在DFSClient的LeaseChecker中进行。
 

当客户端(DFSClient)create一个文件的时候,会通过RPC 调用 namenode 的create方法来创建文件(在DFSOutputStream构造函数)。进而又调用FSNameSystem的startFile方法,又调用 LeaseManager 的addLease方法为新创建的文件添加一个lease。如果lease已存在,则更新该lease的lastUpdate (最近更新时间)值,并将该文件的path对应到该lease上。之后DFSClient 将该文件的path 添加 LeaseChecker中。文件创建成功后,守护线程LeaseChecker会每隔一定时间间隔调用namenode.renewLease(clientName)刷新NN上此DFSClient的租约。
 软限制超时之后,会跟硬限制超时一样,调用internalReleaseLease进行处理。


LeaseManager中还有一个Monitor线程来检测Lease是否超过hardLimit。租约硬限制(1小时)的情况下NameNode发生租约恢复的过程:
 LeaseManager的监视线程Monitor定时调用checkLeases检查当前租约列表中最早的文件有没租约过期,如果过期了就调用FSNamesystem.internalReleaseLease对此文件进行租约释放:
 首先找此文件的InodeFileUnderConstruction,然后找到此文件的最后一个块和此块所在的所有DN,然后调用assignPrimaryDatanode在这些DN中选出一个DN进行块恢复,将此块加入到选出的DN的recoverBlocks中。
 NN在接收到DN发过来的心跳时,会检查该DN的recoverBlocks上是否有需要进行恢复的块。有的话就给DN回送一个类型为DatanodeProtocol.DNA_RECOVERBLOCK的DatanodeCommand,并附上要<恢复的块,块所在的Datanode集>
 2、DN通过RPC机制调用DatanodeCommand[] cmds = namenode.sendHeartbeat(...)得到DatanodeCommand后,在processCommand里调用recoverBlocks(bcmd.getBlocks(), bcmd.getTargets());对这些指令进行处理。
 如果是DNA_RECOVERBLOCK消息,就启动一个recoverBlocks线程处理块进行恢复。
 线程是通过调用recoverBlock方法对块进行恢复:
 首先查看此块是否在ongoingRecovery内,如果在表示已经有在对此块进行恢复,直接返回。否则将这个块插入到ongoingRecovery内。然后找出块所在的所有DN中,时间戳比此块新的DN,将这些DN放到syncList中。查找的过程中也记录最小的块的长度。如果不保存原块的长度(keepLength=false)就将要同步的块的长度设为这个最小的长度block.setNumBytes(minlength),然后调用syncBlock对syncList上的每个DN更新此块。

 private LocatedBlock recoverBlock(Block block, boolean keepLength,
      DatanodeID[] datanodeids, boolean closeFile) throws IOException {
    // If the block is already being recovered, then skip recovering it.
    // This can happen if the namenode and client start recovering the same
    // file at the same time.
    synchronized (ongoingRecovery) {
      Block tmp = new Block();
      tmp.set(block.getBlockId(), block.getNumBytes(), GenerationStamp.WILDCARD_STAMP);
      if (ongoingRecovery.get(tmp) != null) {//此要恢复的块已经在ongoingRecovery上,即已经处于恢复过程
        String msg = "Block " + block + " is already being recovered, " +
                    " ignoring this request to recover it.";
        LOG.info(msg);
        throw new IOException(msg);
      }
      ongoingRecovery.put(block, block);//没在ongoingRecovery中,就put进去
    }
    try {
      List<BlockRecord> syncList = new ArrayList<BlockRecord>();//建立需要sync块的节点列表
      long minlength = Long.MAX_VALUE;
      int errorCount = 0;


      //check generation stamps
      for(DatanodeID id : datanodeids) {//遍历该块所在的DNs,找出
        try {
         //获取到DatanodeId的协议,这是一个D-D间的协议
          InterDatanodeProtocol datanode = dnRegistration.equals(id)/*如果此id就是本Datanode*/?
              this: DataNode.createInterDataNodeProtocolProxy(id, getConf());
          BlockMetaDataInfo info = datanode.getBlockMetaDataInfo(block);//获得该block在DN上的元数据信息
          if (info != null && info.getGenerationStamp() >= block.getGenerationStamp()) {//这个块更新
            if (keepLength) {//块长度保持不变
              if (info.getNumBytes() == block.getNumBytes()) {
                syncList.add(new BlockRecord(id, datanode, new Block(info)));
              }
            }
            else {//块长度可变
              syncList.add(new BlockRecord(id, datanode, new Block(info)));//将这个块加到要同步的列表syncList中去
              if (info.getNumBytes() < minlength) {
                minlength = info.getNumBytes();//更新这些副本的最短长度
              }
            }
          }
        } catch (IOException e) {
          ++errorCount;
          InterDatanodeProtocol.LOG.warn(
              "Failed to getBlockMetaDataInfo for block (=" + block
              + ") from datanode (=" + id + ")", e);
        }
      }


      if (syncList.isEmpty() && errorCount > 0) {//这些DN上都没有需要更新的副本
        throw new IOException("All datanodes failed: block=" + block
            + ", datanodeids=" + Arrays.asList(datanodeids));
      }
      if (!keepLength) {
        block.setNumBytes(minlength);//块大小设为所有副本中最小的
      }
      return syncBlock(block, syncList, closeFile);//进行同步更新syncList上的DN
    } finally {
      synchronized (ongoingRecovery) {
        ongoingRecovery.remove(block);
      }
    }
  }