目的
我在初次接触kafka stream 拓扑感觉很简单,但是代码中加上group,aggregate 等操作时,会有repartition和 local store等操作,此时的topology 就稍稍复杂了。国内外关于此方面知识比较少,所以只能阅读官方文档,希望此分享对读者有所帮助。
基本概念
ps:基本概念就直接搬官方文档,翻译反而变味,抱歉啊,各位!!!
- A stream processor is a node in the processor topology; it represents a processing step to transform data in streams by receiving one input record at a time from its upstream processors in the topology, applying its operation to it, and may subsequently produce one or more output records to its downstream processors.
Source Processor
: A source processor is a special type of stream processor that does not have any upstream processors. It produces an input stream to its topology from one or multiple Kafka topics by consuming records from these topics and forwarding them to its down-stream processors.
Sink Processor
: A sink processor is a special type of stream processor that does not have down-stream processors. It sends any received records from its up-stream processors to a specified Kafka topic.

实例详解
看懂kafka stream应用,首先需要看懂topology
结构。
下面我们以一个具体实例来分析,代码见github (欢迎star
)
StreamsStarter
: kafka stream 启动器
@Slf4j
@Component
public class StreamsStarter {
// 省略......
@PostConstruct
public void startUp() {
final StreamsBuilder builder = new StreamsBuilder();
buildTopology(builder);
final Topology topology = builder.build();
log.info("topology={}", topology.describe());
kafkaStreams = new KafkaStreams(topology, kafkaProperties.getStreamProperties());
run();
}
private void buildTopology(StreamsBuilder builder) {
UserShareStream userShareStream = new UserShareStream(kafkaProperties, coinInfo);
userShareStream.buildTopology(builder);
}
private void run() {
try {
kafkaStreams.start();
} catch (Exception ex) {
log.error("kafkaStreams start error", ex);
}
}
......
}
UserShareStream
: stream topology 构建,
@Slf4j
public class UserShareStream {
void buildTopology(StreamsBuilder builder) {
final String flattenTopic = String.format("%s-flatten-share", coinInfo.getCoinType());
try {
final MiningTopologyMeta flattenMeta = MiningTopologyMeta.builder()
.sourceTopic(kafkaProperties.getTopic())
.toTopic(flattenTopic)
.build();
//user share 铺平
final FlattenTopology flattenTopology = new FlattenTopology(flattenMeta);
flattenTopology.buildTopology(builder);
final MiningTopologyMeta user5MinutesMeta = MiningTopologyMeta.builder()
.sourceTopic(flattenMeta.getToTopic())
.timeUnit(TimeUnit.MINUTES)
.timeValue(5)
.coinType(coinInfo.getCoinType())
.granularityType(GranularityType.MIN5)
.build();
//聚合5min share
final UserSubTopology user5MinSubTopology = new UserSubTopology(user5MinutesMeta);
//订阅flatten topo,是其下游
flattenTopology.addTopology(user5MinSubTopology);
final MiningTopologyMeta user15MinutesMeta = MiningTopologyMeta.builder()
.sourceTopic(flattenMeta.getToTopic())
.timeUnit(TimeUnit.MINUTES)
.timeValue(15)
.coinType(coinInfo.getCoinType())
.granularityType(GranularityType.MIN15)
.build();
//聚合15min share
final UserSubTopology user15MinSubTopology = new UserSubTopology(user15MinutesMeta);
flattenTopology.addTopology(user15MinSubTopology);
final MiningTopologyMeta userHoursMeta = MiningTopologyMeta.builder()
.sourceTopic(flattenMeta.getToTopic())
.timeUnit(TimeUnit.HOURS)
.timeValue(1)
.coinType(coinInfo.getCoinType())
.granularityType(GranularityType.HOUR)
.build();
//聚合1hour share
final UserSubTopology userHoursSubTopology = new UserSubTopology(userHoursMeta);
flattenTopology.addTopology(userHoursSubTopology);
flattenTopology.notifyAll(builder);
} catch (Exception ex) {
log.error("fail to build topology|exception:", ex);
}
}
}
FlattenTopology
: 将数据铺平
@Slf4j
public class FlattenTopology implements TopologyObservable {
protected final MiningTopologyMeta meta;
private List<TopologyObserver> topoObservers;
private KStream<String, MiningData> stream;
public FlattenTopology(MiningTopologyMeta meta) {
this.meta = meta;
topoObservers = Lists.newArrayList();
}
@Override
public void addTopology(TopologyObserver observer) {
topoObservers.add(observer);
}
public void buildTopology(StreamsBuilder builder) {
log.info("build {} topology ...", this.getClass().getSimpleName());
KStream<String, UserShareData> source = StreamUtil.buildSourceKStream(builder, meta.getSourceTopic(), UserShareData.class);
source.filter((k, v) -> v != null)
.flatMapValues(value -> value.getData())
.filter((k, v) -> v != null)
.to(meta.getToTopic(), Produced.with(Serdes.String(), StreamUtil.jsonSerde(MiningData.class)));
stream = StreamUtil.buildSourceKStream(builder, meta.getToTopic(), MiningData.class);
}
public void notifyAll(StreamsBuilder builder) {
for (TopologyObserver observer : topoObservers) {
observer.notify(builder, stream);
}
}
}
UserSubTopology
:不同时间窗口的数据聚合
@Slf4j
public class UserSubTopology extends AbstractTopology<MiningData> implements
TopologyObserver<MiningData> {
public static final String USER = "user";
public UserSubTopology(MiningTopologyMeta meta) {
super(meta, MiningData.class);
}
@Override
public void notify(StreamsBuilder builder, KStream<String, MiningData> stream) {
aggregateToTopic(stream, USER);
}
@Override
public KStream<Windowed<String>, MiningData> doAggregate(KStream<String, MiningData> source, String storeName, String toTopic) {
long windowSizeMs = getWindowSizeMs();
//已经是groupBy之后的结果
KStream<Windowed<String>, MiningData> summaryKStream = source.filter((k, v) -> v != null)
.map((k, v) -> new KeyValue<>(v.getKey(), v))
.groupBy((k, v) -> k, Serialized.with(Serdes.String(), StreamUtil.jsonSerde(MiningData.class)))
.windowedBy(TimeWindows.of(windowSizeMs).advanceBy(windowSizeMs))
.aggregate(() -> new MiningData(),
(aggKey, newValue, aggValue) -> aggValue.add(newValue, toMillis(meta.getTimeUnit(), meta.getTimeValue())),
Materialized.<String, MiningData, WindowStore<Bytes, byte[]>>as(storeName).withValueSerde(StreamUtil.jsonSerde(MiningData.class)))
.toStream();
return summaryKStream;
}
@Override
public void postAggregate(KStream<Windowed<String>, MiningData> stream, String toTopic) {
stream.foreach((key, value) -> log.info("toTopic={}|key={}|value={}", toTopic, key, value));
}
}
省略部分代码
MiningData.java
UserShareData
2018-12-15 10:16:55.941|INFO |main|c.f.f.s.k.t.StreamsStarter|37|topology=Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [btc-share])
--> KSTREAM-FILTER-0000000001
Processor: KSTREAM-FILTER-0000000001 (stores: [])
--> KSTREAM-FLATMAPVALUES-0000000002
<-- KSTREAM-SOURCE-0000000000
Processor: KSTREAM-FLATMAPVALUES-0000000002 (stores: [])
--> KSTREAM-FILTER-0000000003
<-- KSTREAM-FILTER-0000000001
Processor: KSTREAM-FILTER-0000000003 (stores: [])
--> KSTREAM-SINK-0000000004
<-- KSTREAM-FLATMAPVALUES-0000000002
Sink: KSTREAM-SINK-0000000004 (topic: btc-flatten-share)
<-- KSTREAM-FILTER-0000000003
Sub-topology: 1
Source: KSTREAM-SOURCE-0000000005 (topics: [btc-flatten-share])
--> KSTREAM-FILTER-0000000006, KSTREAM-FILTER-0000000016, KSTREAM-FILTER-0000000026
Processor: KSTREAM-FILTER-0000000006 (stores: [])
--> KSTREAM-MAP-0000000007
<-- KSTREAM-SOURCE-0000000005
Processor: KSTREAM-FILTER-0000000016 (stores: [])
--> KSTREAM-MAP-0000000017
<-- KSTREAM-SOURCE-0000000005
Processor: KSTREAM-FILTER-0000000026 (stores: [])
--> KSTREAM-MAP-0000000027
<-- KSTREAM-SOURCE-0000000005
Processor: KSTREAM-MAP-0000000007 (stores: [])
--> KSTREAM-KEY-SELECT-0000000008
<-- KSTREAM-FILTER-0000000006
Processor: KSTREAM-MAP-0000000017 (stores: [])
--> KSTREAM-KEY-SELECT-0000000018
<-- KSTREAM-FILTER-0000000016
Processor: KSTREAM-MAP-0000000027 (stores: [])
--> KSTREAM-KEY-SELECT-0000000028
<-- KSTREAM-FILTER-0000000026
Processor: KSTREAM-KEY-SELECT-0000000008 (stores: [])
--> KSTREAM-FILTER-0000000011
<-- KSTREAM-MAP-0000000007
Processor: KSTREAM-KEY-SELECT-0000000018 (stores: [])
--> KSTREAM-FILTER-0000000021
<-- KSTREAM-MAP-0000000017
Processor: KSTREAM-KEY-SELECT-0000000028 (stores: [])
--> KSTREAM-FILTER-0000000031
<-- KSTREAM-MAP-0000000027
Processor: KSTREAM-FILTER-0000000011 (stores: [])
--> KSTREAM-SINK-0000000010
<-- KSTREAM-KEY-SELECT-0000000008
Processor: KSTREAM-FILTER-0000000021 (stores: [])
--> KSTREAM-SINK-0000000020
<-- KSTREAM-KEY-SELECT-0000000018
Processor: KSTREAM-FILTER-0000000031 (stores: [])
--> KSTREAM-SINK-0000000030
<-- KSTREAM-KEY-SELECT-0000000028
Sink: KSTREAM-SINK-0000000010 (topic: user-share-store-min5-repartition)
<-- KSTREAM-FILTER-0000000011
Sink: KSTREAM-SINK-0000000020 (topic: user-share-store-min15-repartition)
<-- KSTREAM-FILTER-0000000021
Sink: KSTREAM-SINK-0000000030 (topic: user-share-store-hour-repartition)
<-- KSTREAM-FILTER-0000000031
Sub-topology: 2
Source: KSTREAM-SOURCE-0000000012 (topics: [user-share-store-min5-repartition])
--> KSTREAM-AGGREGATE-0000000009
Processor: KSTREAM-AGGREGATE-0000000009 (stores: [user-share-store-min5])
--> KTABLE-TOSTREAM-0000000013
<-- KSTREAM-SOURCE-0000000012
Processor: KTABLE-TOSTREAM-0000000013 (stores: [])
--> KSTREAM-FOREACH-0000000015, KSTREAM-SINK-0000000014
<-- KSTREAM-AGGREGATE-0000000009
Processor: KSTREAM-FOREACH-0000000015 (stores: [])
--> none
<-- KTABLE-TOSTREAM-0000000013
Sink: KSTREAM-SINK-0000000014 (topic: btc-user-share-min5)
<-- KTABLE-TOSTREAM-0000000013
Sub-topology: 3
Source: KSTREAM-SOURCE-0000000022 (topics: [user-share-store-min15-repartition])
--> KSTREAM-AGGREGATE-0000000019
Processor: KSTREAM-AGGREGATE-0000000019 (stores: [user-share-store-min15])
--> KTABLE-TOSTREAM-0000000023
<-- KSTREAM-SOURCE-0000000022
Processor: KTABLE-TOSTREAM-0000000023 (stores: [])
--> KSTREAM-FOREACH-0000000025, KSTREAM-SINK-0000000024
<-- KSTREAM-AGGREGATE-0000000019
Processor: KSTREAM-FOREACH-0000000025 (stores: [])
--> none
<-- KTABLE-TOSTREAM-0000000023
Sink: KSTREAM-SINK-0000000024 (topic: btc-user-share-min15)
<-- KTABLE-TOSTREAM-0000000023
Sub-topology: 4
Source: KSTREAM-SOURCE-0000000032 (topics: [user-share-store-hour-repartition])
--> KSTREAM-AGGREGATE-0000000029
Processor: KSTREAM-AGGREGATE-0000000029 (stores: [user-share-store-hour])
--> KTABLE-TOSTREAM-0000000033
<-- KSTREAM-SOURCE-0000000032
Processor: KTABLE-TOSTREAM-0000000033 (stores: [])
--> KSTREAM-FOREACH-0000000035, KSTREAM-SINK-0000000034
<-- KSTREAM-AGGREGATE-0000000029
Processor: KSTREAM-FOREACH-0000000035 (stores: [])
--> none
<-- KTABLE-TOSTREAM-0000000033
Sink: KSTREAM-SINK-0000000034 (topic: btc-user-share-hour)
<-- KTABLE-TOSTREAM-0000000033
ps:上述代码堆的有点多,为了讲清楚topo,请谅解啊,各位。
下面我们依次分析各个Sub-topology
Sub-topology: 0
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [btc-share])
--> KSTREAM-FILTER-0000000001
Processor: KSTREAM-FILTER-0000000001 (stores: []) //source.filter((k, v) -> v != null)
--> KSTREAM-FLATMAPVALUES-0000000002
<-- KSTREAM-SOURCE-0000000000
Processor: KSTREAM-FLATMAPVALUES-0000000002 (stores: []) //.flatMapValues(value -> value.getData())
--> KSTREAM-FILTER-0000000003
<-- KSTREAM-FILTER-0000000001
Processor: KSTREAM-FILTER-0000000003 (stores: []) //.filter((k, v) -> v != null)
--> KSTREAM-SINK-0000000004
<-- KSTREAM-FLATMAPVALUES-0000000002
Sink: KSTREAM-SINK-0000000004 (topic: btc-flatten-share) //.to(meta.getToTopic(), Produced.with(Serdes.String(), StreamUtil.jsonSerde(MiningData.class)));
<-- KSTREAM-FILTER-0000000003
从Source Processor
(topic: btc-share) 经过一系列操作,最终到Sink Processor
(btc-flatten-share)
-
-->
: 当前processor 对应的下游processor -
<--
: 当前processor 对应的上游processor
以Processor: KSTREAM-FILTER-0000000001
为例
- 上游Processor:KSTREAM-SOURCE-0000000000
- 下游Processor:KSTREAM-FLATMAPVALUES-0000000002
Processor: KSTREAM-FILTER-0000000001 (stores: [])
--> KSTREAM-FLATMAPVALUES-0000000002
<-- KSTREAM-SOURCE-0000000000
这样看,是不是很简单?
如果没有问题,那么我们接下看,稍复杂点的topo
Sub-topology: 1
Sub-topology: 1
Source: KSTREAM-SOURCE-0000000005 (topics: [btc-flatten-share])
--> KSTREAM-FILTER-0000000006, KSTREAM-FILTER-0000000016, KSTREAM-FILTER-0000000026
Processor: KSTREAM-FILTER-0000000006 (stores: [])
--> KSTREAM-MAP-0000000007
<-- KSTREAM-SOURCE-0000000005
Processor: KSTREAM-FILTER-0000000016 (stores: [])
--> KSTREAM-MAP-0000000017
<-- KSTREAM-SOURCE-0000000005
Processor: KSTREAM-FILTER-0000000026 (stores: [])
--> KSTREAM-MAP-0000000027
<-- KSTREAM-SOURCE-0000000005
Processor: KSTREAM-MAP-0000000007 (stores: [])
--> KSTREAM-KEY-SELECT-0000000008
<-- KSTREAM-FILTER-0000000006
Processor: KSTREAM-MAP-0000000017 (stores: [])
--> KSTREAM-KEY-SELECT-0000000018
<-- KSTREAM-FILTER-0000000016
Processor: KSTREAM-MAP-0000000027 (stores: [])
--> KSTREAM-KEY-SELECT-0000000028
<-- KSTREAM-FILTER-0000000026
Processor: KSTREAM-KEY-SELECT-0000000008 (stores: [])
--> KSTREAM-FILTER-0000000011
<-- KSTREAM-MAP-0000000007
Processor: KSTREAM-KEY-SELECT-0000000018 (stores: [])
--> KSTREAM-FILTER-0000000021
<-- KSTREAM-MAP-0000000017
Processor: KSTREAM-KEY-SELECT-0000000028 (stores: [])
--> KSTREAM-FILTER-0000000031
<-- KSTREAM-MAP-0000000027
Processor: KSTREAM-FILTER-0000000011 (stores: [])
--> KSTREAM-SINK-0000000010
<-- KSTREAM-KEY-SELECT-0000000008
Processor: KSTREAM-FILTER-0000000021 (stores: [])
--> KSTREAM-SINK-0000000020
<-- KSTREAM-KEY-SELECT-0000000018
Processor: KSTREAM-FILTER-0000000031 (stores: [])
--> KSTREAM-SINK-0000000030
<-- KSTREAM-KEY-SELECT-0000000028
Sink: KSTREAM-SINK-0000000010 (topic: user-share-store-min5-repartition)
<-- KSTREAM-FILTER-0000000011
Sink: KSTREAM-SINK-0000000020 (topic: user-share-store-min15-repartition)
<-- KSTREAM-FILTER-0000000021
Sink: KSTREAM-SINK-0000000030 (topic: user-share-store-hour-repartition)
<-- KSTREAM-FILTER-0000000031
还是和分析思路一样,Source Processor:btc-flatten-share, Sink Processor分别为:user-share-store-min5-repartition,user-share-store-min15-repartition,user-share-store-hour-repartition
什么?发生了什么?在这里,大家可以思考下如下问题:
- 为什么sink processor 是3个 ?
- sink processor 名称为什么都以
repartition
结尾 - 此topology分别对应
哪些代码
如果你能回答这些问题,那么恭喜你,过关了,可以阅读我其他文章,嘻嘻。
我们先分析topology,然后下面来慢慢回答这些问题
// UserShareStream中,三个时间窗口UserSubTopology分别订阅了上游的FlattenTopology
Source: KSTREAM-SOURCE-0000000005 (topics: [btc-flatten-share])
--> KSTREAM-FILTER-0000000006, KSTREAM-FILTER-0000000016, KSTREAM-FILTER-0000000026
由于三个下游topo都是同一套代码,所以我们挑选其中一个来分析其他topo。
以KSTREAM-FILTER-0000000006
为例:
//KStream<Windowed<String>, MiningData> summaryKStream = source.filter((k, v) -> v != null)
Processor: KSTREAM-FILTER-0000000006 (stores: [])
--> KSTREAM-MAP-0000000007
<-- KSTREAM-SOURCE-0000000005
//.map((k, v) -> new KeyValue<>(v.getKey(), v))
Processor: KSTREAM-MAP-0000000007 (stores: [])
--> KSTREAM-KEY-SELECT-0000000008
<-- KSTREAM-FILTER-0000000006
//.groupBy((k, v) -> k, Serialized.with(Serdes.String(), StreamUtil.jsonSerde(MiningData.class)))
Processor: KSTREAM-KEY-SELECT-0000000008 (stores: [])
--> KSTREAM-FILTER-0000000011
<-- KSTREAM-MAP-0000000007
Processor: KSTREAM-FILTER-0000000011 (stores: [])
--> KSTREAM-SINK-0000000010
<-- KSTREAM-KEY-SELECT-0000000008
Sink: KSTREAM-SINK-0000000010 (topic: user-share-store-min5-repartition)
<-- KSTREAM-FILTER-0000000011
前面几个都比较好理解,最后groupBy不太好理解
Groups the records by a new key, which may be of a different key type. When grouping a table, you may also specify a new value and value type.
groupBy
is a shorthand forselectKey(...).groupByKey()
. (KStream details, KTable details)
Grouping is a prerequisite for aggregating a stream or a table and ensures that data is properly partitioned (“keyed”) for subsequent operations.
When to set explicit SerDes: Variants ofgroupBy
exist to override the configured default SerDes of your application, which you must do if the key and/or value types of the resultingKGroupedStream
orKGroupedTable
do not match the configured default SerDes.
Note
Grouping vs. Windowing: A related operation is windowing, which lets you control how to “sub-group” the grouped records of the same key into so-called windows for stateful operations such as windowed aggregations or windowed joins.
Always causes data re-partitioning:groupBy
always causes data re-partitioning. If possible usegroupByKey
instead, which will re-partition data only if required.
一般groupBy
后,紧接着是aggregate 操作。
经groupBy
操作后,将topic btc-flatten-share
多个partition的数据,根据key分组后,将数据相同key的数据写回同一partition,这也是上面问题3
的答案。
分析到现在, Sub-topology: 1
与代码对应关系已经分析完了,大家有没有疑问?
为什么此topology 就这么结束了? 为什么aggregate
操作没执行?
不要着急,我们下面继续分析!
题外话
笔者,之前生产遇到问题,怀疑是add
方法未加锁,从而引起线程安全问题
,上面的描述就消除此猜想,事实证明是另外消息时间超过窗口保留时间,从而引起stream 停止工作,我们以后再分析此问题。
//...
.aggregate(() -> new MiningData(),
(aggKey, newValue, aggValue) -> aggValue.add(newValue, toMillis(meta.getTimeUnit(), meta.getTimeValue())),
Materialized.<String, MiningData, WindowStore<Bytes, byte[]>>as(storeName).withValueSerde(StreamUtil.jsonSerde(MiningData.class)))```
```JAVA
public MiningData add(MiningData data, Long timeSpace) {
if (data == null) {
return this;
}
if (userId == null) {
userId = data.getUserId();
}
if (createTime == null) {
createTime = data.getCreateTime() / timeSpace * timeSpace + timeSpace;
}
this.share1Count = this.share1Count.add(data.getShare1Count());
return this;
}
Sub-topology: 2
Sub-topology: 2
Source: KSTREAM-SOURCE-0000000012 (topics: [user-share-store-min5-repartition])
--> KSTREAM-AGGREGATE-0000000009
// .aggregate(() -> new MiningData(),
// (aggKey, newValue, aggValue) -> aggValue.add(newValue, //toMillis(meta.getTimeUnit(), meta.getTimeValue())),
// Materialized.<String, MiningData, WindowStore<Bytes, byte[]>>as(storeName).withValueSerde(StreamUtil.jsonSerde(MiningData.class)))
Processor: KSTREAM-AGGREGATE-0000000009 (stores: [user-share-store-min5])
--> KTABLE-TOSTREAM-0000000013
<-- KSTREAM-SOURCE-0000000012
// .toStream();
Processor: KTABLE-TOSTREAM-0000000013 (stores: [])
--> KSTREAM-FOREACH-0000000015, KSTREAM-SINK-0000000014
<-- KSTREAM-AGGREGATE-0000000009
//postAggregate(...)
Processor: KSTREAM-FOREACH-0000000015 (stores: [])
--> none
<-- KTABLE-TOSTREAM-0000000013
Sink: KSTREAM-SINK-0000000014 (topic: btc-user-share-min5)
<-- KTABLE-TOSTREAM-0000000013
Sub-topology 3 ,Sub-topology 4与Sub-topology 2类似
上面抛出的三个问题,在分析过程中已经解释过了,各位
总结
- 搞清楚Processor 上下游关系
- 在执行groupBy操作,重新repartition,生成xxx-repartition topic
groupBy,aggregate 在两个不同的topology
Reference
http://kafka.apache.org/21/documentation/streams/core-concepts
http://kafka.apache.org/21/documentation/streams/architecture.html#streams_architecture_state
http://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#aggregating
http://kafka.apache.org/21/documentation/#streamsconfigs
http://kafka.apache.org/21/documentation/streams/developer-guide/running-app#state-restoration-during-workload-rebalance
网友评论