1. JMS && AMQP对比

2 RabbitMQ

direct 点对点
fanout、topic、headers 发布订阅


2.1 Exchange类型

消息包含的路由键指向哪个队列就给到哪个队列

广播给所有队列

根据路由键配合匹配规则
3 rabbit MQ的Java代码
官网:http://www.rabbitmq.com/tutorials/tutorial-one-java.html
导入maven
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
3.1 hello模式
就是直接发送给队列,直接从队列接收消息,不经过Exchange
3.1.1 发送消息
package com.zl.hello;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.zl.ConnectUtil;
public class Sender {
//队列的名字
private final static String QUEUE = "testhello";
public static void main(String[] args) throws Exception{
//获取连接
Connection connect = ConnectUtil.getConnect();
//创建通道
Channel channel = connect.createChannel();
//申明队列,如果队列不存在,则创建队列,如果队列存在,则什么都不做,只是把channel和队列绑定
//参数1:队列的名字
//参数2:是否持久化队列,队列是默认在内存中的,rabbitmq重启会丢失,如果设置为true,则会保存到erlang自带的数据库中,重启后会重新读取
//参数3:是否排外,作用一:关闭连接是否会自动删除队列,作用二:是否私有化队列,如果私有了,其他通道不可以访问当前队列,适用于一个队列只适合一个消费者
//参数4:是否自动删除
//参数5:一些其他的参数
channel.queueDeclare(QUEUE,false,false,false,null);
//发送数据
//参数1:交换机名,这里hello模式直接发到队列上,不需要交换机
//参数2:队列名
//参数3:属性(待查)
//参数4:消息内容
channel.basicPublish("",QUEUE,null,"发送的消息".getBytes());
//关闭连接
channel.close();
connect.close();
}
}
3.1.2 接收消息
package com.zl.hello;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DeliverCallback;
import com.zl.ConnectUtil;
public class Receiver {
//队列的名字
private final static String QUEUE = "testhello";
public static void main(String[] args) throws Exception {
//获取连接
Connection connect = ConnectUtil.getConnect();
//创建通道
Channel channel = connect.createChannel();
//申明队列,如果队列不存在,则创建队列,如果队列存在,则什么都不做,只是把channel和队列绑定
//参数1:队列的名字
//参数2:是否持久化队列,队列是默认在内存中的,rabbitmq重启会丢失,如果设置为true,则会保存到erlang自带的数据库中,重启后会重新读取
//参数3:是否排外,作用一:关闭连接是否会自动删除队列,作用二:是否私有化队列,如果私有了,其他通道不可以访问当前队列,适用于一个队列只适合一个消费者
//参数4:是否自动删除
//参数5:一些其他的参数
channel.queueDeclare(QUEUE,false,false,false,null);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
//接收消息
//true:自动应答,应答了,表示消息被消费了
channel.basicConsume(QUEUE,true,deliverCallback, consumerTag -> { });
}
}
3.1.3 运行结果
3.2 work模式
还是直接发送到队列里面,当一个队列有多个消费者订阅时,消息会发送给多个消费者,但是一个消息只能给一个消费者,所以结果就是一个消费者收到一部分消息,另一个消费者收到另一部分消息
3.2.1 发送者
发送100条
package com.zl.work;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.zl.ConnectUtil;
public class Sender {
//队列的名字
private final static String QUEUE = "testwork";
public static void main(String[] args) throws Exception{
//获取连接
Connection connect = ConnectUtil.getConnect();
//创建通道
Channel channel = connect.createChannel();
//申明队列,如果队列不存在,则创建队列,如果队列存在,则什么都不做,只是把channel和队列绑定
//参数1:队列的名字
//参数2:是否持久化队列,队列是默认在内存中的,rabbitmq重启会丢失,如果设置为true,则会保存到erlang自带的数据库中,重启后会重新读取
//参数3:是否排外,作用一:关闭连接是否会自动删除队列,作用二:是否私有化队列,如果私有了,其他通道不可以访问当前队列,适用于一个队列只适合一个消费者
//参数4:是否自动删除
//参数5:一些其他的参数
channel.queueDeclare(QUEUE,false,false,false,null);
//发送数据
//参数1:交换机名,这里hello模式直接发到队列上,不需要交换机
//参数2:队列名
//参数3:属性(待查)
//参数4:消息内容
for (int i = 0; i < 100; i++) {
channel.basicPublish("",QUEUE,null,("发送的消息"+i).getBytes());
}
//关闭连接
channel.close();
connect.close();
}
}
3.2.2 消费者
消费者1:
package com.zl.work;
import com.rabbitmq.client.*;
import com.zl.ConnectUtil;
import java.io.IOException;
public class Receiver1 {
//队列的名字
private final static String QUEUE = "testwork";
public static void main(String[] args) throws Exception {
//获取连接
Connection connect = ConnectUtil.getConnect();
//创建通道
Channel channel = connect.createChannel();
//申明队列,如果队列不存在,则创建队列,如果队列存在,则什么都不做,只是把channel和队列绑定
//参数1:队列的名字
//参数2:是否持久化队列,队列是默认在内存中的,rabbitmq重启会丢失,如果设置为true,则会保存到erlang自带的数据库中,重启后会重新读取
//参数3:是否排外,作用一:关闭连接是否会自动删除队列,作用二:是否私有化队列,如果私有了,其他通道不可以访问当前队列,适用于一个队列只适合一个消费者
//参数4:是否自动删除
//参数5:一些其他的参数
channel.queueDeclare(QUEUE,false,false,false,null);
DefaultConsumer consumer = new DefaultConsumer(channel) {
//收到消息的回调
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1收到的消息:"+new String(body));
//确认应答
//参数2:false:表示确认收到消息,true:拒绝收到消息
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
//接收消息
//false:手动确认,代表收到消息需要手动告诉服务器收到信息了
channel.basicConsume(QUEUE,false,consumer);
}
}
消费者2
package com.zl.work;
import com.rabbitmq.client.*;
import com.zl.ConnectUtil;
import java.io.IOException;
public class Receiver2 {
//队列的名字
private final static String QUEUE = "testwork";
public static void main(String[] args) throws Exception {
//获取连接
Connection connect = ConnectUtil.getConnect();
//创建通道
Channel channel = connect.createChannel();
//申明队列,如果队列不存在,则创建队列,如果队列存在,则什么都不做,只是把channel和队列绑定
//参数1:队列的名字
//参数2:是否持久化队列,队列是默认在内存中的,rabbitmq重启会丢失,如果设置为true,则会保存到erlang自带的数据库中,重启后会重新读取
//参数3:是否排外,作用一:关闭连接是否会自动删除队列,作用二:是否私有化队列,如果私有了,其他通道不可以访问当前队列,适用于一个队列只适合一个消费者
//参数4:是否自动删除
//参数5:一些其他的参数
channel.queueDeclare(QUEUE,false,false,false,null);
DefaultConsumer consumer = new DefaultConsumer(channel) {
//收到消息的回调
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2收到的消息:"+new String(body));
//确认应答
//参数2:false:表示确认收到消息,true:拒绝收到消息
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
//接收消息
//false:手动确认,代表收到消息需要手动告诉服务器收到信息了
channel.basicConsume(QUEUE,false,consumer);
}
}
3.2.3运行结果


这里分配给两个消费者的消息是一样多的,但是如果这两个消费者消费的速度不一样呢?一个消费的快,一个消费的慢,他两个分配到的消息数量还是一样的吗?
是的,不管你有没有处理完消息,它都会把消息分配给你,你自己在那里慢慢处理
如果想要实现消费快的处理多一点数据,消费慢的处理少一点数据,怎么实现呢?
只需要在消费者代码中添加一句:
channel.basicQos(1);
告诉服务器,在没有确认当前消息完成之前,不要发送过来新的消息
这样就实现了按照消费能力分配数据量
3.3 publish模式/发布订阅模式/广播模式
发送消息到交换机,多个队列绑定到这个交换机,绑定的队列都会收到消息,绑定对应队列的消费者就收到了消息
3.3.1 消费者1
package com.zl.publish;
import com.rabbitmq.client.*;
import com.zl.ConnectUtil;
import java.io.IOException;
public class Receiver1 {
//交换机的名字
private static final String EXCHANGE_NAME = "testexchange";
private static final String QUEUE = "testqueue1";
public static void main(String[] args) throws Exception {
Connection connect = ConnectUtil.getConnect();
Channel channel = connect.createChannel();
channel.queueDeclare(QUEUE,false,false,false,null);
//绑定队列到交换机
//参数:队列名,交换机名,路由键
channel.queueBind(QUEUE,EXCHANGE_NAME,"");
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1收到的消息:"+new String(body));
//确认应答
//参数2:false:表示确认收到消息,true:拒绝收到消息
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
channel.basicConsume(QUEUE,false,consumer);
}
}
3.3.2 消费者2
package com.zl.publish;
import com.rabbitmq.client.*;
import com.zl.ConnectUtil;
import java.io.IOException;
public class Receiver2 {
//交换机的名字
private static final String EXCHANGE_NAME = "testexchange";
private static final String QUEUE = "testqueue2";
public static void main(String[] args) throws Exception {
Connection connect = ConnectUtil.getConnect();
Channel channel = connect.createChannel();
channel.queueDeclare(QUEUE,false,false,false,null);
//绑定队列到交换机
//参数:队列名,交换机名,路由键
channel.queueBind(QUEUE,EXCHANGE_NAME,"");
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2收到的消息:"+new String(body));
//确认应答
//参数2:false:表示确认收到消息,true:拒绝收到消息
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
channel.basicConsume(QUEUE,false,consumer);
}
}
3.3.3 发送者
package com.zl.publish;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.zl.ConnectUtil;
public class Sender {
//交换机的名字
private static final String EXCHANGE_NAME = "testexchange";
public static void main(String[] args) throws Exception {
Connection connect = ConnectUtil.getConnect();
Channel channel = connect.createChannel();
//申明交换机,类型是fanout(fanout是发布订阅模式,也就是广播)
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
//如果发布消息时,没有消费者,数据会丢失
//参数1:交换机的名字
//参数2:路由键,不需要
//参数3:附加的属性
//参数4:消息
channel.basicPublish(EXCHANGE_NAME,"",null,"发布订阅模式的消息".getBytes());
channel.close();
connect.close();
}
}
3.3.4 运行结果
都收到了
image.png
image.png
3.4 routing模式
发送消息时,指定路由键,队列绑定到交换机时也指定路由键,只有路由键匹配了才会收到消息(一个消费者可以指定多个路由键)
3.4.1 发送者
package com.zl.routing;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.zl.ConnectUtil;
public class Sender {
//交换机的名字
private static final String EXCHANGE_NAME = "testrouting";
public static void main(String[] args) throws Exception {
Connection connect = ConnectUtil.getConnect();
Channel channel = connect.createChannel();
//路由格式的交换机
channel.exchangeDeclare(EXCHANGE_NAME,"direct");
channel.basicPublish(EXCHANGE_NAME,"key1",null,"routing模式的消息".getBytes());
channel.close();
connect.close();
}
}
3.4.2 消费者1
package com.zl.routing;
import com.rabbitmq.client.*;
import com.zl.ConnectUtil;
import java.io.IOException;
public class Receiver1 {
//交换机的名字
private static final String EXCHANGE_NAME = "testrouting";
private static final String QUEUE = "testroutqueue1";
public static void main(String[] args) throws Exception {
Connection connect = ConnectUtil.getConnect();
Channel channel = connect.createChannel();
channel.queueDeclare(QUEUE,false,false,false,null);
//绑定队列到交换机
//参数:队列名,交换机名,路由键
//只有对应了key1,才会收到消息
channel.queueBind(QUEUE,EXCHANGE_NAME,"key1");
//可以绑定多个路由键
channel.queueBind(QUEUE,EXCHANGE_NAME,"key2");
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1收到的消息:"+new String(body));
//确认应答
//参数2:false:表示确认收到消息,true:拒绝收到消息
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
channel.basicConsume(QUEUE,false,consumer);
}
}
3.4.3 消费者2
package com.zl.routing;
import com.rabbitmq.client.*;
import com.zl.ConnectUtil;
import java.io.IOException;
public class Receiver2 {
//交换机的名字
private static final String EXCHANGE_NAME = "testrouting";
private static final String QUEUE = "testroutqueue1";
public static void main(String[] args) throws Exception {
Connection connect = ConnectUtil.getConnect();
Channel channel = connect.createChannel();
channel.queueDeclare(QUEUE,false,false,false,null);
//绑定队列到交换机
//参数:队列名,交换机名,路由键
//只有对应了key1,才会收到消息
channel.queueBind(QUEUE,EXCHANGE_NAME,"key1");
//可以绑定多个路由键
channel.queueBind(QUEUE,EXCHANGE_NAME,"key3");
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1收到的消息:"+new String(body));
//确认应答
//参数2:false:表示确认收到消息,true:拒绝收到消息
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
channel.basicConsume(QUEUE,false,consumer);
}
}
运行结果不看了
3.5 topic模式
和routing模式差不多,只是路由键不是精确匹配,而是用通配符
# 匹配一个或多个单词,* 匹配一个单词
3.5.1 生产者
package com.zl.topic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.zl.ConnectUtil;
public class Sender {
//交换机的名字
private static final String EXCHANGE_NAME = "testtopic";
public static void main(String[] args) throws Exception {
Connection connect = ConnectUtil.getConnect();
Channel channel = connect.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME,"topic");
channel.basicPublish(EXCHANGE_NAME,"key.aaa.bbb",null,"topic模式的key.aaa.bbb消息".getBytes());
channel.basicPublish(EXCHANGE_NAME,"key.ccc",null,"topic模式的key.ccc消息".getBytes());
channel.close();
connect.close();
}
}
3.5.2 消费者1
package com.zl.topic;
import com.rabbitmq.client.*;
import com.zl.ConnectUtil;
import java.io.IOException;
public class Receiver1 {
//交换机的名字
private static final String EXCHANGE_NAME = "testtopic";
private static final String QUEUE = "testtopicqueue1";
public static void main(String[] args) throws Exception {
Connection connect = ConnectUtil.getConnect();
Channel channel = connect.createChannel();
channel.queueDeclare(QUEUE,false,false,false,null);
channel.queueBind(QUEUE,EXCHANGE_NAME,"key.#");
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("当前消费者的路由键是key.#,收到的消息:"+new String(body));
//确认应答
//参数2:false:表示确认收到消息,true:拒绝收到消息
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
channel.basicConsume(QUEUE,false,consumer);
}
}
3.5.3 消费者2
package com.zl.topic;
import com.rabbitmq.client.*;
import com.zl.ConnectUtil;
import java.io.IOException;
public class Receiver2 {
//交换机的名字
private static final String EXCHANGE_NAME = "testtopic";
private static final String QUEUE = "testtopicqueue2";
public static void main(String[] args) throws Exception {
Connection connect = ConnectUtil.getConnect();
Channel channel = connect.createChannel();
channel.queueDeclare(QUEUE,false,false,false,null);
channel.queueBind(QUEUE,EXCHANGE_NAME,"key.*");
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("当前消费者的路由键是key.*,收到的消息:"+new String(body));
//确认应答
//参数2:false:表示确认收到消息,true:拒绝收到消息
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
channel.basicConsume(QUEUE,false,consumer);
}
}
3.5.4 运行结果
# 和 * 的区别就是匹配一个还是多个单词
image.png
image.png
3.6 持久化消息
首先搞清楚几个问题,消息是存在队列里面的,如果一个交换机没有绑定的队列,那么往这个交换机里面发送消息会丢失,当绑定了队列,那么消息在队列里面,这样即使没有消费者,这个队列里面的数据还在,但是如果服务重启了,这个队列里面的数据也就丢失了,那么想要服务重启,队列里面的数据还在的话,可以做如下操作
3.6.1 生产者
//看这里的第二个参数
channel.exchangeDeclare(EXCHANGE_NAME,"direct",true,false,null);
//看这里的第三个参数
channel.basicPublish(EXCHANGE_NAME,"abc",MessageProperties.PERSISTENT_TEXT_PLAIN,"持久化的消息".getBytes());
3.6.2 消费者
package com.zl.persisit;
import com.rabbitmq.client.*;
import com.zl.ConnectUtil;
import java.io.IOException;
public class Receiver {
private static final String EXCHANGE_NAME = "testpersisit";
private static final String QUEUE = "testpersisitqueue";
public static void main(String[] args) throws Exception {
Connection connect = ConnectUtil.getConnect();
Channel channel = connect.createChannel();
//这一这里的第二个参数true
channel.queueDeclare(QUEUE,true,false,false,null);
channel.queueBind(QUEUE,EXCHANGE_NAME,"abc");
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者收到的消息:"+new String(body));
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
channel.basicConsume(QUEUE,false,consumer);
}
}
3.6.3 结果
生产者生产一个消息以后,重启rabbitmq,然后再启动消费者,以后可以获取到之前生产者生产的消息
4 Spring boot使用rabbitmq
4.1 导包
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

4.2 配置
spring.rabbitmq.host=118.24.44.169
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
4.3 使用
发送时,直接发给交换机Exchange,具体交换机将数据给绑定的哪个队列,由交换机自己判断
/**
* 1、单播(点对点)
*/
@Test
public void contextLoads() {
//Message需要自己构造一个;定义消息体内容和消息头
//rabbitTemplate.send(exchage,routeKey,message);
//object默认当成消息体,只需要传入要发送的对象,自动序列化发送给rabbitmq;
//rabbitTemplate.convertAndSend(exchage,routeKey,object);
Map<String,Object> map = new HashMap<>();
map.put("msg","这是第一个消息");
map.put("data", Arrays.asList("helloworld",123,true));
rabbitTemplate.convertAndSend("exchange.direct","atguigu.news",map);
//对象被默认序列化以后发送出去
rabbitTemplate.convertAndSend("exchange.direct","atguigu.news",new Book("西游记","吴承恩"));
}
/**
* 2.广播,广播不需要路由键
*/
@Test
public void sendMsg(){
rabbitTemplate.convertAndSend("exchange.fanout","",new Book("红楼梦","曹雪芹"));
}
接收,订阅的是队列
@Test
public void receive(){
Object o = rabbitTemplate.receiveAndConvert("atguigu.news");
System.out.println(o.getClass());
System.out.println(o);
}
3.4 存json数据
默认存储数据是序列化后的数据,如果想将存储的内容改成json内容,需要更改MessageConverter
@Configuration
public class MyAMQPConfig {
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}
3.5 @RabbitListener和@EnableRabbit
- 在运行主方法上加上
@EnableRabbit //开启基于注解的RabbitMQ模式
@SpringBootApplication
public class Springboot02AmqpApplication {
public static void main(String[] args) {
SpringApplication.run(Springboot02AmqpApplication.class, args);
}
}
- 接收数据
@RabbitListener(queues = "atguigu.news")
public void receive(Book book){
System.out.println("收到消息:"+book);
}
@RabbitListener(queues = "atguigu")
public void receive02(Message message){
System.out.println(message.getBody());
System.out.println(message.getMessageProperties());
}
发送数据还是像上文那么发
3.6 AmqpAdmin 创建和删除Queue、Exchange、Binding
不管是发送还是接收都需要rabbitmq里面有对应和Queue、Exchange、Binding,可以手动在rabbitmq的管理页面创建,也可以在代码中创建
@Autowired
AmqpAdmin amqpAdmin;
@Test
public void createExchange(){
amqpAdmin.declareExchange(new DirectExchange("amqpadmin.exchange"));
amqpAdmin.declareQueue(new Queue("amqpadmin.queue",true));
amqpAdmin.declareBinding(new Binding("amqpadmin.queue", Binding.DestinationType.QUEUE,"amqpadmin.exchange","amqp.routingkey",null));
System.out.println("创建完成");
网友评论