Kafka结合SparkStreaming开发

作者: kason_zhang | 来源:发表于2017-04-11 16:10 被阅读483次

    Apache Kafka 是一种分布式流式平台

    Kafka基本搭建 :

    Step1
    kafka下载地址

    wget http://mirror.bit.edu.cn/apache/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz
    
    tar zxvf kafka_2.11-0.10.2.0.tgz
    
    cd kafka_2.11-0.10.2.0
    
    

    Step2: 启动Server
    Kafka使用ZooKeeper,所以如果你没有一个ZooKeeper Server你需要首先去启动它。你可以通过一个脚本来获取一个快的单节点的ZooKeeper实例。

    bin/zookeeper-server-start.sh config/zookeeper.properties
    

    这时候你就可以启动Kafka server:

    bin/kafka-server-start.sh config/server.properties
    

    Step3: 创建一个话题Topic
    下面我们创建一个名为test的topic,其 只有一个分区和复制

    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
    

    我们来看看我们创建的话题topic

    [kason@kason kafka_2.11-0.10.2.0]$ bin/kafka-topics.sh --list --zookeeper localhost:2181
    test
    
    

    Step4 发送消息,生产者
    Kafka comes with a command line client that will take input from a file or from standard input and send it out as messages to the Kafka cluster. By default, each line will be sent as a separate message.

    比如在centos中发送消息:

    [kason@kason kafka_2.11-0.10.2.0]$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
    Hello
    
    

    Step5 接收消息,消费者
    Kafka also has a command line consumer that will dump out messages to standard output.

    [kason@kason kafka_2.11-0.10.2.0]$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
    Hello
    
    

    以上就是Single Broker cluster,但是我们可以开发multi-broker

    Step6 设置multi-broker cluster

    So far we have been running against a single broker, but that's no fun. For Kafka, a single broker is just a cluster of size one, so nothing much changes other than starting a few more broker instances. But just to get feel for it, let's expand our cluster to three nodes (still all on our local machine).简单看就是在本地主机上设置三个node构成kafka的broker集群

    首先我们需要给每个broker生成一个配置文件,简单讲就是复制一份啊

    cd /home/kason/kafka/kafka_2.11-0.10.2.0/config/
    su
    cp server.properties server-1.properties
    cp server.properties server-2.properties
    

    现在编辑这些新创建的server-1,server-2文件,设置如下:

    server-1.properties:
        broker.id=1
        listeners=PLAINTEXT://:9093
        log.dir=/tmp/kafka-logs-1
    
    server-2.properties:
        broker.id=2
        listeners=PLAINTEXT://:9094
        log.dir=/tmp/kafka-logs-2
    

    broker.id是集群中的每个node的唯一并且持久的标识名字。同时我们为这台机器上的每个node设置了不同的端口监听以及日志路径进而进行区分
    万事俱备,启动另外两个broker

    bin/kafka-server-start.sh config/server-1.properties
    bin/kafka-server-start.sh config/server-2.properties
    

    现在我们创建一个分区三个复制的新的话题topic

    [kason@kason kafka_2.11-0.10.2.0]$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
    Created topic "my-replicated-topic".
    
    

    但是如何知道每一个broker暗杀的呢 可以查看describe topics命令

    [kason@kason kafka_2.11-0.10.2.0]$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
    Topic:my-replicated-topic   PartitionCount:1    ReplicationFactor:3 Configs:
        Topic: my-replicated-topic  Partition: 0    Leader: 1   Replicas: 1,2,0 Isr: 1,2,0
    [kason@kason kafka_2.11-0.10.2.0]$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
    Topic:test  PartitionCount:1    ReplicationFactor:1 Configs:
        Topic: test Partition: 0    Leader: 0   Replicas: 0 Isr: 0
    
    

    我们解释解释上面的东西的含义:第一行给出分区总和,每一个额外的行给出一个分区的信息,由于我们这个topic只有一个分区因此只有一行
    leader 是为指定的分区负责读写的节点node,它是随机选的
    replicas 节点node list
    isr 同步replicas

    在这个broker集群中发送消息 生产者:

    [kason@kason kafka_2.11-0.10.2.0]$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
    my test message 1
    ^[^T^[^T
    hello world
    hello kafka
    
    

    接收消息 消费者:

    [kason@kason kafka_2.11-0.10.2.0]$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
    my test message 1
    ����
    hello world
    hello kafka
    
    
    

    现在因为这是集群我们来测试测试它的容错性,根据上面我们知道Leader是node broker1,现在查到其进程号并手动杀死.。例子就不举了

    Step7 使用Kafka来导入或者导出数据

    Writing data from the console and writing it back to the console is a convenient place to start, but you'll probably want to use data from other sources or export data from Kafka to other systems. For many systems, instead of writing custom integration code you can use Kafka Connect to import or export data
    Kafka Connect is a tool included with Kafka that imports and exports data to Kafka. It is an extensible tool that runs connectors, which implement the custom logic for interacting with an external system. In this quickstart we'll see how to run Kafka Connect with simple connectors that import data from a file to a Kafka topic and export data from a Kafka topic to a file.
    首先我们手动创建一个文件并写入几个数据,test.txt要放在kafka目录下

    echo -e "foo\nbar" > test.txt
    

    然后我们启动两个连接器运行在standalone模式下(就是单独本地的进程),提供三个配置文件最为入参,第一个是Kafka Connect 进程的配置文件,包含一些普通的配置文件如Kafka brokers以及序列化数据的格式,剩下的配置文件每一个指定了要创建的connector, 这些文件包含一个独一无二的connector名字

    bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
    

    Kafka启动图片:

    这里写图片描述

    Spark Streaming

    Spark Streaming Code

    package com.scala.action.streaming
    
    import kafka.serializer.StringDecoder
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.kafka.KafkaUtils
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    /**
      * Created by kason_zhang on 4/11/2017.
      */
    object MyKafkaSparkStreaming {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("MyKafkaStreamingDemo").setMaster("local[3]")
        val ssc = new StreamingContext(conf,Seconds(5))
    
        val topicLines = KafkaUtils.createStream(ssc,"10.64.24.78:2181"
          ,"StreamKafkaGroupId",Map("spark" -> 1))
        topicLines.map(_._2).flatMap(str => str.split(" ")).print()
    
        ssc.start()
        ssc.awaitTermination()
      }
    
    }
    
    

    在这里有几点需要注意的地方
    因为我没有在centos kafka server.properties里面设置
    listeners = PLAINTEXT://your.host.name:9092,它将采用默认的listeners,这样的话host将获取centos的host名,但是我的SparkStreaming程序是在Windows中开发的,他不能识别host,所以需要在C盘的hosts文件里面加入10.64.24.78 kason让其能够识别host

    同时还需要在centos中开放zookeeper的2181端口以及你的kafka的端口。

    IDEA的结果如图:


    这里写图片描述

    相关文章

      网友评论

        本文标题:Kafka结合SparkStreaming开发

        本文链接:https://www.haomeiwen.com/subject/mlluattx.html