Kafka框架基础

作者: Z尽际 | 来源:发表于2017-05-01 18:21 被阅读456次

    * Kafka框架基础

    官网:kafka.apache.org

    框架简介

    Apache Kafka是分布式发布-订阅消息系统。它最初由LinkedIn公司开发,之后成为Apache项目的一部分。Kafka是一种快速、可扩展的、设计内在就是分布式的,分区的和可复制的提交日志服务。

    相关概念

    ** 生产者

    提供数据源生产的地方,对于同一个topic,生产者只能有一个,这样可以确保同一个topic数据来自同一个业务数据,支持多并发

    ** 消费者

    消费数据的客户端,对于同一个topic,可以有多个消费者,比如spark,storm等等

    ** Broker

    消息中间件处理结点,一个Kafka节点就是一个broker,多个broker可以组成一个Kafka集群。

    ** Topic

    同一类消息的统称,Kafka集群能够同时负载多个topic分发。

    ** Partition

    topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列,同一个topic里面的数据是存放在不同的分区中。

    ** Replication

    每个分区或者topic都是有副本的,副本的数量也是可以在创建topic的时候就指定好,保证数据的安全性,以及提供高并发读取效率。

    ** Segment

    partition物理上由多个segment组成

    ** Offset

    每个partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到partition中。partition中的每个消息都有一个连续的序列号叫做offset,用于partition唯一标识一条消息

    框架特色

    ** 同时为发布和订阅提供高吞吐量。Kafka每秒可以生产约25万消息(约50 MB),每秒处理55万消息(约110 MB)。

    ** 可进行持久化操作。将消息持久化到磁盘,因此可用于批量消费,例如ETL,以及实时应用程序。通过将数据持久化到硬盘以及replication防止数据丢失。

    ** 分布式系统,易于向外扩展。所有的producer、broker和consumer都会有多个,均为分布式的。无需停机即可扩展机器。

    ** 消息被处理的状态是在consumer端维护,而不是由server端维护。当失败时能自动平衡。

    架构图

    * 框架部署

    ** 相关下载

    kafka以及scala:链接:http://pan.baidu.com/s/1pLBFJf1 密码:seto

    ** 解压Kafka以及scala

    $ tar -zxf kafka_2.10-0.8.2.1.tgz -C /opt/modules/cdh/

    $ tar -zxf scala-2.10.4.tgz -C /opt/modules/cdh/

    ** 安装JDK并配置环境变量

    不再赘述

    ** 安装并启动zookeeper

    在zookeeper的根目录下:

    $ bin/zkServer.sh start

    ** 配置scala环境变量

    # vi /etc/profile

    $ source /etc/profile

    (注意以上两条语句的执行用户)

    添加如下:

    ##SCALA_HOME

    SCALA_HOME=/opt/modules/cdh/scala-2.10.4

    export PATH=$PATH:$SCALA_HOME/bin

    使用命令检查scala配置是否正确:

    $ scala -version,如图:

    ** 修改Kafka配置文件

    server.properties

    修改为如下:

    producer.properties

    变动内容如下:

    consumer.properties

    变动内容如下:

    ** 启动Kafka

    $ bin/kafka-server-start.sh config/server.properties

    ** 创建Topic

    $ bin/kafka-topics.sh --create --zookeeper z01:2181 --replication-factor 1 --partitions 1 --topic testTopic

    ** 启动生产者

    $ bin/kafka-console-producer.sh --broker-list z01:9092 --topic testTopic

    ** 启动消费者

    $ bin/kafka-console-consumer.sh --zookeeper z01:2181 --topic testTopic --from-beginning

    在生产者窗口输入数据,在消费者窗口查看数据,测试如图:

    消费者:

    生产者:

    * 整合测试

    使用flume+kafka整合测试

    ** 配置flume

    原来我们配置flume,是在tomcat所在机器节点开启了一个flume收集日志,并直接上传到HDFS,如果集群中存在多个机器节点,则势必导致对HDFS集群占用率过高,所以在面临多个flume集群时,一般会采用1~2个单独的flume节点来收集另外flume节点的日志,相当于弄了一个中转站,由中转站收集其他flume,再统一放置到HDFS系统中,此刻我们采用方案2,原理如图:

    背景:在一台机器上开两个flume,分别收集tomcat日志和hive日志,这两者的日志信息分别输入到中间层flume(这个中间层flume也模拟在同一个机器节点上),然后中间层flume在将数据写入到HDFS。

    首先检查一下hive的conf目录下的hive-log4j.properties配置中,是否已经指定了hive的日志目录,如果没有,请指定,如图:

    涉及flume文件:以下文件存在于flume的conf目录下,如果不存在,请自行创建即可。

    flume-apache-log-kafka.conf

    flume-hive-log-kafka.conf

    flume-connector-kafka.conf

    依次启动:

    a4:

    $ bin/flume-ng agent --conf conf/ --name a4 --conf-file conf/flume-connector-kafka.conf

    a3:

    $ bin/flume-ng agent --conf conf/ --name a3 --conf-file conf/flume-hive-log-kafka.conf

    a2:

    $ bin/flume-ng agent --conf conf/ --name a2 --conf-file conf/flume-apache-log-kafka.conf

    测试后如图,即可发现,日志在HDFS和kafka中都已经显示出来:


    IT全栈公众号:

    QQ大数据技术交流群(广告勿入):476966007


    下一节:Hbase框架基础(一)

    相关文章

      网友评论

        本文标题:Kafka框架基础

        本文链接:https://www.haomeiwen.com/subject/xluftxtx.html