美文网首页
基于Apache Flink的流处理 第七章 有状态算子和应用

基于Apache Flink的流处理 第七章 有状态算子和应用

作者: kaiker | 来源:发表于2021-09-25 13:34 被阅读0次

1、键值分区有状态算子

  • 只能由作用在KeyedStream上面的函数使用
  • 状态原语定义单个键值对应的状态结构

ValueState[T]

用于保存类型为T的单个值

ListState[T]

用于保存类型为T的元素列表,可以调用add、addAll更新元素;可以用get访问元素;使用update更新整个列表

MapState[K, V]

用于保存一组键值映射,有get put remove contains等方法

ReducingState[T]

add方法会直接返回一个ReduceFunction聚合后的值

AggregatingState[I, O]

get方法会直接计算最终结果并返回,也会记录聚合值

所有的状态都支持.clear()清空状态

  • 比如下面这个例子可以一直保存计算的状态,状态的初始化需要使用富函数,在open里进行初始化
package BaseOperationTest;

import Bean.SensorReading;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.*;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class KeyStateTest {
    public static void main(String[] args) throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置并行度 = 1
        env.setParallelism(1);
        // 从本地socket读取数据
        DataStream<String> inputStream = env.readTextFile("/Users/kaiker/Documents/projects/flink_study/src/main/resources/sensor.txt");

        // 转换成SensorReading类型
        DataStream<SensorReading> dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        });

        // 使用自定义map方法,里面使用 我们自定义的Keyed State
        DataStream<Integer> resultStream = dataStream
                .keyBy(SensorReading::getId)
                .map(new MyMapper());

        resultStream.print("result");
        env.execute();
    }

    // 自定义map富函数,测试 键控状态
    public static class MyMapper extends RichMapFunction<SensorReading,Integer> {

        //        Exception in thread "main" java.lang.IllegalStateException: The runtime context has not been initialized.
        //        ValueState<Integer> valueState = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("my-int", Integer.class));

        private ValueState<Integer> valueState;


        // 其它类型状态的声明
        private ListState<String> myListState;
        private MapState<String, Double> myMapState;
        private ReducingState<SensorReading> myReducingState;

        @Override
        public void open(Configuration parameters) throws Exception {
            valueState = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("my-int", Integer.class));

            myListState = getRuntimeContext().getListState(new ListStateDescriptor<String>("my-list", String.class));
            myMapState = getRuntimeContext().getMapState(new MapStateDescriptor<String, Double>("my-map", String.class, Double.class));
            //            myReducingState = getRuntimeContext().getReducingState(new ReducingStateDescriptor<SensorReading>())

        }

        // 这里就简单的统计每个 传感器的 信息数量
        @Override
        public Integer map(SensorReading value) throws Exception {
            // 其它状态API调用
            // list state
            for(String str: myListState.get()){
                System.out.println(value.getId() + " " +str);
            }
            myListState.add("hello");
            // map state
//            myMapState.get("1");
//            myMapState.put("2", 12.3);
//            myMapState.remove("2");
            // reducing state
            //            myReducingState.add(value);

//            myMapState.clear();


            Integer count = valueState.value();
            // 第一次获取是null,需要判断
            count = count==null?0:count;
            ++count;
            valueState.update(count);
            return count;
        }
    }
}

2、全局的状态

ListCheckpointed

可以实现列表型的算子状态,需要实现ListCheckpointed接口

  • snapshotState会在触发有状态函数生成检查点时调用,接受检查点编号和JobManager开始创建检查点的机器时间timestamp
  • restoreState()方法会在初始化函数状态调用,接收一个状态对象列表,基于这些对象恢复算子状态进行恢复
public static class MyCountMapper implements MapFunction<SensorReading, Integer>, ListCheckpointed<Integer> {
        // 定义一个本地变量,作为算子状态
        private Integer count = 0;

        @Override
        public Integer map(SensorReading value) throws Exception {
            count++;
            return count;
        }

        @Override
        public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
            return Collections.singletonList(count);
        }

        @Override
        public void restoreState(List<Integer> state) throws Exception {
            for (Integer num : state) {
                count += num;
            }
        }
    }

CheckpointedFunction

用于制定有状态函数的最底层借口。

  • initializeState() 触发时机是应用启动或由于故障而重启任务,传入一个FunctionInitializationContext对象,它可以访问OperatorStateStore和KeyedStateStore对象,这两个状态存储对象能够使用Flink运行时来注册函数状态并返回状态对象。
  • snapshotState() 方法会在生成检查点之前调用,接收FunctionSnapshotContext对象,可以获取检查点编号以及JobManager初始化检查点时间戳。该方法是确保检查点开始之前,所有状态对象都已更新完毕。

https://www.jianshu.com/p/247c7a66c69

3、有状态应用的性能及鲁棒性

选择状态后端

  • MemoryStateBackend将状态以常规对象的方式存储在TaskManager进程的JVM堆里,生成检查点时,会将状态发送至JobManager并保存到它的堆内存中。建议近用于开发和测试。
  • FsStateBackend也将本地状态保存在TaskManager的JVM堆内。但创建检查点时状态会写入远程持久化文件系统。
  • RocksDBStateBackend会把全部状态存到本地RocksDB实例中。RocksDB是一个嵌入式键值存储,可以将数据保存到本地磁盘上。

选择状态原语

状态原语就是ValueState、ListState、MapState之类。不同的状态后端处理状态原语的时候可能会有序列化开销。

可查询式状态

任何键值分区的状态都可以作为可查询式状态暴露给外部应用,QueryableStateServer用于处理客户端代理的请求,运行在TM上,会根据查询的状态返回给客户端代理。QueryableStateClientProxy用于接收并相应客户端请求。


可查询式状态服务架构
  • 可以在创建一个状态之后,调用.setQueryable使得这个状态可查询。
  • 也可以使用asQueryableState()创建,对于每个收到的记录可查询式状态的数据汇都会用它去更新状态值。
dataStream.map(new MyCountMapper())
.keyBy(.1)
.timeWindow(Time.seconds(10))
.max(1)
.asQueryableState("maxTemperature")

相关文章

网友评论

      本文标题:基于Apache Flink的流处理 第七章 有状态算子和应用

      本文链接:https://www.haomeiwen.com/subject/ybfxnltx.html