image.pngRabbitMQ入门与使用篇
http://www.cnblogs.com/SFLYQ/p/7358283.html
Rabbitmq 使用小记
https://www.jianshu.com/p/b63196b596be
RabbitMQ使用详解
https://www.cnblogs.com/enjoyall/p/7767462.html?utm_source=debugrun&utm_medium=referral
绿色的 X 就是 Exchange
红色的是 Queue ,这两者都在 Server 端,又称作 Broker。
蓝色的则是客户端,通常有 Producer 和 Consumer 两种类型。
-
Exchange通常分为四种:
fanout:该类型路由规则非常简单,会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中,相当于广播功能
direct:该类型路由规则会将消息路由到binding key与routing key完全匹配的Queue中
topic:与direct类型相似,只是规则没有那么严格,可以模糊匹配和多条件匹配
headers:该类型不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配
消息队列的使用过程
-
生产者发送消息
客户端连接到消息队列服务器,打开一个channel。
客户端声明一个exchange,并设置相关属性。
客户端声明一个queue,并设置相关属性。
客户端使用routing key,在exchange和queue之间建立好绑定关系。客户端投递消息到exchange。
exchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里。
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("username");
factory.setPort(5672);//注意这里的端口与管理插件的端口不一样
factory.setPassword("pwd");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//声明一个dirent模式的交换机
channel.exchangeDeclare("exchange_name",BuiltinExchangeType.DIRECT,true);
//声明一个非持久化自动删除的队列
channel.queueDeclare("queue_name",false,false,true,null);//如果该队列不在被使用就删除他 zhe
//将绑定到改交换机
channel.queueBind("queue_name","exchange_name","route_key");
//声明一个消息头部
Map<String,Object> header=new HashMap<>();
AMQP.BasicProperties.Builder b= new AMQP.BasicProperties.Builder();
header.put("charset","utf-8");
b.headers(header);
AMQP.BasicProperties bp=b.build();
//将消息发出去
channel.basicPublish("exchange_name","route_key",false,bp,"test3".getBytes());
- 消费者
从队列中取消息
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("username");
factory.setPort(5672);//注意这里的端口与管理插件的端口不一样
factory.setPassword("pwd");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//声明一个dirent模式的交换机
channel.exchangeDeclare("exchange_name",BuiltinExchangeType.DIRECT,true);
//声明一个非持久化自动删除的队列
channel.queueDeclare("queue_name",false,false,true,null);//如果该队列不在被使用就删除他 zhe
//将绑定到改交换机
channel.queueBind("queue_name","exchange_name","route_key");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
channel.basicConsume("queue_name", true, consumer);
RabbitMQ消息可靠性
- 发送者
发送这端利用confirm保证消息可以顺利达到rabbitmq,消息开启持久化(Delivery Mode = 2).
发送者发送一个消息,到达rabbitmq,然后rabbitmq认为此消息需要持久化,经过内存到磁盘的过程,然后把消息返回给发送者端.
1)队列持久化
boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);
2)消息持久化
channel.basicPublish("", "task_queue",
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
- 消息队列(消息持久化)
申明交换机持久化和队列持久化
rabbitmq接收到消息,会把消息从内存刷到磁盘的存储文件中. - 消费者
设置消息的ack,当消费者消费一个消息的时候,会返回给rabbitmq对应queue一个ack消息,这样就保证了消息消费完成.
网友评论