美文网首页RabbitMQ工作生活亚武学习
Rabbitmq打怪升级之路(十三)Rpc-远程调用模式

Rabbitmq打怪升级之路(十三)Rpc-远程调用模式

作者: 亚武de小文 | 来源:发表于2019-07-02 00:02 被阅读0次

简书:亚武de小文 【原创:转载请注明出处】

远程调用模式(RPC)

LengToo上学.png
RabbitMQ有以下几种工作模式 :
  • Work queues
  • Publish/Subscribe
  • Routing
  • Topic
  • Headers
  • RPC

RPC

模型图
[亚武de小文]Rpc模型图.png
  • 名词解释:
    Client:RPC客户端
    Server:RPC服务端
    reply_to: 设置并指定回调队列
    correlation_id: 唯一标识,每一个请求都会设置为一个具有唯一性的值,请求发送到rpc_queue队列

  • 流程详解:

  1. 启动客户端(Client)后,创建一个匿名独占的异步回调队列
  2. 客户端消息设置属性:reply_to、correlation_id,发送消息到rpc_queue队列
  3. 服务端(Server)在rpc_queue队列上等待消息。待收到消息进行处理,然后将处理结果封装成消息发送到reply_to指定的队列上,并且此消息携带correlation_id属性
  4. 客户端在reply_to队列上等待消息,当收到消息后,它会检查收到消息的correlation_id。如果值和自己之前发送的一样,则将响应(当前值)返回给程序
参考代码
客户端
  • RpcClient.java
    package com.yawu.xiaowen.rpc;
    
    import com.rabbitmq.client.*;
    import com.yawu.xiaowen.header.Producer;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.IOException;
    import java.util.UUID;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    
    /**
     * Rpc客户端
     *
     * @author yawu
     * @date 2019.07.02
     */
    public class RpcClient {
        private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class);
        private static final String RPC_QUEUE_NAME = "mq_rpc";
    
        public static void execute(String message) {
            try {
                // RabbitMQ建立连接的管理器
                ConnectionFactory factory = new ConnectionFactory();
                // 设置服务器地址
                factory.setHost("127.0.0.1");
                factory.setUsername("guest");
                factory.setPassword("guest");
    
                // 创建一个连接
                Connection connection = factory.newConnection();
                // 创建一个信道
                Channel channel = connection.createChannel()
    
                // 定义临时队列,并返回生成的队列名称
                String replyQueueName = channel.queueDeclare().getQueue();
    
                // 本次请求唯一标志
                String correlation_id = UUID.randomUUID().toString();
                // 生成发送消息的属性
                AMQP.BasicProperties props = new AMQP.BasicProperties
                        .Builder()
                        .correlationId(correlation_id)
                        // 设置指定回调队列
                        .replyTo(replyQueueName)
                        .build();
                // 发送消息,发送到默认交换机
                channel.basicPublish("", RPC_QUEUE_NAME, props, message.getBytes("UTF-8"));
    
                // 阻塞队列,用于存放回调结果
                final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);
                // 定义消息的回退方法
                channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        if (properties.getCorrelationId().equals(correlation_id)) {
                            response.offer(new String(body, "UTF-8"));
                        }
                    }
                });
                // 获取回调的结果
                String result = response.take();
                System.out.println(" [Rpc客户端] 调用结果:'" + result + "'");
    
                LOGGER.info("Rpc客户端消息发送:{}", message);
                channel.close();
                connection.close();
    
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    
服务端
  • RpcServer.java
    package com.yawu.xiaowen.rpc;
    
    import com.rabbitmq.client.*;
    import com.yawu.xiaowen.header.Producer;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.IOException;
    
    /**
     * Rpc服务端
     *
     * @author yawu
     * @date 2019.07.02
     */
    public class RpcServer {
        private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class);
        private static final String RPC_QUEUE_NAME = "mq_rpc";
    
        public static void execute() {
            Connection connection = null;
            try {
                ConnectionFactory factory = new ConnectionFactory();
                factory.setHost("127.0.0.1");
                connection = factory.newConnection();
                Channel channel = connection.createChannel();
    
                // 声明一个rpc_queue队列
                channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
                // 设置该信道同时最多只能获取一个消息
                channel.basicQos(1);
    
                System.out.println(" [RpcServer]等待Rpc请求");
    
                // 定义消息的回调处理类
                Consumer consumer = new DefaultConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        // 生成返回的结果,关键是设置correlationId值
                        AMQP.BasicProperties replyProps = new AMQP.BasicProperties
                                .Builder()
                                .correlationId(properties.getCorrelationId())
                                .build();
                        // 生成返回
                        String response = generateResponse(body);
                        // 回复消息,通知已经收到请求
                        channel.basicPublish("", properties.getReplyTo(), replyProps, response.getBytes("UTF-8"));
                        // 对消息进行应答
                        channel.basicAck(envelope.getDeliveryTag(), false);
                        // 唤醒正在消费的所有的线程
                        synchronized (this) {
                            this.notify();
                        }
                    }
                };
                // 消费消息
                channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
                // 在收到消息前,本线程进入等待状态
                while (true) {
                    synchronized (consumer) {
                        try {
                            consumer.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            } catch (Exception e) {
                LOGGER.error("an exception was occurred , caused by :{}", e.getMessage());
            } finally {
                try {
                    connection.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
    
        }
    
        /**
         * 暂停10s,并返回结果
         *
         * @param body
         * @return
         */
        private static String generateResponse(byte[] body) {
            System.out.println(" [RpcServer]接收到的请求: " + new String(body));
            try {
                Thread.sleep(1000 * 1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "响应结果:" + new String(body) + "-" + System.currentTimeMillis();
        }
    }
    
Rpc测试代码
  • RpcTest.java
    package com.yawu.xiaowen.rpc;
    
    import org.junit.Test;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    /**
     * Rpc测试类
     *
     * @author yawu
     * @date 2019.07.02
     */
    public class RpcTest {
    
        private ExecutorService executorService = Executors.newFixedThreadPool(10);
    
        @Test
        public void rpc() throws InterruptedException {
    
            // Rpc服务端
            executorService.submit(() -> {
                RpcServer.execute();
            });
    
            // Rpc客户端
            executorService.submit(() -> {
                RpcClient.execute("RPC远程调用-发送信息");
            });
    
            // sleep 10s
            Thread.sleep(10 * 1000);
        }
    }
    
  • 运行测试类,结果如下


    Rpc运行结果.png

相关文章

网友评论

    本文标题:Rabbitmq打怪升级之路(十三)Rpc-远程调用模式

    本文链接:https://www.haomeiwen.com/subject/calbcctx.html