美文网首页
4.发送和接受消息

4.发送和接受消息

作者: JiangCheng97 | 来源:发表于2019-08-09 16:11 被阅读0次

文章参考:Rabbit实战指南

发送消息

​ 如果要发送一个消息,可以使用Channel类的basicPublish方法,比如发送一条内容为“Hello World”的消息,参考如下:

byte[] messageBodyBytes = "Hello,world!".getBytes();
channel.basicPublish(exchangeName,routingKey,null,messageBodyBytes);

为了更好地可控制发送,可以使用mandatory这个参数,或者可以发送一些特定属性的信息:

channel.basicPublish(exchangeName,routingKey,mandatory,
                     MessageProperties.PERSISTENT_TEXT_PLAIN,
                     messageBodyBytes);

下面这行代码发送了一条消息,这条消息的投递模式(delivery mode)设置为2,即消息会被持久化(存入磁盘中)在服务器中。同时这条消息的优先级(priority)设置为1,content-type为“text/plain”。可以自己设定消息的属性:

channel.basicPublish(exchangeName,routingKey,
                     new AMQP.BasicProperties.Builder()
                     .contentType("text/plain")
                     .deliveryMode(2)
                     .priority(1)
                     .userId("hidden")
                     .build(),
                     messageBodyBytes
                    );

也可以发送一条带有headers的消息:

Map<String,Object> headers = new HashMap<>();
headers.put("localtion","here");
headers.put("time","today");
channel.basicPublish(exchangeName,routingKey,
                    new AMQP.BasicProperties.Builder()
                    .headers(headers)
                     .build(),
                     messageBodyBytes
                    );

也可以发送一条带有过期时间(expiration)的消息

channel.basicPublish(exchangeName,routingKey,
                     new AMQP.BasicProperties.Builder()
                     .expiration("6000")
                     .build(),
                     messageBodyBytes
                    );

basicPublish的重载方法:

void basicPublish(String exchange,String routingKey,
                  BasicProperties props,
                  byte[] body) throws IOException;

void basicPublish(String exchange,String routingKey,
                  boolean mandatory,
                  BasicProperties props,
                  byte[] body) throws IOException;

void basicPublish(String exchange,String routingKey,
                  boolean mandatory,boolean imediate,
                  BasicProperties props,
                  byte[] body) throws IOException;

具体参数解释如下:

  • exchange:交换器的名称,指明消息需要发送到哪个交换器中。如果设置为空字符串,则消息会发送到RabbitMQ默认的交换器中。
  • routingKey:路由键,交换器根据路由键将消息存储到相应的队列。
  • props:消息的基本属性集,其包含14个属性成员,分别有contentType、contentEncoding、header(Map<Stirng,Object>)、deliveryMode、priority。correlationId、replyTo、expiration、messageId、timestamp、type、urserId、appId、clusterId。
  • byte[] body:消息体(payload),真正需要发送的消息。
  • mandatory和immediate

消费消息

RabbitMQ的消费模式分为两种:推(Push)模式和拉(Pull)模式。推模式采用Basic.Consume进行消费,而拉模式则调用Basic.Get进行消费。

推模式

在推模式中,可以通过持续订阅的方式来消费信息,使用到的相关类有:

import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DeafultConsumer;

接收消息一般通过实现Consumer接口或者继承DefaultConsumer类来实现。当调用与Consumer类来实现。当调用与Consumer相关的API方法,不同的订阅采用不同的消费者标签来区分彼此,在同一个Channel中的消费者也需要通过唯一的消费者标签以作区别。代码如下:

boolean autoAck = false;
channel.basicQos(64);
channel.basicConsume(queueName,autoAck,"myConsumerTag",
        new DefaultConsumer(channel){
            @Override
            public void handleDevlivery(String consumerTag,
                                       Envelop envelope,
                                  AMQP.BasicProperties properties,
                                        byte[] bode
                                       )throws IOException
            {
                String routingKey = envelope.getRoutingKey();
                String contentType = properties.getContentType();
                long deliveryTag = envelope.getDeliveryTag();
                //(process the message components here ...)
                channel.basicAck(deliveryTag,false);
            }
                    }
                    )

上面代码显式地设置autoAck为false,然后在接受到消息之后进行显式ack操作(channel.basicAck),对应消费者来说这个设置是非常必要的,可以防止消息不必要的丢失。

