后台运行:
nohup ./kafka-server-start.sh ../config/server.properties 1>/dev/null 2>&1 &
/kafka-server-start.sh -daemon ../config/server.properties
外网访问:
Kafka的broker集群处于内部网络中,而外部网络需要订阅消费kafka中的留数据,就需要访问内网
修改server.properties
#advertised.listeners=PLAINTEXT://your.host.name:9092
消费者/生产者:
创建消费者:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-wiki-result --from-beginning
发送消息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test-wiki-result
性能测试:
生产者吞吐量测试
./kafka-producer-perf-test.sh --topic canal-example --num-records 50000 --record-size 200 --throughput -1 --producer-props bootstrap.servers=localhost:9092 acks=-1
消费者吞吐量测试
./kafka-consumer-perf-test.sh --broker-list localhost:9092 --messages 500 --topic canal-example
参数:
log.dirs:
Kafka 持久化消息的目录,若用户机器上有N 块物理硬盘(并且假设这台机器完全给Kafka 使用),那么设置N 个目
录(须挂载在不同磁盘上的目录)是一个很好的选择。N 个磁头可以同时执行写操作,极大地提升了吞吐量
Producer:
Java 版本producer 用户采用异步发送机制。KafkaProducer.send 方法仅仅把消息放入缓冲区中,由一个专属1/0 线程负责从缓冲区中提取消息井封装进消息batch 中,然后发送出去
最常见的是同步和异步的发送方式
异步发送
send 方法提供了回调类参数来实现异步发送以及对发送结果的响应:producer . send(record, new Callback() {...
同步发送
同步发送和异步发送其实就是通过Java 的Future 来区分的,调用Future .get() 无限等待结果返回,即实现同步发送的效果:producer.send(record) .get();
可重试异常和不可重试异常:
producer.send(record , new Callback() {
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
//消息发送成功}
else {
if (exception instanceof RetriableException) {
//处理可重试瞬时异常}
else {
//处理不可重试异常
...........
consumer
offset管理:
位移offset提交到kafka内部的一个topic上(__consumer_offsets)
对最新版本的Kafka (l.0.0)而言,新版本Java consumer 是一个多线程或者说是一个双线程的Java 进程一一创建KafkaConsumer 的线程被称为用户主线程,同时consumer 在后台会创建一个心跳线程,该线程被称为后台心跳线程。Kafka Consumer 的poll方法在用户主线程中运行。这也同时表明: 消费者组执行rebalance 、消息获取、coordinator 管理、异步任务结果的处理甚至位移提交等操作都是运行在用户主线程中的。
提交方式:自动提交与手动提交
consumer 是自动提交位移的,自动提交间隔是5 秒。
手动位移提交就是用户自行确定消息何时被真正处理完并可以提交位移,在一个典型的consumer 应用场景中,用户需要对poll 方法返回的消息集合中的消息执行业务级的处理。用户想要确保只有消息被真正处理完成后再提交位移。如果使用自动位移提交则无法保证这种时序性,因此在这种情况下必须使用手动提交位移。
kafka消息设计:
Kafka 的实现方式本质上是使用Java NIO 的ByteBuffer 来保存消息,同时依赖文件系统提供的页缓存机制,而非依靠Java 的堆缓存。
副本与ISR 设计
一个Kafka 分区本质上就是一个备份日志,即利用多份相同的备份共同提供冗余机制来保持系统高可用性。这些备份在Kafka 中被称为副本( replica ) 。Kafka 把分区的所有副本均匀地分配到所有broker 上,并从这些副本中挑选一个作为leader 副本对外提供服务,而其他副本被称为follower 副本,只能被动地向leader 副本请求数据,从而保持与leader副本的同步。
所谓ISR ,就是Kafka 集群动态维护的一组同步副本集合( in-sync replicas ) 。每个topic分区都有自己的ISR 列表, ISR 中的所有副本都与leader 保持同步状态。值得注意的是, leader副本总是包含在ISR 中的,只有ISR 中的副本才有资格被选举为leader 。而producer 写入的一条Kafka 消息只有被ISR 中的所有副本都接收到,才被视为“己提交”状态(acks=-1)。
请求处理流程 clients/broker:
Java 版本clients 采用了类似于Linux select 和epoll 的实现机制,在底层把I/0 操作完全托管给Java NIO 的Selector ,故在轮询这一步中clients 会检查有无真正的1/0 事件发生,比如发送请求或获取请求,甚至是连接重建或断开等。
在Kafka 中,每个broker 都有一个acceptor 线程和若干个processor 线程,num.network. threads 就用于控制processor 数量的broker 端参数,默认值是3。broker 端固定使用一个acceptor 线程来唯一地监昕入站连接。processor 线程接收acceptor 线程分配的新Socket 连接通道,然后开始监听该通道上的数据传输。processor 线程把请求放入请求队列的,而KafkaRequestHandler 线程池分配具体的线程从该队列中获取请求并执行真正的请求处理逻辑。
每个broker 启动时都会创建一个请求阻塞队列,专门用于接收从clients 端发送过来的请求。同时,broker 还会创建若干个请求处理线程专门获取并处理该阻塞队列中的请求。
精确一次处理
幕等性 producer 和对事务的支持,完美地解决了这种消息重复发送的问题
幕等性 producer : 发送到 broker 端的每批消息都会被赋予一个序列号( sequence number)用于消息去重。除了序列号, Kafka还会为每个 producer实例分配一个 producer id (下称 PID)。若发送消息的序列号小于或 等于broker端保存的序列号,那么 broker会拒绝这条消息的写入操作 。这种设计确保了即使出现重试操作,每条消息也只会被保存在日志中 一次。不过 ,由于每 个新的 producer 实例都会被分配不同的 PID, 当前设计只能保证单个 producer 实例的 EOS 语义,而无法实现多个 producer实例一起提供 EOS 语义 。
引入事务使得 clients 端程序(无论是producer还是 consumer)能够将一组消息放入一个原子性单元中统一处理 。
网友评论