RabbitMQ实现简易RPC调用

作者: 蓝汀华韶 | 来源:发表于2017-07-21 17:55 被阅读191次

    完整代码链接:https://github.com/shawntime/shawn-test-rabbitmq/tree/master/src/main/java/com/shawntime/test/rabbitmq/rpc

    RPC的处理流程

    • 当客户端启动时,创建一个匿名的回调队列。
    • 客户端为RPC请求设置2个属性:replyTo,设置回调队列名字;correlationId,标记request。
    • 请求被发送到请求队列中。
    • RPC服务器端监听请求队列中的请求,当请求到来时,服务器端会处理并且把带有结果的消息发送给客户端。接收的队列就是replyTo设定的回调队列。
    • 客户端监听回调队列,当有消息时,检查correlationId属性,如果与request中匹配,则返回。
    832799-20161224004437839-1074972304.png

    代码实现

    // 客户端
    package com.shawntime.test.rabbitmq.rpc.rabbit;
    
    import java.io.IOException;
    import java.util.Map;
    import java.util.UUID;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.TimeoutException;
    
    import com.google.common.collect.Maps;
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    import com.shawntime.test.rabbitmq.rpc.IRpcService;
    import com.shawntime.test.rabbitmq.rpc.RpcInvokeModel;
    import org.apache.commons.lang3.SerializationUtils;
    
    /**
     * Created by shma on 2017/5/8.
     */
    public class Client implements IRpcService {
    
        private Channel produceChannel;
    
        private Channel consumeChannel;
    
        private String callBackQueueName;
    
        private final Map<String, BlockingQueue<byte[]>> completionQueueMap;
    
        public Client(ConnectModel connectModel) throws IOException, TimeoutException {
            connect(connectModel);
            this.completionQueueMap = Maps.newConcurrentMap();
        }
    
        public byte[] call(RpcInvokeModel model) throws IOException, InterruptedException, ExecutionException {
            model.setDid(UUID.randomUUID().toString());
            model.setCallBackQueueName(callBackQueueName);
            byte[] body = SerializationUtils.serialize(model);
            BlockingQueue<byte[]> blockingQueue = new LinkedBlockingQueue<byte[]>(1);
            completionQueueMap.put(model.getDid(), blockingQueue);
            AMQP.BasicProperties basicProperties = new AMQP.BasicProperties()
                    .builder()
                    .correlationId(model.getDid())
                    .replyTo(callBackQueueName)
                    .build();
            produceChannel.basicPublish(Constant.REQUEST_EXCHANGE_NAME, Constant.REQUEST_ROUTING_NAME, basicProperties, body);
            return blockingQueue.take();
        }
    
        private void connect(ConnectModel connectModel) throws IOException, TimeoutException {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setVirtualHost(connectModel.getVirtualHost());
            factory.setPort(connectModel.getPort());
            factory.setUsername(connectModel.getUserName());
            factory.setPassword(connectModel.getPassword());
            factory.setHost(connectModel.getHost());
            Connection connection = factory.newConnection();
            produceChannel = connection.createChannel();
            consumeChannel = connection.createChannel();
            produceChannel.queueDeclare(Constant.REQUEST_QUEUE_NAME, true, false, false, null);
            produceChannel.exchangeDeclare(Constant.REQUEST_EXCHANGE_NAME, "direct");
            produceChannel.basicQos(1);
            callBackQueueName = produceChannel.queueDeclare().getQueue();
            consumeChannel.exchangeDeclare(Constant.REPLY_EXCHANGE_NAME, "direct");
            consumeChannel.queueBind(callBackQueueName, Constant.REPLY_EXCHANGE_NAME, callBackQueueName);
            consumeChannel.basicQos(1);
            consumeChannel.basicConsume(callBackQueueName, true, new DefaultConsumer(consumeChannel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, final byte[] body) throws IOException {
                    BlockingQueue<byte[]> blockingQueue = completionQueueMap.get(properties.getCorrelationId());
                    blockingQueue.add(body);
                }
            });
        }
    }
    
    // 服务端
    package com.shawntime.test.rabbitmq.rpc.rabbit;
    
    import java.io.IOException;
    import java.lang.reflect.InvocationTargetException;
    import java.lang.reflect.Method;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.TimeoutException;
    
    import com.alibaba.fastjson.JSON;
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    import com.shawntime.test.rabbitmq.rpc.JsonHelper;
    import com.shawntime.test.rabbitmq.rpc.RpcInvokeModel;
    import com.shawntime.test.rabbitmq.rpc.operator.bean.User;
    import org.apache.commons.lang3.SerializationUtils;
    
    /**
     * Created by shma on 2017/5/8.
     */
    public class Service {
    
        private Channel produceChannel;
    
        private Channel consumeChannel;
    
        private ConnectModel connectModel;
    
        public Service(ConnectModel connectModel) throws IOException, TimeoutException {
            this.connectModel = connectModel;
        }
    
        public void start() throws IOException, TimeoutException {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setVirtualHost(connectModel.getVirtualHost());
            factory.setPort(connectModel.getPort());
            factory.setUsername(connectModel.getUserName());
            factory.setPassword(connectModel.getPassword());
            factory.setHost(connectModel.getHost());
            Connection connection = factory.newConnection();
            produceChannel = connection.createChannel();
            produceChannel.exchangeDeclare(Constant.REPLY_EXCHANGE_NAME, "direct");
            produceChannel.basicQos(1);
    
            consumeChannel = connection.createChannel();
            consumeChannel.queueDeclare(Constant.REQUEST_QUEUE_NAME, true, false, false, null);
            consumeChannel.exchangeDeclare(Constant.REQUEST_EXCHANGE_NAME, "direct");
            consumeChannel.basicQos(1);
            consumeChannel.queueBind(Constant.REQUEST_QUEUE_NAME, Constant.REQUEST_EXCHANGE_NAME, Constant
                    .REQUEST_ROUTING_NAME);
            consumeChannel.basicConsume(Constant.REQUEST_QUEUE_NAME, true, new DefaultConsumer(consumeChannel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[]
                        body) throws IOException {
                    RpcInvokeModel model = SerializationUtils.deserialize(body);
                    Class cls;
                    try {
                        cls = Class.forName(model.getClassName());
                        Object[] arguments = model.getArguments();
                        Class[] clazz = new Class[arguments.length];
                        for (int index = 0 ; index < clazz.length; ++index) {
                            clazz[index] = arguments[index].getClass();
                        }
                        Method method = cls.getDeclaredMethod(model.getMethodName(), clazz);
                        Object object = method.invoke(cls.newInstance(), arguments);
                        byte[] resultData = JsonHelper.serialize(object).getBytes("UTF-8");
                        String queueName = properties.getReplyTo();
                        AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder()
                                .correlationId(properties.getCorrelationId()).build();
                        produceChannel.basicPublish(Constant.REPLY_EXCHANGE_NAME, queueName, replyProps, resultData);
                    } catch (ClassNotFoundException e) {
                        e.printStackTrace();
                    } catch (InvocationTargetException e) {
                        e.printStackTrace();
                    } catch (NoSuchMethodException e) {
                        e.printStackTrace();
                    } catch (InstantiationException e) {
                        e.printStackTrace();
                    } catch (IllegalAccessException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }
    
    package com.shawntime.test.rabbitmq.rpc.rabbit;
    
    /**
     * Created by shma on 2017/5/8.
     */
    public class ConnectModel {
    
        private String virtualHost;
    
        private String host;
    
        private String userName;
    
        private String password;
    
        private int port;
    
        public String getVirtualHost() {
            return virtualHost;
        }
    
        public void setVirtualHost(String virtualHost) {
            this.virtualHost = virtualHost;
        }
    
        public String getHost() {
            return host;
        }
    
        public void setHost(String host) {
            this.host = host;
        }
    
        public String getUserName() {
            return userName;
        }
    
        public void setUserName(String userName) {
            this.userName = userName;
        }
    
        public String getPassword() {
            return password;
        }
    
        public void setPassword(String password) {
            this.password = password;
        }
    
        public int getPort() {
            return port;
        }
    
        public void setPort(int port) {
            this.port = port;
        }
    }
    
    package com.shawntime.test.rabbitmq.rpc;
    
    import java.io.IOException;
    import java.util.List;
    import java.util.concurrent.ExecutionException;
    
    /**
     * Created by shma on 2017/5/8.
     */
    public interface IRpcService<T> {
    
        T call(RpcInvokeModel model) throws IOException, InterruptedException, ExecutionException;
    }
    
    
    package com.shawntime.test.rabbitmq.rpc;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.shawntime.test.rabbitmq.rpc.rabbit.ConnectModel;
    import com.shawntime.test.rabbitmq.rpc.rabbit.Service;
    
    /**
     * Created by shma on 2017/5/8.
     */
    public class TestServerMain {
    
        public static void main(String[] args) throws IOException, TimeoutException {
            ConnectModel model = new ConnectModel();
            model.setHost("127.0.0.1");
            model.setPassword("shawntime");
            model.setUserName("shawntime");
            model.setVirtualHost("Test");
            model.setPort(5672);
    
            Service service = new Service(model);
            service.start();
        }
    }
    
    package com.shawntime.test.rabbitmq.rpc;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.shawntime.test.rabbitmq.rpc.operator.IBaseClientService;
    import com.shawntime.test.rabbitmq.rpc.operator.bean.User;
    import com.shawntime.test.rabbitmq.rpc.operator.client.BaseClientService;
    import com.shawntime.test.rabbitmq.rpc.rabbit.Client;
    import com.shawntime.test.rabbitmq.rpc.rabbit.ConnectModel;
    
    /**
     * Created by shma on 2017/5/8.
     */
    public class TestClientMain {
    
        public static void main(String[] args) throws IOException, TimeoutException {
            ConnectModel model = new ConnectModel();
            model.setHost("127.0.0.1");
            model.setPassword("shawntime");
            model.setUserName("shawntime");
            model.setVirtualHost("Test");
            model.setPort(5672);
    
            Client client = new Client(model);
            IBaseClientService baseClientService = new BaseClientService(client);
            User userInfo = baseClientService.getUserInfo(1);
            System.out.println(userInfo.getUserId());
            System.out.println(userInfo.getUserName());
            User user = new User();
            user.setUserName("AAA");
            user.setUserId(222);
            System.out.println(baseClientService.save(user));
        }
    }
    
    

    相关文章

      网友评论

        本文标题:RabbitMQ实现简易RPC调用

        本文链接:https://www.haomeiwen.com/subject/wyiekxtx.html