美文网首页
原创-HDFS副本存储策略源码分析一

原创-HDFS副本存储策略源码分析一

作者: 无色的叶 | 来源:发表于2019-06-06 16:35 被阅读0次

HDFS默认副本存储策略

可通过参数:dfs.block.replicator.classname 配置实现类,默认实现类:
org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault

image.png
  • 1st replica 如果写请求方所在机器是其中一个datanode,则直接存放在本地,否则随机在集群中选择一个datanode.
  • 2nd replica 第二个副本存放于不同第一个副本的所在的机架.
  • 3rd replica 第三个副本存放于第二个副本所在的机架,但是属于不同的节点
  • 4rd replica 第四个副本或更多副本存放策略是随机选择datanode节点进行存储
image.png

存储源码分析

主要是调用DatanodeStorageInfo[] chooseTarget方法进行datanode节点的选取,有个同名重载的方法,只是多一个参数favoredNodes,该参数是通过客户端写入时,可选参数,优先数据存储的dataNode节点,其它都是使用无favoredNodes参数的方法

/**
     * choose <i>numOfReplicas</i> from all data nodes
     *
     * @param srcPath       数据块是哪个文件的
     * @param numOfReplicas additional number of replicas wanted
     *                      副本数
     * @param writer        the writer's machine, could be a non-DatanodeDescriptor node
     *                      写入者所在的服务器,如果不是集群中的服务器,则为空
     * @param excludedNodes datanodes that should not be considered as targets
     *                      这个列表中的结点应该排除在外,不能被选为目标结点
     * @param blocksize     size of the data to be written
     *                      数据要写入的大小
     * @return 返回DatanodeDescriptor的实例数组,这些结点作为此数据块的目标结点,并且被为作一个pipeline被排序。
     */
    @Override
    public DatanodeStorageInfo[] chooseTarget(String srcPath,
                                              int numOfReplicas,
                                              Node writer,
                                              List<DatanodeStorageInfo> chosenNodes,
                                              boolean returnChosenNodes,
                                              Set<Node> excludedNodes,
                                              long blocksize,
                                              final BlockStoragePolicy storagePolicy,
                                              EnumSet<AddBlockFlag> flags) {

        return chooseTarget(numOfReplicas, writer, chosenNodes, returnChosenNodes,
                excludedNodes, blocksize, storagePolicy, flags);
    }

    @Override
    DatanodeStorageInfo[] chooseTarget(String src,
                                       int numOfReplicas,
                                       Node writer,
                                       Set<Node> excludedNodes,
                                       long blocksize,
                                       List<DatanodeDescriptor> favoredNodes,
                                       BlockStoragePolicy storagePolicy,
                                       EnumSet<AddBlockFlag> flags) {

进一步调用同名方法chooseTarget

 /**
     * This is the implementation.
     */
    private DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
                                               Node writer,
                                               List<DatanodeStorageInfo> chosenStorage,
                                               boolean returnChosenNodes,
                                               Set<Node> excludedNodes,
                                               long blocksize,
                                               final BlockStoragePolicy storagePolicy,
                                               EnumSet<AddBlockFlag> addBlockFlags) {
        //副本数为0或者datanode为0,直接返回空数组
        if (numOfReplicas == 0 || clusterMap.getNumOfLeaves() == 0) {
            return DatanodeStorageInfo.EMPTY_ARRAY;
        }

        if (excludedNodes == null) {
            excludedNodes = new HashSet<>();
        }


        //计算每个机架允许分配的最大副本数,限定了一个集群中所有结点的总共副本数量
        int[] result = getMaxNodesPerRack(chosenStorage.size(), numOfReplicas);
        numOfReplicas = result[0];
        int maxNodesPerRack = result[1];

        for (DatanodeStorageInfo storage : chosenStorage) {
            // add localMachine and related nodes to excludedNodes
            addToExcludedNodes(storage.getDatanodeDescriptor(), excludedNodes);
        }

        List<DatanodeStorageInfo> results = null;
        Node localNode = null;
        boolean avoidStaleNodes = (stats != null
                && stats.isAvoidingStaleDataNodesForWrite());
        boolean avoidLocalNode = (addBlockFlags != null
                && addBlockFlags.contains(AddBlockFlag.NO_LOCAL_WRITE)
                && writer != null
                && !excludedNodes.contains(writer));
        // Attempt to exclude local node if the client suggests so. If no enough
        // nodes can be obtained, it falls back to the default block placement
        // policy.
        //如果客户端建议排除本地节点,则尝试排除。如果没有足够的
        //可以获得节点,它返回到默认块的位置
        if (avoidLocalNode) {
            results = new ArrayList<>(chosenStorage);
            Set<Node> excludedNodeCopy = new HashSet<>(excludedNodes);
            if (writer != null) {
                excludedNodeCopy.add(writer);
            }

            localNode = chooseTarget(numOfReplicas, writer,
                    excludedNodeCopy, blocksize, maxNodesPerRack, results,
                    avoidStaleNodes, storagePolicy,
                    EnumSet.noneOf(StorageType.class), results.isEmpty());
            if (results.size() < numOfReplicas) {
                // not enough nodes; discard results and fall back
                results = null;
            }
        }
        if (results == null) {
            results = new ArrayList<>(chosenStorage);
            localNode = chooseTarget(numOfReplicas, writer, excludedNodes,
                    blocksize, maxNodesPerRack, results, avoidStaleNodes,
                    storagePolicy, EnumSet.noneOf(StorageType.class), results.isEmpty());
        }

        if (!returnChosenNodes) {
            results.removeAll(chosenStorage);
        }

        LOG.info("----localNode---" + localNode.getName() + "===writer===" + writer.getName());

        // sorting nodes to form a pipeline
        //根据最短距离排序目标节点列表,形成pipeline
        return getPipeline(
                (writer != null && writer instanceof DatanodeDescriptor) ? writer
                        : localNode,
                results.toArray(new DatanodeStorageInfo[results.size()]));
    }