​ Channel类中basicConsume方法有如下几种形式:

  1. String basicConsume(String queue,Consumer callback) throws IOException

  2. String basicConsume(String queue,boolean autoAck,Consumer callback) throws IOException

  3. String basicConsume(String queue,boolean autoAck,Map<String,Object> arguments,Consumer callback) throws IOException

  4. String basicConsume(String queue,boolean autoAck,String consumerTag,Consumer callback) throws IOException

  5. String basicConsume(String queue,boolean autoAck,String consumerTag,boolean noLocal,boolean exclusive,Map<String,Object> arguments,Consumer callback) throws IOException

    对应参数说明:

    • queue:队列的名称
    • autoAck:设置是否自动确认。建议设置为false
    • consumerTag:消费者标签,用来区分多个消费者;
    • noLocal:设置为true则表示不能将同一个Connection中生产者发送的消息传送给这个Connection中的消费者
    • exclusive:设置是否排他;
    • arguments:设置消费者的其他参数
    • callback:设置消费者的回调函数。用来处理RabbitMQ推送过来的消息,比如DefaultConsumer,使用时需要客户端重写其中的方法。

    对于消费者客户端来说,重写handleDelivery方法是十分方便的。更复杂的消费者客户端会重写更多的方法,具体如下:

    void handleConsumeOk(String consumerTag);
    void handleCancelOk(String consumerTag);
    void handleCancel(String consumerTag) throws IOException;
    void handleShutdownSignal(String consumerTag,ShutdowSignalException sig);
    void handleRecoverOk(String consumerTag);
    

    比如handleShutdownSignal方法,当Channel或者Connection关闭的时候回调用。再者,handleConsumeOk方法会在其他方法之前调用,返回消费者标签。

    重写handleCancelOk和handleCancel方法,这样消费端可以再显示地或者隐式地取消订阅的时候调用。也可以通过channel.basicCancel方法来显式地取消一个消费者的订阅:

    channel.basicCancel(consumerTag)

    注意上面这行代码会首先触发handleConsumerOk方法,之后触发handleDelivery方法,最后才触发handleCanceOk方法。

 和生产者一样,消费者客户端同样需要考虑线程安全的问题。消费者客户端的这些callback会被分配到与Channel不同的线程池上,这意味着消费者客户端可以安全地调用这些阻塞方法,如channe.queueDeclare、channel.basicCancel等。

 每个Channel都拥有自己独立的线程。最常用的做法是一个Channel对应一个消费者。意味着消费者彼此之间没有任何关联。也可以在一个Channel中维持多个消费者,但是要注意一个问题,如果Channel中的一个消费者一直在运行,那么其他消费者的callback会被“耽搁”。

拉模式

通过channel.basicGet方法可以单条的获取消息,其返回值是GetResponse。Channel类的basicGet方法没有其他重载方法,只有:

 GetResponse basicGet(String queue,boolean autoAck) throws IOException;

其中queue代表队列名称,如果设置autoAck为false,那么同样需要调用channel.basicAck来确认消息被成功接收。

拉模式的关键代码如下所示:

GetResponse response = channel.basicGet(QUEUE_NAME,false);
System.out.println(new String(response.getBody()));
channel.basicAck(response.getEnvelope().getDeliveryTag(),false);
注意:

Basic.Consume将信道(Channel)置为接收模式,知道取消队列的订阅为止。在接收模式期间,RabbitMQ会不断地推送消息给消费者,当然推送消息的个数还是会受到Basic.Qos的限制。

如果只想从队列获取单挑消息而不是持续订阅,建议使用Basic.Get进行消费。但不能将Basic.Get放在一个循环里来代替Basic.Consume,这样做会严重影响RabbitMQ的性能。消费者理应使用Basic.Consume方法。

相关文章

  • 4.发送和接受消息

    文章参考:Rabbit实战指南 发送消息 ​ 如果要发送一个消息,可以使用Channel类的basicPubl...

  • 极光IM常用方法(日常记录)

    前提条件 集成极光IM 一、发送消息 发送消息必须先登录 注册账号 登录 发送消息 二、接受消息 在当前接受消息...

  • EventBus

    EventBus 用法: 在接受消息的类注册EventBus, 并添加接受消息的注解方法, 然后在发送消息的类发送...

  • Handler源码浅析(一)

    Handler 和他的小伙伴 Handler(消息发送者和接受者) Looper(消息循环) Message(消息...

  • Handler源码浅析(二)

    Looper 大管家 Handler(消息发送者和接受者) Looper(消息循环) Message(消息) Me...

  • Swoole 源码分析——内存模块之swBuffer

    前言 swoole 中数据的接受与发送(例如 reactor 线程接受客户端消息、发送给客户端的消息、接受到的来自...

  • Kotlin中Channel基本使用

    Channel的源代码如下: Channel的父类有发送消息的SendChannel和接受消息的ReceiveCh...

  • RabbitMQ-2使用

    RabbitMQ-2 简介 RabbitMQ:接受消息再传递消息,可以视为一个"邮局"。发送者和接受者通过队列来进...

  • RabbitMQ学习2--开发

    话不多说,直接上代码,这边主要是演示了消息队列中的发送和接受。 发送方 接收方

  • CLientSocket

    接受消息解析消息发送消息 创建ClientSocketimport SwiftSocket 初始化设置 设置协议 ...

网友评论

      本文标题:4.发送和接受消息

      本文链接:https://www.haomeiwen.com/subject/gsphjctx.html