美文网首页
07. Flink8种分区策略及源码解读

07. Flink8种分区策略及源码解读

作者: bigdata张凯翔 | 来源:发表于2021-04-02 00:28 被阅读0次

    Flink8种分区策略有哪几种?

    Flink实现的分区策略继承图:.png
    GlobalPartitioner: DataStream => DataStream
    GlobalPartitioner数据会被分发到下游算子的第一个实例中进行处理。
    GlobalPartitioner,GLOBAL分区。`将记录输出到下游Operator的第一个实例。
    

    源码解读:

    /**
     * 发送所有的数据到下游算子的第一个task(ID = 0)
     * @param <T>
     */
    @Internal
    public class GlobalPartitioner<T> extends StreamPartitioner<T> {
        private static final long serialVersionUID = 1L;
    
        @Override
        public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
            //只返回0,即只发送给下游算子的第一个task
            return 0;
        }
    
        @Override
        public StreamPartitioner<T> copy() {
            return this;
        }
    
        @Override
        public String toString() {
            return "GLOBAL";
        }
    }
    
    image.png

    ShufflePartitioner: DataStream => DataStream

    ShufflePartitioner数据会被随机分发到下游算子的每一个实例中进行处理。
    `ShufflePartitioner,SHUFFLE分区。`将记录随机输出到下游Operator的每个实例。
    
    /**
     * 随机的选择一个channel进行发送
     * @param <T>
     */
    @Internal
    public class ShufflePartitioner<T> extends StreamPartitioner<T> {
        private static final long serialVersionUID = 1L;
    
        private Random random = new Random();
    
        @Override
        public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
            //产生[0,numberOfChannels)伪随机数,随机发送到下游的某个task
            return random.nextInt(numberOfChannels);
        }
    
        @Override
        public StreamPartitioner<T> copy() {
            return new ShufflePartitioner<T>();
        }
    
        @Override
        public String toString() {
            return "SHUFFLE";
        }
    }
    
    image.png

    RebalancePartitioner: DataStream => DataStream

    `RebalancePartitioner,REBALANCE分区。`将记录以循环的方式输出到下游Operator的每个实例。
    RebalancePartitioner数据会被循环发送到下游的每一个实例中进行处理。
    /**
     *通过循环的方式依次发送到下游的task
     * @param <T>
     */
    @Internal
    public class RebalancePartitioner<T> extends StreamPartitioner<T> {
        private static final long serialVersionUID = 1L;
    
        private int nextChannelToSendTo;
    
        @Override
        public void setup(int numberOfChannels) {
            super.setup(numberOfChannels);
            //初始化channel的id,返回[0,numberOfChannels)的伪随机数
            nextChannelToSendTo = ThreadLocalRandom.current().nextInt(numberOfChannels);
        }
    
        @Override
        public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
            //循环依次发送到下游的task,比如:nextChannelToSendTo初始值为0,numberOfChannels(下游算子的实例个数,并行度)值为2
            //则第一次发送到ID = 1的task,第二次发送到ID = 0的task,第三次发送到ID = 1的task上...依次类推
            nextChannelToSendTo = (nextChannelToSendTo + 1) % numberOfChannels;
            return nextChannelToSendTo;
        }
    
        public StreamPartitioner<T> copy() {
            return this;
        }
    
        @Override
        public String toString() {
            return "REBALANCE";
        }
    }
    
    image.png
    RescalePartitioner: DataStream => DataStream

    RescalePartitioner,RESCALE分区。基于上下游Operator的并行度,将记录以循环的方式输出到下游Operator的每个实例。举例: 上游并行度是2,下游是4,则上游一个并行度以循环的方式将记录输出到下游的两个并行度上;上游另一个并行度以循环的方式将记录输出到下游另两个并行度上。若上游并行度是4,下游并行度是2,则上游两个并行度将记录输出到下游一个并行度上;上游另两个并行度将记录输出到下游另一个并行度上。


    image.png

    这种分区器会根据上下游算子的并行度,循环的方式输出到下游算子的每个实例。这里有点难以理解,假设上游并行度为2,编号为A和B。下游并行度为4,编号为1,2,3,4。那么A则把数据循环发送给1和2,B则把数据循环发送给3和4。假设上游并行度为4,编号为A,B,C,D。下游并行度为2,编号为1,2。那么A和B则把数据发送给1,C和D则把数据发送给2。

    @Internal
    public class RescalePartitioner<T> extends StreamPartitioner<T> {
        private static final long serialVersionUID = 1L;
    
        private int nextChannelToSendTo = -1;
    
        @Override
        public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
            if (++nextChannelToSendTo >= numberOfChannels) {
                nextChannelToSendTo = 0;
            }
            return nextChannelToSendTo;
        }
    
        public StreamPartitioner<T> copy() {
            return this;
        }
    
        @Override
        public String toString() {
            return "RESCALE";
        }
    }
    

    BroadcastPartitioner: DataStream => DataStream

    BroadcastPartitioner,BROADCAST分区。广播分区将上游数据集输出到下游Operator的每个实例中。适合于大数据集Join小数据集的场景。
    BroadcastPartitioner广播分区会将上游数据输出到下游算子的每个实例中。适合于大数据集和小数据集做Jion的场景。
    

    ForwardPartitioner

    ForwardPartitioner,FORWARD分区。将记录输出到下游本地的operator实例。ForwardPartitioner分区器要求上下游算子并行度一样。上下游Operator同属一个SubTasks`。
    ForwardPartitionerForwardPartitioner 用于将记录输出到下游本地的算子实例。它要求上下游算子并行度一样。简单的说,ForwardPartitioner用来做数据的控制台打印。

    /**
     * 发送到下游对应的第一个task
     * @param <T>
     */
    @Internal
    public class ForwardPartitioner<T> extends StreamPartitioner<T> {
        private static final long serialVersionUID = 1L;
    
        @Override
        public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
            return 0;
        }
    
        public StreamPartitioner<T> copy() {
            return this;
        }
    
        @Override
        public String toString() {
            return "FORWARD";
        }
    }
    

    image.png

    在上下游的算子没有指定分区器的情况下,如果上下游的算子并行度一致,则使用ForwardPartitioner,否则使用RebalancePartitioner,对于ForwardPartitioner,必须保证上下游算子并行度一致,否则会抛出异常
    
    //在上下游的算子没有指定分区器的情况下,如果上下游的算子并行度一致,则使用ForwardPartitioner,否则使用RebalancePartitioner
                if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {
                    partitioner = new ForwardPartitioner<Object>();
                } else if (partitioner == null) {
                    partitioner = new RebalancePartitioner<Object>();
                }
    
                if (partitioner instanceof ForwardPartitioner) {
                    //如果上下游的并行度不一致,会抛出异常
                    if (upstreamNode.getParallelism() != downstreamNode.getParallelism()) {
                        throw new UnsupportedOperationException("Forward partitioning does not allow " +
                            "change of parallelism. Upstream operation: " + upstreamNode + " parallelism: " + upstreamNode.getParallelism() +
                            ", downstream operation: " + downstreamNode + " parallelism: " + downstreamNode.getParallelism() +
                            " You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global.");
                    }
                }
    

    KeyGroupStreamPartitioner(HASH方式):

    KeyGroupStreamPartitioner,HASH分区。将记录按Key的Hash值输出到下游Operator实例。
    KeyGroupStreamPartitionerHash分区器。会将数据按 Key 的 Hash 值输出到下游算子实例中。

    /**
     * 根据key的分组索引选择发送到相对应的下游subtask
     * @param <T>
     * @param <K>
     */
    @Internal
    public class KeyGroupStreamPartitioner<T, K> extends StreamPartitioner<T> implements ConfigurableStreamPartitioner {
    ...
    
        @Override
        public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
            K key;
            try {
                key = keySelector.getKey(record.getInstance().getValue());
            } catch (Exception e) {
                throw new RuntimeException("Could not extract key from " + record.getInstance().getValue(), e);
            }
            //调用KeyGroupRangeAssignment类的assignKeyToParallelOperator方法,代码如下所示
            return KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism, numberOfChannels);
        }
    ...
    }
    
    public final class KeyGroupRangeAssignment {
    ...
    
        /**
         * 根据key分配一个并行算子实例的索引,该索引即为该key要发送的下游算子实例的路由信息,
         * 即该key发送到哪一个task
         */
        public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism) {
            Preconditions.checkNotNull(key, "Assigned key must not be null!");
            return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism));
        }
    
        /**
         *根据key分配一个分组id(keyGroupId)
         */
        public static int assignToKeyGroup(Object key, int maxParallelism) {
            Preconditions.checkNotNull(key, "Assigned key must not be null!");
            //获取key的hashcode
            return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);
        }
    
        /**
         * 根据key分配一个分组id(keyGroupId),
         */
        public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) {
    
            //与maxParallelism取余,获取keyGroupId
            return MathUtils.murmurHash(keyHash) % maxParallelism;
        }
    
        //计算分区index,即该key group应该发送到下游的哪一个算子实例
        public static int computeOperatorIndexForKeyGroup(int maxParallelism, int parallelism, int keyGroupId) {
            return keyGroupId * parallelism / maxParallelism;
        }
    ...
    
    image.png
    CustomPartitionerWrapper

    CustomPartitionerWrapper,CUSTOM分区。`通过Partitioner实例的partition方法(自定义的)将记录输出到下游。
    CustomPartitionerWrapper用户自定义分区器。需要用户自己实现Partitioner接口,来定义自己的分区逻辑。
    通过Partitioner实例的partition方法(自定义的)将记录输出到下游。
    例如:

    static class CustomPartitioner implements Partitioner<String> {
          @Override
          public int partition(String key, int numPartitions) {
              switch (key){
                  case "1":
                      return 1;
                  case "2":
                      return 2;
                  case "3":
                      return 3;
                  default:
                      return 4;
              }
          }
      }
    

    相关文章

      网友评论

          本文标题:07. Flink8种分区策略及源码解读

          本文链接:https://www.haomeiwen.com/subject/fytlzhtx.html