RabbitMQ简介
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而群集和故障转移是构建在开放电信平台框架上的。
什么是MQ
MQ 我们可以理解为消息队列,具有先进先出的特点。
RabbitMQ特点
1.解耦,新模块的引入,使其代码改动量最小。
2.削峰,设置流量缓存池,使得后端的服务按照自身的吞吐量进行消费。
3.异步,将非关联引用的链路异步优化并提升系统的吞吐能力。
RabbitMQ模式
1.Simple 简单队列:
单个提供者,单个消费者 。提供者将消息发送到队列中,消费者从队列中获取消息。
缺点:耦合性高,生产者和消费者一一对应,队列名变更,生产者和消费者需要同时变更。
如图1所示:
图1
java 代码
RabbitMQ依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.0</version>
</dependency>
java链接MQ的工具类
public class ConnectionUtils {
/**
* 获取MQ链接
* @return
*/
public static Connection getConnection() throws IOException, TimeoutException {
//创建一个工场
ConnectionFactory factory = new ConnectionFactory();
//服务地址
factory.setHost("127.0.0.1");
//服务端口
factory.setPort(5672);
//vhost
factory.setVirtualHost("/branch_virtual");
//用户名
factory.setUsername("admin");
//密码
factory.setPassword("admin");
return factory.newConnection();
}
}
服务发送方
public class Provider {
private static final String QUEUE_NAME = "mqName";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
//从链接获取一个通道
Channel channel = connection.createChannel();
//创建队列声明
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
String msg= "hello resive";
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
System.out.println("send succeed !");
channel.close();
connection.close();
}
}
服务接收方
public class Receive {
private static final String QUEUE_NAME = "mqName";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
//队列声明
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body,"utf-8");
System.out.println("receive msh :"+msg);
}
};
//监听队列
channel.basicConsume(QUEUE_NAME,true,consumer);
}
}
1.Work queues 工作队列
一个生产者把消息生产到队列中,一个或者多个消费者进行消费。
图2java代码(轮询分发)
消费者往队列发送消息
缺点:
autoAck=true自动确认模式
一旦rabbitmq将消息分发给消费者,就会从内存中删除。如果删除正在执行的消费者,就会出现消息丢失的现象。
public class Provider {
private static final String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws Exception{
Connection connection = ConnectionUtils.getConnection();
//从链接获取一个通道
Channel channel = connection.createChannel();
//创建队列声明
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
for(int i=0;i<61;i++){
String msg= "this is +"+i+"+ msg";
System.out.println(msg);
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
Thread.sleep(100);
}
channel.close();
connection.close();
}
}
消费者1
public class Receive {
private static final String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws Exception{
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body,"utf-8");
System.out.println("comsumer 1 :"+msg);
try {
Thread.sleep(700);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
boolean autoAck = true;
channel.basicConsume(QUEUE_NAME,autoAck,consumer);
}
}
消费者2
public class ReceiveTwo {
private static final String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws Exception{
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body,"utf-8");
System.out.println("comsumer 2 :"+msg);
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
boolean autoAck = true;
channel.basicConsume(QUEUE_NAME,autoAck,consumer);
}
}
3.fairdipatch 公平分发
一个生产者多个消费者模式下的情况,将消费者改为手动回执的形式。
java代码(公平分发)
首先在提供者者中设置basicQos=1,然后在消费者中设置basicAck方法和
autoAck=false手动模式
如果一个消费者挂了,就会交付给其他消费者,rabbitmq支持消息应答,消费者发送一个消息告诉rabbitmq已处理完成,这时rabbitmq会删除内存中的消息。
消息应答默认是打开的 false。
public class Provider {
private static final String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws Exception{
Connection connection = ConnectionUtils.getConnection();
//从链接获取一个通道
Channel channel = connection.createChannel();
//创建队列声明
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
/**
* 每个消费者 发送确认之前,消息队列不发送下一个消息到消费者.
* 限制发送给通哟个消费者不得超过一条数据。
*/
int prefetchCount = 1;
channel.basicQos(prefetchCount);
for(int i=0;i<61;i++){
String msg= "this is +"+i+"+ msg";
System.out.println(msg);
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
Thread.sleep(100);
}
channel.close();
connection.close();
}
}
public class Receive {
private static final String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws Exception{
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body,"utf-8");
System.out.println("comsumer 1 :"+msg);
//手动回执
channel.basicAck(envelope.getDeliveryTag(),false);
try {
Thread.sleep(700);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME,autoAck,consumer);
}
}
public class ReceiveTwo {
private static final String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws Exception{
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body,"utf-8");
System.out.println("comsumer 2 :"+msg);
channel.basicAck(envelope.getDeliveryTag(),false);
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME,autoAck,consumer);
}
}
4.订阅模式
1.一个生产者,多个消费者。
2.每个消费者都有自己的队列。
3.生产者都没有直接把消息发送队列,而是发送到交换价
4.每个队列都要绑定到交换机上
5生产者发送消息,进过交换机到达队列,就能实现一个消息被多个消费者消费。
public class Provider {
private static final String EXCHANGE_NAME = "exchange_fanout";
public static void main(String[] args) throws Exception{
Connection connection = ConnectionUtils.getConnection();
//从链接获取一个通道
Channel channel = connection.createChannel();
/**
* 声明交换机
* fanout 分发
*/
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
String msg = "订阅模式";
channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes());
System.out.println("消费者已发送消息,发送内容: "+msg);
channel.close();
connection.close();
}
}
public class ReceiveTwo {
private static final String EXCHANGE_NAME = "exchange_fanout_send_sms";
private static final String FACTOR_NAME = "exchange_fanout";
public static void main(String[] args) throws Exception{
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
//队列声明
channel.queueDeclare(EXCHANGE_NAME,false,false,false,null);
//绑定队列到交换机
channel.queueBind(EXCHANGE_NAME,FACTOR_NAME,"");
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body,"utf-8");
System.out.println("comsumer 2 :"+msg);
channel.basicAck(envelope.getDeliveryTag(),false);
try {
Thread.sleep(700);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
boolean autoAck = false;
channel.basicConsume(EXCHANGE_NAME,autoAck,consumer);
}
}
public class Receive {
private static final String EXCHANGE_NAME = "exchange_fanout_send_email";
private static final String FACTOR_NAME = "exchange_fanout";
public static void main(String[] args) throws Exception{
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
//队列声明
channel.queueDeclare(EXCHANGE_NAME,false,false,false,null);
//绑定队列到交换机
channel.queueBind(EXCHANGE_NAME,FACTOR_NAME,"");
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body,"utf-8");
System.out.println("comsumer 1 :"+msg);
channel.basicAck(envelope.getDeliveryTag(),false);
try {
Thread.sleep(700);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
boolean autoAck = false;
channel.basicConsume(EXCHANGE_NAME,autoAck,consumer);
}
}
5.路由模式
图4public class Provider {
private static final String EXCHANGE_NAME = "echange_direct";
public static void main(String[] args) throws Exception{
Connection connection = ConnectionUtils.getConnection();
//从链接获取一个通道
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME,"direct");
//创建队列声明
String msg = "hello diret";
String routingKey="error";
channel.basicPublish(EXCHANGE_NAME,routingKey,null,msg.getBytes());
System.out.println("direct send :"+msg);
channel.close();
connection.close();
}
}
public class Receive {
private static final String EXCHANGE_NAME = "echange_direct";
private static final String QUEUE_NAME = "direct_queue";
public static void main(String[] args) throws Exception{
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.basicQos(1);
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"error");
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body,"utf-8");
System.out.println("comsumer 1 :"+msg);
channel.basicAck(envelope.getDeliveryTag(),false);
try {
Thread.sleep(700);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
boolean autoAck = true;
channel.basicConsume(QUEUE_NAME,autoAck,consumer);
}
}
public class ReceiveTwo {
private static final String EXCHANGE_NAME = "echange_direct";
private static final String QUEUE_NAME = "direct_queue";
public static void main(String[] args) throws Exception{
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.basicQos(1);
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"error");
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"info");
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body,"utf-8");
channel.basicAck(envelope.getDeliveryTag(),false);
System.out.println("comsumer 2 :"+msg);
try {
Thread.sleep(700);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
boolean autoAck = true;
channel.basicConsume(QUEUE_NAME,autoAck,consumer);
}
}
6.top exchange
将路由和某个模式匹配。
图5
public class Provider {
private static final String EXCHANGE_NAME = "exchange_topic";
public static void main(String[] args) throws Exception{
Connection connection = ConnectionUtils.getConnection();
//从链接获取一个通道
Channel channel = connection.createChannel();
//创建队列声明
channel.exchangeDeclare(EXCHANGE_NAME,"topic");
String msg = "this is msg...";
channel.basicPublish(EXCHANGE_NAME,"data.select",null,msg.getBytes());
System.out.println("msg is send......");
channel.close();
connection.close();
}
}
public class Receive {
private static final String EXCHANGE_NAME = "exchange_topic";
private static final String QUEUE_NAME = "queue_topic";
public static void main(String[] args) throws Exception{
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"data.insert");
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body,"utf-8");
System.out.println("comsumer 1 :"+msg);
channel.basicAck(envelope.getDeliveryTag(),false);
try {
Thread.sleep(700);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
boolean autoAck = true;
channel.basicConsume(QUEUE_NAME,autoAck,consumer);
}
}
public class ReceiveTwo {
private static final String EXCHANGE_NAME = "exchange_topic";
private static final String QUEUE_NAME = "queue_topic_two";
public static void main(String[] args) throws Exception{
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"data.#");
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body,"utf-8");
System.out.println("comsumer 2 :"+msg);
channel.basicAck(envelope.getDeliveryTag(),false);
try {
Thread.sleep(700);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
boolean autoAck = true;
channel.basicConsume(QUEUE_NAME,autoAck,consumer);
}
}
这就是RabbitMQ常见的5种模式,及其代码演示。
RabbitMQ消息确认机制(事务+confirm)
RabbitMQ为我们提供了两种方式:
方式一:通过AMQP事务机制实现,这也是从AMQP协议层面提供的解决方案;
方式二:通过将channel设置成confirm模式来实现;
AMQP模式:
该模式吞吐量较低。使用的是同步的模式。主要是用channel.txSelect()
开启事务,使用channel.txRollback();
回滚事务。代码如下所示:
/**
* 使用事务的模式来确定消息有没有到达Rabbit服务器
*/
public class Provider {
private static final String QUEUE_NAME = "queue_tx";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtils.getConnection();
//从链接获取一个通道
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
String msg = "tx msg";
try {
//开启事务
channel.txSelect();
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
System.out.println("this is msg send");
channel.txCommit();
}catch (Exception e){
//事务回滚
channel.txRollback();
System.out.println("send msg txRollBack");
}
channel.close();
connection.close();
}
}
public class Receive {
private static final String QUEUE_NAME = "queue_tx";
public static void main(String[] args) throws Exception{
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body,"utf-8");
System.out.println("comsumer :"+msg);
}
});
}
}
Confirm模式:
生产者将信道设置成confirm模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会将消息写入磁盘之后发出,broker回传给生产者的确认消息中deliver-tag域包含了确认消息的序列号,此外broker也可以设置basic.ack的multiple域,表示到这个序列号之前的所有消息都已经得到了处理。
Confirm使用的是异步的模式。使用channel,confirmSelect()
开启事务。
模式主要分为三种
1.普通发送:waitForConfirms()
2.批量发 waitForConfirms()
3.异步,提送回调。
普通发送代码如下所示:
/**
* 普通模式
*/
public class Provider {
private static final String QUEUE_NAME = "queue_confirm_pt";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//将channel设置为普通模式
channel.confirmSelect();
String msg = "普通模式";
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
if(!channel.waitForConfirms()){
System.out.println("send failed");
}else{
System.out.println("send succeed");
}
channel.close();
connection.close();
}
}
public class Receive {
private static final String QUEUE_NAME = "queue_confirm_pt";
public static void main(String[] args) throws Exception{
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body,"utf-8");
System.out.println("comsumer :"+msg);
}
});
}
}
批量发送相对于普通模式的优点是效率高,缺点是一条失败全部失败。 代码如下所示:
public class Provider2 {
private static final String QUEUE_NAME = "queue_confirm_pt_2";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//将channel设置为普通模式
channel.confirmSelect();
String msg = "普通模式";
//批量发送
for(int i=0;i<10;i++){
String data = msg+" "+i;
channel.basicPublish("",QUEUE_NAME,null,data.getBytes());
}
if(!channel.waitForConfirms()){
System.out.println("send failed");
}else{
System.out.println("send succeed");
}
channel.close();
connection.close();
}
}
public class Receive {
private static final String QUEUE_NAME = "queue_confirm_pt_2";
public static void main(String[] args) throws Exception{
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body,"utf-8");
System.out.println("comsumer :"+msg);
}
});
}
}
异步监听模式,效率高,单个的失败不会影响其它数据的发送。 代码如下所示:
/**
* 异步回调模式
*/
public class Provider {
private static final String QUEUE_NAME = "queue_confirm_pt_3";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//将channel设置为普通模式
channel.confirmSelect();
final SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long l, boolean b) throws IOException {
if(b){
System.out.println("handle ACK succeed");
confirmSet.headSet(l+1).clear();
}else{
System.out.println("handle ACK false");
confirmSet.remove(l);
}
}
@Override
public void handleNack(long l, boolean b) throws IOException {
if(b){
System.out.println("handle ACK succeed");
confirmSet.headSet(l+1).clear();
}else{
System.out.println("handle ACK false");
confirmSet.remove(l);
}
}
});
String msg = "回调模式";
while(true){
long data = channel.getNextPublishSeqNo();
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
confirmSet.add(data);
}
}
}
public class Receive {
private static final String QUEUE_NAME = "queue_confirm_pt_3";
public static void main(String[] args) throws Exception{
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body,"utf-8");
System.out.println("comsumer :"+msg);
}
});
}
}
网友评论