再调用Node chooseTarget方法,在选择的过程中可能会发生异常,因为有的时候我们没有配置机架感知,集群中都属于一个默认机架的default-rack,则会导致chooseRemoteRack的方法出错,因为没有满足条件的其余机架,这时需要一些重试策略

/**
     * choose <i>numOfReplicas</i> from all data nodes
     *
     * @param numOfReplicas   additional number of replicas wanted
     *                        副本数
     * @param writer          the writer's machine, could be a non-DatanodeDescriptor node
     *                        写入者所在的服务器,如果不是集群中的服务器,则为空
     * @param excludedNodes   datanodes that should not be considered as targets
     *                        这个列表中的结点应该排除在外,不能被选为目标结点
     * @param blocksize       size of the data to be written
     *                        数据要写入的大小
     * @param maxNodesPerRack max nodes allowed per rack
     * @param results         the target nodes already chosen
     * @param avoidStaleNodes avoid stale nodes in replica choosing
     * @return local node of writer (not chosen node)
     */
    private Node chooseTarget(int numOfReplicas,
                              Node writer,
                              final Set<Node> excludedNodes,
                              final long blocksize,
                              final int maxNodesPerRack,
                              final List<DatanodeStorageInfo> results,
                              final boolean avoidStaleNodes,
                              final BlockStoragePolicy storagePolicy,
                              final EnumSet<StorageType> unavailableStorages,
                              final boolean newBlock) {

        LOG.info("-------Node chooseTarget----------");
        LOG.info("---numOfReplicas--" + numOfReplicas + "---Node writer--" + writer.getName() + "===results==" + results.size());


        // 如果额外需要请求副本数为0,或者集群中没有可选节点
        if (numOfReplicas == 0 || clusterMap.getNumOfLeaves() == 0) {
            // 如果writer请求者在其中一个datanode上则返回此节点,否则直接返回null
            return (writer instanceof DatanodeDescriptor) ? writer : null;
        }
        // 已经选择完成的节点数
        final int numOfResults = results.size();
        // 期望达到的副本总数
        final int totalReplicasExpected = numOfReplicas + numOfResults;
        // 如果writer为空或不在datanode上,则取出已选择好列表中的第一个位置所在节点,赋值给writer
        if ((writer == null || !(writer instanceof DatanodeDescriptor)) && !newBlock) {
            writer = results.get(0).getDatanodeDescriptor();
        }

        // Keep a copy of original excludedNodes
        final Set<Node> oldExcludedNodes = new HashSet<>(excludedNodes);

        // choose storage types; use fallbacks for unavailable storages
        // 根据存储策略获取副本需要满足的存储类型列表,如果有不可用的存储类型,会采用fallback的类型
        final List<StorageType> requiredStorageTypes = storagePolicy
                .chooseStorageTypes((short) totalReplicasExpected,
                        DatanodeStorageInfo.toStorageTypes(results),
                        unavailableStorages, newBlock);
        // 将存储类型列表进行计数统计,并存于map中
        final EnumMap<StorageType, Integer> storageTypes =
                getRequiredStorageTypes(requiredStorageTypes);
        if (LOG.isTraceEnabled()) {
            LOG.trace("storageTypes=" + storageTypes);
        }

        try {
            if ((numOfReplicas = requiredStorageTypes.size()) == 0) {
                throw new NotEnoughReplicasException(
                        "All required storage types are unavailable: "
                                + " unavailableStorages=" + unavailableStorages
                                + ", storagePolicy=" + storagePolicy);
            }
            writer = chooseTargetInOrder(numOfReplicas, writer, excludedNodes, blocksize,
                    maxNodesPerRack, results, avoidStaleNodes, newBlock, storageTypes);
        } catch (NotEnoughReplicasException e) {
            final String message = "Failed to place enough replicas, still in need of "
                    + (totalReplicasExpected - results.size()) + " to reach "
                    + totalReplicasExpected
                    + " (unavailableStorages=" + unavailableStorages
                    + ", storagePolicy=" + storagePolicy
                    + ", newBlock=" + newBlock + ")";

            if (LOG.isTraceEnabled()) {
                LOG.trace(message, e);
            } else {
                LOG.warn(message + " " + e.getMessage());
            }

            if (avoidStaleNodes) {
                // Retry chooseTarget again, this time not avoiding stale nodes.

                // excludedNodes contains the initial excludedNodes and nodes that were
                // not chosen because they were stale, decommissioned, etc.
                // We need to additionally exclude the nodes that were added to the
                // result list in the successful calls to choose*() above.
                for (DatanodeStorageInfo resultStorage : results) {
                    addToExcludedNodes(resultStorage.getDatanodeDescriptor(), oldExcludedNodes);
                }
                // Set numOfReplicas, since it can get out of sync with the result list
                // if the NotEnoughReplicasException was thrown in chooseRandom().
                numOfReplicas = totalReplicasExpected - results.size();
                return chooseTarget(numOfReplicas, writer, oldExcludedNodes, blocksize,
                        maxNodesPerRack, results, false, storagePolicy, unavailableStorages,
                        newBlock);
            }

            boolean retry = false;
            // simply add all the remaining types into unavailableStorages and give
            // another try. No best effort is guaranteed here.
            for (StorageType type : storageTypes.keySet()) {
                if (!unavailableStorages.contains(type)) {
                    unavailableStorages.add(type);
                    retry = true;
                }
            }
            if (retry) {
                for (DatanodeStorageInfo resultStorage : results) {
                    addToExcludedNodes(resultStorage.getDatanodeDescriptor(),
                            oldExcludedNodes);
                }
                numOfReplicas = totalReplicasExpected - results.size();
                return chooseTarget(numOfReplicas, writer, oldExcludedNodes, blocksize,
                        maxNodesPerRack, results, false, storagePolicy, unavailableStorages,
                        newBlock);
            }
        }
        return writer;
    }

