美文网首页菜鸟要飞
Kafka学习(三)kafka_2.12-1.1.0 整合Spr

Kafka学习(三)kafka_2.12-1.1.0 整合Spr

作者: 万总有点菜 | 来源:发表于2018-07-05 22:54 被阅读4次

准备

  • 一个springMVC基础工程
  • 在pom.xml添加依赖,jackson的版本不能太低,要不然会报错误。
java.lang.NoClassDefFoundError: com/fasterxml/jackson/databind/deser/std/StdNodeBasedDeserializer
#Kafka
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.1.7.RELEASE</version>
</dependency>
#fasterxml
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-core</artifactId>
    <version>2.9.6</version>
</dependency>
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.9.6</version>
</dependency>
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-annotations</artifactId>
    <version>2.9.6</version>
</dependency>
<dependency>
    <groupId>com.fasterxml.jackson.module</groupId>
    <artifactId>jackson-module-jaxb-annotations</artifactId>
    <version>2.9.6</version>
</dependency>
  • 创建配置文件spring-context-kafka.xml

配置生产者

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
        http://www.springframework.org/schema/context  http://www.springframework.org/schema/context/spring-context-4.1.xsd"
    default-lazy-init="false">

    <description>Kafka Configuration</description>

    <!-- 加载配置属性文件 -->
    <context:property-placeholder ignore-unresolvable="true" location="classpath:zerodbt.properties"/>

    <!--定义producer的参数-->
    <bean id="producerProperties" class="java.util.HashMap">
        <constructor-arg>
            <map>
                <entry key="bootstrap.servers" value="${kafka.bootstrap.servers}" />
                <entry key="group.id" value="${kafka.group.id}" />
                <entry key="retries" value="${kafka.retries}" />
                <entry key="batch.size" value="${kafka.batch.size}" />
                <entry key="linger.ms" value="${kafka.linger.ms}" />
                <entry key="buffer.memory" value="${kafka.buffer.memory}" />
                <entry key="key.serializer" value="${kafka.key.serializer}" />
                <entry key="value.serializer" value="${kafka.value.serializer}" />
            </map>
        </constructor-arg>
    </bean>

    <!-- 创建kafkatemplate需要使用的producerfactory bean -->
    <bean id="producerFactory"
          class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
        <constructor-arg>
            <ref bean="producerProperties" />
        </constructor-arg>
    </bean>

    <!-- 创建kafkatemplate bean,使用的时候,只需要注入这个bean,即可使用template的send消息方法 -->
    <bean id="KafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
        <constructor-arg ref="producerFactory" />
        <constructor-arg name="autoFlush" value="${kafka.autoFlush}" />
        <property name="defaultTopic" value="${kafka.defaultTopic}" />
    </bean>
</beans>

配置文件zerodbt.properties如下

kafka.bootstrap.servers=192.168.1.211:9092,192.168.1.212:9092,192.168.1.213:9092
kafka.group.id=2
kafka.retries=10
kafka.batch.size=16384
kafka.linger.ms=1
kafka.buffer.memory=33554432
kafka.key.serializer=org.apache.kafka.common.serialization.StringSerializer
kafka.value.serializer=org.apache.kafka.common.serialization.StringSerializer
kafka.autoFlush=true
kafka.defaultTopic=test

使用方式:

//自动注入
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

//使用send方法发送消息
kafkaTemplate.send("qweasd","test","KAFKA分布式消息服务测试");

配置消费者

在spring-context-kafka.xml补充如下配置

<!-- 定义consumer的参数 -->
    <bean id="consumerProperties" class="java.util.HashMap">
        <constructor-arg>
            <map>
                <entry key="bootstrap.servers" value="${kafka.bootstrap.servers}" />
                <entry key="group.id" value="${kafka.group.id}" />
                <entry key="enable.auto.commit" value="${kafka.enable.auto.commit}" />
                <entry key="auto.commit.interval.ms" value="${kafka.auto.commit.interval.ms}" />
                <entry key="session.timeout.ms" value="${kafka.session.timeout.ms}" />
                <entry key="key.deserializer" value="${kafka.key.deserializer}" />
                <entry key="value.deserializer" value="${kafka.value.deserializer}" />
            </map>
        </constructor-arg>
    </bean>

    <!-- 创建consumerFactory bean -->
    <bean id="consumerFactory"
          class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
        <constructor-arg>
            <ref bean="consumerProperties" />
        </constructor-arg>
    </bean>

    <!-- 实际执行消息消费的类 -->
    <bean id="messageListernerConsumerService" class="com.lczyfz.zerobdt.test.service.KafkaConsumerService" />

    <!-- 消费者容器配置信息 -->
    <bean id="containerProperties"
          class="org.springframework.kafka.listener.config.ContainerProperties">
        <constructor-arg value="${kafka.topic}" />
        <property name="messageListener" ref="messageListernerConsumerService" />
    </bean>

    <!-- 创建kafkatemplate bean,使用的时候,只需要注入这个bean,即可使用template的send消息方法 -->
    <bean id="messageListenerContainer"
          class="org.springframework.kafka.listener.KafkaMessageListenerContainer"
          init-method="doStart">
        <constructor-arg ref="consumerFactory" />
        <constructor-arg ref="containerProperties" />
    </bean>

配置文件zerodbt.properties增加如下

kafka.enable.auto.commit=true
kafka.auto.commit.interval.ms=1000
kafka.session.timeout.ms=15000
kafka.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
kafka.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
kafka.topic=qweasd

接收消息处理类com.lczyfz.zerobdt.test.service.KafkaConsumerService

package com.lczyfz.zerobdt.test.service;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.stereotype.Service;

/**
 * Created by maple on 2018-07-05.
 */
@Service
public class KafkaConsumerService implements MessageListener<String, String> {


    @Override
    public void onMessage(ConsumerRecord<String, String> stringStringConsumerRecord) {
        System.out.println("====================" + stringStringConsumerRecord);
        System.out.println(stringStringConsumerRecord.key());
        System.out.println(stringStringConsumerRecord.value());
    }
}

运行结果

相关文章

网友评论

    本文标题:Kafka学习(三)kafka_2.12-1.1.0 整合Spr

    本文链接:https://www.haomeiwen.com/subject/nmlmuftx.html