美文网首页java
Spring和kafka结合使用教程

Spring和kafka结合使用教程

作者: terry蒋 | 来源:发表于2019-05-31 13:42 被阅读0次

基本概念

producer
消息生产者
cunsumer
消息消费者
consumer group
消费者组,相同groupid的consumer组成一个组
Broker
物理概念,Kafka集群中的每个kafka节点
Topic
逻辑概念,Kafka消息的类别,对数据进行区分、隔离
Partition
物理概念,Kafka下数据存储的基本单元。一个Topic数据,会被分散到多个Partition,类似Mysql的分表概念。每一个Partition是有序的。每一个Partition只会存在于一个Broker。
每一个Topic被切分为多个Partition。
消费者数据少于或等于Partition的数目。
Broker Group中的每一个Broker保存Topic的一个或多个Partiton

Replication
即副本。同一个Partition可能会有多个Replica,多个Replica之间数据是一样的
Replication Leader
一个Partiton的多个Replica上,有且仅有一个Leader负责该Producer和Consumer交互
ReplicaManager
负责管理当前broker所有分区和副本的信息,处理KafkaController发起的一些请求,副本状态的切换、添加/读取消息等

offset
一条消息在消息流中的偏移。类似Mysql的自增主键。

常用命令

#创建一个Kafka自带的测试生产者
shell:
kafka-console-producer.sh --broker-list 172.16.6.5:9092 --topic test
bat:
.\kafka-console-producer.bat --broker-list 172.16.6.5:9092 --topic test

#创建一个Kafka自带的测试消费者
shell:
kafka-console-consumer.sh --zookeeper 172.16.6.5:2181 --topic test --from-beginning
bat:
.\kafka-console-consumer.bat --zookeeper 172.16.6.5:2181 --topic test --from-beginning

集成方式

