Direct模式
引入依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置:
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
然后写个配置类:
@Configuration
public class MqConfig {
@Bean
public Queue queue() {
return new Queue("queue");
}
}
然后生产者类:
@Component
public class HelloSender {
@Autowired
private AmqpTemplate template;
public void send() {
template.convertAndSend("queue","Hello,MQ");
}
}
然后再另一个应用程序写消费者类:
@Component
public class HelloReceive {
@RabbitListener(queues="queue") //监听器监听指定的Queue
public void receive(String str) {
System.out.println("Receive:"+str);
}
}
跑生产者程序的测试,消费者程序就可以收到了:
@Autowired
private HelloSender helloSender;
@Test
public void testRabbit() {
helloSender.send();
}
默认情况下,RabbitMq的虚拟机是 "/" ,它默认有交换机和路由键,交换机配置类型就是Direct
而template.convertAndSend("queue","Hello,MQ"),默认就是通过默认的虚拟机处理消息,默认的虚拟机会自动绑定所有的队列,默认的队列名就是路由键Key,可以看到:
/**
* Convert a Java object to an Amqp {@link Message} and send it to a default exchange
* with a specific routing key.
*
* @param routingKey the routing key
* @param message a message to send
* @throws AmqpException if there is a problem
*/
void convertAndSend(String routingKey, Object message) throws AmqpException;
自定义虚拟机
配置
@Bean
public Queue myQueue() {
return new Queue("myQueue");
}
@Bean
public CustomExchange customExchange(){
return new CustomExchange("CustomExchange",ExchangeTypes.DIRECT);
}
@Bean
Binding customExchangeBinding( Queue myQueue, CustomExchange customExchange) {
return BindingBuilder.bind(myQueue).to(customExchange).with("Apple").noargs();
}
发送者
@Component
public class HelloSender {
@Autowired
private AmqpTemplate template;
public void send() {
//template.convertAndSend("queue","Hello,MQ");
template.convertAndSend("CustomExchange", "Apple", "Hello,MQ");
}
}
接收者
@Component
public class HelloReceive {
@RabbitListener(queues="myQueue") //监听器监听指定的Queue
public void receive(String str) {
System.out.println("Receive:"+str);
}
}
Topic模式
同样的首先配置Topic交换机并绑定队列
@Bean
public Queue myTopicQueue() {
return new Queue("myTopicQueue");
}
@Bean
public TopicExchange topicExchange(){
return new TopicExchange("TopicExchange");
}
@Bean
Binding topicExchangeBinding( Queue myTopicQueue, TopicExchange topicExchange) {
return BindingBuilder.bind(myTopicQueue).to(topicExchange).with("Apple.#");
}
然后发生消息
@Component
public class HelloSender {
@Autowired
private AmqpTemplate template;
public void send() {
//template.convertAndSend("queue","Hello,MQ");
// template.convertAndSend("CustomExchange", "Apple", "Hello,MQ");
template.convertAndSend("TopicExchange", "Apple.666", "Hello,MQ");
}
}
接收消息:
@Component
public class HelloReceive {
@RabbitListener(queues="myTopicQueue") //监听器监听指定的Queue
public void receive(String str) {
System.out.println("Receive:"+str);
}
}
Fanout模式
同样的套路
@Bean
public Queue myFanoutQueue() {
return new Queue("myFanoutQueue");
}
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("FanoutExchange");
}
@Bean
Binding fanoutExchangeBinding( Queue myFanoutQueue, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(myFanoutQueue).to(fanoutExchange);
}
@Component
public class HelloSender {
@Autowired
private AmqpTemplate template;
public void send() {
template.convertAndSend("FanoutExchange", "", "Hello,MQ");
}
}
@Component
public class HelloReceive {
@RabbitListener(queues="myFanoutQueue") //监听器监听指定的Queue
public void receive(String str) {
System.out.println("Receive:"+str);
}
}
网友评论