美文网首页
hdfs写之DataStreamer守护线程<三>

hdfs写之DataStreamer守护线程<三>

作者: 古语1 | 来源:发表于2019-10-10 11:07 被阅读0次

    一、流程图

    image.png
    • client和namenode之前的rpc协议都是ClientProtocol。
    • locateFollowingBlock主要是通过rpc向namenode分配新的block,并且返回哪些datanode可以写。
    • client通过createBlockOutputStream向第一个datanode发起writeBlock请求。
    • pipeline中如果datanode出现了写错误,满足某种策略后可以添加新的datanode,并将之前的写成功的datatnode数据通过transfer复制到新添加的节点。并且通过updateBlockForPipeline方法向namenode请求更新block时间戳,这样datanode会清除过期的block。
    • pipeline中如果多个副本datanode写成功,比如三个副本以此写client->dn1->dn2->dn3,dn3发起ack给dn2,以此给client。
    • pipeline如何建立(比如三个副本):
      1. 客户端向第一个发送scoket请求writeBlock,并同步接收第一个dn1的DataXceiver.replyOut返回结果。
      2. 然后dn1会继续向dn2发送scoket请求writeBlock,并同步接收第一个dn2的DataXceiver.replyOut返回结果。
      3. dn2会继续向dn3发送scoket请求writeBlock,并同步接收第一个dn3的DataXceiver.replyOut返回结果。
      4. 最后可以调用blockReceiver.receiveBlock来接收pipeline中的packet数据包。

    二、DataStreamer构造方法

    1、构造方法定义

    该构造方法中定义了chunk使用空间,如果有空闲空间就会appendChunk设置true会执行resetChecksumBufSize方法,并且重新计算每个packet chunk 大小。

        /**
         * Construct a data streamer for appending to the last partial block
         * @param lastBlock last block of the file to be appended
         * @param stat status of the file to be appended
         * @param bytesPerChecksum number of bytes per checksum
         * @throws IOException if error occurs
         */
        private DataStreamer(LocatedBlock lastBlock, HdfsFileStatus stat,
            int bytesPerChecksum) throws IOException {
          isAppend = true;
          stage = BlockConstructionStage.PIPELINE_SETUP_APPEND;
          block = lastBlock.getBlock();
          bytesSent = block.getNumBytes();
          accessToken = lastBlock.getBlockToken();
          isLazyPersistFile = isLazyPersist(stat);
          long usedInLastBlock = stat.getLen() % blockSize;
          int freeInLastBlock = (int)(blockSize - usedInLastBlock);
    
          // calculate the amount of free space in the pre-existing 
          // last crc chunk
          //  计算chunk数据块中,已经使用的空间大小
          int usedInCksum = (int)(stat.getLen() % bytesPerChecksum);
          //计算chunk数据块中空闲的空间的大小
          int freeInCksum = bytesPerChecksum - usedInCksum;
    
          // if there is space in the last block, then we have to 
          // append to that block
          if (freeInLastBlock == blockSize) {
            throw new IOException("The last block for file " + 
                src + " is full.");
          }
          //如果有chunk没有满则 appendChunk = true;
          if (usedInCksum > 0 && freeInCksum > 0) {
            // if there is space in the last partial chunk, then 
            // setup in such a way that the next packet will have only 
            // one chunk that fills up the partial chunk.
            //
            computePacketChunkSize(0, freeInCksum);
            setChecksumBufSize(freeInCksum);
            appendChunk = true;
          } else {
            // if the remaining space in the block is smaller than 
            // that expected size of of a packet, then create 
            // smaller size packet.
            //
            computePacketChunkSize(Math.min(dfsClient.getConf().writePacketSize, freeInLastBlock), 
                bytesPerChecksum);
          }
    
          // setup pipeline to append to the last block XXX retries??
          setPipeline(lastBlock);
          errorIndex = -1;   // no errors yet.
          if (nodes.length < 1) {
            throw new IOException("Unable to retrieve blocks locations " +
                " for last block " + block +
                "of file " + src);
    
          }
        }
    
    2、DataStreamer.run方法实现
    • 如果dataQueue队列为空或者没有超时等条件会线程等待客户端发送数据到dataQueue中。
    • 如果dataQueue为空,则创建一个空的心跳packet(猜想和下游datanode节点做心跳检查作用)。
    • 如果dataQueue不为空,则从队列中取出一个packet。
    • 当管道状态处于写文件初始化状态PIPELINE_SETUP_CREATE时候,调用nextBlockOutputStream方法请求namenode需要写哪些datanode返回以及让namenode分配新的block并且持久化到editlog中。
    • 调用initDataStreaming方法启动ResponseProcessor守护线程,处理ack请求。
    • 如果是最后一个packet (isLastPacketInBlock),说明该block已经写满了,可以在ResponseProcessor线程中返回ack了,但是这里等待1秒钟来确认ack。此时可以修改pipline状态PIPELINE_CLOSE,说名这个block已经写完了。
    • 如果没有异常并且不是心跳packet,则将dataQueue中packet移除到ackQueue中。然后开始写数据 writeTo(blockStream)。
        /* DataStreamer
         * streamer thread is the only thread that opens streams to datanode, 
         * and closes them. Any error recovery is also done by this thread.
         */
        @Override
        public void run() {
          long lastPacket = Time.monotonicNow();
          TraceScope scope = NullScope.INSTANCE;
          while (!streamerClosed && dfsClient.clientRunning) {
            // if the Responder encountered an error, shutdown Responder
            if (hasError && response != null) {
              try {
                //response也是个守护线程,处理datanode写响应结果response
                response.close();
                response.join();
                response = null;
              } catch (InterruptedException  e) {
                DFSClient.LOG.warn("Caught exception ", e);
              }
            }
    
            DFSPacket one;
            try {
              // process datanode IO errors if any
              boolean doSleep = false;
              //写datanode有异常或者datanode重启
              if (hasError && (errorIndex >= 0 || restartingNodeIndex.get() >= 0)) {
                //如果出现异常将数据packet从askqueue中移到dataqueue中
                //其中setupPipelineForAppendOrRecovery会重新更新pipline状态,如果有异常的datanode,会从nodes[]输出删除
                doSleep = processDatanodeError();
              }
    
              synchronized (dataQueue) {
                // wait for a packet to be sent.
                //等待客户端发送数据
                //socketTimeout默认超时60秒
                long now = Time.monotonicNow();
                while ((!streamerClosed && !hasError && dfsClient.clientRunning 
                    && dataQueue.size() == 0 && 
                    (stage != BlockConstructionStage.DATA_STREAMING || 
                     stage == BlockConstructionStage.DATA_STREAMING && 
                     now - lastPacket < dfsClient.getConf().socketTimeout/2)) || doSleep ) {
                  long timeout = dfsClient.getConf().socketTimeout/2 - (now-lastPacket);
                  timeout = timeout <= 0 ? 1000 : timeout;
                  timeout = (stage == BlockConstructionStage.DATA_STREAMING)?
                     timeout : 1000;
                  try {
                    dataQueue.wait(timeout);
                  } catch (InterruptedException  e) {
                    DFSClient.LOG.warn("Caught exception ", e);
                  }
                  doSleep = false;
                  now = Time.monotonicNow();
                }
                if (streamerClosed || hasError || !dfsClient.clientRunning) {
                  continue;
                }
                // get packet to be sent.
                //如果队列为空,为什么要发送一个空的心跳packet
                if (dataQueue.isEmpty()) {
                  one = createHeartbeatPacket();
                  assert one != null;
                } else {
                  //从队列中取出packet
                  one = dataQueue.getFirst(); // regular data packet
                  long parents[] = one.getTraceParents();
                  if (parents.length > 0) {
                    scope = Trace.startSpan("dataStreamer", new TraceInfo(0, parents[0]));
                    // TODO: use setParents API once it's available from HTrace 3.2
    //                scope = Trace.startSpan("dataStreamer", Sampler.ALWAYS);
    //                scope.getSpan().setParents(parents);
                  }
                }
              }
    
              // get new block from namenode.
              if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
                if(DFSClient.LOG.isDebugEnabled()) {
                  DFSClient.LOG.debug("Allocating new block");
                }
                //nextBlockOutputStream方法向namenode请求要写入的哪些datanodes和分配新的block位置并且写入editlog中,
                //第一个datanode写block失败抛IOException异常
                setPipeline(nextBlockOutputStream());
                //启动ResponseProcessor线程处理ackQueue,并修改pipline状态为BlockConstructionStage.DATA_STREAMING,说明可以传输数据了
                initDataStreaming();
              } else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
                if(DFSClient.LOG.isDebugEnabled()) {
                  DFSClient.LOG.debug("Append to block " + block);
                }
                setupPipelineForAppendOrRecovery();
                initDataStreaming();
              }
    
              long lastByteOffsetInBlock = one.getLastByteOffsetBlock();
              if (lastByteOffsetInBlock > blockSize) {
                throw new IOException("BlockSize " + blockSize +
                    " is smaller than data size. " +
                    " Offset of packet in block " + 
                    lastByteOffsetInBlock +
                    " Aborting file " + src);
              }
              //最后一个packet,说明该block写满了,可以确认response线程中的ack了
              if (one.isLastPacketInBlock()) {
                // wait for all data packets have been successfully acked
                synchronized (dataQueue) {
                  //等待之前的ack确认完后再发送该packet
                  while (!streamerClosed && !hasError && 
                      ackQueue.size() != 0 && dfsClient.clientRunning) {
                    try {
                      // wait for acks to arrive from datanodes
                      dataQueue.wait(1000);
                    } catch (InterruptedException  e) {
                      DFSClient.LOG.warn("Caught exception ", e);
                    }
                  }
                }
                if (streamerClosed || hasError || !dfsClient.clientRunning) {
                  continue;
                }
                //发送最后一个packet之前可以将pipline的状态可以修改close了,这个block已经写完了,
                stage = BlockConstructionStage.PIPELINE_CLOSE;
              }
              
              // send the packet
              Span span = null;
              //如果不是心跳packet,则将packet移除到ackQueue队列中
              synchronized (dataQueue) {
                // move packet from dataQueue to ackQueue
                if (!one.isHeartbeatPacket()) {
                  span = scope.detach();
                  one.setTraceSpan(span);
                  dataQueue.removeFirst();
                  ackQueue.addLast(one);
                  dataQueue.notifyAll();
                }
              }
    
              if (DFSClient.LOG.isDebugEnabled()) {
                DFSClient.LOG.debug("DataStreamer block " + block +
                    " sending packet " + one);
              }
    
              // write out data to remote datanode
              TraceScope writeScope = Trace.startSpan("writeTo", span);
              try {
                //开始在IO中写数据
                one.writeTo(blockStream);
                blockStream.flush();   
              } catch (IOException e) {
                // HDFS-3398 treat primary DN is down since client is unable to 
                // write to primary DN. If a failed or restarting node has already
                // been recorded by the responder, the following call will have no 
                // effect. Pipeline recovery can handle only one node error at a
                // time. If the primary node fails again during the recovery, it
                // will be taken out then.
                tryMarkPrimaryDatanodeFailed();
                throw e;
              } finally {
                writeScope.close();
              }
              lastPacket = Time.monotonicNow();
              
              // update bytesSent
              long tmpBytesSent = one.getLastByteOffsetBlock();
              if (bytesSent < tmpBytesSent) {
                bytesSent = tmpBytesSent;
              }
    
              if (streamerClosed || hasError || !dfsClient.clientRunning) {
                continue;
              }
    
              // Is this block full?
              if (one.isLastPacketInBlock()) {
                // wait for the close packet has been acked
                synchronized (dataQueue) {
                  while (!streamerClosed && !hasError && 
                      ackQueue.size() != 0 && dfsClient.clientRunning) {
                    dataQueue.wait(1000);// wait for acks to arrive from datanodes
                  }
                }
                if (streamerClosed || hasError || !dfsClient.clientRunning) {
                  continue;
                }
                //如果最后一个packet发送完成,关闭packet。
                // 所有的pipline的datanode都写成功了,关闭pipline,将状态设置PIPELINE_SETUP_CREATE,可以重新重新申请block
                endBlock();
              }
              if (progress != null) { progress.progress(); }
    
              // This is used by unit test to trigger race conditions.
              if (artificialSlowdown != 0 && dfsClient.clientRunning) {
                Thread.sleep(artificialSlowdown); 
              }
            } catch (Throwable e) {
              // Log warning if there was a real error.
              if (restartingNodeIndex.get() == -1) {
                DFSClient.LOG.warn("DataStreamer Exception", e);
              }
              if (e instanceof IOException) {
                //出现IO异常
                setLastException((IOException)e);
              } else {
                setLastException(new IOException("DataStreamer Exception: ",e));
              }
              hasError = true;
              if (errorIndex == -1 && restartingNodeIndex.get() == -1) {
                // Not a datanode issue
                streamerClosed = true;
              }
            } finally {
              scope.close();
            }
          }
          closeInternal();
        }
    

    三、DataStreamer.nextBlockOutputStream各种方法实现

    1、DataStreamer.nextBlockOutputStream方法实现
    • 把故障的datanode缓存下来
    • 通过locateFollowingBlock方法向namenode发起rpc请求获取所要写的datanode
    • 通过createBlockOutputStream方法在pipline管道中为第一个datanode节点创建输出流,发起block写请求。
    • 如果createBlockOutputStream返回失败,并重试三次都失败抛IO异常并且RPC通知namenode放弃这个block写入,最后会调用processDatanodeError方法把该故障datanode节点从nodes[]数组移除并继续重试到三次。
        /**
         * Open a DataOutputStream to a DataNode so that it can be written to.
         * This happens when a file is created and each time a new block is allocated.
         * Must get block ID and the IDs of the destinations from the namenode.
         * Returns the list of target datanodes.
         */
        private LocatedBlock nextBlockOutputStream() throws IOException {
          LocatedBlock lb = null;
          DatanodeInfo[] nodes = null;
          StorageType[] storageTypes = null;
          //dfs.client.block.write.retries默认3次,向第一个datanode写block只能重试三次,失败之后?
          int count = dfsClient.getConf().nBlockWriteRetry;
          boolean success = false;
          ExtendedBlock oldBlock = block;
          do {
            hasError = false;
            lastException.set(null);
            errorIndex = -1;
            success = false;
            //不需要写的datanode缓存下来
            DatanodeInfo[] excluded =
                excludedNodes.getAllPresent(excludedNodes.asMap().keySet())
                .keySet()
                .toArray(new DatanodeInfo[0]);
            block = oldBlock;
            //rpc请求namenode获取要写入的datanodes和分配新的block
            lb = locateFollowingBlock(excluded.length > 0 ? excluded : null);
            block = lb.getBlock();
            block.setNumBytes(0);
            bytesSent = 0;
            accessToken = lb.getBlockToken();
            nodes = lb.getLocations();
            storageTypes = lb.getStorageTypes();
    
            //
            // Connect to first DataNode in the list.
            //创建pipline管道中第一个datanode输出流,向第一个datanode发送写block请求
            success = createBlockOutputStream(nodes, storageTypes, 0L, false);
            //如果不成功将失败的datanode放到excludedNodes,并且继续重试写
            if (!success) {
              DFSClient.LOG.info("Abandoning " + block);
              dfsClient.namenode.abandonBlock(block, fileId, src,
                  dfsClient.clientName);
              block = null;
              DFSClient.LOG.info("Excluding datanode " + nodes[errorIndex]);
              excludedNodes.put(nodes[errorIndex], nodes[errorIndex]);
            }
          } while (!success && --count >= 0);
    
          if (!success) {
            throw new IOException("Unable to create new block.");
          }
          return lb;
        }
    
    2、DataStreamer. locateFollowingBlock方法实现

    该方法通过ClientProtocol协议发起RPC请求namenode获取要写入的datanode,并且将block持久化到edit log中。如果提交的block没有达到hdfs指定的副本数,默认重试5次。

    
        private LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes)  throws IOException {
          //dfs.client.block.write.locateFollowingBlock.retries默认5次
          int retries = dfsClient.getConf().nBlockWriteLocateFollowingRetry;
          long sleeptime = 400;
          while (true) {
            long localstart = Time.monotonicNow();
            while (true) {
              try {
                //ClientProtocol RPC请求namenode获取要写入的datanode,并且将block持久化到edit log中
                return dfsClient.namenode.addBlock(src, dfsClient.clientName,
                    block, excludedNodes, fileId, favoredNodes);
              } catch (RemoteException e) {
                IOException ue = 
                  e.unwrapRemoteException(FileNotFoundException.class,
                                          AccessControlException.class,
                                          NSQuotaExceededException.class,
                                          DSQuotaExceededException.class,
                                          UnresolvedPathException.class);
                if (ue != e) { 
                  throw ue; // no need to retry these exceptions
                }
                //该异常是上一个提交的block没有达到hdfs指定的副本数,则需要等待重试
                if (NotReplicatedYetException.class.getName().
                    equals(e.getClassName())) {
                  if (retries == 0) { 
                    throw e;
                  } else {
                    --retries;
                    DFSClient.LOG.info("Exception while adding a block", e);
                    long elapsed = Time.monotonicNow() - localstart;
                    if (elapsed > 5000) {
                      DFSClient.LOG.info("Waiting for replication for "
                          + (elapsed / 1000) + " seconds");
                    }
                    try {
                      DFSClient.LOG.warn("NotReplicatedYetException sleeping " + src
                          + " retries left " + retries);
                      Thread.sleep(sleeptime);
                      sleeptime *= 2;
                    } catch (InterruptedException ie) {
                      DFSClient.LOG.warn("Caught exception ", ie);
                    }
                  }
                } else {
                  throw e;
                }
    
              }
            }
          } 
        }
    
        ExtendedBlock getBlock() {
          return block;
        }
    
        DatanodeInfo[] getNodes() {
          return nodes;
        }
    
        Token<BlockTokenIdentifier> getBlockToken() {
          return accessToken;
        }
    
        private void setLastException(IOException e) {
          lastException.compareAndSet(null, e);
        }
      }
    
    3、DataStreamer.createBlockOutputStream方法实现
    • 该方法在pipline管道中创建第一个datanode输出流
    • 通过Sender方法发起socket请求writeBlock
    • 如果有重启节点出现,也会标志重启标志,然后在上层方法移除改节点
    // connects to the first datanode in the pipeline
        // Returns true if success, otherwise return failure.
        //
        private boolean createBlockOutputStream(DatanodeInfo[] nodes,
            StorageType[] nodeStorageTypes, long newGS, boolean recoveryFlag) {
          if (nodes.length == 0) {
            DFSClient.LOG.info("nodes are empty for write pipeline of block "
                + block);
            return false;
          }
          Status pipelineStatus = SUCCESS;
          String firstBadLink = "";
          boolean checkRestart = false;
          if (DFSClient.LOG.isDebugEnabled()) {
            for (int i = 0; i < nodes.length; i++) {
              DFSClient.LOG.debug("pipeline = " + nodes[i]);
            }
          }
    
          // persist blocks on namenode on next flush
          persistBlocks.set(true);
    
          int refetchEncryptionKey = 1;
          while (true) {
            boolean result = false;
            DataOutputStream out = null;
            try {
              assert null == s : "Previous socket unclosed";
              assert null == blockReplyStream : "Previous blockReplyStream unclosed";
              //第一个datanode建立socket
              s = createSocketForPipeline(nodes[0], nodes.length, dfsClient);
              //datanode写socket超时时间,默认是8分钟加上datanode个数乘以5秒钟
              long writeTimeout = dfsClient.getDatanodeWriteTimeout(nodes.length);
              
              OutputStream unbufOut = NetUtils.getOutputStream(s, writeTimeout);
              InputStream unbufIn = NetUtils.getInputStream(s);
              IOStreamPair saslStreams = dfsClient.saslClient.socketSend(s,
                unbufOut, unbufIn, dfsClient, accessToken, nodes[0]);
              unbufOut = saslStreams.out;
              unbufIn = saslStreams.in;
              out = new DataOutputStream(new BufferedOutputStream(unbufOut,
                  HdfsConstants.SMALL_BUFFER_SIZE));
              blockReplyStream = new DataInputStream(unbufIn);
      
              //
              // Xmit header info to datanode
              //
      
              BlockConstructionStage bcs = recoveryFlag? stage.getRecoveryStage(): stage;
    
              // We cannot change the block length in 'block' as it counts the number
              // of bytes ack'ed.
              ExtendedBlock blockCopy = new ExtendedBlock(block);
              blockCopy.setNumBytes(blockSize);
              //目标datanode
              boolean[] targetPinnings = getPinnings(nodes, true);
              // send the request
              //通过scoket远程写
              new Sender(out).writeBlock(blockCopy, nodeStorageTypes[0], accessToken,
                  dfsClient.clientName, nodes, nodeStorageTypes, null, bcs, 
                  nodes.length, block.getNumBytes(), bytesSent, newGS,
                  checksum4WriteBlock, cachingStrategy.get(), isLazyPersistFile,
                (targetPinnings == null ? false : targetPinnings[0]), targetPinnings);
      
              // receive ack for connect
             //接收下游的pipeline ack确认
              BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
                  PBHelper.vintPrefixed(blockReplyStream));
              //返回pipline状态值
              pipelineStatus = resp.getStatus();
              firstBadLink = resp.getFirstBadLink();
              
              // Got an restart OOB ack.
              // If a node is already restarting, this status is not likely from
              // the same node. If it is from a different node, it is not
              // from the local datanode. Thus it is safe to treat this as a
              // regular node error.
              //如果有重启的datanode抛IO异常,然后在processDatanodeError从nodes[]异常故障节点
              if (PipelineAck.isRestartOOBStatus(pipelineStatus) &&
                restartingNodeIndex.get() == -1) {
                checkRestart = true;
                throw new IOException("A datanode is restarting.");
              }
    
              String logInfo = "ack with firstBadLink as " + firstBadLink;
              DataTransferProtoUtil.checkBlockOpStatus(resp, logInfo);
    
              assert null == blockStream : "Previous blockStream unclosed";
              blockStream = out;
              result =  true; // success
              restartingNodeIndex.set(-1);
              hasError = false;
            } catch (IOException ie) {
              if (restartingNodeIndex.get() == -1) {
                DFSClient.LOG.info("Exception in createBlockOutputStream", ie);
              }
              if (ie instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
                DFSClient.LOG.info("Will fetch a new encryption key and retry, " 
                    + "encryption key was invalid when connecting to "
                    + nodes[0] + " : " + ie);
                // The encryption key used is invalid.
                refetchEncryptionKey--;
                dfsClient.clearDataEncryptionKey();
                // Don't close the socket/exclude this node just yet. Try again with
                // a new encryption key.
                continue;
              }
      
              // find the datanode that matches
              if (firstBadLink.length() != 0) {
                for (int i = 0; i < nodes.length; i++) {
                  // NB: Unconditionally using the xfer addr w/o hostname
                  if (firstBadLink.equals(nodes[i].getXferAddr())) {
                    errorIndex = i;
                    break;
                  }
                }
              } else {
                assert checkRestart == false;
                errorIndex = 0;
              }
              // Check whether there is a restart worth waiting for.
              //校验是不是有datanode重启
              if (checkRestart && shouldWaitForRestart(errorIndex)) {
                restartDeadline = dfsClient.getConf().datanodeRestartTimeout +
                    Time.monotonicNow();
                restartingNodeIndex.set(errorIndex);
                errorIndex = -1;
                DFSClient.LOG.info("Waiting for the datanode to be restarted: " +
                    nodes[restartingNodeIndex.get()]);
              }
              hasError = true;
              setLastException(ie);
              result =  false;  // error
            } finally {
              if (!result) {
                IOUtils.closeSocket(s);
                s = null;
                IOUtils.closeStream(out);
                out = null;
                IOUtils.closeStream(blockReplyStream);
                blockReplyStream = null;
              }
            }
            return result;
          }
        }
    

    四、DataStreamer. processDatanodeError处理异常逻辑

    1、processDatanodeError该方法异常处理实现
    • 该方法主要关闭所有流并且将ackQueue中packet全部放到dataQueue中。
    • 如果一个packet异常恢复超过5次都失败了,则 streamerClosed = true,停止恢复。
    
        // If this stream has encountered any errors so far, shutdown 
        // threads and mark stream as closed. Returns true if we should
        // sleep for a while after returning from this call.
        //
        private boolean processDatanodeError() throws IOException {
          if (response != null) {
            DFSClient.LOG.info("Error Recovery for " + block +
            " waiting for responder to exit. ");
            return true;
          }
          closeStream();
    
          // move packets from ack queue to front of the data queue
          synchronized (dataQueue) {
            dataQueue.addAll(0, ackQueue);
            ackQueue.clear();
          }
    
          // Record the new pipeline failure recovery.
          if (lastAckedSeqnoBeforeFailure != lastAckedSeqno) {
             lastAckedSeqnoBeforeFailure = lastAckedSeqno;
             pipelineRecoveryCount = 1;
          } else {
            // If we had to recover the pipeline five times in a row for the
            // same packet, this client likely has corrupt data or corrupting
            // during transmission.
            if (++pipelineRecoveryCount > 5) {
              DFSClient.LOG.warn("Error recovering pipeline for writing " +
                  block + ". Already retried 5 times for the same packet.");
              lastException.set(new IOException("Failing write. Tried pipeline " +
                  "recovery 5 times without success."));
              streamerClosed = true;
              return false;
            }
          }
          //setupPipelineForAppendOrRecovery会重新更新pipline状态,如果有异常的datanode,会从nodes[]输出删除
          boolean doSleep = setupPipelineForAppendOrRecovery();
          
          if (!streamerClosed && dfsClient.clientRunning) {
            if (stage == BlockConstructionStage.PIPELINE_CLOSE) {
    
              // If we had an error while closing the pipeline, we go through a fast-path
              // where the BlockReceiver does not run. Instead, the DataNode just finalizes
              // the block immediately during the 'connect ack' process. So, we want to pull
              // the end-of-block packet from the dataQueue, since we don't actually have
              // a true pipeline to send it over.
              //
              // We also need to set lastAckedSeqno to the end-of-block Packet's seqno, so that
              // a client waiting on close() will be aware that the flush finished.
              synchronized (dataQueue) {
                DFSPacket endOfBlockPacket = dataQueue.remove();  // remove the end of block packet
                Span span = endOfBlockPacket.getTraceSpan();
                if (span != null) {
                  // Close any trace span associated with this Packet
                  TraceScope scope = Trace.continueSpan(span);
                  scope.close();
                }
                assert endOfBlockPacket.isLastPacketInBlock();
                assert lastAckedSeqno == endOfBlockPacket.getSeqno() - 1;
                lastAckedSeqno = endOfBlockPacket.getSeqno();
                dataQueue.notifyAll();
              }
              endBlock();
            } else {
              initDataStreaming();
            }
          }
          
          return doSleep;
        }
    
    
    2、setupPipelineForAppendOrRecovery处理实现
    • 该方法append操作时候创建pipline管道
    • pipline管道写数据异常时候,进行恢复操作(本文讨论就是异常情况下)
    1. 等待重启的datanode启动
    2. 将错误的datanode从pipline中删除,调用addDatanode2ExistingPipeline方法将新的datanode加入到pipline中
    3. 更新namenode上namespace时间戳,异常datanode上的过期block就可以删除了。
    /**
         * Open a DataOutputStream to a DataNode pipeline so that 
         * it can be written to.
         * This happens when a file is appended or data streaming fails
         * It keeps on trying until a pipeline is setup
         *  //重新更新pipline状态,如果有异常的datanode,会从nodes[]输出删除
         */
        private boolean setupPipelineForAppendOrRecovery() throws IOException {
          // check number of datanodes
          if (nodes == null || nodes.length == 0) {
            String msg = "Could not get block locations. " + "Source file \""
                + src + "\" - Aborting...";
            DFSClient.LOG.warn(msg);
            setLastException(new IOException(msg));
            streamerClosed = true;
            return false;
          }
          
          boolean success = false;
          long newGS = 0L;
          while (!success && !streamerClosed && dfsClient.clientRunning) {
            // Sleep before reconnect if a dn is restarting.
            // This process will be repeated until the deadline or the datanode
            // starts back up.
            if (restartingNodeIndex.get() >= 0) {
              // 4 seconds or the configured deadline period, whichever is shorter.
              // This is the retry interval and recovery will be retried in this
              // interval until timeout or success.
              long delay = Math.min(dfsClient.getConf().datanodeRestartTimeout,
                  4000L);
              try {
                Thread.sleep(delay);
              } catch (InterruptedException ie) {
                lastException.set(new IOException("Interrupted while waiting for " +
                    "datanode to restart. " + nodes[restartingNodeIndex.get()]));
                streamerClosed = true;
                return false;
              }
            }
            boolean isRecovery = hasError;
            // remove bad datanode from list of datanodes.
            // If errorIndex was not set (i.e. appends), then do not remove 
            // any datanodes
            // 
            if (errorIndex >= 0) {
              StringBuilder pipelineMsg = new StringBuilder();
              for (int j = 0; j < nodes.length; j++) {
                pipelineMsg.append(nodes[j]);
                if (j < nodes.length - 1) {
                  pipelineMsg.append(", ");
                }
              }
              if (nodes.length <= 1) {
                lastException.set(new IOException("All datanodes " + pipelineMsg
                    + " are bad. Aborting..."));
                streamerClosed = true;
                return false;
              }
              DFSClient.LOG.warn("Error Recovery for block " + block +
                  " in pipeline " + pipelineMsg + 
                  ": bad datanode " + nodes[errorIndex]);
              failed.add(nodes[errorIndex]);
    
              DatanodeInfo[] newnodes = new DatanodeInfo[nodes.length-1];
              //
              arraycopy(nodes, newnodes, errorIndex);
    
              final StorageType[] newStorageTypes = new StorageType[newnodes.length];
              arraycopy(storageTypes, newStorageTypes, errorIndex);
    
              final String[] newStorageIDs = new String[newnodes.length];
              arraycopy(storageIDs, newStorageIDs, errorIndex);
              //
              //重新更新pipline状态,如果有异常的datanode,会从nodes[]输出删除
              setPipeline(newnodes, newStorageTypes, newStorageIDs);
    
              // Just took care of a node error while waiting for a node restart
              if (restartingNodeIndex.get() >= 0) {
                // If the error came from a node further away than the restarting
                // node, the restart must have been complete.
                if (errorIndex > restartingNodeIndex.get()) {
                  restartingNodeIndex.set(-1);
                } else if (errorIndex < restartingNodeIndex.get()) {
                  // the node index has shifted.
                  restartingNodeIndex.decrementAndGet();
                } else {
                  // this shouldn't happen...
                  assert false;
                }
              }
    
              if (restartingNodeIndex.get() == -1) {
                hasError = false;
              }
              lastException.set(null);
              errorIndex = -1;
            }
    
        
            // Check if replace-datanode policy is satisfied.
            //如果不满足这种策略怎么,失败的一个节点就不写了吗,怎么保证三副本?
            /**
    
             /**
             *   由dfs.client.block.write.replace-datanode-on-failure.policy参数控制策略,默认DEFAULT
             *
             * DEFAULT condition:
             *   Let r be the replication number.
             *   Let n be the number of existing datanodes.
             *   Add a new datanode only if r >= 3 and either
             *   (1) floor(r/2) >= n; or
             *   (2) r > n and the block is hflushed/appended.
             *   r:副本数,n:pipeline中存活的datanode数
             *   如果r>=3并且满足下面任意一个条件才替换发生故障的dn
             *   floor(r/2) >= n; or (当hflush/append被调用并且r > n)
             */
            if (dfsClient.dtpReplaceDatanodeOnFailure.satisfy(blockReplication,
                nodes, isAppend, isHflushed)) {
              try {
    //pipeline中添加新的datanode节点并且 通过transfer将之前成功传输的数据块复制到另一个datanode上
                addDatanode2ExistingPipeline();
              } catch(IOException ioe) {
                if (!dfsClient.dtpReplaceDatanodeOnFailure.isBestEffort()) {
                  throw ioe;
                }
                DFSClient.LOG.warn("Failed to replace datanode."
                    + " Continue with the remaining datanodes since "
                    + DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_BEST_EFFORT_KEY
                    + " is set to true.", ioe);
              }
            }
    
            // get a new generation stamp and an access token
             //RPC请求更新时间戳,datanode可以删除过期的block
            LocatedBlock lb = dfsClient.namenode.updateBlockForPipeline(block, dfsClient.clientName);
            newGS = lb.getBlock().getGenerationStamp();
            accessToken = lb.getBlockToken();
            
            // set up the pipeline again with the remaining nodes
            if (failPacket) { // for testing
              success = createBlockOutputStream(nodes, storageTypes, newGS, isRecovery);
              failPacket = false;
              try {
                // Give DNs time to send in bad reports. In real situations,
                // good reports should follow bad ones, if client committed
                // with those nodes.
                Thread.sleep(2000);
              } catch (InterruptedException ie) {}
            } else {
              //重新给第一个datanode创建输出流,向第一个datanode写block
              success = createBlockOutputStream(nodes, storageTypes, newGS, isRecovery);
            }
    
            if (restartingNodeIndex.get() >= 0) {
              assert hasError == true;
              // check errorIndex set above
              if (errorIndex == restartingNodeIndex.get()) {
                // ignore, if came from the restarting node
                errorIndex = -1;
              }
              // still within the deadline
              if (Time.monotonicNow() < restartDeadline) {
                continue; // with in the deadline
              }
              // expired. declare the restarting node dead
              restartDeadline = 0;
              int expiredNodeIndex = restartingNodeIndex.get();
              restartingNodeIndex.set(-1);
              DFSClient.LOG.warn("Datanode did not restart in time: " +
                  nodes[expiredNodeIndex]);
              // Mark the restarting node as failed. If there is any other failed
              // node during the last pipeline construction attempt, it will not be
              // overwritten/dropped. In this case, the restarting node will get
              // excluded in the following attempt, if it still does not come up.
              if (errorIndex == -1) {
                errorIndex = expiredNodeIndex;
              }
              // From this point on, normal pipeline recovery applies.
            }
          } // while
    
          if (success) {
            // update pipeline at the namenode
            ExtendedBlock newBlock = new ExtendedBlock(
                block.getBlockPoolId(), block.getBlockId(), block.getNumBytes(), newGS);
            dfsClient.namenode.updatePipeline(dfsClient.clientName, block, newBlock,
                nodes, storageIDs);
            // update client side generation stamp
            block = newBlock;
          }
          return false; // do not sleep, continue processing
        }
    
    
    3、addDatanode2ExistingPipeline处理实现

    该方法RPC调用namenode的getAdditionalDatanode,增加一个新的datanode,来替换异常的datanode

    
        private void addDatanode2ExistingPipeline() throws IOException {
          if (DataTransferProtocol.LOG.isDebugEnabled()) {
            DataTransferProtocol.LOG.debug("lastAckedSeqno = " + lastAckedSeqno);
          }
          /*
           * Is data transfer necessary?  We have the following cases.
           * 
           * Case 1: Failure in Pipeline Setup
           * - Append
           *    + Transfer the stored replica, which may be a RBW or a finalized.
           * - Create
           *    + If no data, then no transfer is required.
           *    + If there are data written, transfer RBW. This case may happens 
           *      when there are streaming failure earlier in this pipeline.
           *
           * Case 2: Failure in Streaming
           * - Append/Create:
           *    + transfer RBW
           * 
           * Case 3: Failure in Close
           * - Append/Create:
           *    + no transfer, let NameNode replicates the block.
           */
          if (!isAppend && lastAckedSeqno < 0
              && stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
            //no data have been written
            return;
          } else if (stage == BlockConstructionStage.PIPELINE_CLOSE
              || stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
            //pipeline is closing
            return;
          }
    
          //get a new datanode
          final DatanodeInfo[] original = nodes;
          final LocatedBlock lb = dfsClient.namenode.getAdditionalDatanode(
              src, fileId, block, nodes, storageIDs,
              failed.toArray(new DatanodeInfo[failed.size()]),
              1, dfsClient.clientName);
          setPipeline(lb);
    
          //find the new datanode
          final int d = findNewDatanode(original);
    
          //transfer replica
          final DatanodeInfo src = d == 0? nodes[1]: nodes[d - 1];
          final DatanodeInfo[] targets = {nodes[d]};
          final StorageType[] targetStorageTypes = {storageTypes[d]};
                      //pipeline中添加新的datanode节点并且 通过transfer将之前成功传输的数据块复制到另一个datanode上
    
          transfer(src, targets, targetStorageTypes, lb.getBlockToken());
        }
    
    4、transfer处理实现

    通过sokect复制数据

    //主要将之前成功传输的数据块复制到另一个datanode上
    //src之前写过的节点,targets新加的节点
    private void transfer(final DatanodeInfo src, final DatanodeInfo[] targets,
            final StorageType[] targetStorageTypes,
            final Token<BlockTokenIdentifier> blockToken) throws IOException {
          //transfer replica to the new datanode
          Socket sock = null;
          DataOutputStream out = null;
          DataInputStream in = null;
          try {
            sock = createSocketForPipeline(src, 2, dfsClient);
            final long writeTimeout = dfsClient.getDatanodeWriteTimeout(2);
            
            OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout);
            InputStream unbufIn = NetUtils.getInputStream(sock);
            IOStreamPair saslStreams = dfsClient.saslClient.socketSend(sock,
              unbufOut, unbufIn, dfsClient, blockToken, src);
            unbufOut = saslStreams.out;
            unbufIn = saslStreams.in;
            out = new DataOutputStream(new BufferedOutputStream(unbufOut,
                HdfsConstants.SMALL_BUFFER_SIZE));
            in = new DataInputStream(unbufIn);
    
            //send the TRANSFER_BLOCK request
            new Sender(out).transferBlock(block, blockToken, dfsClient.clientName,
                targets, targetStorageTypes);
            out.flush();
    
            //ack
            BlockOpResponseProto response =
              BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in));
            if (SUCCESS != response.getStatus()) {
              throw new IOException("Failed to add a datanode");
            }
          } finally {
            IOUtils.closeStream(in);
            IOUtils.closeStream(out);
            IOUtils.closeSocket(sock);
          }
    }
    

    相关文章

      网友评论

          本文标题:hdfs写之DataStreamer守护线程<三>

          本文链接:https://www.haomeiwen.com/subject/ikpiectx.html