美文网首页Apache Kafkakafka Stream
kafka stream 内容过滤 demo

kafka stream 内容过滤 demo

作者: pcgreat | 来源:发表于2018-08-30 14:14 被阅读2次

写个demo 练练手 , 结论: 思维必须成流式 ,不要以数据库的方式去看待流式聚合 , 流式的聚合,在time window 中 也会产生很多事件 . 最后一点 ,kafka 提供着数据库存储能力的ktable。
也就是说 ,你可以 发请求给instance ,获取ktable 的聚合数据 。
而不是 自己写个服务作为消费者然后去实现ktable的聚合 ,这点有点别扭
建议大家看看 https://github.com/confluentinc/kafka-streams-examples.git

网状网络拓扑结构 (2).png

10s 超过5次 评论 代码

        SpecificAvroSerde<Content> contentSpecificAvroSerde = new SpecificAvroSerde<>();
        SpecificAvroSerde<ContentbyUserId> userContentSpecificAvroSerde = new SpecificAvroSerde<>();
        contentSpecificAvroSerde.configure(Collections.singletonMap(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081"), false);
        userContentSpecificAvroSerde.configure(Collections.singletonMap(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081"), false);
        Properties properties = config("contentId1", "localhost:9092", "/tmp/filter1");
        // key 为 contentId  value content
        final StreamsBuilder builder = new StreamsBuilder();
        // key userId , value content
        KStream<String, Content> kStream = builder.stream("content2", Consumed.with(Serdes.String(), contentSpecificAvroSerde))
                .selectKey((k, v) -> v.getUserId());
        KGroupedStream<String, Content> stringContentKGroupedStream = kStream.groupByKey();
        KStream<Windowed<String>, ContentbyUserId> k1ResultStrem = k1Result
                .toStream()
                .filter((k, v) -> {
            return null != v && v.getCount() > 5;
        });
        k1ResultStrem.print(Printed.toSysOut());

10s 内 输入事件间隔小于1s的事件数 >5

  // key 为 contentId  value content
        final StreamsBuilder builder = new StreamsBuilder();
        // key userId , value content
        KStream<String, Content> kStream = builder.stream("content2", Consumed.with(Serdes.String(), contentSpecificAvroSerde))
                .selectKey((k, v) -> v.getUserId());
        KStream<Windowed<String>,Long> kStreamResult2 = stringContentKGroupedStream.windowedBy(SessionWindows.with(1 * 1000))
                .count(Materialized.<String, Long, SessionStore<Bytes, byte[]>>as("result2")
                        .withKeySerde(Serdes.String())
                        .withValueSerde(Serdes.Long()))
                .toStream()
                .filter((e, v) -> {
                            // session 窗口时间 大于 10 s 且 数量大于5
                            if (v != null && e.window().end() - e.window().start() > 10 * 1000 && v.longValue() > 5) {
                                return true;
                            }
                            return false;

                        }
                );

        kStreamResult2.print(Printed.toSysOut());

基础配置

    public static Properties config(String appliactionId, String bootstrapServers, String stateDir) {
        final Properties streamsConfiguration = new Properties();
        // Give the Streams application a unique name.  The name must be unique in the Kafka cluster
        // against which the application is run.
        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, appliactionId);
        // Where to find Kafka broker(s).
        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
        streamsConfiguration.put("schema.registry.url", "http://localhost:8081");


        // Provide the details of our embedded http service that we'll use to connect to this streams
        // instance and discover locations of stores.
//        streamsConfiguration.put(StreamsConfig.APPLICATION_SERVER_CONFIG, host + ":" + applicationServerPort);
        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, stateDir);
        // Set to earliest so we don't miss any data that arrived in the topics before the process
        // started
        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        // Set the commit interval to 500ms so that any changes are flushed frequently and the top five
        // charts are updated with low latency.
        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 500);
        // Allow the user to fine-tune the `metadata.max.age.ms` via Java system properties from the CLI.
        // Lowering this parameter from its default of 5 minutes to a few seconds is helpful in
        // situations where the input topic was not pre-created before running the application because
        // the application will discover a newly created topic faster.  In production, you would
        // typically not change this parameter from its default.
        String metadataMaxAgeMs = System.getProperty(ConsumerConfig.METADATA_MAX_AGE_CONFIG);
        if (metadataMaxAgeMs != null) {
            try {
                int value = Integer.parseInt(metadataMaxAgeMs);
                streamsConfiguration.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, value);
                System.out.println("Set consumer configuration " + ConsumerConfig.METADATA_MAX_AGE_CONFIG +
                        " to " + value);
            } catch (NumberFormatException ignored) {
            }
        }
        return streamsConfiguration;
    }

相关文章

网友评论

    本文标题:kafka stream 内容过滤 demo

    本文链接:https://www.haomeiwen.com/subject/tmkhwftx.html