真正选择dataNode逻辑实现在chooseTargetInOrder方法中

protected Node chooseTargetInOrder(int numOfReplicas,
                                       Node writer,
                                       final Set<Node> excludedNodes,
                                       final long blocksize,
                                       final int maxNodesPerRack,
                                       final List<DatanodeStorageInfo> results,
                                       final boolean avoidStaleNodes,
                                       final boolean newBlock,
                                       EnumMap<StorageType, Integer> storageTypes)
            throws NotEnoughReplicasException {
        //已选择的目标节点数
        final int numOfResults = results.size();
        LOG.info("========Node chooseTargetInOrder====numOfResults==" + numOfResults);
        //如果numOfResults == 0 则表示副本一个都还没开始选,首先从选本地节点开始
        if (numOfResults == 0) {
            DatanodeStorageInfo storageInfo = chooseLocalStorage(writer,
                    excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes,
                    storageTypes, true);

            LOG.info("---numOfResults == 0---");
            if (storageInfo != null) {
                LOG.info("---storageInfo---" + storageInfo.getDatanodeDescriptor().getHostName());
            }
            writer = (storageInfo != null) ? storageInfo.getDatanodeDescriptor()
                    : null;

            // 如果此时目标需求完成的副本数为降为0,代表选择目标完成,返回第一个节点writer
            if (--numOfReplicas == 0) {
                return writer;
            }
        }
        final DatanodeDescriptor dn0 = results.get(0).getDatanodeDescriptor();
        if (numOfResults <= 1) {
            chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack,
                    results, avoidStaleNodes, storageTypes);
            if (--numOfReplicas == 0) {
                return writer;
            }
        }
        if (numOfResults <= 2) {
            final DatanodeDescriptor dn1 = results.get(1).getDatanodeDescriptor();
            // 如果dn0,dn1所在同机房
            if (clusterMap.isOnSameRack(dn0, dn1)) {
                // 则选择1个不同于dn0,dn1所在机房的副本位置
                chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack,
                        results, avoidStaleNodes, storageTypes);
            } else if (newBlock) {
                // 如果是新的block块,则选取1个于dn1所在同机房的节点位置
                chooseLocalRack(dn1, excludedNodes, blocksize, maxNodesPerRack,
                        results, avoidStaleNodes, storageTypes);
            } else {
                // 否则选取于writer同机房的位置
                chooseLocalRack(writer, excludedNodes, blocksize, maxNodesPerRack,
                        results, avoidStaleNodes, storageTypes);
            }
            if (--numOfReplicas == 0) {
                return writer;
            }
        }
        // 如果副本数已经超过3个,说明设置的block的时候,则剩余位置在集群中随机选择放置节点
        chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, blocksize,
                maxNodesPerRack, results, avoidStaleNodes, storageTypes);
        return writer;
    }

