美文网首页
RabbitMQ/RPC/TTL/死信队列

RabbitMQ/RPC/TTL/死信队列

作者: 米刀灵 | 来源:发表于2016-08-30 00:18 被阅读2195次

当需要调用远端计算机的函数并等待结果,这模式通常被称为远程过程调用或RPC。

BasicProperties:
消息属性 这AMQP协议预先确定了消息中的14个属性。常用的有:

  • deliveryMode
    将一个消息标记为持久化(值为2)或者瞬态的(其他值)。之前发送队列消息时的用法:

      channel.basicPublish("", "task_queue",MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
    
  • contentType
    用来描述媒体类型的编码。例如常常使用的JSON编码:application/json。

  • replyTo
    通常来命名回收队列的名字。

  • correlationId
    对RPC加速响应请求是很有用的。

相关性ID(Correlation Id):
为每一个RPC请求创建一个回收队列,这个效率十分低下。每一个客户端创建一个单一的回收队列。 用Correlation Id判断队列中的响应是属于哪个请求的。
当在回收队列中接收消息时,依据这个属性值,刻意将每个响应匹配到对应的请求上。如果是未知的Correlation Id值,就丢弃这个消息,因为它不属于任何一个我们的请求。

请求RPC服务:

当客户端启动,它会创建一个匿名的独占的回收队列。 对于一个RPC请求,客户端会发送一个消息到rpc_queue队列中,其中有两个属性:replyTo(回复队列)和correlationId(每一个请求都是唯一值)。
服务端:
package testrabbitmq;
import com.rabbitmq.client.*;
import com.rabbitmq.client.AMQP.BasicProperties;

/**
 * Created by zzhblh on 2016/8/29.
 */
public class RPCServer {

    private static final String RPC_QUEUE_NAME = "rpc_queue";

    private static int fib(int n) throws Exception {
        if (n == 0) return 0;
        if (n == 1) return 1;
        return fib(n-1) + fib(n-2);
    }

    public static void main(String[] argv) throws Exception
    {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
        channel.basicQos(1);//公平分发机制
        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
        System.out.println(" [x] Awaiting RPC requests");
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            BasicProperties props = delivery.getProperties();
            BasicProperties replyProps = new BasicProperties
                    .Builder()
                    .correlationId(props.getCorrelationId())
                    .build();

            String message = new String(delivery.getBody());
            int n = Integer.parseInt(message);
            System.out.println(" [.] fib(" + message + ")");
            String response = "" + fib(n);
            channel.basicPublish( "", props.getReplyTo(), replyProps, response.getBytes());
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

客户端:

package testrabbitmq;
import com.rabbitmq.client.*;
/**
 * Created by zzhblh on 2016/8/29.
 */
public class RPCClient {
    private Connection connection;
    private Channel channel;
    private String requestQueueName = "rpc_queue";
    private String replyQueueName;
    private QueueingConsumer consumer;

    public RPCClient() throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        connection = factory.newConnection();
        channel = connection.createChannel();

        replyQueueName = channel.queueDeclare().getQueue();
        consumer = new QueueingConsumer(channel);
        channel.basicConsume(replyQueueName, true, consumer);
    }

    public String call(String message) throws Exception {
        String response = null;
        String corrId = java.util.UUID.randomUUID().toString();

        AMQP.BasicProperties props = new AMQP.BasicProperties
                .Builder()
                .correlationId(corrId)
                .replyTo(replyQueueName)
                .build();

        channel.basicPublish("", requestQueueName, props, message.getBytes());

        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            if (delivery.getProperties().getCorrelationId().equals(corrId)) {
                response = new String(delivery.getBody());
                break;
            }
        }

        return response;
    }

    public void close() throws Exception {
        connection.close();
    }
}
  • contentType
    用来描述媒体类型的编码。例如常常使用的JSON编码,这是一个好的惯例,设置这个属性为:application/json。
  • replyTo
    通常来命名回收队列的名字。
  • correlationId
    对RPC加速响应请求是很有用的。

相关性ID(Correlation Id):
为每一个RPC请求创建一个回收队列,这个效率十分低下。每一个客户端创建一个单一的回收队列。 用Correlation Id判断队列中的响应是属于哪个请求的。
当在回收队列中接收消息时,依据这个属性值,刻意将每个响应匹配到对应的请求上。如果是未知的Correlation Id值,就丢弃这个消息,因为它不属于任何一个我们的请求。

请求RPC服务:

当客户端启动,它会创建一个匿名的独占的回收队列。 对于一个RPC请求,客户端会发送一个消息到rpc_queue队列中,其中有两个属性:replyTo(回复队列)和correlationId(每一个请求都是唯一值)。
服务端:
package testrabbitmq;
import com.rabbitmq.client.*;
import com.rabbitmq.client.AMQP.BasicProperties;

/**
 * Created by zzhblh on 2016/8/29.
 */
public class RPCServer {

    private static final String RPC_QUEUE_NAME = "rpc_queue";

    private static int fib(int n) throws Exception {
        if (n == 0) return 0;
        if (n == 1) return 1;
        return fib(n-1) + fib(n-2);
    }

    public static void main(String[] argv) throws Exception
    {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
        channel.basicQos(1);//公平分发机制
        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
        System.out.println(" [x] Awaiting RPC requests");
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            BasicProperties props = delivery.getProperties();
            BasicProperties replyProps = new BasicProperties
                    .Builder()
                    .correlationId(props.getCorrelationId())
                    .build();

            String message = new String(delivery.getBody());
            int n = Integer.parseInt(message);
            System.out.println(" [.] fib(" + message + ")");
            String response = "" + fib(n);
            channel.basicPublish( "", props.getReplyTo(), replyProps, response.getBytes());
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

客户端:

package testrabbitmq;
import com.rabbitmq.client.*;
/**
 * Created by zzhblh on 2016/8/29.
 */
public class RPCClient {
    private Connection connection;
    private Channel channel;
    private String requestQueueName = "rpc_queue";
    private String replyQueueName;
    private QueueingConsumer consumer;

    public RPCClient() throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        connection = factory.newConnection();
        channel = connection.createChannel();

        replyQueueName = channel.queueDeclare().getQueue();
        consumer = new QueueingConsumer(channel);
        channel.basicConsume(replyQueueName, true, consumer);
    }

    public String call(String message) throws Exception {
        String response = null;
        String corrId = java.util.UUID.randomUUID().toString();

        AMQP.BasicProperties props = new AMQP.BasicProperties
                .Builder()
                .correlationId(corrId)
                .replyTo(replyQueueName)
                .build();

        channel.basicPublish("", requestQueueName, props, message.getBytes());

        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            if (delivery.getProperties().getCorrelationId().equals(corrId)) {
                response = new String(delivery.getBody());
                break;
            }
        }

        return response;
    }

    public void close() throws Exception {
        connection.close();
    }
}

TTL

  • Queue Message TTL
    设置某队列中所有消息的TTL,通过在 queue.declare 中设置 x-message-ttl 参数,可以控制被 publish 到 queue 中的 message 被丢弃前能够存活的时间。值得注意的是,当一个 message 被路由到多个 queue 中时,他们之间不会互相影响。
Map<String, Object> args = new HashMap<String, Object>();  
args.put("x-message-ttl", 60000);//存活时间最大为 60 秒
channel.queueDeclare("myqueue", false, false, false, args);
  • Per-Message TTL
    TTL 设置可以具体到每一条 message 本身,只要在通过 basic.publish 命令发送 message 时设置 expiration 字段。expiration 字段必须为字符串类型。
AMQP.BasicProperties properties = new AMQP.BasicProperties();  
properties.setExpiration("60000");  
channel.basicPublish("myexchange", "routingkey", properties, message.getBytes());

对于第一种设置队列TTL属性的方法,一旦消息过期,就会从队列中抹去。而第二种方法里,即使消息过期,也不会马上从队列中抹去,在过期 message 到达 queue 的头部时被真正的丢弃。

  • Queue TTL
    queue 被自动删除前可以处于未使用状态的时间。未使用的意思是 queue 上没有任何 consumer ,queue 没有被重新声明,并且在过期时间段内未调用过 basic.get 命令。在服务器重启后,持久化的 queue(本来还未过期的) 的超时时间将重新计算。
Map<String, Object> args = new HashMap<String, Object>();  
args.put("x-expires", 1800000);  
channel.queueDeclare("myqueue", false, false, false, args);  

DLX(死信)
利用DLX,当消息在一个队列中变成死信后,它能被重新publish到另一个Exchange,这个Exchange就是DLX。消息变成死信一向有以下几种情况:

  • 消息被拒绝(basic.reject or basic.nack)并且requeue=false
  • 消息TTL过期
  • 队列达到最大长度

DLX也是一下正常的Exchange同一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性,当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange中去,进而被路由到另一个队列, publish可以监听这个队列中消息做相应的处理,这个特性可以弥补RabbitMQ 3.0.0以前支持的immediate参数中的向publish确认的功能。

channel.exchangeDeclare("some.exchange.name", "direct");  
Map<String, Object> args = new HashMap<String, Object>();  
args.put("x-dead-letter-exchange", "some.exchange.name");  
channel.queueDeclare("myqueue", false, false, false, args); 

你也可以为这个DLX指定routing key,如果没有特殊指定,则使用原队列的routing key

args.put("x-dead-letter-routing-key", "some-routing-key"); 

http://memorynotfound.com/produce-consume-rabbitmq-spring-json-message-queue/
http://blog.csdn.net/jiao_fuyou/article/details/22923935

相关文章

网友评论

      本文标题:RabbitMQ/RPC/TTL/死信队列

      本文链接:https://www.haomeiwen.com/subject/fcdfettx.html