美文网首页
flink实验想法

flink实验想法

作者: 大大大大大大大熊 | 来源:发表于2018-08-31 15:59 被阅读0次

    Sink

    writeAsText("file:///home/wangxiaotong/result");
    结果:在对应工作节点的本地上,生成了result文件保存数据。

    state

    链接:https://zhuanlan.zhihu.com/p/29003852

    • Keyed State
      !!!keystream!!!中才能,而且是与key用关。可以当成是operator state按照key划分了,每个key对应一个state-partition。
    • Operator State
      !!!nokeystream!!!每个operator state对应一个并行度的实例。
    • Managed State:flink里封装好的数据结构,比如“ValueState”, “ListState”,等,他们会被编码到checkpoint里

    =
    flink中的state可以从2个纬度来划分:是否属于某个key(key state或者operator state),是否受flink管理(raw state或者managed state)。key state用于在KeyedStream中保存状态,operater state用于在普通的非key中保存状态。managed state是指被flink所管理的状态。raw state是被应用程序自己管理,flink会调用相应的接口方法来实现状态的restore和snapshot。

    Using Managed Keyed State
    1. ValueState<T>:使用update(T) 来set,使用T value()来取回
    2. ListState<T>:add(T) or addAll(List<T>)来添加元素,使用 Iterable<T> get()来获取元素, 使用update(List<T>)来重写已经存在的list
    3. ReducingState<T>: 维持一个聚合结果的值,使用 add(T) 来加入元素,并且使用ReduceFunction函数来reduce出来聚合结果
    4. AggregatingState<IN, OUT>: 维持一个聚合结果的值,和3不同的是,这里聚合的类型可以是不同的元素类型,使用 add(IN)来加入元素,并且使用AggregateFunction函数来aggregated 出来聚合结果
    5. FoldingState<T, ACC>: flink 1.4之后将会被弃用,推荐使用AggregatingState 来代替
    6. MapState<UK, UV>: 使用map存储key-value对,通过 put(UK, UV) or putAll(Map<UK, UV>)来添加,使用 get(UK)来获取。
    • Raw State:算子自己的数据结构,他们会被编写成一系列的bytes写入checkpoint里

    所有的datastream function可以使用managed state,但是raw state需要通过接口。更推荐managed state。

    StateDescriptor:handle state
    有a ValueStateDescriptor, a ListStateDescriptor, a ReducingStateDescriptor, a FoldingStateDescriptor or a MapStateDescriptor.

    获取state:

    • ValueState<T> getState(ValueStateDescriptor<T>)
    • ReducingState<T> getReducingState(ReducingStateDescriptor<T>)
    • ListState<T> getListState(ListStateDescriptor<T>)
    • AggregatingState<IN, OUT> -
      getAggregatingState(AggregatingState<IN, OUT>)
    • FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC>)
    • MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)

    state的存在时间:State Time-To-Live (TTL)

    import org.apache.flink.api.common.state.StateTtlConfig;
    import org.apache.flink.api.common.state.ValueStateDescriptor;
    import org.apache.flink.api.common.time.Time;
    
    StateTtlConfig ttlConfig = StateTtlConfig
        .newBuilder(Time.seconds(1))
        .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
        .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
        .build();
        
    ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class);
    stateDescriptor.enableTimeToLive(ttlConfig);
    

    过期的state清除:只能通过显示的调用read清除,比如调用 ValueState.value()。这意味着默认情况下,如果未读取过期状态,则不会删除它,可能会导致状态不断增长。此外,您可以在获取 the full state snapshot 时激活 cleanup ,这将减小其大小。
    在当前实现下不会清除本地状态,但在从上一个快照恢复的情况下,它不会包括已删除的过期状态。 它可以在StateTtlConfig中配置:

    import org.apache.flink.api.common.state.StateTtlConfig;
    import org.apache.flink.api.common.time.Time;
    
    StateTtlConfig ttlConfig = StateTtlConfig
        .newBuilder(Time.seconds(1))
        .cleanupFullSnapshot()
        .build();
    

    note:This option is not applicable for the incremental checkpointing in the RocksDB state backend.

    Using Managed Operator State

    只能使用两种方法:第一种,通过CheckpointedFunction接口,支持两种分配策略。第二种,通过ListCheckpointed <T extends Serializable>接口,且 list-style state 和even-split redistribution (事务切分的重分配策略),如果状态是re-partitionable,你可以使用Collections.singletonList(MY_STATE)。都当前只支持ListState 的状态。

    分配策略

    • Even-split redistribution:
      在restore/redistribution阶段的时候,在多并行度的情况下,整个list state会切分成多个sublists。ListCheckpointed接口只支持这个。
    • Union redistribution:
      在restore/redistribution阶段的时候,每个operator会得到完整的list state。
      CheckpointedFunction接口可以选择两种,通过
    checkpointedState = context.getOperatorStateStore().getUnionListState(descriptor);
    checkpointedState = context.getOperatorStateStore().getListState(descriptor);
    

    checkpointfunction:

    void snapshotState(FunctionSnapshotContext context) throws Exception;
    
    void initializeState(FunctionInitializationContext context) throws Exception;
    
    1. 初始化或者recovery的时候调用initializeState()
    2. 例子:
    public class BufferingSink
            implements SinkFunction<Tuple2<String, Integer>>,
                       CheckpointedFunction {
    
        private final int threshold;
    
        private transient ListState<Tuple2<String, Integer>> checkpointedState;
    
        private List<Tuple2<String, Integer>> bufferedElements;
    
        public BufferingSink(int threshold) {
            this.threshold = threshold;
            this.bufferedElements = new ArrayList<>();
        }
    
        @Override
        public void invoke(Tuple2<String, Integer> value) throws Exception {
            bufferedElements.add(value);
            if (bufferedElements.size() == threshold) {
                for (Tuple2<String, Integer> element: bufferedElements) {
                    // send it to the sink
                }
                bufferedElements.clear();
            }
        }
    
        @Override
        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            checkpointedState.clear();
            for (Tuple2<String, Integer> element : bufferedElements) {
                checkpointedState.add(element);
            }
        }
    
        @Override
        public void initializeState(FunctionInitializationContext context) throws Exception {
            ListStateDescriptor<Tuple2<String, Integer>> descriptor =
                new ListStateDescriptor<>(
                    "buffered-elements",
                    TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));
    
            checkpointedState = context.getOperatorStateStore().getListState(descriptor);
    
            if (context.isRestored()) {
                for (Tuple2<String, Integer> element : checkpointedState.get()) {
                    bufferedElements.add(element);
                }
            }
        }
    }
    

    故障恢复的时候,调用 isRestored()来查看当前的状态是不是恢复状态,如果是的话,restore的逻辑将会被应用。

    单个状态的保存可以参考Stateful Source Functions(https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/state.html#stateful-source-functions)的offset的保存。

    Stateful Source Functions

    为了保证r exactly-once semantics on failure/recovery。用户需要使用一个lock。

    public static class CounterSource
            extends RichParallelSourceFunction<Long>
            implements ListCheckpointed<Long> {
    
        /**  current offset for exactly once semantics */
        private Long offset;
    
        /** flag for job cancellation */
        private volatile boolean isRunning = true;
    
        @Override
        public void run(SourceContext<Long> ctx) {
            final Object lock = ctx.getCheckpointLock();
    
            while (isRunning) {
                // output and state update are atomic
                synchronized (lock) {
                    ctx.collect(offset);
                    offset += 1;
                }
            }
        }
    
        @Override
        public void cancel() {
            isRunning = false;
        }
    
        @Override
        public List<Long> snapshotState(long checkpointId, long checkpointTimestamp) {
            return Collections.singletonList(offset);
        }
    
        @Override
        public void restoreState(List<Long> state) {
            for (Long s : state)
                offset = s;
        }
    }
    

    The Broadcast State Pattern

    1. it has a map format,
    2. it is only available to specific operators that have as inputs a broadcasted stream and a non-broadcasted one, and
    3. such an operator can have multiple broadcast states with different names.
    BroadcastProcessFunction and KeyedBroadcastProcessFunction
    public abstract class BroadcastProcessFunction<IN1, IN2, OUT> extends BaseBroadcastProcessFunction {
    
        public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception;
    
        public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
    }
    public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> {
    
        public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception;
    
        public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
    
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception;
    }
    

    processElement用来处理non-broadcasted数据流,processBroadcastElement用来处理broadcasted 流。
    并且processBroadcastElement用的时context,具有以下功能:

    • give access to the broadcast state: ctx.getBroadcastState(MapStateDescriptor<K, V> stateDescriptor)
    • allow to query the timestamp of the element: ctx.timestamp(),
    • get the current watermark: ctx.currentWatermark()
    • get the current processing time: ctx.currentProcessingTime(), and
    • emit elements to side-outputs: ctx.output(OutputTag<X> outputTag, X value).

    Checkpointing

    启动:StreamExecutionEnvironment的 enableCheckpointing(n) ,n是checkpoint的间隔
    参数:

    • exactly-once vs. at-least-once:后者更适用于低延迟应用
    • checkpoint timeout: 超时的话,这个checkpoint进程将会被终止
    • minimum time between checkpoints: 表示新的checkpoint将会在上个完成的几秒内开始,比如设置5000,那么上个checkpoint完成的时候,下一个新checkpoint将会在5秒内开始。不受duration and the checkpoint interval的影响。
      注意:间隔需要设置比这个参数大。这个值也意味着并发检查点的数量是1。
    • number of concurrent checkpoints: 默认,只有一个chekpoint在处理。
      注意:这个参数和上面的最小时间不能同时设定并存。
    • externalized checkpoints: 定期把checkpoint持久化在外部。外部化的检查点将它们的元数据写到持久存储中,并且当job失败时不会自动清理。这样,如果你的工作失败了,你将会有一个检查点。设置可参考: deployment notes on externalized checkpoints.
    • fail/continue task on checkpoint errors: 默认是开启的。如果task在checkpoint的阶段失败的话,那么也会认为这个task失败了。可以设定不开启,那么会简单的拒绝checkpoint协调器,并且继续执行。
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    // start a checkpoint every 1000 ms
    env.enableCheckpointing(1000);
    
    // advanced options:
    
    // set mode to exactly-once (this is the default)
    env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    
    // make sure 500 ms of progress happen between checkpoints
    env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
    
    // checkpoints have to complete within one minute, or are discarded
    env.getCheckpointConfig().setCheckpointTimeout(60000);
    
    // allow only one checkpoint to be in progress at the same time
    env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
    
    // enable externalized checkpoints which are retained after job cancellation
    env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    

    Related Config Options

    image.png

    Selecting a State Backend

    Flink会存储定时器的,stateful operator的state,还有connectors,window,用户自定义的state。默认,state存在taskmanager的内存里,checkpoint存在jobmanager的内存里。
    通过参数设置:treamExecutionEnvironment.setStateBackend(…).

    State Checkpoints in Iterative Jobs

    迭代程序的checkpoint,需要启动特殊的标志force:
    env.enableCheckpointing(interval, force = true).注意:故障的时候,loop的边内正处理的数据和修改的state将会丢失

    Restart Strategies

    可通过 flink-conf.yaml来配置重启策略,
    如果没开启checkpoint机制,那么是no restart策略。
    如果开启了checkpoint机制,但是没配置启动策略,那么默认启动 fixed-delay 策略,最多尝试Integer.MAX_VALUE次。


    image.png

    1:Fixed Delay ,故障重启后会尝试3次,每次间隔10s。

    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
      3, // number of restart attempts
      Time.of(10, TimeUnit.SECONDS) // delay
    ));
    

    也可在flink-conf.yaml里面配置相关设置:尝试次数,间隔延时等。


    image.png

    2:Failure Rate,和上面的不同的是,如果在设定的时间区间内,超出了故障的次数,那么就会最终失败。


    image.png

    也可在函数内配置:

    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    env.setRestartStrategy(RestartStrategies.failureRateRestart(
      3, // max failures per interval
      Time.of(5, TimeUnit.MINUTES), //time interval for measuring failure rate
      Time.of(10, TimeUnit.SECONDS) // delay
    ));
    

    3:No Restart

    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    env.setRestartStrategy(RestartStrategies.noRestart());
    

    4:Fallback Restart Strategy

    State Backends

    可使用的存储:

    • MemoryStateBackend
    • FsStateBackend
    • RocksDBStateBackend

    默认MemoryStateBackend

    MemoryStateBackend

    能配置成异步快照,推荐使用异步快照,从而消除流水线的blocking,这是默认设置的。不想开启异步快照,这需要设置标志位,如:
    new MemoryStateBackend(MAX_MEM_STATE_SIZE, false);
    限制:

    • 默认每个state的sizie是5MB,这个可以MemoryStateBackend的数据结构里配置增加。
    • state不能比akka frame size大
    • The aggregate state must fit into the JobManager memory.

    适用:

    • 本地部署和debugging
    • 小状态的任务,比如record-at-a-time functions (Map, FlatMap, Filter, …),kafka的consumer要求非常小的状态

    FsStateBackend

    保持in-flight数据存到TaskManager的内存里。
    Checkpoint的时候:把状态快照,然后写到配置好的文件系统和目录里(“hdfs://namenode:40010/flink/checkpoints” or “file:///data/flink/checkpoints”.)。最小的元数据存到JobManager的内存里。默认是开启异步的。不想开启异步快照,这需要设置标志位,如:
    new FsStateBackend(path, false);
    适用:

    • 大状态任务,长窗口,大键值状态
    • 所有高可用的设置

    RocksDBStateBackend

    保持in-flight数据到RocksDB数据库,这个数据库存在TaskManager的数据目录下。
    Checkpoint的时候:整个RocksDB数据库会checkpointed到配置好的文件系统和目录下(“hdfs://namenode:40010/flink/checkpoints” or “file:///data/flink/checkpoints”.)。最小的元数据存到JobManager的内存里。
    限制:

    • RocksDB的接口是基于byte[],key的大小和值限制是2^31。
      适用:
    • 非常大状态任务,长窗口,大键值状态
    • 所有高可用的设置

    Note:保持的state只是受限于磁盘的大小,比起保持state在内存中的FsStateBackend要大的多,但是他的最大吞吐量也会降低。
    他也是唯一支持增量checkpoint的后端。

    Configuring a State Backend

    1. 每个任务粒度的设置:
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));
    
    1. 或者在flink-conf.yaml里面配置所有Setting Default State Backend,关键字state.backend., jobmanager (MemoryStateBackend), filesystem (FsStateBackend), rocksdb (RocksDBStateBackend),还有种org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory for RocksDBStateBackend.
      比如:
    # The backend that will be used to store operator state checkpoints
    
    state.backend: filesystem
    
    
    # Directory for storing checkpoints
    
    state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints
    

    需要在flink-config.yaml里面配置,在FWC.yaml配置不行

    image.png

    Custom Serialization for Managed State

    某些特殊需求要 Managed State定制serialization逻辑。如果没需求,只用简单的序列化,这部分可以跳过。

    Retained Checkpoints

    默认情况下,checkpoint不会保留,仅用于从失败中恢复作业。 取消任务时会删除它们。 但是,您可以配置要保留的定期checkpoint。 根据配置,当作业失败或取消时,不会自动清除这些保留的checkpoint。 这样,如果您的工作失败,您将有一个checkpoint可以从中恢复。

    CheckpointConfig config = env.getCheckpointConfig();
    config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    

    - ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION: 当job取消的时候保留checkpoint,需要手动去删除。 ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 当job取消的时候删除checkpoint,checkpoint只在job故障的时候有用。

    flink团队的实验

    https://data-artisans.com/blog/high-throughput-low-latency-and-exactly-once-stream-processing-with-apache-flink
    状态实验:
    https://github.com/StephanEwen/flink-demos/tree/master/streaming-state-machine

    容错使用

    checkpointing enabled, and for end-to-end exactly once guarantees, you need to havesources that support replayand sinks that are either idempotent or transactional.

    in your case the first place to start might be to configure a restart strategy

    https://stackoverflow.com/questions/43637190/does-flink-garauntee-tasks-fault-tolerance-in-all-cases

    image.png

    https://www.slideshare.net/FlinkForward/flink-forward-berlin-2017-tzuli-gordon-tai-managing-state-in-apache-flink

    stateful wordcount
    http://sinanbir.com/apache-flink-stateful-streaming-example/

    ValueStateDescriptor

    两种初始化:

    public void processElement(Tuple2<String, Long> input,
                ProcessFunction<Tuple2<String, Long>, Tuple2<String, Long>>.Context ctx,
                Collector<Tuple2<String, Long>> out) throws Exception {
            // TODO Auto-generated method stub
            Long currentSum = sum.value();
             if (currentSum == null) {
                 currentSum = 0L;
                }
            currentSum += input.f1;
            sum.update(currentSum);
            out.collect(new Tuple2<>(input.f0, currentSum));
        }
       @Override
        public void open(Configuration config) {
            ValueStateDescriptor<Long> descriptor =
                    new ValueStateDescriptor<>(
                            "wordsum", // the state name
                            Long.class); // default value of the state, if nothing was set
            sum = getRuntimeContext().getState(descriptor);
        }
    
       @Override
        public void open(Configuration config) {
            ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
                    new ValueStateDescriptor<>(
                            "wordsum", // the state name
                             Long.class, // type information
                            0L); // default value of the state, if nothing was set
            sum = getRuntimeContext().getState(descriptor);
        }
    

    wordcount的窗口多种实现

    第一种:使用apply

    text.flatMap(new Tokenizer())
                    // group by the tuple field "0" and sum up tuple field "1"
                            .keyBy(0)
                            .countWindow(11)
                                                    .apply (new WindowFunction<Tuple2<String,Integer>, Tuple2<String,Integer>, Tuple, GlobalWindow>() {
            public void apply (Tuple tuple,
                    GlobalWindow window,
                    Iterable<Tuple2<String, Integer>> values,
                    Collector<Tuple2<String, Integer>> out) throws Exception {
                int sum = 0;
               
                for (Tuple2<String, Integer> t: values) {
                    sum += t.f1;
                    System.out.println("word"+t.f0);
                }
                out.collect (new Tuple2<String, Integer>( String.valueOf(window) ,sum));
            }
        });
    

    第二种:使用reduce

    text.flatMap(new Tokenizer())
                    // group by the tuple field "0" and sum up tuple field "1"
                            .keyBy(0)
                            .countWindow(11)
                            .reduce(new ReduceFunction<Tuple2<String,Integer>>() {
                                public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
                                    return new Tuple2<String,Integer>(value1.f0, value1.f1 + value2.f1);
                                }
                            });
    

    结论:apply和process是通用的窗口函数,reduce等是对sum等特殊情况的简单应用。而且apply这是ProcessWindowFunction的旧版本,它提供较少的context信息,并且没有一些高级功能,例如per-window keyed state。 此接口将在某个时候弃用。推荐使用process。

    窗口和key的问题:
    Applies the given window function to each window. The window function is called for each evaluation of the window for each key individually.
    而且经过实践和提问,发现每个窗口函数处理的是按per-key粒度的。

    Generating Timestamps / Watermarks

    只在event time的环境下有效。
    timestamp和watermarks的生成有两种方式:

    1. 直接在source中生成,调用collectWithTimestamp和emitWatermark,注意这个方式会被第二种方式覆盖。
    2. Timestamp Assigners / Watermark Generators接口,通常在source之后调用,但是如果是map或者filter操作也可以不需要,但是如果是设计event time的操作,比如窗口,那么在这种算子前就得设定好timestamp assigner 。如果使用kafka source的话,flink在内部设定了接口。

    设定好了2中timestamp assigner,区别是watermark的生成方式不同:

    1. With Periodic Watermarks
      AssignerWithPeriodicWatermarks 采集时间戳,并且定期的发送watermark,通过ExecutionConfig.setAutoWatermarkInterval(...).设定发送间隔,单位毫秒。只会发送比之前大的watermark。提供的BoundedOutOfOrdernessTimestampExtractor 和BoundedOutOfOrdernessGenerator 相似,功能是提取时间戳,并且按延时容忍度设置了产生watermark的规则。
    2. With Punctuated Watermarks
      watermarkAssignerWithPunctuatedWatermarks采集时间戳,并且按某些特殊的标记触发计算watermark。checkAndGetNextWatermark用来复制watermark。

    注意:可以每个记录就产生一个watermark,但是每个watermark可能会导致下游的一些计算,所以过多的watermark会导致性能的下降。

    Pre-defined Timestamp Extractors / Watermark Emitters

    1. Assigners with ascending timestamps
      如果kafka partition中的时间戳在每个partition中是严格递增的,那么可以直接可以使用定义好的ascending timestamps。


      image.png

    这个的watermark是 current timestamp,因为是严格增的时间戳,所以不会有更早的时间戳到达。

    1. Assigners allowing a fixed amount of lateness
      设定一个延时容忍度,如果被认定为late tuple,那么默认是被忽略,丢弃不加入计算。
    image.png

    Working with window results

    窗口计算后的timestamp应该怎么算呢?
    这里建议:

    This is set to the maximum allowed timestamp of the processed window, which is end timestamp - 1, since the window-end timestamp is exclusive. Note that this is true for both event-time windows and processing-time windows.

    设定为最大末尾时间戳-1,这个设定对于event-time windows 和 processing-time windows都正确。对于process窗口来说,这个并没有什么影响,但是对于event窗口来说,这个对连续的窗口计算有用:比如下面的topK

    image.png

    REST API

    获取latency:

    1. 获取能获取的metrics列表 http://10.11.6.70:8081/jobs/metrics
    2. 使用GET指令获取具体的值
      http://10.11.6.70:8081/jobs/metrics?get=latency.source_id.cbc357ccb763df2852fee8c4fc7d55f2.operator_id.7df19f87deec5680128845fd9a6ca18d.operator_subtask_index.0.latency_median

    并行度设置

    每个operator最好<=核数*机器数目

    相关文章

      网友评论

          本文标题:flink实验想法

          本文链接:https://www.haomeiwen.com/subject/zbzswftx.html