
创建接口
public interface GroupTopic {
String INPUT = "group-consumer";
String OUTPUT = "group-producer";
@Input(INPUT)
SubscribableChannel input();
@Output(OUTPUT)
MessageChannel output();
}
消息监听器
@Slf4j
@EnableBinding(
value = {
GroupTopic.class
}
)
public class StreamConsumer {
@StreamListener(GroupTopic.INPUT)
public void consumeGroupMessage(Object payload){
log.info("Group message consumed successfully,payload={}",payload);
}
}
控制器修改
@RestController
@RequestMapping
@Configuration
@Slf4j
public class Controller {
private final StreamBridge streamBridge;
@Autowired
private GroupTopic groupTopicProducer;
@Autowired
public Controller(StreamBridge streamBridge) {
this.streamBridge = streamBridge;
}
@PostMapping("sendToGroup")
public void sendMessage(@RequestParam(value="body")String body){
groupTopicProducer.output().send(MessageBuilder.withPayload(body).build());
}
}
配置文件修改
#消息分组示例
spring.cloud.stream.bindings.group-consumer.destination=group-topic
spring.cloud.stream.bindings.group-producer.destination=group-topic
spring.cloud.stream.bindings.group-consumer.group=Group-A
然后以不同的端口号启动项目两次,访问接口发送数据:

之后我们再到控制台就会发现,只有一个窗口出现了消息,这就证明消费组配置生效了,也就是说在一个分组内,只有一个消费者会接收到消息。
两个或以上的消费组测试
这个测试很简单,就是我们改下端口号和组名:
#消息分组示例
spring.cloud.stream.bindings.group-consumer.destination=group-topic
spring.cloud.stream.bindings.group-producer.destination=group-topic
spring.cloud.stream.bindings.group-consumer.group=Group-B
然后启动项目,发送接口请求,这个时候就会发现第三个服务收到消息了,然后前两个服务依然只有一台收到消息了,也就是说每个消费组都会收到消息,但是组中只有一台服务器可以消费消息。
消费分组
首先需要打开消费分区的功能,然后配置好消费分区的数量,然后分区key,消费者的实例总数,当前实例的索引值
#消费分区配置
#打开消费者的消费分区功能
spring.cloud.stream.bindings.group-consumer.consumer.partitioned=true
#两个消费分区
spring.cloud.stream.bindings.group-producer.producer.partition-count=2
#SpEl(key resolver)
#只有索引参数为1的节点(消费者),才能消费消息
spring.cloud.stream.bindings.group-producer.producer.partition-key-expression=1
#当前消费者实例总数
spring.cloud.stream.instance-count=2
#最大值instanceCount-1,当前实例的索引号
spring.cloud.stream.instance-index=1
配置好之后,我们把最后一个参数改为0启动一个实例,然后参数改为1端口号改下再启动实例;
然后发送接口就会发现,只有index=1的实例接收到消息了;
消费分区,多个实例测试
在上面的基础上,我们把index=1 改下端口再启动一个实例,发送接口观察,会发现index=1的两个实例会轮询着接受消息:
我这边总共发送了三次接口,然后消息就轮询着被后两个服务接受消费。


网友评论