-
下载安装
wget https://downloads.apache.org/kafka/3.3.1/kafka_2.13-3.3.1.tgz
# 解压
tar -zxvf kafka_2.13-3.3.1.tgz -C /opt/module/
mv kafka_2.13-3.3.1/ kafka
# 修改配置文件
cd config/
vi server.properties #设置broker.id、log.dirs目录、zookeeper.connect
# 添加到系统变量
export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:$KAFKA_HOME/bin
-
kafka启停脚本
#!/bin/bash
case $1 in
"start")
for i in hadoop1 hadoop2 hadoop3
do
echo "--- 启动 $i kafka ---"
ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties"
done
;;
"stop")
for i in hadoop1 hadoop2 hadoop3
do
echo "--- 停止 $i kafka ---"
ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh "
done
;;
esac
-
kafka主题使用
# 创建主题
bin/kafka-topics.sh --bootstrap-server hadoop1:9092 --topic one --create --partitions 1 --replication-factor 3
# 查看所有主题
bin/kafka-topics.sh --bootstrap-server hadoop1:9092 --list
# 查看主题详细描述
bin/kafka-topics.sh --bootstrap-server hadoop1:9092 --topic one --describe
-
kafka生产者使用
bin/kafka-console-producer.sh --bootstrap-server hadoop1:9092 --topic one
生产者有main线程和sender线程,main中的分区器默认32M,DQuene默认16K
batch.size:16K,数据积累到16K后,sender才会发送或者
linger.ms:发果长时间不到batch.size,可以设置等待时间,默认0ms
应答机制
0:生产者不需要等待数据落盘应答
1:生产者要等Leader收到数据后应答
-1:生产者要等Leader和ISR队列中所有节点收齐数据后应答
异步发送
# 导入依赖
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.0.0</version>
</dependency>
</dependencies>
KafkaProducer类
//callback换成.get()就是同步
public class KafkaProducer {
public static void main(String[] args) {
Properties properties = new Properties();
//连接集群
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop1:9092,hadoop2:9092");
//指定序列化类型
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
properties.put("acks", "all");
properties.put("retries", 0);
properties.put("batch.size", 16384);
1 properties.put("linger.ms", 1);
properties.put("buffer.memory", 33554432);
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> kafkaProducer = new org.apache.kafka.clients.producer.KafkaProducer<String, String>(properties);
for (int i = 1; i <= 600; i++) {
//参数1:topic名, 参数2:消息文本; ProducerRecord多个重载的构造方法
kafkaProducer.send(new ProducerRecord<String, String>("one", "message"+i),new Callback(){
@Override
public void onCompletion(RecordMetadata metadata, Exception exception){
if(exception == null){
System.out.println("主题:"+ metadata.topic() + " 分区: "+metadata.partition());
}
}
});
System.out.println("message"+i);
}
kafkaProducer.close();
}
}
-
kafka消费者使用
bin/kafka-console-consumer.sh --bootstrap-server hadoop1:9092 --topic one
# --from-beginning 从开始(历史数据也接收)接收
-
kafka自定义分区
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.test.kafka.MyPartition"); //partition类名全路径
public class MyPartition implements Partitioner {
private Random random = new Random();
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
String msgValues = value.toString();
int partition;
if(msgValues.contains("hello")){
partition = 0;
}else{
partition = 1;
}
}
@Override
public void close() {}
@Override
public void configure(Map<String, ?> configs) {}
}
-
提高生产者吞吐量
# KafkaProducer类中加入
//缓冲区大小
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);
//批次大小
properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
//linger.ms
properties.put(ProducerConfig.LINGER_MS_CONFIG,1);
//压缩,压缩可配置gzip,snappy,lz4,zstd
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");
-
ACK应答级别
1.acks=0,生产者发送过来的数据就不管,可靠性差,效率高
2.acks=1,生产者发送过来数据Leader应答,可靠性中等,效率中等
3.acks=-1,生产者发送过来数据Leader和ISR队列里所有Follwer应答,可靠性高,效率低。
//具体配置
properties.put(ProducerConfig.ACKS_CONFIG,"all");
//重试次数
properties.put(ProducerConfig.RETRIES_CONFIG,3);
-
kafka事务
注意:开启事务,必须开启幂等性,每个broker都有一个事务协调器,
//幂等性默认是开启的
enable.idempotence=true
生产者事务
# KafkaProducer类中加入
//0.指定事务id
properties.put(ProducerConfig.TRANSCATIONAL_ID_CONFIG,"transcational_id");
//1.初始化事务
kafkaProducer.initTransaction();
//2.开启事务
kafkaProducer.beginTransaction();
//3.发送数据
try{
for (int i = 1; i <= 600; i++) {
//参数1:topic名, 参数2:消息文本; ProducerRecord多个重载的构造方法
kafkaProducer.send(new ProducerRecord<String, String>("one", "message"+i));
}
//4.提交事务
kafkaProducer.commitTransaction();
}catch(Exception e){
kafkaProducer.abortTransaction();
}finally{
//关闭资源
kafkaProducer.close();
}
-
数据的有序与乱序
1.kafka在1.x版本之前保证数据单分区有序,条件是max.in.flight.requests.connection=1,无需考虑是否开启幂等性
2.1.x以后的版本,分为未开启幂等性max.in.flight.requests.connection=1,需要设置为1,开启幂等性,max.in.flight.requests.connection=需要设置小于等于5.
原因:因为1.x后,启用幂等后,kafka服务器会缓存producer发来的最近5个request的元数据,
故无论如何,都可以保证最近5个request的数据都是有序的。
网友评论