首先选择本地存储位置.如果没有满足条件的,再选择本地机架的节点,如果还是没有满足条件的,进一步降级选择不同机架的节点,最后随机选择集群中的节点,关系图如下

image.png

都会调用chooseRandom方法

/**
     * chooseLocalStorage调用chooseRandom时:
     * <p>
     * numOfReplicas:1,表示我们需要选取多少个Node;
     * scope:网络拓扑结构的根节点(root),即"";表示从整个集群中随机选取;
     * excludedNodes:空值或已经被选取的Nodes,表示选取出的Node不能出现在这些excludedNodes中;
     * <p>
     * chooseRemoteRack调用chooseRandom时:
     * <p>
     * numOfReplicas:1,表示我们需要选取多少个Node;
     * scope:~rack,表示从整个集群中非rack机架中随机选取;
     * excludedNodes:空值或已经被选取的Nodes,表示选取出的Node不能出现在这些excludedNodes中;
     * <p>
     * chooseLocalRack调用chooseRandom时:
     * <p>
     * numOfReplicas:1,表示我们需要选取多少个Node;
     * scope:rack,表示从集群机架rack中随机选取;
     * excludedNodes:空值或已经被选取的Nodes,表示选取出的Node不能出现在这些excludedNodes中;
     * <p>
     * Randomly choose <i>numOfReplicas</i> targets from the given <i>scope</i>.
     *
     * @return the first chosen node, if there is any.
     */
    protected DatanodeStorageInfo chooseRandom(int numOfReplicas,
                                               String scope,
                                               Set<Node> excludedNodes,
                                               long blocksize,
                                               int maxNodesPerRack,
                                               List<DatanodeStorageInfo> results,
                                               boolean avoidStaleNodes,
                                               EnumMap<StorageType, Integer> storageTypes)
            throws NotEnoughReplicasException {

后续详细分析方法chooseTargetInOrder

相关文章

网友评论

      本文标题:原创-HDFS副本存储策略源码分析一

      本文链接:https://www.haomeiwen.com/subject/skjuxctx.html