使用queueBind
package com.ghg.mq01.producer;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
public class RabbitProducer {
private static final String EXCHANGE_NAME = "exchange_demo";
private static final String ROUTING_KEY = "routingkey_demo";
private static final String QUEUR_NAME = "queue_demo";
//rabbitmq的服务地址
private static final String IP_ADDRESS = "10.18.200.199";
//RabbitMq 服务端 默认端口号为5672
private static final int PORT = 5672;
//用户名
private static final String USER_NAME = "root";
//密码
private static final String PASSWORD = "root";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
/**
* 设置ip
* port
* 用户名
* 密码
*/
connectionFactory.setHost(IP_ADDRESS);
connectionFactory.setPort(PORT);
connectionFactory.setUsername(USER_NAME);
connectionFactory.setPassword(PASSWORD);
/**
* 创建连接
*/
Connection connection = connectionFactory.newConnection();
/**
* 创建信道
*/
Channel channel = connection.createChannel();
/**
* 创建一个type=direct 持久化的 非自动删除的交换器
*/
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null);
/**
* 创建一个持久化 百排他的 非自动删除的队列
*/
channel.queueDeclare(QUEUR_NAME, true, false, false, null);
/**
* 将交换器与队列通过路由键绑定
*/
channel.queueBind(QUEUR_NAME, EXCHANGE_NAME, ROUTING_KEY);
/**
* 发送一条持 久化消息
*/
String message="Hello RabbitMq!";
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN , message.getBytes());
/**
* 关闭资源
*/
channel.close();
connection.close();
}
}
queueBind方法
Queue.BindOk queueBind(String queue, String exchange, String routingKey) throws IOException;
Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;
void queueBindNoWait(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;
- queue 队列名称
- exchange 交换器名称
- routingKey 路由key
- arguments 其它的一些参数
queueUnbind
Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey) throws IOException;
Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;
将队列与交换器解绑
- queue 队列名称
- exchange 交换器名称
- routingKey 路由key
- arguments 其它的一些参数
exchangeBind
通过,信道 将交换器与交换器绑定,
channel.exchangeBind(destination, source, routingKey);
channel.exchangeBind(destination, source, routingKey, arguments);
channel.exchangeBindNoWait(destination, source, routingKey, arguments);
- 定义
Exchange.BindOk exchangeBind(String destination, String source, String routingKey) throws IOException;
Exchange.BindOk exchangeBind(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;
void exchangeBindNoWait(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;
生产者发送消息到source交换器中,source根据路由键找到与其匹配的另一个交换器destination,并把消息转发到destination中,时而存储在destination绑定的队列queue中,
package com.ghg.mq01.producer;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Test1 {
private static final String EXCHANGE_NAME = "exchange_demo";
private static final String ROUTING_KEY = "routingkey_demo";
private static final String QUEUR_NAME = "queue_demo";
//rabbitmq的服务地址
private static final String IP_ADDRESS = "10.18.200.199";
//RabbitMq 服务端 默认端口号为5672
private static final int PORT = 5672;
//用户名
private static final String USER_NAME = "root";
//密码
private static final String PASSWORD = "root";
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(IP_ADDRESS);
connectionFactory.setUsername(USER_NAME);
connectionFactory.setPassword(PASSWORD);
connectionFactory.setPort(PORT);
//2.创建连接
Connection connection = connectionFactory.newConnection();
//3.创建信道
Channel channel = connection.createChannel();
//4.创建交换器
channel.exchangeDeclare("source", "direct", true, false, null);
channel.exchangeDeclare("destination", "fanout", true, false, null);
//5. 绑定交换器
channel.exchangeBind("destination", "source", "exKey");
//6.创建队列
channel.queueDeclare("queue", true, false, false, null);
//6.发送消息
channel.basicPublish("source", "exKey", null, "exToExDemo".getBytes());
//7.关闭连接
channel.close();
connection.close();
}
}
image.png
网友评论