1、maven依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.2.2.RELEASE</version>
</dependency>
2、配置文件
spring.kafka.bootstrap-servers=xxx.xx.xx.xx:9092
#设置一个默认组
spring:
kafka:
producer:
batch-size: 65536
buffer-memory: 524288
value-serializer: org.apache.kafka.common.serialization.StringSerializer
key-serializer: org.apache.kafka.common.serialization.StringSerializer
bootstrap-servers: 119.29.192.127:9092
consumer:
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
group-id: 0
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
3、生产者
package com.hsshy.beam.kafka.service;
import com.alibaba.fastjson.JSON;
import com.hsshy.beam.sys.entity.SysConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@Component
public class KafkaSender {
@Autowired
private KafkaTemplate<String,Object> kafkaTemplate;
/**
* 发送消息到kafka
*/
public void sendChannelMess(String channel, String message){
kafkaTemplate.send(channel,message);
}
public void sendChannelMap(String channel, SysConfig message){
kafkaTemplate.send(channel, JSON.toJSONString(message));
}
}
4、消费者
package com.hsshy.beam.kafka.service;
import com.alibaba.fastjson.JSON;
import com.hsshy.beam.common.utils.RedisUtil;
import com.hsshy.beam.sys.entity.SysConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class KafkaConsumer {
@Autowired
private RedisUtil redisUtil;
/**
* 监听testTopic主题,有消息就读取
* @param message
*/
@KafkaListener(topics = {"testTopic"})
public void receiveMessage(String message){
//收到通道的消息之后执行操作
System.out.println("消息1:"+message);
}
/**
* 监听test2主题,有消息就读取
* @param message
*/
@KafkaListener(topics = {"test2"})
public void receiveMap(String message){
//收到通道的消息之后执行操作
System.out.println("消息2:"+message);
SysConfig sysConfig = JSON.parseObject(message,SysConfig.class);
System.out.println(sysConfig.getParamKey());
}
}
5、Controller
package com.hsshy.beam.kafka.controller;
import com.hsshy.beam.common.utils.R;
import com.hsshy.beam.kafka.service.KafkaSender;
import com.hsshy.beam.sys.entity.SysConfig;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;
import java.util.Map;
/**
* @description: kafka演示
* @author: hs
* @create: 2019-04-03 10:13:12
**/
@RequestMapping(value = "/kafka")
@RestController
@Api(value = "KafkaController",tags = "KafkaController")
public class KafkaController {
@Autowired
private KafkaSender kafkaSender;
@ApiOperation(value = "测试")
@GetMapping(value = "/test")
public R test(){
kafkaSender.sendChannelMess("testTopic","caonima");
SysConfig sysConfig = new SysConfig();
sysConfig.setParamKey("11111");
kafkaSender.sendChannelMap("test2",sysConfig);
return R.ok();
}
}
网友评论