美文网首页RabbitMQ工作生活亚武学习
Rabbitmq打怪升级之路(十三)Rpc-远程调用模式

Rabbitmq打怪升级之路(十三)Rpc-远程调用模式

作者: 亚武de小文 | 来源:发表于2019-07-02 00:02 被阅读0次

    简书:亚武de小文 【原创:转载请注明出处】

    远程调用模式(RPC)

    LengToo上学.png
    RabbitMQ有以下几种工作模式 :
    • Work queues
    • Publish/Subscribe
    • Routing
    • Topic
    • Headers
    • RPC

    RPC

    模型图
    [亚武de小文]Rpc模型图.png
    • 名词解释:
      Client:RPC客户端
      Server:RPC服务端
      reply_to: 设置并指定回调队列
      correlation_id: 唯一标识,每一个请求都会设置为一个具有唯一性的值,请求发送到rpc_queue队列

    • 流程详解:

    1. 启动客户端(Client)后,创建一个匿名独占的异步回调队列
    2. 客户端消息设置属性:reply_to、correlation_id,发送消息到rpc_queue队列
    3. 服务端(Server)在rpc_queue队列上等待消息。待收到消息进行处理,然后将处理结果封装成消息发送到reply_to指定的队列上,并且此消息携带correlation_id属性
    4. 客户端在reply_to队列上等待消息,当收到消息后,它会检查收到消息的correlation_id。如果值和自己之前发送的一样,则将响应(当前值)返回给程序
    参考代码
    客户端
    • RpcClient.java
      package com.yawu.xiaowen.rpc;
      
      import com.rabbitmq.client.*;
      import com.yawu.xiaowen.header.Producer;
      import org.slf4j.Logger;
      import org.slf4j.LoggerFactory;
      
      import java.io.IOException;
      import java.util.UUID;
      import java.util.concurrent.ArrayBlockingQueue;
      import java.util.concurrent.BlockingQueue;
      
      /**
       * Rpc客户端
       *
       * @author yawu
       * @date 2019.07.02
       */
      public class RpcClient {
          private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class);
          private static final String RPC_QUEUE_NAME = "mq_rpc";
      
          public static void execute(String message) {
              try {
                  // RabbitMQ建立连接的管理器
                  ConnectionFactory factory = new ConnectionFactory();
                  // 设置服务器地址
                  factory.setHost("127.0.0.1");
                  factory.setUsername("guest");
                  factory.setPassword("guest");
      
                  // 创建一个连接
                  Connection connection = factory.newConnection();
                  // 创建一个信道
                  Channel channel = connection.createChannel()
      
                  // 定义临时队列,并返回生成的队列名称
                  String replyQueueName = channel.queueDeclare().getQueue();
      
                  // 本次请求唯一标志
                  String correlation_id = UUID.randomUUID().toString();
                  // 生成发送消息的属性
                  AMQP.BasicProperties props = new AMQP.BasicProperties
                          .Builder()
                          .correlationId(correlation_id)
                          // 设置指定回调队列
                          .replyTo(replyQueueName)
                          .build();
                  // 发送消息,发送到默认交换机
                  channel.basicPublish("", RPC_QUEUE_NAME, props, message.getBytes("UTF-8"));
      
                  // 阻塞队列,用于存放回调结果
                  final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);
                  // 定义消息的回退方法
                  channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {
                      @Override
                      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                          if (properties.getCorrelationId().equals(correlation_id)) {
                              response.offer(new String(body, "UTF-8"));
                          }
                      }
                  });
                  // 获取回调的结果
                  String result = response.take();
                  System.out.println(" [Rpc客户端] 调用结果:'" + result + "'");
      
                  LOGGER.info("Rpc客户端消息发送:{}", message);
                  channel.close();
                  connection.close();
      
              } catch (Exception e) {
                  e.printStackTrace();
              }
          }
      }
      
    服务端
    • RpcServer.java
      package com.yawu.xiaowen.rpc;
      
      import com.rabbitmq.client.*;
      import com.yawu.xiaowen.header.Producer;
      import org.slf4j.Logger;
      import org.slf4j.LoggerFactory;
      
      import java.io.IOException;
      
      /**
       * Rpc服务端
       *
       * @author yawu
       * @date 2019.07.02
       */
      public class RpcServer {
          private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class);
          private static final String RPC_QUEUE_NAME = "mq_rpc";
      
          public static void execute() {
              Connection connection = null;
              try {
                  ConnectionFactory factory = new ConnectionFactory();
                  factory.setHost("127.0.0.1");
                  connection = factory.newConnection();
                  Channel channel = connection.createChannel();
      
                  // 声明一个rpc_queue队列
                  channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
                  // 设置该信道同时最多只能获取一个消息
                  channel.basicQos(1);
      
                  System.out.println(" [RpcServer]等待Rpc请求");
      
                  // 定义消息的回调处理类
                  Consumer consumer = new DefaultConsumer(channel) {
                      @Override
                      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                          // 生成返回的结果,关键是设置correlationId值
                          AMQP.BasicProperties replyProps = new AMQP.BasicProperties
                                  .Builder()
                                  .correlationId(properties.getCorrelationId())
                                  .build();
                          // 生成返回
                          String response = generateResponse(body);
                          // 回复消息,通知已经收到请求
                          channel.basicPublish("", properties.getReplyTo(), replyProps, response.getBytes("UTF-8"));
                          // 对消息进行应答
                          channel.basicAck(envelope.getDeliveryTag(), false);
                          // 唤醒正在消费的所有的线程
                          synchronized (this) {
                              this.notify();
                          }
                      }
                  };
                  // 消费消息
                  channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
                  // 在收到消息前,本线程进入等待状态
                  while (true) {
                      synchronized (consumer) {
                          try {
                              consumer.wait();
                          } catch (InterruptedException e) {
                              e.printStackTrace();
                          }
                      }
                  }
              } catch (Exception e) {
                  LOGGER.error("an exception was occurred , caused by :{}", e.getMessage());
              } finally {
                  try {
                      connection.close();
                  } catch (Exception e) {
                      e.printStackTrace();
                  }
              }
      
          }
      
          /**
           * 暂停10s,并返回结果
           *
           * @param body
           * @return
           */
          private static String generateResponse(byte[] body) {
              System.out.println(" [RpcServer]接收到的请求: " + new String(body));
              try {
                  Thread.sleep(1000 * 1);
              } catch (InterruptedException e) {
                  e.printStackTrace();
              }
              return "响应结果:" + new String(body) + "-" + System.currentTimeMillis();
          }
      }
      
    Rpc测试代码
    • RpcTest.java
      package com.yawu.xiaowen.rpc;
      
      import org.junit.Test;
      
      import java.util.concurrent.ExecutorService;
      import java.util.concurrent.Executors;
      
      /**
       * Rpc测试类
       *
       * @author yawu
       * @date 2019.07.02
       */
      public class RpcTest {
      
          private ExecutorService executorService = Executors.newFixedThreadPool(10);
      
          @Test
          public void rpc() throws InterruptedException {
      
              // Rpc服务端
              executorService.submit(() -> {
                  RpcServer.execute();
              });
      
              // Rpc客户端
              executorService.submit(() -> {
                  RpcClient.execute("RPC远程调用-发送信息");
              });
      
              // sleep 10s
              Thread.sleep(10 * 1000);
          }
      }
      
    • 运行测试类,结果如下


      Rpc运行结果.png

    相关文章

      网友评论

        本文标题:Rabbitmq打怪升级之路(十三)Rpc-远程调用模式

        本文链接:https://www.haomeiwen.com/subject/calbcctx.html