Prologue
在很久之前的《Spark Streaming/Flink广播实现作业配置动态更新》一文中,笔者简单介绍了Flink Streaming API中广播流和广播状态的使用方法。前几天见到社区群内有童鞋询问与广播状态相关的事情,于是写一篇深入一点的文章说说它。
Broadcast[Connected]Stream
拿之前的示意图复习一下。其中Stream A是普通的数据流,Stream B是含有控制信息等数据的控制流(control stream),且B会被广播。
在流B上调用DataStream.broadcast()方法并传入MapStateDescriptor作为状态描述符,就可以将它转化为广播流BroadcastStream。该方法的源码如下,注意MapStateDescriptor可以有多个。
public BroadcastStream<T> broadcast(final MapStateDescriptor<?, ?>... broadcastStateDescriptors) {
Preconditions.checkNotNull(broadcastStateDescriptors);
final DataStream<T> broadcastStream = setConnectionType(new BroadcastPartitioner<>());
return new BroadcastStream<>(environment, broadcastStream, broadcastStateDescriptors);
}
BroadcastPartitioner是专用于广播流的分区器。因为广播的具体逻辑要在后面靠connect()方法实现,所以实际上不用分区(即selectChannel()方法为空),只是简单地标记了isBroadcast为true而已。
broadcast()方法将执行环境StreamExecutionEnvironment、原始的DataStream和MapStateDescriptor一起包装成了BroadcastStream实例。BroadcastStream的实现非常简单,代码就不贴了。
接下来我们会在数据流A上调用DataStream.connect()方法,将它与被广播的流B连接起来,并生成一个BroadcastConnectedStream。
public <R> BroadcastConnectedStream<T, R> connect(BroadcastStream<R> broadcastStream) {
return new BroadcastConnectedStream<>(
environment,
this,
Preconditions.checkNotNull(broadcastStream),
broadcastStream.getBroadcastStateDescriptor());
}
然后就要在BroadcastConnectedStream上调用process()方法来分别处理两条流。我们知道,如果数据流A是一个KeyedStream,就要传入KeyedBroadcastProcessFunction;如果是一个普通的DataStream,就要传入BroadcastProcessFunction。下面以KeyedStream的情况为例,查看process()方法的源码。
public <KS, OUT> SingleOutputStreamOperator<OUT> process(
final KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> function,
final TypeInformation<OUT> outTypeInfo) {
Preconditions.checkNotNull(function);
Preconditions.checkArgument(inputStream1 instanceof KeyedStream,
"A KeyedBroadcastProcessFunction can only be used on a keyed stream.");
TwoInputStreamOperator<IN1, IN2, OUT> operator =
new CoBroadcastWithKeyedOperator<>(clean(function), broadcastStateDescriptors);
return transform("Co-Process-Broadcast-Keyed", outTypeInfo, operator);
}
CoBroadcastWith[Non]KeyedOperator
由上可见是通过构建CoBroadcastWithKeyedOperator这个算子来真正调用处理函数(另一种情况的算子名为CoBroadcastWithNonKeyedOperator),属于双流输入的TwoInputStreamOperator类别。该算子内维护的成员有如下几个。
private final List<MapStateDescriptor<?, ?>> broadcastStateDescriptors;
private transient TimestampedCollector<OUT> collector;
private transient Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> broadcastStates;
private transient ReadWriteContextImpl rwContext;
private transient ReadOnlyContextImpl rContext;
private transient OnTimerContextImpl onTimerContext;
其中,broadcastStateDescriptors就是文章开头通过broadcast()方法传入的状态描述符列表,而broadcastStates维护了状态描述符与状态实例之间的映射关系。另外,ReadWriteContextImpl和ReadOnlyContextImpl分别对应KeyedBroadcastProcessFunction的可读可写上下文和只读上下文,后面会看到它们的作用。
在算子的open()方法中可以看到初始化逻辑。
public void open() throws Exception {
super.open();
InternalTimerService<VoidNamespace> internalTimerService =
getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);
TimerService timerService = new SimpleTimerService(internalTimerService);
collector = new TimestampedCollector<>(output);
this.broadcastStates = new HashMap<>(broadcastStateDescriptors.size());
for (MapStateDescriptor<?, ?> descriptor: broadcastStateDescriptors) {
broadcastStates.put(descriptor, getOperatorStateBackend().getBroadcastState(descriptor));
}
rwContext = new ReadWriteContextImpl(getExecutionConfig(), getKeyedStateBackend(), userFunction, broadcastStates, timerService);
rContext = new ReadOnlyContextImpl(getExecutionConfig(), userFunction, broadcastStates, timerService);
onTimerContext = new OnTimerContextImpl(getExecutionConfig(), userFunction, broadcastStates, timerService);
}
分别观察两条流的处理方法processElement1/2(),1对应数据流,2对应广播流。
@Override
public void processElement1(StreamRecord<IN1> element) throws Exception {
collector.setTimestamp(element);
rContext.setElement(element);
userFunction.processElement(element.getValue(), rContext, collector);
rContext.setElement(null);
}
@Override
public void processElement2(StreamRecord<IN2> element) throws Exception {
collector.setTimestamp(element);
rwContext.setElement(element);
userFunction.processBroadcastElement(element.getValue(), rwContext, collector);
rwContext.setElement(null);
}
可以发现,处理数据流的processElement()方法对应的上下文为ReadOnlyContext,而处理广播流的processBroadcastElement()方法对应的上下文为ReadWriteContext。我们已经知道,在上述两个方法中都可以调用Context.getBroadcastState()方法来获取广播状态BroadcastState,它也是两条流之间的桥梁。那么getBroadcastState()有什么不同呢?
// 这是广播流对应的ReadWriteContextImpl.getBroadcastState()方法
@Override
public <K, V> BroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> stateDescriptor) {
Preconditions.checkNotNull(stateDescriptor);
stateDescriptor.initializeSerializerUnlessSet(config);
BroadcastState<K, V> state = (BroadcastState<K, V>) states.get(stateDescriptor);
if (state == null) {
throw new IllegalArgumentException("The requested state does not exist. " +
"Check for typos in your state descriptor, or specify the state descriptor " +
"in the datastream.broadcast(...) call if you forgot to register it.");
}
return state;
}
// 这是数据流对应的ReadOnlyContextImpl.getBroadcastState()方法
@Override
public <K, V> ReadOnlyBroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> stateDescriptor) {
Preconditions.checkNotNull(stateDescriptor);
stateDescriptor.initializeSerializerUnlessSet(config);
ReadOnlyBroadcastState<K, V> state = (ReadOnlyBroadcastState<K, V>) states.get(stateDescriptor);
if (state == null) {
throw new IllegalArgumentException("The requested state does not exist. " +
"Check for typos in your state descriptor, or specify the state descriptor " +
"in the datastream.broadcast(...) call if you forgot to register it.");
}
return state;
}
可见仅有返回状态实例的类型不同,分别是BroadcastState和ReadOnlyBroadcastState。顾名思义,数据流一侧只能读取BroadcastState,广播流一侧可以读写BroadcastState,这样可以有效防止处理数据流时更改状态值造成结果不一致。
最后来看看BroadcastState的实现吧。
[ReadOnly]BroadcastState
类图如下。
可见,只读的和可读写的广播状态的最终实现都是HeapBroadcastState,不过ReadOnlyBroadcastState接口中没有提供put()/putAll()/remove()方法而已。
在上一节算子的open()方法中,调用了DefaultOperatorStateBackend.getBroadcastState()方法来创建HeapBroadcastState实例——说明广播状态本质上是一种operator state。HeapBroadcastState的实现甚为简单,主要的属性只有两个,一是广播状态的元数据(包含名称、序列化器等),二是真正存储状态数据的HashMap。
private RegisteredBroadcastStateBackendMetaInfo<K, V> stateMetaInfo;
private final Map<K, V> backingMap;
至于所有的状态读写操作,实际上都是对underlying HashMap的读写操作。
@Override
public V get(K key) { return backingMap.get(key); }
@Override
public void put(K key, V value) { backingMap.put(key, value); }
@Override
public void putAll(Map<K, V> map) { backingMap.putAll(map); }
@Override
public void remove(K key) { backingMap.remove(key); }
@Override
public boolean contains(K key) { return backingMap.containsKey(key); }
由此可知,广播状态是固定维护在堆内存中的,不会写入文件系统或者RocksDB。广播流一侧修改广播状态的键值之后,数据流一侧就可以立即感知到变化。
网友评论