写个demo 练练手 , 结论: 思维必须成流式 ,不要以数据库的方式去看待流式聚合 , 流式的聚合,在time window 中 也会产生很多事件 . 最后一点 ,kafka 提供着数据库存储能力的ktable。
也就是说 ,你可以 发请求给instance ,获取ktable 的聚合数据 。
而不是 自己写个服务作为消费者然后去实现ktable的聚合 ,这点有点别扭
建议大家看看 https://github.com/confluentinc/kafka-streams-examples.git
10s 超过5次 评论 代码
SpecificAvroSerde<Content> contentSpecificAvroSerde = new SpecificAvroSerde<>();
SpecificAvroSerde<ContentbyUserId> userContentSpecificAvroSerde = new SpecificAvroSerde<>();
contentSpecificAvroSerde.configure(Collections.singletonMap(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081"), false);
userContentSpecificAvroSerde.configure(Collections.singletonMap(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081"), false);
Properties properties = config("contentId1", "localhost:9092", "/tmp/filter1");
// key 为 contentId value content
final StreamsBuilder builder = new StreamsBuilder();
// key userId , value content
KStream<String, Content> kStream = builder.stream("content2", Consumed.with(Serdes.String(), contentSpecificAvroSerde))
.selectKey((k, v) -> v.getUserId());
KGroupedStream<String, Content> stringContentKGroupedStream = kStream.groupByKey();
KStream<Windowed<String>, ContentbyUserId> k1ResultStrem = k1Result
.toStream()
.filter((k, v) -> {
return null != v && v.getCount() > 5;
});
k1ResultStrem.print(Printed.toSysOut());
10s 内 输入事件间隔小于1s的事件数 >5
// key 为 contentId value content
final StreamsBuilder builder = new StreamsBuilder();
// key userId , value content
KStream<String, Content> kStream = builder.stream("content2", Consumed.with(Serdes.String(), contentSpecificAvroSerde))
.selectKey((k, v) -> v.getUserId());
KStream<Windowed<String>,Long> kStreamResult2 = stringContentKGroupedStream.windowedBy(SessionWindows.with(1 * 1000))
.count(Materialized.<String, Long, SessionStore<Bytes, byte[]>>as("result2")
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.Long()))
.toStream()
.filter((e, v) -> {
// session 窗口时间 大于 10 s 且 数量大于5
if (v != null && e.window().end() - e.window().start() > 10 * 1000 && v.longValue() > 5) {
return true;
}
return false;
}
);
kStreamResult2.print(Printed.toSysOut());
基础配置
public static Properties config(String appliactionId, String bootstrapServers, String stateDir) {
final Properties streamsConfiguration = new Properties();
// Give the Streams application a unique name. The name must be unique in the Kafka cluster
// against which the application is run.
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, appliactionId);
// Where to find Kafka broker(s).
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
streamsConfiguration.put("schema.registry.url", "http://localhost:8081");
// Provide the details of our embedded http service that we'll use to connect to this streams
// instance and discover locations of stores.
// streamsConfiguration.put(StreamsConfig.APPLICATION_SERVER_CONFIG, host + ":" + applicationServerPort);
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, stateDir);
// Set to earliest so we don't miss any data that arrived in the topics before the process
// started
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Set the commit interval to 500ms so that any changes are flushed frequently and the top five
// charts are updated with low latency.
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 500);
// Allow the user to fine-tune the `metadata.max.age.ms` via Java system properties from the CLI.
// Lowering this parameter from its default of 5 minutes to a few seconds is helpful in
// situations where the input topic was not pre-created before running the application because
// the application will discover a newly created topic faster. In production, you would
// typically not change this parameter from its default.
String metadataMaxAgeMs = System.getProperty(ConsumerConfig.METADATA_MAX_AGE_CONFIG);
if (metadataMaxAgeMs != null) {
try {
int value = Integer.parseInt(metadataMaxAgeMs);
streamsConfiguration.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, value);
System.out.println("Set consumer configuration " + ConsumerConfig.METADATA_MAX_AGE_CONFIG +
" to " + value);
} catch (NumberFormatException ignored) {
}
}
return streamsConfiguration;
}
网友评论