美文网首页springboot
springboot整合kafka

springboot整合kafka

作者: 穿越沙漠的马兰花 | 来源:发表于2021-11-15 09:12 被阅读0次

  之前项目用了kafka,当时多少有点仓促,看了一下简单的api直接使用了就,一直想着自己搭建一个简单的Demo,这个周末终于有时间搞了一下。话不多说,直接上代码吧。

整体框架图

pom.xml

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">

<modelVersion>4.0.0</modelVersion>

<parent>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-parent</artifactId>

<version>2.4.2</version>

<relativePath/>

    </parent>

<groupId>com.atyangjun</groupId>

<artifactId>sharding-demo</artifactId>

<version>0.0.1-SNAPSHOT</version>

<name>sharding-demo</name>

<description>Demo project for Spring Boot</description>

<properties>

<java.version>1.8</java.version>

</properties>

<dependencies>

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter</artifactId>

</dependency>

<dependency>

<groupId>org.projectlombok</groupId>

<artifactId>lombok</artifactId>

<optional>true</optional>

</dependency>

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-test</artifactId>

<scope>test</scope>

</dependency>

<dependency>

<groupId>mysql</groupId>

<artifactId>mysql-connector-java</artifactId>

</dependency>

<dependency>

<groupId>com.baomidou</groupId>

<artifactId>mybatis-plus-boot-starter</artifactId>

<version>3.0.5</version>

</dependency>

        <dependency>

<groupId>commons-fileupload</groupId>

<artifactId>commons-fileupload</artifactId>

<version>1.3.1</version>

</dependency>

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-thymeleaf</artifactId>

</dependency>

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-web</artifactId>

</dependency>

<dependency>

<groupId>org.springframework.kafka</groupId>

<artifactId>spring-kafka</artifactId>

</dependency>

<dependency>

<groupId>com.google.code.gson</groupId>

<artifactId>gson</artifactId>

<version>2.8.2</version>

</dependency>

<dependency>

<groupId>org.springframework</groupId>

<artifactId>spring-tx</artifactId>

</dependency>

</dependencies>

<build>

<plugins>

<plugin>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-maven-plugin</artifactId>

<configuration>

<excludes>

<exclude>

<groupId>org.projectlombok</groupId>

<artifactId>lombok</artifactId>

</exclude>

</excludes>

</configuration>

</plugin>

</plugins>

</build>

</project>

1、新建bean:

import lombok.Data;

import java.util.Date;

@Data

public class Message {

private Longid;

private Stringmsg;

private DatesendTime;

}

2、生产者service :KafkaProService

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.kafka.core.KafkaTemplate;

import org.springframework.kafka.support.SendResult;

import org.springframework.stereotype.Service;

import org.springframework.util.concurrent.ListenableFuture;

@Service

public class KafkaProService {

private static final LoggerLOG = LoggerFactory.getLogger(KafkaProService.class);

@Autowired

    private KafkaTemplatekafkaTemplate;

@Value("${kafka.app.topic.foo}")

private Stringtopic;

public void send(String message) {

LOG.info("topic=" +topic +",message=" + message);

ListenableFuture> future =kafkaTemplate.send(topic, message);

future.addCallback(success ->LOG.info("KafkaMessageProducer 发送消息成功!"),

fail ->LOG.error("KafkaMessageProducer 发送消息失败!"));

}

}

3、消费者KafkaReceiver

import lombok.extern.slf4j.Slf4j;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.springframework.kafka.annotation.KafkaListener;

import org.springframework.stereotype.Component;

import java.util.Optional;

@Component

@Slf4j

public class KafkaReceiver {

@KafkaListener(topics = {"test"})

public void listen(ConsumerRecord record) {

Optional kafkaMessage = Optional.ofNullable(record.value());

if (kafkaMessage.isPresent()) {

Object message = kafkaMessage.get();

log.info("===消费者收到消息============");

log.info("消息=======:" + message);

}

}

}

4、KafkaProController

package com.atyangjun.shardingdemo.controller;

import com.atyangjun.shardingdemo.entity.Message;

import com.atyangjun.shardingdemo.service.KafkaProService;

import com.google.gson.Gson;

import com.google.gson.GsonBuilder;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Controller;

import org.springframework.web.bind.annotation.RequestMapping;

import org.springframework.web.bind.annotation.RequestParam;

import org.springframework.web.bind.annotation.ResponseBody;

import java.util.Date;

import java.util.UUID;

@Controller

public class KafkaProController {

@Autowired

    private KafkaProServicekafkaProService;

private Gsongson =new GsonBuilder().create();

private static final LoggerLOGGER = LoggerFactory.getLogger(KafkaProController.class);

@RequestMapping("sendMessage")

@ResponseBody

    public String send(@RequestParam(required =true) String message) {

try {

kafkaProService.send(message);

}catch (Exception e) {

return "send failed";

}

return message;

}

@RequestMapping("sendBatch")

@ResponseBody

    public String send2() {

for (int i =0; i <10; i++) {

try {

Message message =new Message();

message.setId(System.currentTimeMillis());

message.setMsg(UUID.randomUUID().toString());

message.setSendTime(new Date());

LOGGER.info("消费者发送消息  message = {}",gson.toJson(message));

String s =gson.toJson(message);

kafkaProService.send(s);

}catch (Exception e) {

return "send failed";

}

}

return "success";

}

}

5、application.properties 配置中心

#==kakfa配置

#spring.kafka.bootstrap-servers=127.0.0.1:9092

spring.kafka.bootstrap-servers=0.0.0.0:9092

#=========provider

spring.kafka.producer.retries=0

# 每次批量发送消息的数量

spring.kafka.producer.batch-size=16384

spring.kafka.producer.buffer-memory=33554432

# 指定消息key和消息体的编解码方式

spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer

spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

#========consumer

# 指定默认消费者group id

spring.kafka.consumer.group-id=test-consumer-group

spring.kafka.consumer.auto-offset-reset=earliest

spring.kafka.consumer.enable-auto-commit=true

spring.kafka.consumer.auto-commit-interval=100

# 指定消息key和消息体的编解码方式

spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer

spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

kafka.app.topic.foo=test

运行之后 在页面发送请求,一种单个message,一种批量任务。

如果小伙伴在运行期间有啥问题,可以留言一起解决哈,虽然一个简单的Demo,可是稍不注意 会有几个小坑呢🌈。

相关文章

网友评论

    本文标题:springboot整合kafka

    本文链接:https://www.haomeiwen.com/subject/ieozzltx.html