一.简单队列
1.配置pom文件,主要是添加spring-boot-starter-amqp的支持
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.配置application.properties文件
spring:
application:
name: spirng-boot-rabbitmq
rabbitmq:
host: 49.235.110.134
port: 5672
username: root
password: root
3.配置队列
package cn.lovingliu.rabbitmq.config;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Author:LovingLiu
* @Description: 配置队列
* @Date:Created in 2020-01-16
*/
@Configuration
public class RabbitConfig {
@Bean
public Queue queue() {
return new Queue("q_rabbit");
}
}
4.生产者
package cn.lovingliu.rabbitmq.producer;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* @Author:LovingLiu
* @Description: 生产者
* @Date:Created in 2020-01-16
*/
@Component
public class Producer {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send() {
String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()); //24小时制
String context = "hello " + date;
System.out.println("Sender : " + context);
//简单对列的情况下routingKey即为Queue名
this.rabbitTemplate.convertAndSend("q_rabbit", context);
}
}
5.接收者
package cn.lovingliu.rabbitmq.consumer;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @Author:LovingLiu
* @Description: 消费者
* @Date:Created in 2020-01-16
*/
@Component
@RabbitListener(queues = "q_rabbit")
public class Receiver {
@RabbitHandler
public void process(String hello) {
System.out.println("Receiver : " + hello);
}
}
6.测试
package cn.lovingliu.rabbitmq.test;
import cn.lovingliu.rabbitmq.producer.Producer;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
/**
* @Author:LovingLiu
* @Description:
* @Date:Created in 2020-01-16
*/
@RunWith(SpringRunner.class)
@SpringBootTest
public class Test {
@Autowired
private Producer producer;
@org.junit.Test
public void oneToOne() throws Exception {
producer.send();
}
}
简单队列
二.工作队列(work)
1.创建两个消费者
cn.lovingliu.rabbitmq_work.consumer.ReceiverWork1
package cn.lovingliu.rabbitmq_work.consumer;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @Author:LovingLiu
* @Description: 消费者1
* @Date:Created in 2020-01-16
*/
@Component
@RabbitListener(queues = "q_rabbit_work")
public class ReceiverWork1 {
@RabbitHandler
public void process(String hello) {
System.out.println("Receiver1 : " + hello);
}
}
cn.lovingliu.rabbitmq_work.consumer.ReceiverWork2
package cn.lovingliu.rabbitmq_work.consumer;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @Author:LovingLiu
* @Description: 消费者2
* @Date:Created in 2020-01-16
*/
@Component
@RabbitListener(queues = "q_rabbit_work")
public class ReceiverWork2 {
@RabbitHandler
public void process(String hello) {
System.out.println("Receiver2 : " + hello);
}
}
2.消费者
package cn.lovingliu.rabbitmq_work.producer;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* @Author:LovingLiu
* @Description: 生产者
* @Date:Created in 2020-01-16
*/
@Component
public class Producer {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send(int i) {
String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());//24小时制
String context = "hello " + i + " " + date;
System.out.println("Sender : " + context);
//简单对列的情况下routingKey即为Q名
this.rabbitTemplate.convertAndSend("q_rabbit_work", context);
}
}
3.测试
package cn.lovingliu.rabbitmq_work.test;
import cn.lovingliu.rabbitmq_work.producer.Producer;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
/**
* @Author:LovingLiu
* @Description: work 工作队列测试
* @Date:Created in 2020-01-16
*/
@RunWith(SpringRunner.class)
@SpringBootTest
public class Test {
@Autowired
private Producer producer;
@org.junit.Test
public void oneToMany() throws Exception {
for (int i=0;i<100;i++){
producer.send(i);
Thread.sleep(300);
}
}
}
三.Topic Exchange(主题模式)
topic
是RabbitMQ
中最灵活的一种方式,可以根据routing_key
自由的绑定不同的队列.
1.配置队列,绑定交换机
package cn.lovingliu.rabbitmq_topic.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Author:LovingLiu
* @Description: 配置队列,绑定交换机
* @Date:Created in 2020-01-16
*/
@Configuration
public class TopicRabbitConfig {
public final static String QUEUE_NAME_1 = "q_rabbit_topic_1";
public final static String QUEUE_NAME_2 = "q_rabbit_topic_2";
public final static String EXCHANGE_NAME = "my_exchange";
@Bean
public Queue queue1() {
return new Queue(TopicRabbitConfig.QUEUE_NAME_1);
}
@Bean
public Queue queue2() {
return new Queue(TopicRabbitConfig.QUEUE_NAME_2);
}
/**
* 声明一个Topic类型的交换机
* @return
*/
@Bean
TopicExchange exchange() {
return new TopicExchange(EXCHANGE_NAME);
}
/**
* 绑定Q到交换机,并且指定routingKey
* @param queue1
* @param exchange
* @return
*/
@Bean
Binding bindingExchangeMessage(Queue queue1, TopicExchange exchange) {
return BindingBuilder.bind(queue1).to(exchange).with("topic.1.*");
}
@Bean
Binding bindingExchangeMessages(Queue queue2, TopicExchange exchange) {
return BindingBuilder.bind(queue2).to(exchange).with("topic.2.*");
}
}
2.创建2个消费者
package cn.lovingliu.rabbitmq_topic.consumer;
import cn.lovingliu.rabbitmq_topic.config.TopicRabbitConfig;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @Author:LovingLiu
* @Description: 消费者1
* @Date:Created in 2020-01-16
*/
@Component
@RabbitListener(queues = TopicRabbitConfig.QUEUE_NAME_1)
public class ReceiverTopic1 {
@RabbitHandler
public void process(String hello) {
System.out.println("Receiver1 : " + hello);
}
}
package cn.lovingliu.rabbitmq_topic.consumer;
import cn.lovingliu.rabbitmq_topic.config.TopicRabbitConfig;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @Author:LovingLiu
* @Description: 消费者2
* @Date:Created in 2020-01-16
*/
@Component
@RabbitListener(queues = TopicRabbitConfig.QUEUE_NAME_2)
public class ReceiverTopic2 {
@RabbitHandler
public void process(String hello) {
System.out.println("Receiver2 : " + hello);
}
}
3.消息发送者(生产者)
package cn.lovingliu.rabbitmq_topic.producer;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @Author:LovingLiu
* @Description: 生产者
* @Date:Created in 2020-01-16
*/
@Component
public class Producer {
private static final String EXCHANGE_NAME = "my_exchange";
@Autowired
private AmqpTemplate rabbitTemplate;
public void send1() {
String context = "hi, queue 1 message";
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend(EXCHANGE_NAME, "topic.1.message", context);
}
public void send2() {
String context = "hi, queue 2 message";
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend(EXCHANGE_NAME, "topic.2.messages", context);
}
}
4.测试
package cn.lovingliu.rabbitmq_topic.test;
import cn.lovingliu.rabbitmq_topic.producer.Producer;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
/**
* @Author:LovingLiu
* @Description: work 工作队列测试
* @Date:Created in 2020-01-16
*/
@RunWith(SpringRunner.class)
@SpringBootTest
public class Test {
@Autowired
private Producer producer;
@org.junit.Test
public void send1() throws Exception {
producer.send1();
}
@org.junit.Test
public void send2() throws Exception {
producer.send2();
}
}
运行结果
4.Fanout Exchange(订阅模式)
Fanout
就是我们熟悉的广播模式或者订阅模式,给Fanout
交换机发送消息,绑定了这个交换机的所有队列都收到这个消息。
1.配置队列,绑定交换机
package cn.lovingliu.rabbitmq_fanout.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Author:LovingLiu
* @Description: 配置队列,绑定交换机
* @Date:Created in 2020-01-16
*/
@Configuration
public class FanoutRabbitConfig {
public final static String QUEUE_NAME_A = "q_rabbit_fanout_a";
public final static String QUEUE_NAME_B = "q_rabbit_fanout_b";
public final static String QUEUE_NAME_C = "q_rabbit_fanout_c";
public final static String EXCHANGE_NAME = "my_fanout_exchange";
@Bean
public Queue aQueue() {
return new Queue(QUEUE_NAME_A);
}
@Bean
public Queue bQueue() {
return new Queue(QUEUE_NAME_B);
}
@Bean
public Queue cQueue() {
return new Queue(QUEUE_NAME_C);
}
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange(EXCHANGE_NAME);
}
@Bean
Binding bindingExchangeA(Queue aQueue, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(aQueue).to(fanoutExchange);
}
@Bean
Binding bindingExchangeB(Queue bQueue, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(bQueue).to(fanoutExchange);
}
@Bean
Binding bindingExchangeC(Queue cQueue, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(cQueue).to(fanoutExchange);
}
}
2.创建3个消费者
cn.lovingliu.rabbitmq_fanout.consumer.ReceiverFanoutA
package cn.lovingliu.rabbitmq_fanout.consumer;
import cn.lovingliu.rabbitmq_fanout.config.FanoutRabbitConfig;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @Author:LovingLiu
* @Description: 消费者A
* @Date:Created in 2020-01-17
*/
@Component
@RabbitListener(queues = FanoutRabbitConfig.QUEUE_NAME_A)
public class ReceiverFanoutA {
@RabbitHandler
public void process(String hello) {
System.out.println("AReceiver : " + hello + "/n");
}
}
cn.lovingliu.rabbitmq_fanout.consumer.ReceiverFanoutB
package cn.lovingliu.rabbitmq_fanout.consumer;
import cn.lovingliu.rabbitmq_fanout.config.FanoutRabbitConfig;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @Author:LovingLiu
* @Description: 消费者B
* @Date:Created in 2020-01-17
*/
@Component
@RabbitListener(queues = FanoutRabbitConfig.QUEUE_NAME_B)
public class ReceiverFanoutB {
@RabbitHandler
public void process(String hello) {
System.out.println("BReceiver : " + hello + "/n");
}
}
cn.lovingliu.rabbitmq_fanout.consumer.ReceiverFanoutC
package cn.lovingliu.rabbitmq_fanout.consumer;
import cn.lovingliu.rabbitmq_fanout.config.FanoutRabbitConfig;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @Author:LovingLiu
* @Description: 消费者C
* @Date:Created in 2020-01-17
*/
@Component
@RabbitListener(queues = FanoutRabbitConfig.QUEUE_NAME_C)
public class ReceiverFanoutC {
@RabbitHandler
public void process(String hello) {
System.out.println("CReceiver : " + hello + "/n");
}
}
3.生产者
cn.lovingliu.rabbitmq_fanout.producer.Producer
package cn.lovingliu.rabbitmq_fanout.producer;
import cn.lovingliu.rabbitmq_fanout.config.FanoutRabbitConfig;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @Author:LovingLiu
* @Description:
* @Date:Created in 2020-01-17
*/
@Component
public class Producer {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send() {
String context = "hi, fanout msg ";
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend(FanoutRabbitConfig.EXCHANGE_NAME,"", context);
}
}
4.测试
package cn.lovingliu.rabbitmq_fanout.test;
import cn.lovingliu.rabbitmq_fanout.producer.Producer;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
/**
* @Author:LovingLiu
* @Description: 测试
* @Date:Created in 2020-01-17
*/
@RunWith(SpringRunner.class)
@SpringBootTest
public class Test {
@Autowired
private Producer producer;
@org.junit.Test
public void send1() throws Exception {
producer.send();
}
}
运行截图
消费者
网友评论