有两种,一种是通过Spring-Cloud-Stream集成,推荐使用这种方式。
如果是使用Spring-Cloud-Stream发送mq,则也要用Spring-Cloud-Stream接收消息。
因为例如 使用send(MessageBuilder.withPayload(payload).build())发送消息时,若payload的对应json为{"userId":"411","userName":"閽变箤","mobile":"13706562363","amount":59157,"productName":"衣服","orderTime":"2019-05-29 22:00:00"}
而实际发送的会在头部带上content-type信息例如?♂contentType ♀"text/plain"‼originalContentType "application/json;charset=UTF-8"{,即实际消息为

?♂contentType   ♀"text/plain"‼originalContentType    "application/json;charset=UTF-8"{"userId":"411","userName":"閽变箤","mobile":"13706562363","amount":59157,"productName":"衣服","orderTime":"2019-05-29 22:00:00"}

所以需要配套的Spring-Cloud-Stream的客户端去接收并解析这种格式的消息,不然很难解析出需要的消息。
具体的参考官方contentType的教程 https://docs.spring.io/spring-cloud-stream/docs/Brooklyn.RELEASE/reference/html/contenttypemanagement.html

Spring-Cloud-Stream 集成kafka客户端

添加maven依赖,
注意:
kafka client的版本一定要和kafka服务器版本一致,不然会因为版本不兼容,有连接的各种问题。

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>1.5.9.RELEASE</version>
</parent>

<!-- kafka-client -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.10.2.0</version>
    <exclusions>
        <exclusion>
            <artifactId>spring-boot-starter-logging</artifactId>
            <groupId>org.springframework.boot</groupId>
        </exclusion>
    </exclusions>
</dependency>

<!-- spring-cloud-starter-stream-kafka -->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-kafka</artifactId>
    <exclusions>
        <exclusion>
            <artifactId>kafka-clients</artifactId>
            <groupId>org.apache.kafka</groupId>
        </exclusion>
    </exclusions>
</dependency>

添加配置

# kafka
spring.cloud.stream.binders.kafka-mq.type = kafka
spring.cloud.stream.binders.kafka-mq.environment.spring.cloud.stream.kafka.binder.brokers = 172.16.6.5:9092,172.16.6.6:9092,172.16.6.7:9092
spring.cloud.stream.binders.kafka-mq.environment.spring.cloud.stream.kafka.binder.zkNodes = 172.16.6.5:2181,172.16.6.6:2181,172.16.6.7:2181


# 营销事件 绑定在kafka消息队列上
生产:
spring.cloud.stream.bindings.output-event-marketing.destination = adapter_event_marketing_dev
spring.cloud.stream.bindings.output-event-marketing.content-type = application/json
spring.cloud.stream.bindings.output-event-marketing.binder = kafka-mq

消费:
spring.cloud.stream.bindings.input-event-marketing.destination = adapter_event_marketing_dev
spring.cloud.stream.bindings.input-event-marketing.content-type = application/json
spring.cloud.stream.bindings.input-event-marketing.binder = kafka-mq

注意:
如果项目中spring-cloud-stream也绑定了rabbitMq等其他mq,则需要有多渠道的binder配置

spring cloud stream配置多个kafka binders
https://blog.csdn.net/xiao_jun_0820/article/details/78400579

官方教程:Spring Cloud Stream Core - Binders
https://docs.spring.io/spring-cloud-stream/docs/Brooklyn.RELEASE/reference/html/_binders.html

# 默认消息队列用rabbitmq
spring.cloud.stream.default-binder = defaultRabbit
# rabbit 配置
# defaultRabbit 是一个binder名称,可以取其他名成
# type = rabbit意为该mq类型为rabbit
spring.cloud.stream.binders.defaultRabbit.type = rabbit
spring.cloud.stream.binders.defaultRabbit.environment.spring.rabbitmq.host = 172.16.10.18
spring.cloud.stream.binders.defaultRabbit.environment.spring.rabbitmq.port = 5672
spring.cloud.stream.binders.defaultRabbit.environment.spring.rabbitmq.username = dev
spring.cloud.stream.binders.defaultRabbit.environment.spring.rabbitmq.password = dev
spring.cloud.stream.binders.defaultRabbit.environment.spring.rabbitmq.virtual-host = /

# kafka 配置
# kafka-mq 是一个binder名称,可以取其他名称,同上面的defaultRabbit
# type = kafka 意为该mq类型为 kafka,同上面的rabbit
spring.cloud.stream.binders.kafka-mq.type = kafka
# 实例的节点
spring.cloud.stream.binders.kafka-mq.environment.spring.cloud.stream.kafka.binder.brokers = 172.16.6.5:9092
# zookeeper节点
spring.cloud.stream.binders.kafka-mq.environment.spring.cloud.stream.kafka.binder.zkNodes = 172.16.6.5:2181

# 营销事件的消息配置
spring.cloud.stream.bindings.output-event-marketing.destination = adapter_event_marketing_dev
spring.cloud.stream.bindings.output-event-marketing.content-type = application/json
# 绑定在kafka消息队列上。如果不设定binder, 将取default-binder对应的binder
spring.cloud.stream.bindings.output-event-marketing.binder = kafka-mq
spring.cloud.stream.bindings.input-event-marketing.destination = adapter_event_marketing_dev
spring.cloud.stream.bindings.input-event-marketing.content-type = application/json
spring.cloud.stream.bindings.input-event-marketing.binder = kafka-mq

消息生产和消费示例代码:
EventMarkingOutputPayload为普通的Bean

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

import java.util.Map;

/**
 * @author jiangyaopeng
 */
@Service
@EnableBinding(AdapterKafkaSenderBO.VelarAdapterOutputSource.class)
public class AdapterKafkaSenderBO {
    private final static LoggerManager logger = LoggerManager.getLogger(AdapterKafkaSenderBO.class);

    @Autowired
    VelarAdapterOutputSource velarAdapterOutputSource;

    interface VelarAdapterOutputSource {
        String OUT_EVENT_MARKETING = "output-event-marketing";

        @Output(OUT_EVENT_MARKETING)
        MessageChannel outputEventMarketing();

        String IN_EVENT_MARKETING = "input-event-marketing";

        @Input(IN_EVENT_MARKETING)
        MessageChannel inputEventMarketing();
    }

    /**
     * 推送营销事件消息
     * @param payload
     * @return
     */
    public boolean outputEventMarketing(EventMarkingOutputPayload payload, Map<String, String> tags) {
        Map<String, String> logTags = GenLogTagTool.genlogTag(this.getClass().getSimpleName(), "inputEventMarketing", tags);
        logger.info("AdapterKafkaSenderBO.outputEventMarketing start. payload = " + payload, logTags);
        boolean result = velarAdapterOutputSource.outputEventMarketing().send(MessageBuilder.withPayload(payload).build());
        logger.info("AdapterKafkaSenderBO.outputEventMarketing end. result = " + result, logTags);
        return result;
    }

    /**
     * 监听营销事件消息
     * @param payload
     * @return
     */
    @StreamListener(VelarAdapterOutputSource.IN_EVENT_MARKETING)
    public void inputEventMarketing(EventMarkingOutputPayload payload) {
        Map<String, String> logTags = GenLogTagTool.genlogTag(this.getClass().getSimpleName(), "inputEventMarketing");
        logTags.put(GenLogTagTool.LOG_TAG_FLOW_NO, StringUtil.generateFlowNo());
        logger.info("AdapterKafkaSenderBO.AdapterKafkaSenderBO end. payload = " + payload, logTags);
    }
}

Spring 集成kafka客户端

添加maven依赖,
注意:
kafka client的版本一定要和kafka服务器版本一致,不然会因为版本不兼容,有连接的各种问题。

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>1.5.9.RELEASE</version>
</parent>

<!-- kafka-client -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.10.2.0</version>
    <exclusions>
        <exclusion>
            <artifactId>spring-boot-starter-logging</artifactId>
            <groupId>org.springframework.boot</groupId>
        </exclusion>
    </exclusions>
</dependency>

<!-- spring kafka -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <exclusions>
        <exclusion>
            <artifactId>kafka-clients</artifactId>
            <groupId>org.apache.kafka</groupId>
        </exclusion>
    </exclusions>
</dependency>

添加配置

#kafka相关配置
spring.kafka.bootstrap-servers = 172.16.6.5:9092,172.16.6.6:9092,172.16.6.7:9092
#设置一个默认组
spring.kafka.consumer.group-id = defaultGroup
#key-value序列化反序列化
spring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer = org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.key-serializer = org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer = org.apache.kafka.common.serialization.StringSerializer

#每次批量发送消息的数量
spring.kafka.producer.batch-size = 65536
spring.kafka.producer.buffer-memory = 524288

消息生产和消费示例代码:

@Autowiredprivate
KafkaTemplate kafkaTemplate;

/**
 * 推送营销事件消息
 * @param eventMarkingOutputPayload
 * @param tags
 * @return
 */
public boolean outputEventMarketing(EventMarkingOutputPayload eventMarkingOutputPayload, Map<String, String> tags) {
    Map<String, String> logTags = GenLogTagTool.genlogTag(this.getClass().getSimpleName(), "inputEventMarketing", tags);
    logger.info("AdapterKafkaSenderBO.outputEventMarketing start. EventMarkingOutputPayload = " + eventMarkingOutputPayload, logTags);
    String realPayLoad = eventMarkingOutputPayload.getUserId() + "," + eventMarkingOutputPayload.getAmount() + "," + eventMarkingOutputPayload.getOrderTime();
    logger.info("velarAdapterOutputSource.outputEventMarketing. realPayLoad = " + realPayLoad, logTags);
    kafkaTemplate.send(eventMarketingMqTopic, realPayLoad);
    logger.info("AdapterKafkaSenderBO.outputEventMarketing end.", logTags);
    return Boolean.TRUE;
}

/**
 * 监听Topic主题*
 * 监听营销事件消息
 */
@KafkaListener(topics = {"xxxxxtopic"})
public void receiveMessage(String message) {
    
}

相关文章

网友评论

    本文标题:Spring和kafka结合使用教程

    本文链接:https://www.haomeiwen.com/subject/pbmwtctx.html