美文网首页
Flink状态机制

Flink状态机制

作者: 浮zai梦里 | 来源:发表于2019-03-07 23:03 被阅读0次
    什么是状态

    首先要知道,状态指的是算子的状态。为什么算子需要状态,状态的用处无非两点:

    1. 实现算子的逻辑(作为一种中间状态)
    2. 错误恢复
    实现算子的逻辑

    用官网的例子,假设一段数据流格式长这样<1,3><1,2><1,3><2,3><2,5>
    那么我想对相同第一个元素所有tuple,求第二个元素的平均值。该如何实现?

    你可能会想到使用Flink自带的聚合函数,其中该函数缓存所有的相同key的元素,在函数里做遍历累加求值的操作。这很正确。但有一个不好的点,需要缓存所有数据。

    如果现在就让你用map操作实现呢?而且不缓存所以数据

    这就需要用到状态了。试想一下,如果在map算子里面维护这样一个变量<a,b>。a是该算子的key的次数,上面数据key为1的次数便是3(a=3),b是所有第二个元素之和。

    那么上面数据流在每个map算子中维护了<3,8>,<2,8>的状态。好了,平均值就出来了。而且,这个状态,来一次数据更新一次,不需要缓存。

    贴下代码:

    public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
    
        /**
         * The ValueState handle. The first field is the count, the second field a running sum.
         */
        private transient ValueState<Tuple2<Long, Long>> sum;
    
        @Override
        public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {
    
            // access the state value
            Tuple2<Long, Long> currentSum = sum.value();
    
            // update the count
            currentSum.f0 += 1;
    
            // add the second field of the input value
            currentSum.f1 += input.f1;
    
            // update the state
            sum.update(currentSum);
    
            // if the count reaches 2, emit the average and clear the state
            if (currentSum.f0 >= 2) {
                out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
                sum.clear();
            }
        }
    
        @Override
        public void open(Configuration config) {
            ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
                    new ValueStateDescriptor<>(
                            "average", // the state name
                            TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information
                            Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
            sum = getRuntimeContext().getState(descriptor);
        }
    }
    
    // this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
    env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))
            .keyBy(0)
            .flatMap(new CountWindowAverage())
            .print();
    
    // the printed output will be (1,4) and (1,5)
    
    
    错误恢复

    试想这样一个场景:
    需要将数据流的每个数据存入数据库,而且任务失败后重启能保证不将数据不重复落盘。怎么实现?

    首先对于落盘,肯定不能来一条存一条,考虑到性能问题,我们设定一个阈值,达到这个阈值触发落盘操作。

    那么任务一旦失败了,从哪开始恢复呢。这就肯定需要知道上一次落盘在哪发生的。

    这就又需要在落盘算子(SinkFunction)中保存一个状态,用来记录在上次任务失败时所缓存的还没有落盘的数据,只要把这批数据存数据库。后面的操作继续执行就可以了。

    代码如下:

    public class BufferingSink
            implements SinkFunction<Tuple2<String, Integer>>,
                       CheckpointedFunction {
    
        private final int threshold;
    
        private transient ListState<Tuple2<String, Integer>> checkpointedState;
    
        private List<Tuple2<String, Integer>> bufferedElements;
    
        public BufferingSink(int threshold) {
            this.threshold = threshold;
            this.bufferedElements = new ArrayList<>();
        }
    
        @Override
        public void invoke(Tuple2<String, Integer> value) throws Exception {
            bufferedElements.add(value);
            if (bufferedElements.size() == threshold) {
                for (Tuple2<String, Integer> element: bufferedElements) {
                    // send it to the sink
                }
                bufferedElements.clear();
            }
        }
    
        @Override
        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            checkpointedState.clear();
            for (Tuple2<String, Integer> element : bufferedElements) {
                checkpointedState.add(element);
            }
        }
    
        @Override
        public void initializeState(FunctionInitializationContext context) throws Exception {
            ListStateDescriptor<Tuple2<String, Integer>> descriptor =
                new ListStateDescriptor<>(
                    "buffered-elements",
                    TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));
    
            checkpointedState = context.getOperatorStateStore().getListState(descriptor);
    
            if (context.isRestored()) {
                for (Tuple2<String, Integer> element : checkpointedState.get()) {
                    bufferedElements.add(element);
                }
            }
        }
    }
    

    相关文章

      网友评论

          本文标题:Flink状态机制

          本文链接:https://www.haomeiwen.com/subject/sursuqtx.html