RabbitMQ
作用:1.解耦 2.异步请求 3.保证数据一致性
角色:
消息生产者
连接:connection channel
消息平台broker: exchange ---(1对多)--->queue
连接
消息消费者
特点:适用于处理高并发请求
安装RabbitMQ:注意:以管理员身份运行
Rabbit原始开发流程(了解):
1.创建连接工厂
2.创建连接
3.创建通道
4.声明交互机&队列(交换机绑定队列)|声明队列
5.发送消息 |接收消息
6.关闭通道、连接
工作模式(重点):
一对一:
工作队列:一个生产者发送的消息同一时间只能被一个消费者接收,如果有多个消费者,默认轮循接收消息
一对多(根据交换机类型定义的工作模式):特点:自定声明交换机,把队列绑定到交换机上,把消息发送到交换机
发布订阅:fanout 特点:不指定routingkey,把消息发送到交互机绑定的所有队列
路由:direct 特点:指定routingkey,根据routingkey,可以把消息发送到交互机绑定的所有队列,也可把消息发送到交换机绑定的某一个或某几个队列
通配符:topic 特点:使用通配符设置routingkey,根据routingkey,可以把消息发送到交互机绑定的所有队列,也可把消息发送到交换机绑定的某一个队列
header:headers
RPC
消息队列:可以保证数据一致性,多个请求访问时,按照排队的方式进入,redis也是排队处理。
异步方面,一些比较耗时的动作,比如发送验证码,对页面进行静态化。。。,这些操作会阻塞主线程
RabbitMQ的工作模式呈递进
发布订阅完全具备工作队列的特性,一个交换机只连接一个queue时,也可以多个消费者同时监听一个queue,平均分配
完全具备发布订阅的特性,根据routingkey把消息发送 到指定队列,routingkey一样就实现了发布订阅模式
通配符(topic) 特点和路由工作模式一样,#匹配0个或多个词,*匹配0个,词之间用 . 分割
通配符模式
inform.#.sms.* 可以匹配inform.sms
inform.#.email.# 可以匹配inform.email
inform.sms.email就可以匹配上面两种了
Demo
发送端:
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.huang</groupId>
<artifactId>rabbitMQDemo</artifactId>
<version>1.0-SNAPSHOT</version>
<parent>
<artifactId>xc-framework-parent</artifactId>
<groupId>com.xuecheng</groupId>
<version>1.0-SNAPSHOT</version>
<relativePath>../xc-framework-parent/pom.xml</relativePath>
</parent>
<dependencies>
<!-- <dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.0.3</version><!–此版本与spring boot 1.5.9版本匹配–>
</dependency>-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>
</dependencies>
</project>
applicaiton.yml
server:
port: 44000
spring:
application:
name: test-rabbitmq-producer
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
Application.java
package com.huang;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @author HuangSir
*/
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class);
}
}
RabbitConfig.java
package com.huang.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author HuangSir
*/
@Configuration
public class RabbitConfig {
public static final String QUEUE_INFORM_EMAIL = "queue_inf orm_email";
public static final String QUEUE_INFORM_SMS = "queue_inform_sms";
public static final String EXCHANGE_TOPIC_INFORM = "exchange_topic_inform";
@Bean("exchangeTopic")
public Exchange exchangeTopicInform() {
return ExchangeBuilder.topicExchange(EXCHANGE_TOPIC_INFORM).durable(true).build();
}
@Bean("queueInformEmail")
public Queue queueInformEmail() {
return new Queue(QUEUE_INFORM_EMAIL);
}
@Bean("queueInformSms")
public Queue queueInformSms() {
return new Queue(QUEUE_INFORM_SMS);
}
@Bean
public Binding bindingInformSms(@Qualifier("exchangeTopic")Exchange exchange, @Qualifier("queueInformSms")Queue queue){
return BindingBuilder.bind(queue).to(exchange).with("inform.#.sms.#").noargs();
}
@Bean
public Binding bindingInformEmail(Exchange exchange, @Qualifier("queueInformEmail") Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with("inform.#.email.#").noargs();
}
}
Test.java
import com.huang.Application;
import com.huang.config.RabbitConfig;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@ContextConfiguration(classes = Application.class)
public class Test {
@Autowired
private RabbitTemplate rabbitTemplate;
@org.junit.Test
public void test() {
rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_TOPIC_INFORM,"inform.sms","send msg to Exchange");
}
}
运行后在http://localhost:15672查看已生成queue
消费端
pom.xml application.yml 同上
ReceiveHandler.java
package com.huang.listener;
import com.huang.config.RabbitConfig;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @author HuangSir
*/
@Component
public class ReceiveHandler {
@RabbitListener(queues = {"queue_inform_sms"})
public void receiveSms(String msg, Message message, Channel channel) {
System.out.println("msg" + msg);
System.out.println("message:" + message);
}
}
如果不能确定生产者和消费者哪个应用先启动,在消费端也应该将RabbitMQ.java加入其中,这样消费端程序启动时不会出现问题
网友评论