1 简介
Direct模式是rabbitmq的默认模式,机制大致可以理解为下图:
image可以这样理解,交换机是一个车站(exChange),每个人是一个消息体(message),携带一张车票(key),在站台乘车(queue);人要乘车,于是根据票的不同,被车站分发到不同的站台,乘车去不同的地方。
即生产者发送一个指定key的消息,交换机将之转发给所有绑定此key的队列,消费者提取。
知道了流程,就考虑实现,我们订一个目标,写一个支持单发,群发的简单通讯。
show code.
2 生产者
首先创建factory
if (factory == null) {
factory = new ConnectionFactory();
factory.setHost(HOST); //主机地址
factory.setPort(PART); // 端口号
factory.setUsername(USERNAME); //用户名
factory.setPassword(PASSWORD); //密码
factory.setVirtualHost(VIRTUALHOST);
}
创建connection ,channel,声明一个交换机
try {
//创建连接
if (connection == null || !connection.isOpen())
connection = factory.newConnection();
//创建通道
if (channel== null || !sendChannel.isOpen())
channel= connection.createChannel();
//声明交换机
//EXCHANGE_NAME = "exchange-name"
//EXCHANGE_TYPE = "direct"
channel.exchangeDeclare(EXCHANGE_NAME,
EXCHANGE_TYPE,
true);
Log.i(TAG, "连接成功");
} catch (Exception e) {
e.printStackTrace();
Log.i(TAG, "连接失败" + e);
}
然后发送消息(发送消息只需要建立连接,将指定key的消息发给交换机即可)
try {
//EXCHANGE_NAME = "exchange-name"
//KEY = "key-group"
//MESSAGE = "messageBody"
sendChannel.basicPublish(EXCHANGE_NAME,
KEY,
null,
MESSAGE.getBytes());
Log.i(TAG, "发送成功");
} catch (Exception e) {
e.printStackTrace();
Log.e(TAG, "发送失败 " + e);
}
3 消费者
同样的,创建factory
if (factory == null) {
factory = new ConnectionFactory();
factory.setHost(HOST); //主机地址
factory.setPort(PART); // 端口号
factory.setUsername(USERNAME); //用户名
factory.setPassword(PASSWORD); //密码
factory.setVirtualHost(VIRTUALHOST);
}
创建connection ,channel,声明交换机
这里更重要的是声明队列
try {
//创建连接
if (connection == null || !connection.isOpen())
connection = factory.newConnection();
//创建通道
if (receiveChannel == null || !receiveChannel.isOpen())
receiveChannel = connection.createChannel();
//声明交换机
//EXCHANGE_NAME = "exchange-name"
//EXCHANGE_TYPE = "direct"
channel.exchangeDeclare(EXCHANGE_NAME,
EXCHANGE_TYPE,
true);
//声明队列
//QUEUE_NAME = "queue-name"
receiveChannel.queueDeclare(QUEUE_NAME,
true, false, false, null);
//队列绑定交换机(单个key)
//SINGLE_KEY = "key-single"
receiveChannel.queueBind(QUEUE_NAME,
EXCHANGE_NAME,
SINGLE_KEY);
//队列绑定交换机(集群key)
//SINGLE_KEY = "key-group"
receiveChannel.queueBind(QUEUE_NAME,
EXCHANGE_NAME,
GROUP_KEY);
//接收消息
receiveChannel.basicConsume(QUEUE_NAME,
true,
new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
super.handleDelivery(consumerTag, envelope, properties, body);
String message = new String(body);
Log.i(TAG, "消息内容 " + message);
}
});
Log.i(TAG, "连接成功");
} catch (Exception e) {
e.printStackTrace();
Log.i(TAG, "连接失败" + e);
}
网友评论