当需要调用远端计算机的函数并等待结果,这模式通常被称为远程过程调用或RPC。
BasicProperties:
消息属性 这AMQP协议预先确定了消息中的14个属性。常用的有:
-
deliveryMode
将一个消息标记为持久化(值为2)或者瞬态的(其他值)。之前发送队列消息时的用法:channel.basicPublish("", "task_queue",MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
-
contentType
用来描述媒体类型的编码。例如常常使用的JSON编码:application/json。 -
replyTo
通常来命名回收队列的名字。 -
correlationId
对RPC加速响应请求是很有用的。
相关性ID(Correlation Id):
为每一个RPC请求创建一个回收队列,这个效率十分低下。每一个客户端创建一个单一的回收队列。 用Correlation Id判断队列中的响应是属于哪个请求的。
当在回收队列中接收消息时,依据这个属性值,刻意将每个响应匹配到对应的请求上。如果是未知的Correlation Id值,就丢弃这个消息,因为它不属于任何一个我们的请求。
请求RPC服务:
服务端:
package testrabbitmq;
import com.rabbitmq.client.*;
import com.rabbitmq.client.AMQP.BasicProperties;
/**
* Created by zzhblh on 2016/8/29.
*/
public class RPCServer {
private static final String RPC_QUEUE_NAME = "rpc_queue";
private static int fib(int n) throws Exception {
if (n == 0) return 0;
if (n == 1) return 1;
return fib(n-1) + fib(n-2);
}
public static void main(String[] argv) throws Exception
{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
channel.basicQos(1);//公平分发机制
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
System.out.println(" [x] Awaiting RPC requests");
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
BasicProperties props = delivery.getProperties();
BasicProperties replyProps = new BasicProperties
.Builder()
.correlationId(props.getCorrelationId())
.build();
String message = new String(delivery.getBody());
int n = Integer.parseInt(message);
System.out.println(" [.] fib(" + message + ")");
String response = "" + fib(n);
channel.basicPublish( "", props.getReplyTo(), replyProps, response.getBytes());
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
客户端:
package testrabbitmq;
import com.rabbitmq.client.*;
/**
* Created by zzhblh on 2016/8/29.
*/
public class RPCClient {
private Connection connection;
private Channel channel;
private String requestQueueName = "rpc_queue";
private String replyQueueName;
private QueueingConsumer consumer;
public RPCClient() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
replyQueueName = channel.queueDeclare().getQueue();
consumer = new QueueingConsumer(channel);
channel.basicConsume(replyQueueName, true, consumer);
}
public String call(String message) throws Exception {
String response = null;
String corrId = java.util.UUID.randomUUID().toString();
AMQP.BasicProperties props = new AMQP.BasicProperties
.Builder()
.correlationId(corrId)
.replyTo(replyQueueName)
.build();
channel.basicPublish("", requestQueueName, props, message.getBytes());
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
if (delivery.getProperties().getCorrelationId().equals(corrId)) {
response = new String(delivery.getBody());
break;
}
}
return response;
}
public void close() throws Exception {
connection.close();
}
}
- contentType
用来描述媒体类型的编码。例如常常使用的JSON编码,这是一个好的惯例,设置这个属性为:application/json。 - replyTo
通常来命名回收队列的名字。 - correlationId
对RPC加速响应请求是很有用的。
相关性ID(Correlation Id):
为每一个RPC请求创建一个回收队列,这个效率十分低下。每一个客户端创建一个单一的回收队列。 用Correlation Id判断队列中的响应是属于哪个请求的。
当在回收队列中接收消息时,依据这个属性值,刻意将每个响应匹配到对应的请求上。如果是未知的Correlation Id值,就丢弃这个消息,因为它不属于任何一个我们的请求。
请求RPC服务:
服务端:
package testrabbitmq;
import com.rabbitmq.client.*;
import com.rabbitmq.client.AMQP.BasicProperties;
/**
* Created by zzhblh on 2016/8/29.
*/
public class RPCServer {
private static final String RPC_QUEUE_NAME = "rpc_queue";
private static int fib(int n) throws Exception {
if (n == 0) return 0;
if (n == 1) return 1;
return fib(n-1) + fib(n-2);
}
public static void main(String[] argv) throws Exception
{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
channel.basicQos(1);//公平分发机制
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
System.out.println(" [x] Awaiting RPC requests");
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
BasicProperties props = delivery.getProperties();
BasicProperties replyProps = new BasicProperties
.Builder()
.correlationId(props.getCorrelationId())
.build();
String message = new String(delivery.getBody());
int n = Integer.parseInt(message);
System.out.println(" [.] fib(" + message + ")");
String response = "" + fib(n);
channel.basicPublish( "", props.getReplyTo(), replyProps, response.getBytes());
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
客户端:
package testrabbitmq;
import com.rabbitmq.client.*;
/**
* Created by zzhblh on 2016/8/29.
*/
public class RPCClient {
private Connection connection;
private Channel channel;
private String requestQueueName = "rpc_queue";
private String replyQueueName;
private QueueingConsumer consumer;
public RPCClient() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
replyQueueName = channel.queueDeclare().getQueue();
consumer = new QueueingConsumer(channel);
channel.basicConsume(replyQueueName, true, consumer);
}
public String call(String message) throws Exception {
String response = null;
String corrId = java.util.UUID.randomUUID().toString();
AMQP.BasicProperties props = new AMQP.BasicProperties
.Builder()
.correlationId(corrId)
.replyTo(replyQueueName)
.build();
channel.basicPublish("", requestQueueName, props, message.getBytes());
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
if (delivery.getProperties().getCorrelationId().equals(corrId)) {
response = new String(delivery.getBody());
break;
}
}
return response;
}
public void close() throws Exception {
connection.close();
}
}
TTL
- Queue Message TTL
设置某队列中所有消息的TTL,通过在 queue.declare 中设置 x-message-ttl 参数,可以控制被 publish 到 queue 中的 message 被丢弃前能够存活的时间。值得注意的是,当一个 message 被路由到多个 queue 中时,他们之间不会互相影响。
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 60000);//存活时间最大为 60 秒
channel.queueDeclare("myqueue", false, false, false, args);
- Per-Message TTL
TTL 设置可以具体到每一条 message 本身,只要在通过 basic.publish 命令发送 message 时设置 expiration 字段。expiration 字段必须为字符串类型。
AMQP.BasicProperties properties = new AMQP.BasicProperties();
properties.setExpiration("60000");
channel.basicPublish("myexchange", "routingkey", properties, message.getBytes());
对于第一种设置队列TTL属性的方法,一旦消息过期,就会从队列中抹去。而第二种方法里,即使消息过期,也不会马上从队列中抹去,在过期 message 到达 queue 的头部时被真正的丢弃。
- Queue TTL
queue 被自动删除前可以处于未使用状态的时间。未使用的意思是 queue 上没有任何 consumer ,queue 没有被重新声明,并且在过期时间段内未调用过 basic.get 命令。在服务器重启后,持久化的 queue(本来还未过期的) 的超时时间将重新计算。
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-expires", 1800000);
channel.queueDeclare("myqueue", false, false, false, args);
DLX(死信)
利用DLX,当消息在一个队列中变成死信后,它能被重新publish到另一个Exchange,这个Exchange就是DLX。消息变成死信一向有以下几种情况:
- 消息被拒绝(basic.reject or basic.nack)并且requeue=false
- 消息TTL过期
- 队列达到最大长度
DLX也是一下正常的Exchange同一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性,当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange中去,进而被路由到另一个队列, publish可以监听这个队列中消息做相应的处理,这个特性可以弥补RabbitMQ 3.0.0以前支持的immediate参数中的向publish确认的功能。
channel.exchangeDeclare("some.exchange.name", "direct");
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", "some.exchange.name");
channel.queueDeclare("myqueue", false, false, false, args);
你也可以为这个DLX指定routing key,如果没有特殊指定,则使用原队列的routing key
args.put("x-dead-letter-routing-key", "some-routing-key");
http://memorynotfound.com/produce-consume-rabbitmq-spring-json-message-queue/
http://blog.csdn.net/jiao_fuyou/article/details/22923935
网友评论