美文网首页
基于消费组实现轮循单播功能

基于消费组实现轮循单播功能

作者: 那钱有着落吗 | 来源:发表于2021-10-28 11:34 被阅读0次
image.png

创建接口

public interface GroupTopic {

    String INPUT = "group-consumer";

    String OUTPUT = "group-producer";

    @Input(INPUT)
    SubscribableChannel input();

    @Output(OUTPUT)
    MessageChannel output();
}

消息监听器


@Slf4j
@EnableBinding(
        value = {
                GroupTopic.class
        }
)
public class StreamConsumer {

    @StreamListener(GroupTopic.INPUT)
    public void consumeGroupMessage(Object payload){
        log.info("Group message consumed successfully,payload={}",payload);
    }


}

控制器修改


@RestController
@RequestMapping
@Configuration
@Slf4j
public class Controller {

    private final StreamBridge streamBridge;

    @Autowired
    private GroupTopic groupTopicProducer;

    @Autowired
    public Controller(StreamBridge streamBridge) {
        this.streamBridge = streamBridge;
    }

    @PostMapping("sendToGroup")
    public void sendMessage(@RequestParam(value="body")String body){
        groupTopicProducer.output().send(MessageBuilder.withPayload(body).build());
    }


}

配置文件修改

#消息分组示例
spring.cloud.stream.bindings.group-consumer.destination=group-topic
spring.cloud.stream.bindings.group-producer.destination=group-topic
spring.cloud.stream.bindings.group-consumer.group=Group-A

然后以不同的端口号启动项目两次,访问接口发送数据:


image.png

之后我们再到控制台就会发现,只有一个窗口出现了消息,这就证明消费组配置生效了,也就是说在一个分组内,只有一个消费者会接收到消息。

两个或以上的消费组测试

这个测试很简单,就是我们改下端口号和组名:

#消息分组示例
spring.cloud.stream.bindings.group-consumer.destination=group-topic
spring.cloud.stream.bindings.group-producer.destination=group-topic
spring.cloud.stream.bindings.group-consumer.group=Group-B

然后启动项目,发送接口请求,这个时候就会发现第三个服务收到消息了,然后前两个服务依然只有一台收到消息了,也就是说每个消费组都会收到消息,但是组中只有一台服务器可以消费消息。

消费分组

首先需要打开消费分区的功能,然后配置好消费分区的数量,然后分区key,消费者的实例总数,当前实例的索引值

#消费分区配置

#打开消费者的消费分区功能
spring.cloud.stream.bindings.group-consumer.consumer.partitioned=true
#两个消费分区
spring.cloud.stream.bindings.group-producer.producer.partition-count=2
#SpEl(key resolver)
#只有索引参数为1的节点(消费者),才能消费消息
spring.cloud.stream.bindings.group-producer.producer.partition-key-expression=1
#当前消费者实例总数
spring.cloud.stream.instance-count=2
#最大值instanceCount-1,当前实例的索引号
spring.cloud.stream.instance-index=1

配置好之后,我们把最后一个参数改为0启动一个实例,然后参数改为1端口号改下再启动实例;

然后发送接口就会发现,只有index=1的实例接收到消息了;

消费分区,多个实例测试

在上面的基础上,我们把index=1 改下端口再启动一个实例,发送接口观察,会发现index=1的两个实例会轮询着接受消息:

我这边总共发送了三次接口,然后消息就轮询着被后两个服务接受消费。


image.png image.png

相关文章

  • 基于消费组实现轮循单播功能

    创建接口 消息监听器 控制器修改 配置文件修改 然后以不同的端口号启动项目两次,访问接口发送数据: 之后我们再到控...

  • TCP/IP网络应用Socket编程实验报告

    摘要 基于Socket编程的基本原理和开发流程,本文设计并实现了基于单播和组播的多人聊天工具,以及基于组播的...

  • Kafka消息单播与多播的概念介绍

    Kafka引入了消费者组概念,每个消费者都属于一个特定的消费者组,通过消费者组就可以实现消息的单播与多播。本文将详...

  • 消息系统设计

    消息推送和聊天功能是移动时代的重要功能,广泛存在于各种业务中 一、特性 消息推送(单播、组播、广播); 聊天(聊天...

  • 消费者客户端(1)-消息拉取总概

    前言 从本章开始,将会以系列的方式,分章节讲解Rocket Mq是如何实现消息的消费模型。 消息的广播或单播 不管...

  • 循循尚购App-技术服务支持

    循循尚购是真正基于云共享经济下的“实体店+消费商+持续分润”的创新商业平台。 对平台而言,循循尚购可以连接线上线下...

  • 轮循

    一滴藏在心里的泪 一片泊于冷空的云 爱的火 燃烧了泪 怨的冰 凝结了云 你我只是 循于其中的水 沾染了爱 就强装勇...

  • swift UICollectionView 的使用:实现相册功

    这里的相册功能是基于 UIcollection 实现的,相比于基于 UIScrollview 来实现,这种实现方式...

  • Android端基于JavaCV实现人脸检测功能

    JavaCV-FaceDetect Android端基于JavaCV实现人脸检测功能 实现功能 人脸检测功能:Fa...

  • 基于订阅模式的系统站内消息系统数据库设计

    网站的站内消息可能有单播,组播,广播。比如余额提醒是单播,vip用户消息是组播,系统通知是广播。一般情况下这些可能...

网友评论

      本文标题:基于消费组实现轮循单播功能

      本文链接:https://www.haomeiwen.com/subject/bwoqnltx.html