之前都是一个消息被一个消费者消费。如果想一个消费者被多个消费者消费可以使用订阅模式。
订阅模式:生产者不会将消息直接发送到队列,而是发送到交换机(exchange)上,每个队列绑定到交换机上。
生产者发送到交换机,再到达队列,就能实现一个消息被多个消费者消费。
生产者
package com.demo.ps;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.demo.util.RabbitMQ;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
* 订阅模式
* @author yl
*
*/
public class Send {
public void send() throws IOException, TimeoutException {
//得到连接
Connection con=RabbitMQ.getConnection();
//创建通道
Channel channel=con.createChannel();
//声明交换机
channel.exchangeDeclare("test_exchage", "fanout");
//发送消息
String msg="hello world";
channel.basicPublish("test_exchage", "", null, msg.getBytes());
channel.close();
con.close();
}
public static void main(String[] args) throws IOException, TimeoutException {
new Send().send();
}
}
消费者1
package com.demo.ps;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.demo.util.RabbitMQ;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
public class Get1 {
/**
* 消费者1
* @throws IOException
* @throws TimeoutException
*/
public void get() throws IOException, TimeoutException {
//创建连接
Connection connection=RabbitMQ.getConnection();
//得到通道
Channel channel =connection.createChannel();
//得到队列
channel.queueDeclare("ps_queue1", false, false, false, null);
//绑定到交换机
channel.queueBind("ps_queue1", "test_exchage", "");
//每次最多只发送一个消息
channel.basicQos(1);
DefaultConsumer consumer=new DefaultConsumer(channel) {
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
String msg=new String(body);
try {
Thread.sleep(2000);
System.out.println("消费者1"+msg);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
//手动消息回执
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
};
//自动应答改成false
channel.basicConsume("ps_queue1", false, consumer);
}
public static void main(String[] args) throws IOException, TimeoutException {
new Get1().get();
}
}
消费者2
package com.demo.ps;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.demo.util.RabbitMQ;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
public class Get2 {
/**
* 消费者2
* @throws IOException
* @throws TimeoutException
*/
public void get() throws IOException, TimeoutException {
//创建连接
Connection connection=RabbitMQ.getConnection();
//得到通道
Channel channel =connection.createChannel();
//得到队列
channel.queueDeclare("ps_queue2", false, false, false, null);
//绑定到交换机
channel.queueBind("ps_queue2", "test_exchage", "");
//每次最多只发送一个消息
channel.basicQos(1);
DefaultConsumer consumer=new DefaultConsumer(channel) {
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
String msg=new String(body);
try {
Thread.sleep(2000);
System.out.println("消费者2"+msg);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
//手动消息回执
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
};
//自动应答改成false
channel.basicConsume("ps_queue2", false, consumer);
}
public static void main(String[] args) throws IOException, TimeoutException {
new Get2().get();
}
}
运行发现,两个消费者都消费了消息!
网友评论