消息中间件概述
消息队列中间件是分布式架构中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性框架目前使用较多的消息队列有ActiveMQ、RabbitMQ、ZeroMQ、Kafka、MetaMQ、RockerMQ。
RabbitMQ 作用
生产者发送消息不会向传统方式直接将消息投递到队列中,而是先将消息投递到交换机中,在由交换机转发到具体的队列,队列在将消息以推送或者拉取方式给消费者进行消费,这和我们之前学习Nginx有点类似。
交换机的作用
根据具体的路由策略分发到不同的队列中。
交换机的四种类型
1.Direct exchange (直连交换机)是根据消息携带的路由键(routing key)将消息投递给对应队列的
2.Fanout exchange (扇型交换机)将消息路由给绑定到它身上的所有队列
3.Topic exchange ( 主题交换机)队列通过路由键绑定到交换机上,然后,交换机根据消息里的路由值,将消息路
由给一个或多个绑定队列
4.Headers exchange (头交换机)类似主题交换机,但是头交换机使用多个消息属性来代替路由键建立路由规则。
通过判断消息头的值能否与指定的绑定相匹配来确立路由规则。
VirtualHost
像MySQL服务器中可以添加多个数据库一样,可以指定用户对指定库表等操作权限,RabbitMQ中这种管理权限就是VirtualHost,每个VirtualHost相当于一个独立的服务器,每个VirtualHost之前相互隔离message、queue不能互通,目的是为了解耦合
RabbitMQ消息队列的类型
1.点对点模式: 一对一模式一个生产者投递消息给队列,只能允许有一个消费者进行消费。如若消费者集群,会进行均摊消费
2.工作模式:又称公平消费模式,采用能者多劳的原则,哪个消费者应答的快,哪个就能多消费消息,当消费者没有应答之前,队列将不会再发送新的消息给消费者
3.发布订阅模式:一个生产者发送消息,多个消费者获取同样的消息,包括一个生产者,一个交换机,多个队列,多个消费者。
4.路由模式(RoutingKey):
5.通配符模式(topic):生产者发送消息到交换机并指定一个路由key,消费者队列绑定到交换机时要制定路由key(key匹配就能接受消息,key不匹配就不能接受消息)
"*"表示匹配一个词语,"#"表示匹配多个词语
自动应答
不在乎消费者对这个消息处理是否成功,都会告诉队列删除该消息。如果处理消息失败情况下,实现自动补偿。
手动应答
消费处理完业务逻辑,手动返回ack(通知),告诉队列服务器是否删除消息
pom导入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
application.yml中配置
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: suoxingliunian
password: a1234560
virtual-host: /admin_host
消息生产者
创建.class配置类
/*发布订阅模式配置交换机类型为Fanou*/
@Configuration
public class FanoutConfig {
//邮件队列
private String FANOUT_EMAIL_QUEUE = "fanout_eamil_queue";
//短信队列
private String FANOUT_SMS_QUEUE = " fanout_sms_queue" ;
//交换机名称
private String EXCHANGE_NAME = "fanoutExchange";
// 定义邮件队列
@Bean
public Queue fanoutEamilQueue(){
return new Queue(FANOUT_EMAIL_QUEUE);
}
// 定义短信队列
@Bean
public Queue fanoutSmsQueue(){
return new Queue(FANOUT_SMS_QUEUE);
}
// 定义交换机名称 Fanout类型 其他类型例如TopicExchange
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange(EXCHANGE_NAME);
}
// 邮件队列和交换机进行绑定(参数名称一定要和队列方法、交换机名称一致)
@Bean
Binding bindingExchangeEamil(Queue fanoutEamilQueue,FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutEamilQueue).to(fanoutExchange);
}
// 短息队列和交换机进行绑定(参数名称一定要和队列方法、交换机名称一致)
@Bean
Binding bindingExchangeSms(Queue fanoutSmsQueue,FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutSmsQueue).to(fanoutExchange);
}
}
创建.class消息发布消息
@Component
public class FanoutProducer {
@Autowired
public AmqpTemplate amqpTemplate;
public void send(String queueName) {
String msg = "sendmsg"+new Date();
// 发送消息
amqpTemplate.convertAndSend(queueName,msg);
}
}
创建Controller模拟消息发布接口
@RestController
public class ProducerController {
@Autowired
private FanoutProducer producer;
@GetMapping("/sendMsg")
public void sendMsg(String queueName) {
producer.send(queueName);
}
}
消息消费者
工厂方法
/*工厂方法*/
public class Producer {
private static final String queueName = "sanshengsanshishilitaohua";
public static void main(String[] args) throws IOException, TimeoutException {
// 获取新连接
Connection connection = RabbitMQUtils.newConntction();
// 创建通道
Channel channel = connection.createChannel();
// 创建队列
channel.queueDeclare(queueName, false, false, false, null);
// 发送消息
channel.basicPublish("", queueName, null, "suoxingliunian".getBytes());
// 关闭资源
channel.close();
connection.close();
}
}
工具类
public class RabbitMQUtils {
// 创建RabbitMQ连接
public static Connection newConntction() throws IOException, TimeoutException {
// 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 设置连接地址
connectionFactory.setHost("127.0.0.1");
// 设置连接用户名
connectionFactory.setUsername("admin");
// 设置密码
connectionFactory.setPassword("123456");
// 设置端口号
connectionFactory.setPort(5672);
// 设置VirtualHost地址
connectionFactory.setVirtualHost("/admin_host");
Connection connection = connectionFactory.newConnection();
return connection;
}
}
消费者
/*消息消费者*/
public class Consumer {
private static final String queueName = "sanshengsanshishilitaohua";
public static void main(String[] args) throws IOException, TimeoutException {
// 获取新连接
Connection connection = RabbitMQUtils.newConntction();
// 创建通道
Channel channel = connection.createChannel();
// 消费者关联队列
// channel.queueDeclare(queueName, false, false, false, null);
// 消费者获取消息
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
// 监听获取消息
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
super.handleDelivery(consumerTag, envelope, properties, body);
String result = new String(body,"utf-8");
System.err.println("消费者获取生产者发送消息"+result);
//手动应答
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
// 设置应答模式
channel.basicConsume(queueName,true,defaultConsumer);
}
}
网友评论