工作原理:每一个对列都有自己的路由key,当生产者发送消息时,都会携带一个路由key,这时,会将消息发往路由key一致的队列中。路由模式是发布订阅模式的升级版。
路由key起到的是消息的导向作用。
[root@bogon rabbitmq-server-3.6.1]# cd /etc/rabbitmq
[root@bogon rabbitmq]# service rabbitmq-server start
Starting rabbitmq-server: SUCCESS
rabbitmq-server.er.
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.rabbit</groupId>
<artifactId>schoolmanage</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>war</packaging>
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.5.1</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.4.0.RELEASE</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
</dependency>
</dependencies>
</project>
package schoolmanage;
import java.io.IOException;
import org.junit.Before;
import org.junit.Test;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class TestStudentMsgRouting {
private Connection connection = null;
@Before
public void init() throws IOException {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 为工厂对象添加数据
// 远程主机
factory.setHost("192.168.6.130");
// 端口号
factory.setPort(5672);
// 虚拟主机
factory.setVirtualHost("/school");
// 用户名
factory.setUsername("student");
// 密码
factory.setPassword("student");
// 创建连接
connection = factory.newConnection();
}
// 消息生产者
@Test
public void provider() throws IOException {
System.out.println("生产者启动。。。");
// 创建通道
Channel channel = connection.createChannel();
// 创建交换机
String exchangeName = "RountingExchange";
/**
* 定义交换机模式 String exchange String type fanout发布订阅 redirect路由模式 topic主题模式
*/
channel.exchangeDeclare(exchangeName, "direct");
String info = "数学课正常上课!";
channel.basicPublish(exchangeName, "info", null, info.getBytes());
String error= "语文教师出去开会,取消今天的语文课!";
channel.basicPublish(exchangeName, "error", null, error.getBytes());
String warning="多读书,少游戏!";
channel.basicPublish(exchangeName, "warning", null, warning.getBytes());
// 将流关闭
channel.close();
connection.close();
System.out.println("消息发送成功!!!");
}
// 消费者1
@Test
public void consumer1() throws Exception {
// 创建通道
Channel channel = connection.createChannel();
// 定义对列
String queueName = "routingCounsumer1";
channel.queueDeclare(queueName, false, false, false, null);
// 订阅E1交换机的消息
String exchangeName = "RountingExchange";
// 定义交换机模式
channel.exchangeDeclare(exchangeName, "direct");
// 将对列与交换机绑定
channel.queueBind(queueName, exchangeName, "error");
// 只允许一次执行一个消息
channel.basicQos(1);
// 定义消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 将消费者与队列进行绑定
/**
* 定义回复方式 autoAck为false表示手动返回
*/
channel.basicConsume(queueName, false, consumer);
System.out.println("消费者1,启动。。。");
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String msg = "消费者1收到 :" + new String(delivery.getBody());
System.out.println(msg);
// 告知rabbitMq当前消费的是哪一个消息
/**
* multiple false表示不扩展
*/
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
// 消费者2
@Test
public void consumer2() throws Exception {
// 创建通道
Channel channel = connection.createChannel();
// 定义对列
String queueName = "routingCounsumer2";
channel.queueDeclare(queueName, false, false, false, null);
// 订阅E1交换机的消息
String exchangeName = "RountingExchange";
// 定义交换机模式
channel.exchangeDeclare(exchangeName, "direct");
// 将对列与交换机绑定
channel.queueBind(queueName, exchangeName, "error");
channel.queueBind(queueName, exchangeName, "info");
channel.queueBind(queueName, exchangeName, "warning");
// 只允许一次执行一个消息
channel.basicQos(1);
// 定义消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 将消费者与队列进行绑定
/**
* 定义回复方式 autoAck为false表示手动返回
*/
channel.basicConsume(queueName, false, consumer);
System.out.println("消费者2,启动。。。");
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String msg = "消费者2收到 :" + new String(delivery.getBody());
System.out.println(msg);
// 告知rabbitMq当前消费的是哪一个消息
/**
* multiple false表示不扩展
*/
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
消费者1,启动。。。
消费者2,启动。。。
生产者启动。。。
消息发送成功!!!
消费者1收到 :语文教师出去开会,取消今天的语文课!
消费者2收到 :数学课正常上课!
消费者2收到 :语文教师出去开会,取消今天的语文课!
消费者2收到 :多读书,少游戏!
网友评论