美文网首页Apache Kafkakafka Stream
kafka stream 内容过滤 demo

kafka stream 内容过滤 demo

作者: pcgreat | 来源:发表于2018-08-30 14:14 被阅读2次

    写个demo 练练手 , 结论: 思维必须成流式 ,不要以数据库的方式去看待流式聚合 , 流式的聚合,在time window 中 也会产生很多事件 . 最后一点 ,kafka 提供着数据库存储能力的ktable。
    也就是说 ,你可以 发请求给instance ,获取ktable 的聚合数据 。
    而不是 自己写个服务作为消费者然后去实现ktable的聚合 ,这点有点别扭
    建议大家看看 https://github.com/confluentinc/kafka-streams-examples.git

    网状网络拓扑结构 (2).png

    10s 超过5次 评论 代码

            SpecificAvroSerde<Content> contentSpecificAvroSerde = new SpecificAvroSerde<>();
            SpecificAvroSerde<ContentbyUserId> userContentSpecificAvroSerde = new SpecificAvroSerde<>();
            contentSpecificAvroSerde.configure(Collections.singletonMap(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081"), false);
            userContentSpecificAvroSerde.configure(Collections.singletonMap(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081"), false);
            Properties properties = config("contentId1", "localhost:9092", "/tmp/filter1");
            // key 为 contentId  value content
            final StreamsBuilder builder = new StreamsBuilder();
            // key userId , value content
            KStream<String, Content> kStream = builder.stream("content2", Consumed.with(Serdes.String(), contentSpecificAvroSerde))
                    .selectKey((k, v) -> v.getUserId());
            KGroupedStream<String, Content> stringContentKGroupedStream = kStream.groupByKey();
            KStream<Windowed<String>, ContentbyUserId> k1ResultStrem = k1Result
                    .toStream()
                    .filter((k, v) -> {
                return null != v && v.getCount() > 5;
            });
            k1ResultStrem.print(Printed.toSysOut());
    
    

    10s 内 输入事件间隔小于1s的事件数 >5

      // key 为 contentId  value content
            final StreamsBuilder builder = new StreamsBuilder();
            // key userId , value content
            KStream<String, Content> kStream = builder.stream("content2", Consumed.with(Serdes.String(), contentSpecificAvroSerde))
                    .selectKey((k, v) -> v.getUserId());
            KStream<Windowed<String>,Long> kStreamResult2 = stringContentKGroupedStream.windowedBy(SessionWindows.with(1 * 1000))
                    .count(Materialized.<String, Long, SessionStore<Bytes, byte[]>>as("result2")
                            .withKeySerde(Serdes.String())
                            .withValueSerde(Serdes.Long()))
                    .toStream()
                    .filter((e, v) -> {
                                // session 窗口时间 大于 10 s 且 数量大于5
                                if (v != null && e.window().end() - e.window().start() > 10 * 1000 && v.longValue() > 5) {
                                    return true;
                                }
                                return false;
    
                            }
                    );
    
            kStreamResult2.print(Printed.toSysOut());
    

    基础配置

        public static Properties config(String appliactionId, String bootstrapServers, String stateDir) {
            final Properties streamsConfiguration = new Properties();
            // Give the Streams application a unique name.  The name must be unique in the Kafka cluster
            // against which the application is run.
            streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, appliactionId);
            // Where to find Kafka broker(s).
            streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
            streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
            streamsConfiguration.put("schema.registry.url", "http://localhost:8081");
    
    
            // Provide the details of our embedded http service that we'll use to connect to this streams
            // instance and discover locations of stores.
    //        streamsConfiguration.put(StreamsConfig.APPLICATION_SERVER_CONFIG, host + ":" + applicationServerPort);
            streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, stateDir);
            // Set to earliest so we don't miss any data that arrived in the topics before the process
            // started
            streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
            // Set the commit interval to 500ms so that any changes are flushed frequently and the top five
            // charts are updated with low latency.
            streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 500);
            // Allow the user to fine-tune the `metadata.max.age.ms` via Java system properties from the CLI.
            // Lowering this parameter from its default of 5 minutes to a few seconds is helpful in
            // situations where the input topic was not pre-created before running the application because
            // the application will discover a newly created topic faster.  In production, you would
            // typically not change this parameter from its default.
            String metadataMaxAgeMs = System.getProperty(ConsumerConfig.METADATA_MAX_AGE_CONFIG);
            if (metadataMaxAgeMs != null) {
                try {
                    int value = Integer.parseInt(metadataMaxAgeMs);
                    streamsConfiguration.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, value);
                    System.out.println("Set consumer configuration " + ConsumerConfig.METADATA_MAX_AGE_CONFIG +
                            " to " + value);
                } catch (NumberFormatException ignored) {
                }
            }
            return streamsConfiguration;
        }
    

    相关文章

      网友评论

        本文标题:kafka stream 内容过滤 demo

        本文链接:https://www.haomeiwen.com/subject/tmkhwftx.html