准备
- 一个springMVC基础工程
- 在pom.xml添加依赖,jackson的版本不能太低,要不然会报错误。
java.lang.NoClassDefFoundError: com/fasterxml/jackson/databind/deser/std/StdNodeBasedDeserializer
#Kafka
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.1.7.RELEASE</version>
</dependency>
#fasterxml
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.9.6</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.6</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.9.6</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-jaxb-annotations</artifactId>
<version>2.9.6</version>
</dependency>
- 创建配置文件spring-context-kafka.xml
配置生产者
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.1.xsd"
default-lazy-init="false">
<description>Kafka Configuration</description>
<!-- 加载配置属性文件 -->
<context:property-placeholder ignore-unresolvable="true" location="classpath:zerodbt.properties"/>
<!--定义producer的参数-->
<bean id="producerProperties" class="java.util.HashMap">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="${kafka.bootstrap.servers}" />
<entry key="group.id" value="${kafka.group.id}" />
<entry key="retries" value="${kafka.retries}" />
<entry key="batch.size" value="${kafka.batch.size}" />
<entry key="linger.ms" value="${kafka.linger.ms}" />
<entry key="buffer.memory" value="${kafka.buffer.memory}" />
<entry key="key.serializer" value="${kafka.key.serializer}" />
<entry key="value.serializer" value="${kafka.value.serializer}" />
</map>
</constructor-arg>
</bean>
<!-- 创建kafkatemplate需要使用的producerfactory bean -->
<bean id="producerFactory"
class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
<constructor-arg>
<ref bean="producerProperties" />
</constructor-arg>
</bean>
<!-- 创建kafkatemplate bean,使用的时候,只需要注入这个bean,即可使用template的send消息方法 -->
<bean id="KafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
<constructor-arg ref="producerFactory" />
<constructor-arg name="autoFlush" value="${kafka.autoFlush}" />
<property name="defaultTopic" value="${kafka.defaultTopic}" />
</bean>
</beans>
配置文件zerodbt.properties如下
kafka.bootstrap.servers=192.168.1.211:9092,192.168.1.212:9092,192.168.1.213:9092
kafka.group.id=2
kafka.retries=10
kafka.batch.size=16384
kafka.linger.ms=1
kafka.buffer.memory=33554432
kafka.key.serializer=org.apache.kafka.common.serialization.StringSerializer
kafka.value.serializer=org.apache.kafka.common.serialization.StringSerializer
kafka.autoFlush=true
kafka.defaultTopic=test
使用方式:
//自动注入
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
//使用send方法发送消息
kafkaTemplate.send("qweasd","test","KAFKA分布式消息服务测试");
配置消费者
在spring-context-kafka.xml补充如下配置
<!-- 定义consumer的参数 -->
<bean id="consumerProperties" class="java.util.HashMap">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="${kafka.bootstrap.servers}" />
<entry key="group.id" value="${kafka.group.id}" />
<entry key="enable.auto.commit" value="${kafka.enable.auto.commit}" />
<entry key="auto.commit.interval.ms" value="${kafka.auto.commit.interval.ms}" />
<entry key="session.timeout.ms" value="${kafka.session.timeout.ms}" />
<entry key="key.deserializer" value="${kafka.key.deserializer}" />
<entry key="value.deserializer" value="${kafka.value.deserializer}" />
</map>
</constructor-arg>
</bean>
<!-- 创建consumerFactory bean -->
<bean id="consumerFactory"
class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
<constructor-arg>
<ref bean="consumerProperties" />
</constructor-arg>
</bean>
<!-- 实际执行消息消费的类 -->
<bean id="messageListernerConsumerService" class="com.lczyfz.zerobdt.test.service.KafkaConsumerService" />
<!-- 消费者容器配置信息 -->
<bean id="containerProperties"
class="org.springframework.kafka.listener.config.ContainerProperties">
<constructor-arg value="${kafka.topic}" />
<property name="messageListener" ref="messageListernerConsumerService" />
</bean>
<!-- 创建kafkatemplate bean,使用的时候,只需要注入这个bean,即可使用template的send消息方法 -->
<bean id="messageListenerContainer"
class="org.springframework.kafka.listener.KafkaMessageListenerContainer"
init-method="doStart">
<constructor-arg ref="consumerFactory" />
<constructor-arg ref="containerProperties" />
</bean>
配置文件zerodbt.properties增加如下
kafka.enable.auto.commit=true
kafka.auto.commit.interval.ms=1000
kafka.session.timeout.ms=15000
kafka.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
kafka.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
kafka.topic=qweasd
接收消息处理类com.lczyfz.zerobdt.test.service.KafkaConsumerService
package com.lczyfz.zerobdt.test.service;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.stereotype.Service;
/**
* Created by maple on 2018-07-05.
*/
@Service
public class KafkaConsumerService implements MessageListener<String, String> {
@Override
public void onMessage(ConsumerRecord<String, String> stringStringConsumerRecord) {
System.out.println("====================" + stringStringConsumerRecord);
System.out.println(stringStringConsumerRecord.key());
System.out.println(stringStringConsumerRecord.value());
}
}
运行结果
网友评论