基本概念
producer
消息生产者
cunsumer
消息消费者
consumer group
消费者组,相同groupid的consumer组成一个组
Broker
物理概念,Kafka集群中的每个kafka节点
Topic
逻辑概念,Kafka消息的类别,对数据进行区分、隔离
Partition
物理概念,Kafka下数据存储的基本单元。一个Topic数据,会被分散到多个Partition,类似Mysql的分表概念。每一个Partition是有序的。每一个Partition只会存在于一个Broker。
每一个Topic被切分为多个Partition。
消费者数据少于或等于Partition的数目。
Broker Group中的每一个Broker保存Topic的一个或多个Partiton
Replication
即副本。同一个Partition可能会有多个Replica,多个Replica之间数据是一样的
Replication Leader
一个Partiton的多个Replica上,有且仅有一个Leader负责该Producer和Consumer交互
ReplicaManager
负责管理当前broker所有分区和副本的信息,处理KafkaController发起的一些请求,副本状态的切换、添加/读取消息等
offset
一条消息在消息流中的偏移。类似Mysql的自增主键。
常用命令
#创建一个Kafka自带的测试生产者
shell:
kafka-console-producer.sh --broker-list 172.16.6.5:9092 --topic test
bat:
.\kafka-console-producer.bat --broker-list 172.16.6.5:9092 --topic test
#创建一个Kafka自带的测试消费者
shell:
kafka-console-consumer.sh --zookeeper 172.16.6.5:2181 --topic test --from-beginning
bat:
.\kafka-console-consumer.bat --zookeeper 172.16.6.5:2181 --topic test --from-beginning
集成方式
有两种,一种是通过Spring-Cloud-Stream集成,推荐使用这种方式。
如果是使用Spring-Cloud-Stream发送mq,则也要用Spring-Cloud-Stream接收消息。
因为例如 使用send(MessageBuilder.withPayload(payload).build())
发送消息时,若payload的对应json为{"userId":"411","userName":"閽变箤","mobile":"13706562363","amount":59157,"productName":"衣服","orderTime":"2019-05-29 22:00:00"}
而实际发送的会在头部带上content-type信息例如?♂contentType ♀"text/plain"‼originalContentType "application/json;charset=UTF-8"{
,即实际消息为
?♂contentType ♀"text/plain"‼originalContentType "application/json;charset=UTF-8"{"userId":"411","userName":"閽变箤","mobile":"13706562363","amount":59157,"productName":"衣服","orderTime":"2019-05-29 22:00:00"}
所以需要配套的Spring-Cloud-Stream的客户端去接收并解析这种格式的消息,不然很难解析出需要的消息。
具体的参考官方contentType的教程 https://docs.spring.io/spring-cloud-stream/docs/Brooklyn.RELEASE/reference/html/contenttypemanagement.html
Spring-Cloud-Stream 集成kafka客户端
添加maven依赖,
注意:
kafka client的版本一定要和kafka服务器版本一致,不然会因为版本不兼容,有连接的各种问题。
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.9.RELEASE</version>
</parent>
<!-- kafka-client -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.2.0</version>
<exclusions>
<exclusion>
<artifactId>spring-boot-starter-logging</artifactId>
<groupId>org.springframework.boot</groupId>
</exclusion>
</exclusions>
</dependency>
<!-- spring-cloud-starter-stream-kafka -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
<exclusions>
<exclusion>
<artifactId>kafka-clients</artifactId>
<groupId>org.apache.kafka</groupId>
</exclusion>
</exclusions>
</dependency>
添加配置
# kafka
spring.cloud.stream.binders.kafka-mq.type = kafka
spring.cloud.stream.binders.kafka-mq.environment.spring.cloud.stream.kafka.binder.brokers = 172.16.6.5:9092,172.16.6.6:9092,172.16.6.7:9092
spring.cloud.stream.binders.kafka-mq.environment.spring.cloud.stream.kafka.binder.zkNodes = 172.16.6.5:2181,172.16.6.6:2181,172.16.6.7:2181
# 营销事件 绑定在kafka消息队列上
生产:
spring.cloud.stream.bindings.output-event-marketing.destination = adapter_event_marketing_dev
spring.cloud.stream.bindings.output-event-marketing.content-type = application/json
spring.cloud.stream.bindings.output-event-marketing.binder = kafka-mq
消费:
spring.cloud.stream.bindings.input-event-marketing.destination = adapter_event_marketing_dev
spring.cloud.stream.bindings.input-event-marketing.content-type = application/json
spring.cloud.stream.bindings.input-event-marketing.binder = kafka-mq
注意:
如果项目中spring-cloud-stream也绑定了rabbitMq等其他mq,则需要有多渠道的binder配置
spring cloud stream配置多个kafka binders
https://blog.csdn.net/xiao_jun_0820/article/details/78400579
官方教程:Spring Cloud Stream Core - Binders
https://docs.spring.io/spring-cloud-stream/docs/Brooklyn.RELEASE/reference/html/_binders.html
# 默认消息队列用rabbitmq
spring.cloud.stream.default-binder = defaultRabbit
# rabbit 配置
# defaultRabbit 是一个binder名称,可以取其他名成
# type = rabbit意为该mq类型为rabbit
spring.cloud.stream.binders.defaultRabbit.type = rabbit
spring.cloud.stream.binders.defaultRabbit.environment.spring.rabbitmq.host = 172.16.10.18
spring.cloud.stream.binders.defaultRabbit.environment.spring.rabbitmq.port = 5672
spring.cloud.stream.binders.defaultRabbit.environment.spring.rabbitmq.username = dev
spring.cloud.stream.binders.defaultRabbit.environment.spring.rabbitmq.password = dev
spring.cloud.stream.binders.defaultRabbit.environment.spring.rabbitmq.virtual-host = /
# kafka 配置
# kafka-mq 是一个binder名称,可以取其他名称,同上面的defaultRabbit
# type = kafka 意为该mq类型为 kafka,同上面的rabbit
spring.cloud.stream.binders.kafka-mq.type = kafka
# 实例的节点
spring.cloud.stream.binders.kafka-mq.environment.spring.cloud.stream.kafka.binder.brokers = 172.16.6.5:9092
# zookeeper节点
spring.cloud.stream.binders.kafka-mq.environment.spring.cloud.stream.kafka.binder.zkNodes = 172.16.6.5:2181
# 营销事件的消息配置
spring.cloud.stream.bindings.output-event-marketing.destination = adapter_event_marketing_dev
spring.cloud.stream.bindings.output-event-marketing.content-type = application/json
# 绑定在kafka消息队列上。如果不设定binder, 将取default-binder对应的binder
spring.cloud.stream.bindings.output-event-marketing.binder = kafka-mq
spring.cloud.stream.bindings.input-event-marketing.destination = adapter_event_marketing_dev
spring.cloud.stream.bindings.input-event-marketing.content-type = application/json
spring.cloud.stream.bindings.input-event-marketing.binder = kafka-mq
消息生产和消费示例代码:
EventMarkingOutputPayload为普通的Bean
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import java.util.Map;
/**
* @author jiangyaopeng
*/
@Service
@EnableBinding(AdapterKafkaSenderBO.VelarAdapterOutputSource.class)
public class AdapterKafkaSenderBO {
private final static LoggerManager logger = LoggerManager.getLogger(AdapterKafkaSenderBO.class);
@Autowired
VelarAdapterOutputSource velarAdapterOutputSource;
interface VelarAdapterOutputSource {
String OUT_EVENT_MARKETING = "output-event-marketing";
@Output(OUT_EVENT_MARKETING)
MessageChannel outputEventMarketing();
String IN_EVENT_MARKETING = "input-event-marketing";
@Input(IN_EVENT_MARKETING)
MessageChannel inputEventMarketing();
}
/**
* 推送营销事件消息
* @param payload
* @return
*/
public boolean outputEventMarketing(EventMarkingOutputPayload payload, Map<String, String> tags) {
Map<String, String> logTags = GenLogTagTool.genlogTag(this.getClass().getSimpleName(), "inputEventMarketing", tags);
logger.info("AdapterKafkaSenderBO.outputEventMarketing start. payload = " + payload, logTags);
boolean result = velarAdapterOutputSource.outputEventMarketing().send(MessageBuilder.withPayload(payload).build());
logger.info("AdapterKafkaSenderBO.outputEventMarketing end. result = " + result, logTags);
return result;
}
/**
* 监听营销事件消息
* @param payload
* @return
*/
@StreamListener(VelarAdapterOutputSource.IN_EVENT_MARKETING)
public void inputEventMarketing(EventMarkingOutputPayload payload) {
Map<String, String> logTags = GenLogTagTool.genlogTag(this.getClass().getSimpleName(), "inputEventMarketing");
logTags.put(GenLogTagTool.LOG_TAG_FLOW_NO, StringUtil.generateFlowNo());
logger.info("AdapterKafkaSenderBO.AdapterKafkaSenderBO end. payload = " + payload, logTags);
}
}
Spring 集成kafka客户端
添加maven依赖,
注意:
kafka client的版本一定要和kafka服务器版本一致,不然会因为版本不兼容,有连接的各种问题。
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.9.RELEASE</version>
</parent>
<!-- kafka-client -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.2.0</version>
<exclusions>
<exclusion>
<artifactId>spring-boot-starter-logging</artifactId>
<groupId>org.springframework.boot</groupId>
</exclusion>
</exclusions>
</dependency>
<!-- spring kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<exclusions>
<exclusion>
<artifactId>kafka-clients</artifactId>
<groupId>org.apache.kafka</groupId>
</exclusion>
</exclusions>
</dependency>
添加配置
#kafka相关配置
spring.kafka.bootstrap-servers = 172.16.6.5:9092,172.16.6.6:9092,172.16.6.7:9092
#设置一个默认组
spring.kafka.consumer.group-id = defaultGroup
#key-value序列化反序列化
spring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer = org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.key-serializer = org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer = org.apache.kafka.common.serialization.StringSerializer
#每次批量发送消息的数量
spring.kafka.producer.batch-size = 65536
spring.kafka.producer.buffer-memory = 524288
消息生产和消费示例代码:
@Autowiredprivate
KafkaTemplate kafkaTemplate;
/**
* 推送营销事件消息
* @param eventMarkingOutputPayload
* @param tags
* @return
*/
public boolean outputEventMarketing(EventMarkingOutputPayload eventMarkingOutputPayload, Map<String, String> tags) {
Map<String, String> logTags = GenLogTagTool.genlogTag(this.getClass().getSimpleName(), "inputEventMarketing", tags);
logger.info("AdapterKafkaSenderBO.outputEventMarketing start. EventMarkingOutputPayload = " + eventMarkingOutputPayload, logTags);
String realPayLoad = eventMarkingOutputPayload.getUserId() + "," + eventMarkingOutputPayload.getAmount() + "," + eventMarkingOutputPayload.getOrderTime();
logger.info("velarAdapterOutputSource.outputEventMarketing. realPayLoad = " + realPayLoad, logTags);
kafkaTemplate.send(eventMarketingMqTopic, realPayLoad);
logger.info("AdapterKafkaSenderBO.outputEventMarketing end.", logTags);
return Boolean.TRUE;
}
/**
* 监听Topic主题*
* 监听营销事件消息
*/
@KafkaListener(topics = {"xxxxxtopic"})
public void receiveMessage(String message) {
}
网友评论