Kafka是一个分布式消息队列。Kafka对消息保存时根据Topic进行归类,发送消息者称为Producer,消息接受者称为Consumer,此外kafka集群有多个kafka实例组成,每个实例(server)称为broker。Kafka保证一个Partition内的消息的有序性
一、Kafka集群部署:
1、解压安装包
tar -zxvf kafka_2.11-0.11.0.0.tgz -C /opt/module/
mv kafka_2.11-0.11.0.0/ kafka
2、修改配置文件(server.properties)
#broker的全局唯一编号,不能重复
broker.id=0
#删除topic功能使能
delete.topic.enable=true
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘IO的现成数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka运行日志存放的路径
log.dirs=/opt/module/kafka/logs
#topic在当前broker上的分区个数
num.partitions=1
#用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1
#segment文件保留的最长时间,超时将被删除
log.retention.hours=168
#配置连接Zookeeper集群地址
zookeeper.connect=hadoop130:2181,hadoop131:2181,hadoop132:2181
3、 配置环境变量
sudo vi /etc/profile
export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:$KAFKA_HOME/bin
source /etc/profile
4、修改hadoop131和hadoop132上的server.properties
broker.id=1、broker.id=2
注:broker.id不得重复
5、启动集群
bin/kafka-server-start.sh config/server.properties &
6、关闭集群
bin/kafka-server-stop.sh stop
二、Kafka集群常用 命令
--topic 定义主题名
--replication-factor 定义副本数
--partitions 定义分区数
1、查看当前服务器中的所有topic
bin/kafka-topics.sh --zookeeper hadoop130:2181 --list
2、创建topic
bin/kafka-topics.sh --zookeeper hadoop130:2181 --create --replication-factor 3 --partitions 1 --topic first
3、删除topic
bin/kafka-topics.sh --zookeeper hadoop130:2181 --delete --topic first
4、发送消息
bin/kafka-console-producer.sh --broker-list hadoop130:9092 --topic first
5、消费消息
bin/kafka-console-consumer.sh --zookeeper hadoop130:2181 --from-beginning --topic first
--from-beginning:会把该主题中以往所有的数据都读取出来。
6、查看Topic的详情
bin/kafka-topics.sh --zookeeper hadoop130:2181 --describe --topic second
三、Kafka工作流程
1、消息写入
producer采用推(push)模式将消息发布到broker,每条消息都被追加(append)到分区(patition)中,属于顺序写磁盘(顺序写磁盘效率比随机写内存要高,保障kafka吞吐率)。每个Partition中的消息都是有序的,生产的消息被不断追加到Partition log上,其中的每一个消息都被赋予了一个唯一的offset值
分区的原则
指定了patition,则直接使用;
未指定patition但指定key,通过对key的value进行hash出一个patition;
patition和key都未指定,使用轮询选出一个patition。
2、Broker 保存消息
物理上把topic分成一个或多个patition(对应 server.properties 中的num.partitions=3配置),每个patition物理上对应一个文件夹(该文件夹存储该patition的所有消息和索引文件)
无论消息是否被消费,kafka都会保留所有消息。有两种策略可以删除旧数据:
基于时间:log.retention.hours=168
基于大小:log.retention.bytes=1073741824
注意:Kafka读取特定消息的时间复杂度为O(1),即与文件大小无关。
注意:producer不在zk中注册,消费者在zk中注册。
四、Flume与kafka集成
1、配置flume(flume-kafka.conf)
# define
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F -c +0 /opt/module/flume/datas/flume.log
a1.sources.r1.shell = /bin/bash -c
# sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = hadoop130:9092,hadoop131:9092,hadoop132:9092
a1.sinks.k1.kafka.topic = first
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
2、启动消费者
3、启动flume
bin/flume-ng agent -c conf/ -n a1 -f demo/flume-kafka.conf
4、向flume.log追加数据
$ echo hello > /opt/module/flume/datas/flume.log
网友评论