美文网首页
kafka编程应用Stream

kafka编程应用Stream

作者: 北海北_6dc3 | 来源:发表于2020-04-22 22:18 被阅读0次

    Kafka Streams简介

    Kafka Streams被认为是开发实时应用程序的最简单方法。它是一个Kafka的客户端API库,编写简单的java和scala代码就可以实现流式处理。

    优势:
    • 弹性,高度可扩展,容错
    • 部署到容器,VM,裸机,云
    • 同样适用于小型,中型和大型用例
    • 与Kafka安全性完全集成
    • 编写标准Java和Scala应用程序
    • 在Mac,Linux,Windows上开发
    • Exactly-once 语义

    什么是流式计算

    一般流式计算会与批量计算相比较。在流式计算模型中,输入是持续的,可以认为在时间上是无界的,也就意味着,永远拿不到全量数据去做计算。同时,计算结果是持续输出的,也即计算结果在时间上也是无界的。流式计算一般对实时性要求较高,同时一般是先定义目标计算,然后数据到来之后将计算逻辑应用于数据。同时为了提高计算效率,往往尽可能采用增量计算代替全量计算。


    image.png

    为什么要有Kafka Stream

    当前已经有非常多的流式处理系统,最知名且应用最多的开源流式处理系统有Spark Streaming和Apache Storm。Apache Storm发展多年,应用广泛,提供记录级别的处理能力,当前也支持SQL on Stream。而Spark Streaming基于Apache Spark,可以非常方便与图计算,SQL处理等集成,功能强大,对于熟悉其它Spark应用开发的用户而言使用门槛低。另外,目前主流的Hadoop发行版,如MapR,Cloudera和Hortonworks,都集成了Apache Storm和Apache Spark,使得部署更容易。

    既然Apache Spark与Apache Storm拥用如此多的优势,那为何还需要Kafka Stream呢?笔者认为主要有如下原因。

    第一,Spark和Storm都是流式处理框架,而Kafka Stream提供的是一个基于Kafka的流式处理类库。框架要求开发者按照特定的方式去开发逻辑部分,供框架调用。开发者很难了解框架的具体运行方式,从而使得调试成本高,并且使用受限。而Kafka Stream作为流式处理类库,直接提供具体的类给开发者调用,整个应用的运行方式主要由开发者控制,方便使用和调试。


    image.png

    第二,虽然Cloudera与Hortonworks方便了Storm和Spark的部署,但是这些框架的部署仍然相对复杂。而Kafka Stream作为类库,可以非常方便的嵌入应用程序中,它对应用的打包和部署基本没有任何要求。更为重要的是,Kafka Stream充分利用了Kafka的分区机制Consumer的Rebalance机制,使得Kafka Stream可以非常方便的水平扩展,并且各个实例可以使用不同的部署方式。具体来说,每个运行Kafka Stream的应用程序实例都包含了Kafka Consumer实例,多个同一应用的实例之间并行处理数据集。而不同实例之间的部署方式并不要求一致,比如部分实例可以运行在Web容器中,部分实例可运行在Docker或Kubernetes中。

    第三,就流式处理系统而言,基本都支持Kafka作为数据源。例如Storm具有专门的kafka-spout,而Spark也提供专门的spark-streaming-kafka模块。事实上,Kafka基本上是主流的流式处理系统的标准数据源。换言之,大部分流式系统中都已部署了Kafka,此时使用Kafka Stream的成本非常低。

    第四,使用Storm或Spark Streaming时,需要为框架本身的进程预留资源,如Storm的supervisor和Spark on YARN的node manager。即使对于应用实例而言,框架本身也会占用部分资源,如Spark Streaming需要为shuffle和storage预留内存。

    第五,由于Kafka本身提供数据持久化,因此Kafka Stream提供滚动部署和滚动升级以及重新计算的能力。

    第六,由于Kafka Consumer Rebalance机制,Kafka Stream可以在线动态调整并行度。

    简单编程

    public class SpringBootKafkaStreamApplication1 {
    
        public static void main(String[] args) {
            Properties props = new Properties();
            //定义消费组
            props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
            //定义kafka地址
            props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "47.105.194.139:9092");
            //key编码方式
            props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
            //value编码方式
            props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    
            //构建输入流
            StreamsBuilder builder = new StreamsBuilder();
            KStream<String, String> textLines = builder.stream("test5");
    
            //构建输出table
            KTable<String, Long> wordCounts = textLines
                    // Split each text line, by whitespace, into words.
                    .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
    
                    // Group the text words as message keys
                    .groupBy((key, value) -> value)
    
                    // Count the occurrences of each word (message key).
                    .count();
            //存入输出topic
            wordCounts.toStream().to("test7", Produced.with(Serdes.String(), Serdes.Long()));
    
            //启动
            KafkaStreams streams = new KafkaStreams(builder.build(), props);
            streams.start();
        }
    }
    

    创建名为test5、test7的topic。
    生产代码

    [root@izm5e11cqeaucml4d3vumbz kafka_2.12-2.4.0]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test5
    >all streams lead to kafka
    >hello kafka streams
    >join kafka summit
    

    消费代码test5

    [root@izm5e11cqeaucml4d3vumbz kafka_2.12-2.4.0]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test5 --from-beginning
    
    all streams lead to kafka
    hello kafka streams
    join kafka summit
    

    消费代码test7

     bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test7 --from-beginning --formatter kafka.tools.DefaultMessageFormatter --property print.key=true  --property print.value=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
    all 1
    lead    1
    to  1
    hello   1
    kafka   2
    streams 2
    
    join    1
    kafka   3
    summit  1
    

    参考资料:
    最简单流处理引擎——Kafka Streams简介
    Kafka设计解析(七)- 流式处理的新贵Kafka Stream
    https://www.cnblogs.com/hklv/p/10692999.html
    Kafka Streams:Kafka原生计算的基石
    教程:编写Kafka Streams应用程序
    https://kafka.apache.org/24/documentation/#producerapi
    https://kafka.apache.org/24/documentation/streams/developer-guide/dsl-api#transform-a-stream

    相关文章

      网友评论

          本文标题:kafka编程应用Stream

          本文链接:https://www.haomeiwen.com/subject/lsjrihtx.html