美文网首页
RabbitMQ/RPC/TTL/死信队列

RabbitMQ/RPC/TTL/死信队列

作者: 米刀灵 | 来源:发表于2016-08-30 00:18 被阅读2195次

    当需要调用远端计算机的函数并等待结果,这模式通常被称为远程过程调用或RPC。

    BasicProperties:
    消息属性 这AMQP协议预先确定了消息中的14个属性。常用的有:

    • deliveryMode
      将一个消息标记为持久化(值为2)或者瞬态的(其他值)。之前发送队列消息时的用法:

        channel.basicPublish("", "task_queue",MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
      
    • contentType
      用来描述媒体类型的编码。例如常常使用的JSON编码:application/json。

    • replyTo
      通常来命名回收队列的名字。

    • correlationId
      对RPC加速响应请求是很有用的。

    相关性ID(Correlation Id):
    为每一个RPC请求创建一个回收队列,这个效率十分低下。每一个客户端创建一个单一的回收队列。 用Correlation Id判断队列中的响应是属于哪个请求的。
    当在回收队列中接收消息时,依据这个属性值,刻意将每个响应匹配到对应的请求上。如果是未知的Correlation Id值,就丢弃这个消息,因为它不属于任何一个我们的请求。

    请求RPC服务:

    当客户端启动,它会创建一个匿名的独占的回收队列。 对于一个RPC请求,客户端会发送一个消息到rpc_queue队列中,其中有两个属性:replyTo(回复队列)和correlationId(每一个请求都是唯一值)。
    服务端:
    package testrabbitmq;
    import com.rabbitmq.client.*;
    import com.rabbitmq.client.AMQP.BasicProperties;
    
    /**
     * Created by zzhblh on 2016/8/29.
     */
    public class RPCServer {
    
        private static final String RPC_QUEUE_NAME = "rpc_queue";
    
        private static int fib(int n) throws Exception {
            if (n == 0) return 0;
            if (n == 1) return 1;
            return fib(n-1) + fib(n-2);
        }
    
        public static void main(String[] argv) throws Exception
        {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
            channel.basicQos(1);//公平分发机制
            QueueingConsumer consumer = new QueueingConsumer(channel);
            channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
            System.out.println(" [x] Awaiting RPC requests");
            while (true) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                BasicProperties props = delivery.getProperties();
                BasicProperties replyProps = new BasicProperties
                        .Builder()
                        .correlationId(props.getCorrelationId())
                        .build();
    
                String message = new String(delivery.getBody());
                int n = Integer.parseInt(message);
                System.out.println(" [.] fib(" + message + ")");
                String response = "" + fib(n);
                channel.basicPublish( "", props.getReplyTo(), replyProps, response.getBytes());
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        }
    }
    

    客户端:

    package testrabbitmq;
    import com.rabbitmq.client.*;
    /**
     * Created by zzhblh on 2016/8/29.
     */
    public class RPCClient {
        private Connection connection;
        private Channel channel;
        private String requestQueueName = "rpc_queue";
        private String replyQueueName;
        private QueueingConsumer consumer;
    
        public RPCClient() throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            connection = factory.newConnection();
            channel = connection.createChannel();
    
            replyQueueName = channel.queueDeclare().getQueue();
            consumer = new QueueingConsumer(channel);
            channel.basicConsume(replyQueueName, true, consumer);
        }
    
        public String call(String message) throws Exception {
            String response = null;
            String corrId = java.util.UUID.randomUUID().toString();
    
            AMQP.BasicProperties props = new AMQP.BasicProperties
                    .Builder()
                    .correlationId(corrId)
                    .replyTo(replyQueueName)
                    .build();
    
            channel.basicPublish("", requestQueueName, props, message.getBytes());
    
            while (true) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                if (delivery.getProperties().getCorrelationId().equals(corrId)) {
                    response = new String(delivery.getBody());
                    break;
                }
            }
    
            return response;
        }
    
        public void close() throws Exception {
            connection.close();
        }
    }
    
    • contentType
      用来描述媒体类型的编码。例如常常使用的JSON编码,这是一个好的惯例,设置这个属性为:application/json。
    • replyTo
      通常来命名回收队列的名字。
    • correlationId
      对RPC加速响应请求是很有用的。

    相关性ID(Correlation Id):
    为每一个RPC请求创建一个回收队列,这个效率十分低下。每一个客户端创建一个单一的回收队列。 用Correlation Id判断队列中的响应是属于哪个请求的。
    当在回收队列中接收消息时,依据这个属性值,刻意将每个响应匹配到对应的请求上。如果是未知的Correlation Id值,就丢弃这个消息,因为它不属于任何一个我们的请求。

    请求RPC服务:

    当客户端启动,它会创建一个匿名的独占的回收队列。 对于一个RPC请求,客户端会发送一个消息到rpc_queue队列中,其中有两个属性:replyTo(回复队列)和correlationId(每一个请求都是唯一值)。
    服务端:
    package testrabbitmq;
    import com.rabbitmq.client.*;
    import com.rabbitmq.client.AMQP.BasicProperties;
    
    /**
     * Created by zzhblh on 2016/8/29.
     */
    public class RPCServer {
    
        private static final String RPC_QUEUE_NAME = "rpc_queue";
    
        private static int fib(int n) throws Exception {
            if (n == 0) return 0;
            if (n == 1) return 1;
            return fib(n-1) + fib(n-2);
        }
    
        public static void main(String[] argv) throws Exception
        {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
            channel.basicQos(1);//公平分发机制
            QueueingConsumer consumer = new QueueingConsumer(channel);
            channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
            System.out.println(" [x] Awaiting RPC requests");
            while (true) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                BasicProperties props = delivery.getProperties();
                BasicProperties replyProps = new BasicProperties
                        .Builder()
                        .correlationId(props.getCorrelationId())
                        .build();
    
                String message = new String(delivery.getBody());
                int n = Integer.parseInt(message);
                System.out.println(" [.] fib(" + message + ")");
                String response = "" + fib(n);
                channel.basicPublish( "", props.getReplyTo(), replyProps, response.getBytes());
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        }
    }
    

    客户端:

    package testrabbitmq;
    import com.rabbitmq.client.*;
    /**
     * Created by zzhblh on 2016/8/29.
     */
    public class RPCClient {
        private Connection connection;
        private Channel channel;
        private String requestQueueName = "rpc_queue";
        private String replyQueueName;
        private QueueingConsumer consumer;
    
        public RPCClient() throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            connection = factory.newConnection();
            channel = connection.createChannel();
    
            replyQueueName = channel.queueDeclare().getQueue();
            consumer = new QueueingConsumer(channel);
            channel.basicConsume(replyQueueName, true, consumer);
        }
    
        public String call(String message) throws Exception {
            String response = null;
            String corrId = java.util.UUID.randomUUID().toString();
    
            AMQP.BasicProperties props = new AMQP.BasicProperties
                    .Builder()
                    .correlationId(corrId)
                    .replyTo(replyQueueName)
                    .build();
    
            channel.basicPublish("", requestQueueName, props, message.getBytes());
    
            while (true) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                if (delivery.getProperties().getCorrelationId().equals(corrId)) {
                    response = new String(delivery.getBody());
                    break;
                }
            }
    
            return response;
        }
    
        public void close() throws Exception {
            connection.close();
        }
    }
    

    TTL

    • Queue Message TTL
      设置某队列中所有消息的TTL,通过在 queue.declare 中设置 x-message-ttl 参数,可以控制被 publish 到 queue 中的 message 被丢弃前能够存活的时间。值得注意的是,当一个 message 被路由到多个 queue 中时,他们之间不会互相影响。
    Map<String, Object> args = new HashMap<String, Object>();  
    args.put("x-message-ttl", 60000);//存活时间最大为 60 秒
    channel.queueDeclare("myqueue", false, false, false, args);
    
    • Per-Message TTL
      TTL 设置可以具体到每一条 message 本身,只要在通过 basic.publish 命令发送 message 时设置 expiration 字段。expiration 字段必须为字符串类型。
    AMQP.BasicProperties properties = new AMQP.BasicProperties();  
    properties.setExpiration("60000");  
    channel.basicPublish("myexchange", "routingkey", properties, message.getBytes());
    

    对于第一种设置队列TTL属性的方法,一旦消息过期,就会从队列中抹去。而第二种方法里,即使消息过期,也不会马上从队列中抹去,在过期 message 到达 queue 的头部时被真正的丢弃。

    • Queue TTL
      queue 被自动删除前可以处于未使用状态的时间。未使用的意思是 queue 上没有任何 consumer ,queue 没有被重新声明,并且在过期时间段内未调用过 basic.get 命令。在服务器重启后,持久化的 queue(本来还未过期的) 的超时时间将重新计算。
    Map<String, Object> args = new HashMap<String, Object>();  
    args.put("x-expires", 1800000);  
    channel.queueDeclare("myqueue", false, false, false, args);  
    

    DLX(死信)
    利用DLX,当消息在一个队列中变成死信后,它能被重新publish到另一个Exchange,这个Exchange就是DLX。消息变成死信一向有以下几种情况:

    • 消息被拒绝(basic.reject or basic.nack)并且requeue=false
    • 消息TTL过期
    • 队列达到最大长度

    DLX也是一下正常的Exchange同一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性,当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange中去,进而被路由到另一个队列, publish可以监听这个队列中消息做相应的处理,这个特性可以弥补RabbitMQ 3.0.0以前支持的immediate参数中的向publish确认的功能。

    channel.exchangeDeclare("some.exchange.name", "direct");  
    Map<String, Object> args = new HashMap<String, Object>();  
    args.put("x-dead-letter-exchange", "some.exchange.name");  
    channel.queueDeclare("myqueue", false, false, false, args); 
    

    你也可以为这个DLX指定routing key,如果没有特殊指定,则使用原队列的routing key

    args.put("x-dead-letter-routing-key", "some-routing-key"); 
    

    http://memorynotfound.com/produce-consume-rabbitmq-spring-json-message-queue/
    http://blog.csdn.net/jiao_fuyou/article/details/22923935

    相关文章

      网友评论

          本文标题:RabbitMQ/RPC/TTL/死信队列

          本文链接:https://www.haomeiwen.com/subject/fcdfettx.html