序
本文主要研究一下flink的InternalTimeServiceManager
InternalTimeServiceManager
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java
@Internal
public class InternalTimeServiceManager<K> {
@VisibleForTesting
static final String TIMER_STATE_PREFIX = "_timer_state";
@VisibleForTesting
static final String PROCESSING_TIMER_PREFIX = TIMER_STATE_PREFIX + "/processing_";
@VisibleForTesting
static final String EVENT_TIMER_PREFIX = TIMER_STATE_PREFIX + "/event_";
private final KeyGroupRange localKeyGroupRange;
private final KeyContext keyContext;
private final PriorityQueueSetFactory priorityQueueSetFactory;
private final ProcessingTimeService processingTimeService;
private final Map<String, InternalTimerServiceImpl<K, ?>> timerServices;
private final boolean useLegacySynchronousSnapshots;
InternalTimeServiceManager(
KeyGroupRange localKeyGroupRange,
KeyContext keyContext,
PriorityQueueSetFactory priorityQueueSetFactory,
ProcessingTimeService processingTimeService, boolean useLegacySynchronousSnapshots) {
this.localKeyGroupRange = Preconditions.checkNotNull(localKeyGroupRange);
this.priorityQueueSetFactory = Preconditions.checkNotNull(priorityQueueSetFactory);
this.keyContext = Preconditions.checkNotNull(keyContext);
this.processingTimeService = Preconditions.checkNotNull(processingTimeService);
this.useLegacySynchronousSnapshots = useLegacySynchronousSnapshots;
this.timerServices = new HashMap<>();
}
@SuppressWarnings("unchecked")
public <N> InternalTimerService<N> getInternalTimerService(
String name,
TimerSerializer<K, N> timerSerializer,
Triggerable<K, N> triggerable) {
InternalTimerServiceImpl<K, N> timerService = registerOrGetTimerService(name, timerSerializer);
timerService.startTimerService(
timerSerializer.getKeySerializer(),
timerSerializer.getNamespaceSerializer(),
triggerable);
return timerService;
}
@SuppressWarnings("unchecked")
<N> InternalTimerServiceImpl<K, N> registerOrGetTimerService(String name, TimerSerializer<K, N> timerSerializer) {
InternalTimerServiceImpl<K, N> timerService = (InternalTimerServiceImpl<K, N>) timerServices.get(name);
if (timerService == null) {
timerService = new InternalTimerServiceImpl<>(
localKeyGroupRange,
keyContext,
processingTimeService,
createTimerPriorityQueue(PROCESSING_TIMER_PREFIX + name, timerSerializer),
createTimerPriorityQueue(EVENT_TIMER_PREFIX + name, timerSerializer));
timerServices.put(name, timerService);
}
return timerService;
}
Map<String, InternalTimerServiceImpl<K, ?>> getRegisteredTimerServices() {
return Collections.unmodifiableMap(timerServices);
}
private <N> KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> createTimerPriorityQueue(
String name,
TimerSerializer<K, N> timerSerializer) {
return priorityQueueSetFactory.create(
name,
timerSerializer);
}
public void advanceWatermark(Watermark watermark) throws Exception {
for (InternalTimerServiceImpl<?, ?> service : timerServices.values()) {
service.advanceWatermark(watermark.getTimestamp());
}
}
////////////////// Fault Tolerance Methods ///////////////////
public void snapshotStateForKeyGroup(DataOutputView stream, int keyGroupIdx) throws IOException {
Preconditions.checkState(useLegacySynchronousSnapshots);
InternalTimerServiceSerializationProxy<K> serializationProxy =
new InternalTimerServiceSerializationProxy<>(this, keyGroupIdx);
serializationProxy.write(stream);
}
public void restoreStateForKeyGroup(
InputStream stream,
int keyGroupIdx,
ClassLoader userCodeClassLoader) throws IOException {
InternalTimerServiceSerializationProxy<K> serializationProxy =
new InternalTimerServiceSerializationProxy<>(
this,
userCodeClassLoader,
keyGroupIdx);
serializationProxy.read(stream);
}
//////////////////// Methods used ONLY IN TESTS ////////////////////
@VisibleForTesting
public int numProcessingTimeTimers() {
int count = 0;
for (InternalTimerServiceImpl<?, ?> timerService : timerServices.values()) {
count += timerService.numProcessingTimeTimers();
}
return count;
}
@VisibleForTesting
public int numEventTimeTimers() {
int count = 0;
for (InternalTimerServiceImpl<?, ?> timerService : timerServices.values()) {
count += timerService.numEventTimeTimers();
}
return count;
}
}
- InternalTimeServiceManager用于管理所有keyed operators要使用的timerService,它在内存使用map维护了timerService的名称与InternalTimerServiceImpl的映射
- getInternalTimerService方法首先调用registerOrGetTimerService方法获取或创建指定name的InternalTimerServiceImpl,之后调用timerService.startTimerService进行初始化然后返回
- registerOrGetTimerService方法先从名为timerServices的map中查找指定name的InternalTimerServiceImpl,没有就创建一个,然后放入到名为timerServices的map中;创建InternalTimerServiceImpl的时候,这里使用createTimerPriorityQueue来创建KeyGroupedInternalPriorityQueue类型的processingTimeTimersQueue及eventTimeTimersQueue;createTimerPriorityQueue是通过priorityQueueSetFactory来创建的
PriorityQueueSetFactory
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/PriorityQueueSetFactory.java
public interface PriorityQueueSetFactory {
@Nonnull
<T extends HeapPriorityQueueElement & PriorityComparable & Keyed> KeyGroupedInternalPriorityQueue<T> create(
@Nonnull String stateName,
@Nonnull TypeSerializer<T> byteOrderedElementSerializer);
}
- PriorityQueueSetFactory定义了create方法,创建的是KeyGroupedInternalPriorityQueue,其中T的泛型要求是同时继承或实现HeapPriorityQueueElement、PriorityComparable、Keyed这三个接口
HeapPriorityQueueElement
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/heap/HeapPriorityQueueElement.java
@Internal
public interface HeapPriorityQueueElement {
/**
* The index that indicates that a {@link HeapPriorityQueueElement} object is not contained in and managed by any
* {@link HeapPriorityQueue}. We do not strictly enforce that internal indexes must be reset to this value when
* elements are removed from a {@link HeapPriorityQueue}.
*/
int NOT_CONTAINED = Integer.MIN_VALUE;
/**
* Returns the current index of this object in the internal array of {@link HeapPriorityQueue}.
*/
int getInternalIndex();
/**
* Sets the current index of this object in the {@link HeapPriorityQueue} and should only be called by the owning
* {@link HeapPriorityQueue}.
*
* @param newIndex the new index in the timer heap.
*/
void setInternalIndex(int newIndex);
}
- HeapPriorityQueueElement接口定义了HeapPriorityQueue所要求的元素类型,它定义了getInternalIndex、setInternalIndex方法
PriorityComparable
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/PriorityComparable.java
public interface PriorityComparable<T> {
int comparePriorityTo(@Nonnull T other);
}
- PriorityComparable定义了comparePriorityTo方法,用于根据priority来进行比对
Keyed
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/Keyed.java
public interface Keyed<K> {
K getKey();
}
- Keyed接口定义了getKey方法,用于返回该对象的key
InternalTimer
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/InternalTimer.java
@Internal
public interface InternalTimer<K, N> extends PriorityComparable<InternalTimer<?, ?>>, Keyed<K> {
/** Function to extract the key from a {@link InternalTimer}. */
KeyExtractorFunction<InternalTimer<?, ?>> KEY_EXTRACTOR_FUNCTION = InternalTimer::getKey;
/** Function to compare instances of {@link InternalTimer}. */
PriorityComparator<InternalTimer<?, ?>> TIMER_COMPARATOR =
(left, right) -> Long.compare(left.getTimestamp(), right.getTimestamp());
/**
* Returns the timestamp of the timer. This value determines the point in time when the timer will fire.
*/
long getTimestamp();
/**
* Returns the key that is bound to this timer.
*/
@Nonnull
@Override
K getKey();
/**
* Returns the namespace that is bound to this timer.
*/
@Nonnull
N getNamespace();
}
- InternalTimer继承了PriorityComparable、Keyed接口,它定义了getTimestamp、getKey、getNamespace方法,同时内置了KEY_EXTRACTOR_FUNCTION、TIMER_COMPARATOR
TimerHeapInternalTimer
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/TimerHeapInternalTimer.java
@Internal
public final class TimerHeapInternalTimer<K, N> implements InternalTimer<K, N>, HeapPriorityQueueElement {
/** The key for which the timer is scoped. */
@Nonnull
private final K key;
/** The namespace for which the timer is scoped. */
@Nonnull
private final N namespace;
/** The expiration timestamp. */
private final long timestamp;
private transient int timerHeapIndex;
TimerHeapInternalTimer(long timestamp, @Nonnull K key, @Nonnull N namespace) {
this.timestamp = timestamp;
this.key = key;
this.namespace = namespace;
this.timerHeapIndex = NOT_CONTAINED;
}
@Override
public long getTimestamp() {
return timestamp;
}
@Nonnull
@Override
public K getKey() {
return key;
}
@Nonnull
@Override
public N getNamespace() {
return namespace;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o instanceof InternalTimer) {
InternalTimer<?, ?> timer = (InternalTimer<?, ?>) o;
return timestamp == timer.getTimestamp()
&& key.equals(timer.getKey())
&& namespace.equals(timer.getNamespace());
}
return false;
}
@Override
public int getInternalIndex() {
return timerHeapIndex;
}
@Override
public void setInternalIndex(int newIndex) {
this.timerHeapIndex = newIndex;
}
void removedFromTimerQueue() {
setInternalIndex(NOT_CONTAINED);
}
@Override
public int hashCode() {
int result = (int) (timestamp ^ (timestamp >>> 32));
result = 31 * result + key.hashCode();
result = 31 * result + namespace.hashCode();
return result;
}
@Override
public String toString() {
return "Timer{" +
"timestamp=" + timestamp +
", key=" + key +
", namespace=" + namespace +
'}';
}
@Override
public int comparePriorityTo(@Nonnull InternalTimer<?, ?> other) {
return Long.compare(timestamp, other.getTimestamp());
}
}
- TimerHeapInternalTimer实现了InternalTimer及HeapPriorityQueueElement接口;这里removedFromTimerQueue接口是调用setInternalIndex(NOT_CONTAINED),即改动其index为NOT_CONTAINED,逻辑删除
HeapPriorityQueueSetFactory
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/heap/HeapPriorityQueueSetFactory.java
public class HeapPriorityQueueSetFactory implements PriorityQueueSetFactory {
@Nonnull
private final KeyGroupRange keyGroupRange;
@Nonnegative
private final int totalKeyGroups;
@Nonnegative
private final int minimumCapacity;
public HeapPriorityQueueSetFactory(
@Nonnull KeyGroupRange keyGroupRange,
@Nonnegative int totalKeyGroups,
@Nonnegative int minimumCapacity) {
this.keyGroupRange = keyGroupRange;
this.totalKeyGroups = totalKeyGroups;
this.minimumCapacity = minimumCapacity;
}
@Nonnull
@Override
public <T extends HeapPriorityQueueElement & PriorityComparable & Keyed> HeapPriorityQueueSet<T> create(
@Nonnull String stateName,
@Nonnull TypeSerializer<T> byteOrderedElementSerializer) {
return new HeapPriorityQueueSet<>(
PriorityComparator.forPriorityComparableObjects(),
KeyExtractorFunction.forKeyedObjects(),
minimumCapacity,
keyGroupRange,
totalKeyGroups);
}
}
- HeapPriorityQueueSetFactory实现了PriorityQueueSetFactory接口,其create方法创建的是HeapPriorityQueueSet
小结
- InternalTimeServiceManager用于管理所有keyed operators要使用的timerService,它在内存使用map维护了timerService的名称与InternalTimerServiceImpl的映射;getInternalTimerService方法首先调用registerOrGetTimerService方法获取或创建指定name的InternalTimerServiceImpl,之后调用timerService.startTimerService进行初始化然后返回
- registerOrGetTimerService方法先从名为timerServices的map中查找指定name的InternalTimerServiceImpl,没有就创建一个,然后放入到名为timerServices的map中;创建InternalTimerServiceImpl的时候,这里使用createTimerPriorityQueue来创建KeyGroupedInternalPriorityQueue类型的processingTimeTimersQueue及eventTimeTimersQueue;createTimerPriorityQueue是通过priorityQueueSetFactory来创建的
- PriorityQueueSetFactory定义了create方法,创建的是KeyGroupedInternalPriorityQueue,其中T的泛型要求是同时继承或实现HeapPriorityQueueElement、PriorityComparable、Keyed这三个接口(
InternalTimer继承了PriorityComparable、Keyed接口,TimerHeapInternalTimer实现了InternalTimer及HeapPriorityQueueElement接口
);HeapPriorityQueueSetFactory实现了PriorityQueueSetFactory接口,其create方法创建的是HeapPriorityQueueSet
网友评论