RABBIT_MQ是消息队列,即messge queue
作用:模块之间的解耦、流量削峰、异步消息和消息通信;高性能、高可用和最终一致性的框架
作用链接:https://www.cnblogs.com/HigginCui/p/6478613.html
简单教程:https://blog.csdn.net/hellozpc/article/details/81436980
api:http://rabbitmq.mr-ping.com/
https://docs.spring.io/spring-amqp/api/overview-summary.html
高级应用:https://blog.csdn.net/weixin_42763504/article/details/81608237,
https://blog.csdn.net/zhihuirensheng123/article/details/82805349
https://blog.csdn.net/qq315737546/article/details/54176560
https://www.cnblogs.com/toov5/p/10288260.html
https://blog.csdn.net/qq_38455201/article/details/80308771
应用解耦:订单-》库存,解耦后:订单-》消息通道-》库存
流量削峰:秒杀活动
异步消息:用户注册,需要发送短信和邮箱;直接放进消息队列
消息通信:
rabbitmq模式
公共创建链接
public class RaMqConnCommon {
public final static String QUEUE_SIMPLE = "test_queue";//简单模式
public final static String QUEUE_PUBLIC = "test_public_queue";//订阅模式
public final static String QUEUE_PUBLIC_2 = "test_public_queue2";//订阅模式
public final static String QUEUE_ROUTE = "test_route_queue";//路由模式队列
public final static String QUEUE_ROUTE_2 = "test_route_queue2";//路由模式队列
public final static String EXCHANGE_PUBLIC = "test_public_exchange";//订阅模式交换机
public final static String EXCHANGE_ROUTE = "test_route_exchange";//路由模式交换机
public final static String ROUTE_KEY = "routekey";//路由key
public final static String ROUTE_KEY_2 = "routekey2";//路由key
/**
* @Description: 创建RabbitMq链接
* @Param: []
* @return: com.rabbitmq.client.Connection
* @Author: LJW
* @Date: 16:33
*/
public static Connection getConnection(){
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("testmq");
factory.setUsername("guest");
factory.setPassword("guest");
Connection conn = null;
try {
conn = factory.newConnection();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
return conn;
}
简单模式:
生产者producer
1.创建链接
2.创建通道
3.声明队列
4.发送
//创建链接
Connection connection = RaMqConnCommon.getConnection();
//创建渠道
Channel channel = connection.createChannel();
//声明队列(String queue,boolean durable,boolean exclusive,boolean autoDelete,Map<String, Object> arguments)
channel.queueDeclare(RaMqConnCommon.QUEUE_SIMPLE,false,false,false,null);
String sendMessge = "Hello World!!";
//发送消息:交换机、队列、队列特性和内容
channel.basicPublish("",RaMqConnCommon.QUEUE_SIMPLE,null,sendMessge.getBytes());
System.out.println("工作模式,多劳多得==========》"+sendMessge);
channel.close();
connection.close();
消费者
//创建链接
Connection connection = RaMqConnCommon.getConnection();
//创建渠道
Channel channel = connection.createChannel();
//消费
channel.basicConsume(RaMqConnCommon.QUEUE_SIMPLE,true,new RecevierConsumer(channel));
public class RecevierConsumer extends DefaultConsumer {
public RecevierConsumer(Channel channel) {
super(channel);
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//消费的消息
System.out.println(consumerTag+":接收到的消息="+new String(body));
}
}
工作模式:
生产者和上面一样
消费者
//创建链接
Connection connection = RaMqConnCommon.getConnection();
//创建渠道
Channel channel = connection.createChannel();
//同一时刻服务器只发送一条消息
channel.basicQos(1);
//消费,需要确认机制(队列名,ack,consumer)
channel.basicConsume(RaMqConnCommon.QUEUE_SIMPLE,false,new RecevierConsumer(channel));
public class RecevierConsumer extends DefaultConsumer {
private Channel channel;
public RecevierConsumer(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//消费的消息
System.out.println(consumerTag+":接收到的消息="+new String(body));
//消息确认(false为消息确认,true为未确认)
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
订阅模式:
1.创建链接
2.创建通道
3.声明交换机
4.发送到交换机
//创建链接
Connection connection = RaMqConnCommon.getConnection();
//创建渠道
Channel channel = connection.createChannel();
//声明交换机
channel.exchangeDeclare(RaMqConnCommon.EXCHANGE_PUBLIC,"fanout");
String sendMessge = "Hello World!!";
int i = 0;
while(true && i<100){
String str = sendMessge+(++i);
//发送消息到交换机(exchange、routingKey)
channel.basicPublish(RaMqConnCommon.EXCHANGE_PUBLIC,"",null,str.getBytes());
}
// //发送消息到交换机
// channel.basicPublish(RaMqConnCommon.EXCHANGE_PUBLIC,"",null,sendMessge.getBytes());
// System.out.println("订阅模式==========》"+sendMessge);
channel.close();
connection.close();
消费者和工作模式一样
路由模式:
生产者
1.创建链接
2.创建通道
3.声明交换机
4.发送到交换机,并且指定路由
消费者
//创建链接
Connection connection = RaMqConnCommon.getConnection();
//创建渠道
Channel channel = connection.createChannel();
//同一时刻服务器只发送一条消息
channel.basicQos(1);
//声明队列(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments)
channel.queueDeclare(RaMqConnCommon.QUEUE_ROUTE,true,false,false,null);
//队列绑定交换机(queue,exchange,routekey)
channel.queueBind(RaMqConnCommon.QUEUE_ROUTE,RaMqConnCommon.EXCHANGE_ROUTE,RaMqConnCommon.ROUTE_KEY);
//消费(mq,是否自动签收,消费者)
channel.basicConsume(RaMqConnCommon.QUEUE_ROUTE,false,new RecevierConsumer(channel));
通配符模式:
高级特性:
returnListener:
找不到对应的路由key或交换机,则执行本方法
channel.addReturnListener(new ReturnListener() {
/**
* int replyCode, 响应吗, 路由成没成功
* String replyText, 回复内容
* String exchange,
* String routingKey,
* AMQP.BasicProperties properties,
* byte[] body 实际的消息体内容
*
* @param replyCode
* @param replyText
* @param exchange
* @param routingKey
* @param properties
* @param body
* @throws IOException 跑出ioexception 异常
*/
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("handle return");
System.out.println("replyCode: " + replyCode);
System.out.println("exchange: " + exchange);
System.out.println("routingKey: " + routingKey);
System.out.println("properties: " + properties.toString());
System.out.println("body: " + new String(body));
}
});
ConfirmListener
判断消息生产者是否发送成功
// 开启发送方确认模式
channel.confirmSelect();
// 6. 添加一个监听
channel.addConfirmListener(new ConfirmListener() {
// 成功
// 关键的唯一的消息标签deliveryTag, multiple :是否批量, 暂时不用管
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("成功 Ack");
}
// 失败, 比如磁盘写满了,mq 出现一些异常, key 容量达到上限,也有可能handleAck,handleNack 都没有收到, 进行抓取,和重发
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("失败 Nack");
}
});
死信队列
主要是消息过期或消费者拒绝接收,然而此类消息则进入死信队列
主要是接收的队列绑定死信交换机
事物
try {
channel.txSelect(); // 声明事务
// 发送消息
channel.basicPublish("", _queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
channel.txCommit(); // 提交事务
} catch (Exception e) {
channel.txRollback();
} finally {
channel.close();
conn.close();
}
spring整合rabbitMq
<!-- mq======================-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
</dependency>
发送者
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!-- 引入druid配置文件-->
<context:property-placeholder location="classpath:/local/rabbitmq.properties" />
<!-- <context:component-scan base-package="com.jwl.test.mqConfig"/>-->
<!-- 链接工厂-->
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" port="${rabbitmq.port}"
username="${rabbitmq.username}" password="${rabbitmq.password}" virtual-host="${rabbitmq.vhost}" publisher-confirms="true"
/>
<!--spring代理rabbit-->
<rabbit:admin connection-factory="connectionFactory" />
<!--路由交换机-->
<rabbit:direct-exchange name="test-routekey-exchange" id="test-routekey-exchange" durable="true" auto-declare="true"/>
<!--定义模板--><!-- mandatory必须设置true,return callback才生效 -->
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
mandatory="true"
return-callback="messageReturn"
confirm-callback="messageConfirm"
/>
<!-- <bean id="messageConverter" class="com.jwl.common.util.MqMessageConvert"/>-->
<!--抵达交换机但是找不到响应的路由-->
<bean id="messageReturn" class="com.jwl.test.mqConfig.MqReturnHandler"/>
<!-- 消息发送者-->
<bean id="messageConfirm" class="com.jwl.test.mqConfig.MqConfirmListener"/>
<!--死信交换机-->
<rabbit:direct-exchange name="dead-letter-exchange" durable="true" auto-declare="true"/>
</beans>
消费者
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!-- 引入druid配置文件-->
<context:property-placeholder location="classpath:/local/rabbitmq.properties" />
<!-- 链接工厂-->
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" port="${rabbitmq.port}"
username="${rabbitmq.username}" password="${rabbitmq.password}" virtual-host="${rabbitmq.vhost}"/>
<!-- spring代理rabbit-->
<rabbit:admin connection-factory="connectionFactory" />
<!-- 声明队列,并且绑定死信交换机,过期时间2秒-->
<rabbit:queue id="test_direct_queue" durable="true" name="test_direct_queue" auto-declare="true">
<rabbit:queue-arguments>
<entry key="x-message-ttl" value="2000" value-type="java.lang.Long"/>
<entry key="x-dead-letter-exchange" value="dead-letter-exchange" />
<entry key="x-dead-letter-routing-key" value="deadLetterKey" />
</rabbit:queue-arguments>
</rabbit:queue>
<!-- 声明死信队列-->
<rabbit:queue id="test_dead_queue" durable="true" name="test_dead_queue" auto-declare="true"/>
<!--监听类绑定,设置手动消费,服务器每次只发一条数据-->
<rabbit:listener-container prefetch="1" connection-factory="connectionFactory" message-converter="messageConverter" acknowledge="manual">
<rabbit:listener ref="listener" method="listener" queue-names="test_direct_queue" />
<rabbit:listener ref="deadListener" method="listener" queue-names="test_dead_queue" />
</rabbit:listener-container>
<bean id="messageConverter" class="com.jwl.common.util.MqMessageConvert"/>
<!--监听类-->
<bean id="listener" class="com.jwl.test.MqListener"/>
<bean id="deadListener" class="com.jwl.test.MqDeadListener"/>
</beans>
异常
spring整合rabbitmq,命名空间版本不对会引起找不到响应的类
网友评论