RabbitMQ消息队列
消息是指在应用间传送的数据,消息可以非常简单
消息队列是一种应用间的通信方式,消息发送后可以立即返回,由消息系统来确保消息的可靠传递。消息队列是一种异步协作机制。应用场景:订单系统,用户点击按钮后,扣减库存,生成响应数据,发送红包,发送短息通知。主要的应用场景是:最终的一致性,广播,削锋流控等。
RabbitMQ特点
RabbitMQ起源于金融系统,用于分布式系统中的存储转发消息,在易用性,扩展性,高可用性等方面表现不俗。
可靠性
RabbitMQ使用一些机制保证可靠性:如持久化,传输确认,发布确认。
灵活路由:通过交换机来路由消息。
消息集群:多个服务器可以组成一个集群
高可用:队列可以在集群的机器上进行镜像,使得在部分节点出现问题的情况下队列任然可以使用
多种协议:支持STOMP,MQTT协议等
多语言客户端,管理界面,跟踪机制
Kafka虽然效率高,但是数据不安全
AMQP协议机制
消费者订阅某个队列,生产者创建消息,然后发送到队列,最后发送监听者。
[图片上传失败...(image-4977eb-1605963403617)]
Message:消息,消息不是具体的,它由消息头,消息体,消息体是不透明的,消息头是一系列可选属性组成。
消息的生产者:是一个向交换器发布消息的客户端应用
交换器:用来接收生产者发送消息并将消息路由给队列
绑定:用于消息队列和交换器之间的关联
消息队列:用来保存消息直到发送给消费者
信道:多路复用连接中的一条独立的双向数据流通道。
消费者:表示消费队列取得消息客户端
交换机类型:
Direct交换机 一对一的数据绑定
fanout交换:是广播模式:消息是一对多的:用户接不接受到消息无所谓应用场景 手机APP推送
topic交换器通过模式匹配消息路由键属性,将路由和某个模式进行匹配
消息简单的接收和发送
加入依赖架包
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.1.1</version>
</dependency>
</dependencies>
消息发送数据
package com.rabbit.test;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Send {
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接工厂
ConnectionFactory factory=
new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection=null;
Channel channel=null;
try {
connection=factory.newConnection();
channel=connection.createChannel();
// 声明通道
channel.queueDeclare("myQueue",true,false,false,null);
String message="我的RabbitMQ消息";
// 发送消息到MQ
// 参数一交换机名称 空字符串表示不使用交换机
// 参数二为队列名
// 参数三 消息属性信息
// 参数四 为具体消息数据字节数组
channel.basicPublish("","myQueue",null,message.getBytes("utf-8"));
System.out.println("发送成功");
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}finally {
if (channel!=null){
channel.close();
}
if (connection!=null){
connection.close();
}
}
}
}
消息接收
package com.rabbit.test;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Receive {
public static void main(String[] args)throws IOException, TimeoutException {
// 创建连接工厂
ConnectionFactory factory=
new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection=null;
Channel channel=null;
try {
/**
* 接收消息
* 参数 1 为当前消费者需要监听的队列名 ,队列名必须要与发送时的队列名完全一致否则接收不到消息
* 参数 2 为消息是否自动确认,true表示自动确认接收完消息以后会自动将消息从队列中移除
* 参数 3 为消息接收者的标签,用于当多个消费者同时监听一个队列时用于确认不通消费者,通常为空字符串即可
* 参数 4 为消息接收的回调方法这个方法中具体完成对消息的处理代码
* 注意:使用了basicConsume方法以后,会启动一个线程在持续的监听队列,如果队列中有信息的数据进入则会自动接收消息
* 因此不能关闭连接和通道对象
*/
connection=factory.newConnection();
channel=connection.createChannel();
channel.basicConsume("myQueue",true,"",new DefaultConsumer(channel){
//消息的具体接收和处理方法
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message=new String (body,"utf-8");
System.out.println("消费者-- "+message);
}
});
//不能关闭通道和链接,如果一旦关闭可能会造成接收时抛出异常,或无法接收到消息
// channel.close();
// connection.close();
} catch (IOException e) {
e.printStackTrace();
} finally {
}
}
}
RabbitMQ各种交换机的发送与接收
消息发送
package com.rabbit.exchage.direct;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class DirectSend {
public static void main(String[] args) {
ConnectionFactory factory=
new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection=null;
Channel channel=null;
try {
connection=factory.newConnection();
channel=connection.createChannel();
channel.queueDeclare("myDirectQueue",true,false,false,null);
/**
* 声明一个交换机
* 参数 1 为交换机的名称取值任意
* 参数 2 为交换机的类型 取值为 direct、fanout、topic、headers
* 参数 3 为是否为持久化交换机
* 注意:
* 1、声明交换机时如果这个交换机应存在则会放弃声明,如果交换机不存在则声明交换机
* 2、这个代码是可有可无的但是在使用前必须要确保这个交换机被声明
*/
channel.exchangeDeclare("directExchange","direct",true);
channel.queueBind("myDirectQueue","directExchange","directRoutingKey");
String message="direct的测试消息!";
/**
* 发送消息到指定的队列
* 参数 1 为交换机名称
* 参数 2 为消息的RoutingKey 如果这个消息的RoutingKey和某个队列与交换机绑定的RoutingKey一致那么
* 这个消息就会发送的指定的队列中
* 注意:
* 1、发送消息时必须确保交换机已经创建并且确保已经正确的绑定到了某个队列
*/
channel.basicPublish("directExchange","directRoutingKey",null,message.getBytes("utf-8"));
System.out.println("消息发送成功");
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} finally {
if(channel!=null){
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
if(connection!=null){
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
消息接收
package com.rabbit.exchage.direct;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class DirectRevice {
public static void main(String[] args) {
// 创建连接工厂
ConnectionFactory factory=
new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection=null;
Channel channel=null;
try {
connection=factory.newConnection();
channel=connection.createChannel();
channel.queueDeclare("myDirectQueue",true,false,false,null);
channel.exchangeDeclare("directExchage","direct",true);
channel.queueBind("myDirectQueue","directExchage","directRoutingKey");
channel.basicConsume("myDirectQueue",true,"",new DefaultConsumer(channel){
//获取某个队列的数据
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message=new String(body);
System.out.println("消费者===>"+message);
}
});
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
fanout交换机发送和接收的消息(一对多的关系)
发送消息
package com.rabbit.fanout;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class FanoutSend {
public static void main(String[] args) {
ConnectionFactory factory=
new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection=null;
Channel channel=null;
try {
connection=factory.newConnection();
channel=connection.createChannel();
/**
* 由于使用Fanout类型的交换机,因此消息的接收方肯能会有多个因此不建议在消息发送时来创建队列
* 以及绑定交换机,建议在消费者中创建队列并绑定交换机
* 但是发送消息时至少应该确保交换机时存在
*/
channel.exchangeDeclare("fanoutExchange","fanout",true);
// channel.queueDeclare("fanoutQueue",true,false,false,null);
// channel.queueBind("fanoutQueue","fanoutExchange","");
String message="fanout的测试消息!";
/**
* 发送消息到指定的队列
* 参数 1 为交换机名称
* 参数 2 为消息的RoutingKey 如果这个消息的RoutingKey和某个队列与交换机绑定的RoutingKey一致那么
* 这个消息就会发送的指定的队列中
* 注意:
* 1、发送消息时必须确保交换机已经创建并且确保已经正确的绑定到了某个队列
*/
channel.basicPublish("fanoutExchange","",null,message.getBytes("utf-8"));
System.out.println("消息发送成功");
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} finally {
if(channel!=null){
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
if(connection!=null){
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
接收消息
package com.rabbit.fanout;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Receive01 {
public static void main(String[] args) {
ConnectionFactory factory=
new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection=null;
Channel channel=null;
try {
connection=factory.newConnection();
channel=connection.createChannel();
/**
* 由于Fanout类型的交换机的消息时类似于广播的模式,它不需要绑定RoutingKey
* 而又可能会有很多个消费来接收这个交换机中的数据,因此我们创建队列时要创建
* 一个随机的队列名称
*
* 没有参数的queueDeclare方法会创建一个名字为随机的一个队列
* 这个队列的数据时非持久化
* 是排外的(同时最多只允许有一个消费者监听当前队列)
* 自动删除的 当没有任何消费者监听队列时这个队列会自动删除
*
* getQueue() 方法用于获取这个随机的队列名
*/
String queueName= channel.queueDeclare().getQueue();
channel.exchangeDeclare("fanoutExchange","fanout",true);
//将这个随机的队列绑定到交换机中, 由于是fanout类型的交换机因此不需指定RoutingKey进行绑定
channel.queueBind(queueName,"fanoutExchange","");
/**
* 监听某个队列并获取队列中的数据
* 注意:
* 当前被讲定的队列必须已经存在并正确的绑定到了某个交换机中
*/
channel.basicConsume(queueName,true,"",new DefaultConsumer(channel){
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message=new String(body);
System.out.println("Receive01消费者 ---"+message);
}
});
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
Topic消息类型的交换机的发送和接收
消息的发送
package com.rabbit.exchage.topic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class TopicSend {
public static void main(String[] args) {
ConnectionFactory factory=
new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection=null;
Channel channel=null;
try {
connection=factory.newConnection();
channel=connection.createChannel();
/**
* 由于使用Fanout类型的交换机,因此消息的接收方肯能会有多个因此不建议在消息发送时来创建队列
* 以及绑定交换机,建议在消费者中创建队列并绑定交换机
* 但是发送消息时至少应该确保交换机时存在
*/
channel.exchangeDeclare("topicExchange","topic",true);
//
String message="topic的测试消息!";
/**
* 发送消息到指定的队列
* 参数 1 为交换机名称
* 参数 2 为消息的RoutingKey 如果这个消息的RoutingKey和某个队列与交换机绑定的RoutingKey一致那么
* 这个消息就会发送的指定的队列中
* 注意:
* 1、发送消息时必须确保交换机已经创建并且确保已经正确的绑定到了某个队列
* # 多个队列
* * 一个
*/
channel.basicPublish("topicExchange","aa.bb",null,message.getBytes("utf-8"));
System.out.println("消息发送成功");
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} finally {
if(channel!=null){
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
if(connection!=null){
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
消息的接收
package com.rabbit.exchage.topic;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Receive01 {
public static void main(String[] args) {
ConnectionFactory factory=
new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection=null;
Channel channel=null;
try {
connection=factory.newConnection();
channel=connection.createChannel();
/**
* Topic 类型的交换机也是消息一对多的一种交换机类型,它和fanout都能实现一个消息同时发送给多个队列
* fanout更适合于使用在一个功能不同的进程来获取数据,例如手机App中的消息推送,一个App可能会还有很
* 多个用户来进行安装然后他们都会启动一个随机的队列来接收着自己的数据
* Topic更适合不同的功能模块来接收同一个消息,例如商城下单成功以后需要发送消息到队列中。例如RoutingKey
* 为 的order.success,物流系统监听订单order.* 发票系统监听order.*
*
* Topic可以使用随机的队列名也可以使用一个明确的队列名,但是如果应用在和订单有关的功能中,建议是有个
* 名取的队列名并且要求为持久化的队列
*/
channel.queueDeclare("topicQueue01",true,false,false,null);
channel.exchangeDeclare("topicExchange","topic",true);
channel.queueBind("topicQueue01","topicExchange","aa");
channel.basicConsume("topicQueue01",true,"",new DefaultConsumer(channel){
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message=new String(body);
System.out.println("Receive01消费者aa ---"+message);
}
});
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
*符号的接收
package com.rabbit.exchage.topic;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Receive02 {
public static void main(String[] args) {
ConnectionFactory factory=
new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection=null;
Channel channel=null;
try {
connection=factory.newConnection();
channel=connection.createChannel();
channel.queueDeclare("topicQueue02",true,false,false,null);
channel.exchangeDeclare("topicExchange","topic",true);
channel.queueBind("topicQueue02","topicExchange","aa.*");
channel.basicConsume("topicQueue02",true,"",new DefaultConsumer(channel){
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message=new String(body);
System.out.println("Receive02消费者aa.* ---"+message);
}
});
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
#符号的接收
package com.rabbit.exchage.topic;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Receive03 {
public static void main(String[] args) {
ConnectionFactory factory=
new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection=null;
Channel channel=null;
try {
connection=factory.newConnection();
channel=connection.createChannel();
channel.queueDeclare("topicQueue03",true,false,false,null);
channel.exchangeDeclare("topicExchange","topic",true);
channel.queueBind("topicQueue03","topicExchange","aa.#");
channel.basicConsume("topicQueue03",true,"",new DefaultConsumer(channel){
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message=new String(body);
System.out.println("Receive03消费者aa.# ---"+message);
}
});
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
fanout交换机和topic交换机使用场景对比
Topic 类型的交换机也是消息一对多的一种交换机类型,它和fonout都能实现一个消息发送多个队列
fonout更适合使用在一个功能不同的进程来获取数据,例如手机APP中的消息推送,一个APP可能有很多个用户进行安装然后他们启动一个随机的队列来接收自己的数据
Topic更适合不同的功能模块来接收同一个消息,例如商城下单成功后需要发送消息到队列中
Topic可以使用随机的队列名,也可以使用一个明确的队列名,但是如果应用在和订单有关的功能中
事务消息
事务消息与数据库事务类似,只有MQ中的消息是要保证消息是否会全部发送成功,防止丢失消息的方案
RabbitMQ有两种解决方案:
通过AMQP提供的事务机制来实现
使用发送者确认模式实现。
发送确认模式
普通确认模式
等待服务返回响应 ,用于是否消费发送成功,如果服务确认消费已经发送完成则返回true 否则返回false,可以为这个方法指定一个毫秒用于确定我们的需要等待服务确认的超时时间,
,果超过了指定的时间以后则会抛出异常InterruptedException 表示服务器出现问题了需要补发消息或将消息缓存到Redis中稍后利用定时任务补发,论是返回false还是抛出异常消息都有可能发送成功有可能没有发送成功,果我们要求这个消息一定要发送到队列例如订单数据,那怎么我们可以采用消息补发,谓补发就是重新发送一次消息,可以使用递归或利用Redis+定时任务来完成补发
package com.rabbit.exchage.confirm;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 普通确认模式
*/
public class CommonConfirmSend {
public static void main(String[] args) {
ConnectionFactory factory=
new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection=null;
Channel channel=null;
try {
connection=factory.newConnection();
channel=connection.createChannel();
channel.queueDeclare("confirmQueue",true,false,false,null);
channel.exchangeDeclare("directConfirmExchange","direct",true);
channel.queueBind("confirmQueue","directConfirmExchange","confirmRoutingKey");
String message="普通发送者确认模式测试消息!";
//启动发送者确认模式
channel.confirmSelect();
channel.basicPublish("directConfirmExchange","confirmRoutingKey",null,message.getBytes("utf-8"));
boolean flag= channel.waitForConfirms();
System.out.println("消息发送成功"+flag);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if(channel!=null){
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
if(connection!=null){
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
批量确认模式发送
waitForConfirmsOrDie 批量消息确认,它会同时向服务中确认之前当前通道中发送的所有的消息是否已经全部成功写入 这个方法没有任何的返回值,如果服务器中有一条消息没有能够成功或向服务器发送确认时服务不可访问都被认定为消息确认失败,可能有有消息没有发送成功,我们需要进行消费的补发。 如果无法向服务器获取确认信息那么方法就会抛出InterruptedException异常,这时就需要补发消息到队列waitForConfirmsOrDie方法可以指定一个参数timeout 用于等待服务器的确认时间,如果超过这个时间也会 抛出异常,表示确认失败需要补发消息,注意:批量消息确认的速度比普通的消息确认要快,但是如果一旦出现了消息补发的情况,我们不能确定具体是哪条消息没有完成发送,需要将本次的发送的所有消息全部进行补发
package com.rabbit.exchage.confirm;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 批量确认模式发送
*/
public class MoreConfirmSend {
public static void main(String[] args) {
ConnectionFactory factory=
new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection=null;
Channel channel=null;
try {
connection=factory.newConnection();
channel=connection.createChannel();
channel.queueDeclare("confirmQueue",true,false,false,null);
channel.exchangeDeclare("directConfirmExchange","direct",true);
channel.queueBind("confirmQueue","directConfirmExchange","confirmRoutingKey");
String message="普通发送者确认模式测试消息!";
//启动发送者确认模式
channel.confirmSelect();
channel.basicPublish("directConfirmExchange","confirmRoutingKey",null,message.getBytes("utf-8"));
channel.waitForConfirmsOrDie();
System.out.println("消息发送成功");
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if(channel!=null){
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
if(connection!=null){
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
异步消息确认发送
package com.rabbit.exchage.confirm;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class SynchroniseConfirmSend {
public static void main(String[] args) {
ConnectionFactory factory=
new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection=null;
Channel channel=null;
try {
connection=factory.newConnection();
channel=connection.createChannel();
channel.queueDeclare("confirmQueue",true,false,false,null);
channel.exchangeDeclare("directConfirmExchange","direct",true);
channel.queueBind("confirmQueue","directConfirmExchange","confirmRoutingKey");
String message="普通发送者确认模式测试消息!";
//启动发送者确认模式
channel.confirmSelect();
/**
*异步消息确认监听器,需要在发送消息前启动
*/
channel.addConfirmListener(new ConfirmListener() {
//消息确认以后的回调方法
//参数 1 为被确认的消息的编号 从 1 开始自动递增用于标记当前是第几个消息
//参数 2 为当前消息是否同时确认了多个
//注意:如果参数 2 为true 则表示本次确认同时确认了多条消息,消息等于当前参数1 (消息编号)的所有消息
// 全部被确认 如果为false 则表示只确认多了当前编号的消息
public void handleAck(long l, boolean b) throws IOException {
System.out.println("消息被确认了 --- 消息编号:"+l+" 是否确认了多条:"+b);
}
//消息没有确认的回调方法
//如果这个方法被执行表示当前的消息没有被确认 需要进行消息补发
//参数 1 为没有被确认的消息的编号 从 1 开始自动递增用于标记当前是第几个消息
//参数 2 为当前消息是否同时没有确认多个
//注意: 如果参数2 为true 则表示小于当前编号的所有的消息可能都没有发送成功需要进行消息的补发
// 如果参数2 为false则表示当前编号的消息没法发送成功需要进行补发
public void handleNack(long l, boolean b) throws IOException {
System.out.println("消息没有被确认-----消息编号:"+l+" 是否没有确认多条:"+b);
}
});
for(int i=0;i<100000;i++){
channel.basicPublish("directConfirmExchange","confirmRoutingKey",null,message.getBytes("utf-8"));
}
System.out.println("消息发送成功");
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} finally {
}
}
}
确认模式接收消息
package com.rabbit.exchage.confirm;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Receive1 {
public static void main(String[] args) {
ConnectionFactory factory=
new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection=null;
Channel channel=null;
try {
connection=factory.newConnection();
channel=connection.createChannel();
channel.queueDeclare("confirmQueue",true,false,false,null);
channel.exchangeDeclare("directConfirmExchange","direct",true);
channel.queueBind("confirmQueue","directConfirmExchange","confirmRoutingKey");
//启动事务
channel.txSelect();
/**
* 接收消息
* 参数 2 为消息的确认机制,true表示自动消息确认,确认以后消息会从队列中被移除 ,当读取完消息以后就会自动确认
* 如果为false 表示手动确认消息
* 注意:
* 1、如果我们只是接收的消息但是还没有来得处理,当前应用就崩溃或在进行处理的时候例如像数据库中
* 写数据但是数据库这时不可用,那么由于消息是自动确认的那么这个消息就会在接收完成以后自动从队列中
* 被删除,这就会丢失消息
*/
channel.basicConsume("confirmQueue",false,"",new DefaultConsumer(channel){
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//获取当前当前消息是否被接收过一次如果返回值为false表示消息之前没有被接收过,如果返回值为true
//则表示之前这个消息被接收过,可能也处理完成,因此我们要进行消息的防重复处理
Boolean isRedeliver= envelope.isRedeliver();
//获取当前内部类中的通道
Channel c= this.getChannel();
if(!isRedeliver){
String message=new String(body);
System.out.println("消费者 处理了消息---"+message);
//获取消息的编号,我们需要根据消息的编号来确认消息
long tag= envelope.getDeliveryTag();
}else{
}
}
});
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
SpringBoot整合RabbitMQ
加入依赖的架包
<!--添加AMQP的起步依赖,添加成功后就会自动引入RabbitMQ的依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置连接信息
#配置RabbitMQ的相关连接信息(单机版)
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
##配置RabbitMQ的相关链接信息(集群版)
#spring.rabbitmq.addresses=192.168.115.132:5672,192.168.115.136:5672
#spring.rabbitmq.username=root
#spring.rabbitmq.password=root
配置类
package com.yang.springboot_send.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
//配置一个Direct类型的交换
@Bean
public DirectExchange directExchange(){
return new DirectExchange("bootDirectExchange");
}
//配置一个队列
@Bean
public Queue directQueue(){
return new Queue("bootDirectQueue");
}
/**
* 配置一个队列和交换机的绑定
* @param directQueue 需要绑定的队列的对象,参数名必须要与某个@Bean的方法名完全相同这样就会自动进行注入
* @param directExchange 需要绑定的交换机的对象,参数名必须要与某个@Bean的方法名完全相同这样就会自动进行注入
* @return
*/
@Bean
public Binding directBinding(Queue directQueue,DirectExchange directExchange){
//完成绑定
// 参数 1 为需要绑定的队列
// 参数 2 为需要绑定的交换机
// 参数 3绑定时的RoutingKey
return BindingBuilder.bind(directQueue).to(directExchange).with("bootDirectRoutingKey");
}
// //配置一个 Fanout类型的交换
// @Bean
// public FanoutExchange fanoutExchange(){
// return new FanoutExchange("fanoutExchange");
// }
//
// //配置一个 Topic 类型的交换
// @Bean
// public TopicExchange topicExchange(){
// return new TopicExchange("topicExchange");
// }
}
实现Service消息发送
package com.bjpowernode.rabbitmq.service.impl;
import com.bjpowernode.rabbitmq.service.SendService;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@Service("sendService")
public class SendServiceImpl implements SendService {
//注入Amqp的模板类,利用这个对象来发送和接收消息
@Resource
private AmqpTemplate amqpTemplate;
public void sendMessage(String message) {
/**
* 发送消息
* 参数 1 为交换机名
* 参数 2 为RoutingKey
* 参数 3 为我们的具体发送的消息数据
*/
amqpTemplate.convertAndSend("bootDirectExchange","bootDirectRoutingKey",message);
}
public void sendFanoutMessage(String message) {
amqpTemplate.convertAndSend("fanoutExchange","",message);
}
public void sendTopicMessage(String message) {
amqpTemplate.convertAndSend("topicExchange","aa.bb.cc",message);
}
}
实现发送者启动类
package com.bjpowernode.rabbitmq;
import com.bjpowernode.rabbitmq.service.SendService;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
@SpringBootApplication
public class Application {
public static void main(String[] args) {
ApplicationContext ac=SpringApplication.run(Application.class, args);
SendService service= (SendService) ac.getBean("sendService");
service.sendMessage("Boot的测试数据");
// service.sendFanoutMessage("Boot的Fanout测试数据");
// service.sendTopicMessage("Boot的Topic测数据数据key 为 aa.bb.cc");
}
}
实现消费者接受消息
配置类
package com.bjpowernode.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
//配置一个Direct类型的交换
@Bean
public DirectExchange directExchange(){
return new DirectExchange("bootDirectExchange");
}
//配置一个队列
@Bean
public Queue directQueue(){
return new Queue("bootDirectQueue");
}
/**
* 配置一个队列和交换机的绑定
* @param directQueue 需要绑定的队列的对象,参数名必须要与某个@Bean的方法名完全相同这样就会自动进行注入
* @param directExchange 需要绑定的交换机的对象,参数名必须要与某个@Bean的方法名完全相同这样就会自动进行注入
* @return
*/
@Bean
public Binding directBinding(Queue directQueue,DirectExchange directExchange){
//完成绑定
// 参数 1 为需要绑定的队列
// 参数 2 为需要绑定的交换机
// 参数 3绑定时的RoutingKey
return BindingBuilder.bind(directQueue).to(directExchange).with("bootDirectRoutingKey");
}
}
Service实现类
package com.bjpowernode.rabbitmq.service.impl;
import com.bjpowernode.rabbitmq.service.ReceiveService;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@Service("receiveService")
public class ReceiveServiceImpl implements ReceiveService {
//注入Amqp的模板类,利用这个对象来发送和接收消息
@Resource
private AmqpTemplate amqpTemplate;
/**
* 这里个接收不是不间断接收消息,每执行一次这个方法只能接收一次消息,如果有新消息进入则不会自动接收消息
* 不建议使用
*/
public void receive() {
// String message= (String) amqpTemplate.receiveAndConvert("bootDirectQueue");
// System.out.println(message);
}
/**
* @RabbitListener 注解用于标记当前方法是一个RabbitMQ的消息监听方法,作用是持续性的自动接收消息
* 这个方法不需要手动调用Spring会自动运行这个监听
* 属性
* queues 用于指定一个已经存在的队列名,用于进行队列的监听
* @param message 接收到的具体的消息数据
*
* 注意:如果当前监听方法正常结束Spring就会自动确认消息,如果出现异常则不会确认消息
* 因此在消息处理时我们需要做好消息的防止重复处理工作
*/
@RabbitListener(queues = {"bootDirectQueue"})
public void directReceive(String message){
System.out.println("监听器接收的消息----"+message);
}
// @RabbitListener(bindings={
// @QueueBinding(//@QueueBinding注解要完成队列和交换机的
// value = @Queue(),//@Queue创建一个队列(没有指定参数则表示创建一个随机队列)
// exchange=@Exchange(name="fanoutExchange",type="fanout")//创建一个交换机
// )
// }
// )
// public void fanoutReceive01(String message){
// System.out.println("fanoutReceive01监听器接收的消息----"+message);
// }
//
//
//
// @RabbitListener(bindings={
// @QueueBinding(//@QueueBinding注解要完成队列和交换机的
// value = @Queue(),//@Queue创建一个队列(没有指定参数则表示创建一个随机队列)
// exchange=@Exchange(name="fanoutExchange",type="fanout")//创建一个交换机
// )
// }
// )
// public void fanoutReceive02(String message){
// System.out.println("fanoutReceive02监听器接收的消息----"+message);
// }
//
//
//
// @RabbitListener(bindings = {@QueueBinding(value=@Queue("topic01"),key = {"aa"},exchange =@Exchange(name = "topicExchange",type = "topic"))})
// public void topicReceive01(String message){
// System.out.println("topic01消费者 ---aa---"+message );
// }
// @RabbitListener(bindings = {@QueueBinding(value=@Queue("topic02"),key = {"aa.*"},exchange =@Exchange(name = "topicExchange",type = "topic"))})
// public void topicReceive02(String message){
// System.out.println("topic02消费者 ---aa.*---"+message );
// }
// @RabbitListener(bindings = {@QueueBinding(value=@Queue("topic03"),key = {"aa.#"},exchange =@Exchange(name = "topicExchange",type = "topic"))})
// public void topicReceive03(String message){
// System.out.println("topic03消费者 ---aa。#---"+message );
// }
}
启动类
package com.bjpowernode.rabbitmq;
import com.bjpowernode.rabbitmq.service.ReceiveService;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
@SpringBootApplication
public class Application {
public static void main(String[] args) {
ApplicationContext ac=SpringApplication.run(Application.class, args);
ReceiveService service= (ReceiveService) ac.getBean("receiveService");
//使用了消息监听器接收消息那么就不需要调用接收方法来接收消息
// service.receive();
}
}
网友评论