背景:由于一些原因,为了实现ApiGateway的动态更新,当前微服务架构并没有总线(Spring cloud bus),为了实现的业务场景是在管理平台更新某个路由规则,需要通知到所有别的实例。
RabbitMQ
为了实现业务第一时间想到MQ。但是RabbitMQ只有基于Queue的。
Exchange有三类:fanout、direct、topic
exchange与queue是绑定管理。
fanout 会把同一条消息发送给所有绑定的queue
direct 会把消息路由到那些binding key与routing key完全匹配的Queue
topic 也是将消息路由到binding key与routing key相匹配的Queue中
很明显,我们选择的是fanout的Exchange . 在代码里,动态创建临时的queue,动态绑定到fanout。由于临时的queue在实例关闭的时候就会删除。所以非常满足当前的业务场景。
上代码:
///创建mqFactory
@Bean(name = "mqConnectionFactory")
@Primary
public ConnectionFactory firstConnectionFactory(@Value("${mq.server.host}") String host,
@Value("${mq.server.port}") int port,
@Value("${mq.server.username}") String username,
@Value("${mq.server.password}") String password,
@Value("${mq.server.virtual-host}") String virtualHost) {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);
return connectionFactory;
}
///创建临时的queue ,通过KeyGenerator保证queue名唯一(实现可以很多)
@Bean(name = "refreshRoutesQueue")
@Primary
public Queue queue() {
String queueName = "etc.gk." + KeyGenerator.generatorOrderKey();
Queue queue = new Queue(queueName, false, false, true);
return queue;
}
///创建fanout的实例
@Bean(name = "changeFanout")
@Primary
public FanoutExchange fanoutExchange(@Value("${mq.service.routes.change.fanout}") String fanout) {
return new FanoutExchange(fanout);
}
///绑定queue和exchange
@Bean(name = "binding-exchangeMessage")
public Binding bindingExchangeMessage(@Qualifier("refreshRoutesQueue") Queue queueMessage, @Qualifier("changeFanout") FanoutExchange exchange) {
return BindingBuilder.bind(queueMessage).to(exchange);
}
接下来的操作就是监听这个Queue,创建Listener,Container 就不赘述了
网友评论