RabbitMQ实现简易RPC调用

作者: 传达室马大爷 | 来源:发表于2017-07-21 17:55 被阅读191次

完整代码链接:https://github.com/shawntime/shawn-test-rabbitmq/tree/master/src/main/java/com/shawntime/test/rabbitmq/rpc

RPC的处理流程

  • 当客户端启动时,创建一个匿名的回调队列。
  • 客户端为RPC请求设置2个属性:replyTo,设置回调队列名字;correlationId,标记request。
  • 请求被发送到请求队列中。
  • RPC服务器端监听请求队列中的请求,当请求到来时,服务器端会处理并且把带有结果的消息发送给客户端。接收的队列就是replyTo设定的回调队列。
  • 客户端监听回调队列,当有消息时,检查correlationId属性,如果与request中匹配,则返回。
832799-20161224004437839-1074972304.png

代码实现

// 客户端
package com.shawntime.test.rabbitmq.rpc.rabbit;

import java.io.IOException;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeoutException;

import com.google.common.collect.Maps;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.shawntime.test.rabbitmq.rpc.IRpcService;
import com.shawntime.test.rabbitmq.rpc.RpcInvokeModel;
import org.apache.commons.lang3.SerializationUtils;

/**
 * Created by shma on 2017/5/8.
 */
public class Client implements IRpcService {

    private Channel produceChannel;

    private Channel consumeChannel;

    private String callBackQueueName;

    private final Map<String, BlockingQueue<byte[]>> completionQueueMap;

    public Client(ConnectModel connectModel) throws IOException, TimeoutException {
        connect(connectModel);
        this.completionQueueMap = Maps.newConcurrentMap();
    }

    public byte[] call(RpcInvokeModel model) throws IOException, InterruptedException, ExecutionException {
        model.setDid(UUID.randomUUID().toString());
        model.setCallBackQueueName(callBackQueueName);
        byte[] body = SerializationUtils.serialize(model);
        BlockingQueue<byte[]> blockingQueue = new LinkedBlockingQueue<byte[]>(1);
        completionQueueMap.put(model.getDid(), blockingQueue);
        AMQP.BasicProperties basicProperties = new AMQP.BasicProperties()
                .builder()
                .correlationId(model.getDid())
                .replyTo(callBackQueueName)
                .build();
        produceChannel.basicPublish(Constant.REQUEST_EXCHANGE_NAME, Constant.REQUEST_ROUTING_NAME, basicProperties, body);
        return blockingQueue.take();
    }

    private void connect(ConnectModel connectModel) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setVirtualHost(connectModel.getVirtualHost());
        factory.setPort(connectModel.getPort());
        factory.setUsername(connectModel.getUserName());
        factory.setPassword(connectModel.getPassword());
        factory.setHost(connectModel.getHost());
        Connection connection = factory.newConnection();
        produceChannel = connection.createChannel();
        consumeChannel = connection.createChannel();
        produceChannel.queueDeclare(Constant.REQUEST_QUEUE_NAME, true, false, false, null);
        produceChannel.exchangeDeclare(Constant.REQUEST_EXCHANGE_NAME, "direct");
        produceChannel.basicQos(1);
        callBackQueueName = produceChannel.queueDeclare().getQueue();
        consumeChannel.exchangeDeclare(Constant.REPLY_EXCHANGE_NAME, "direct");
        consumeChannel.queueBind(callBackQueueName, Constant.REPLY_EXCHANGE_NAME, callBackQueueName);
        consumeChannel.basicQos(1);
        consumeChannel.basicConsume(callBackQueueName, true, new DefaultConsumer(consumeChannel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, final byte[] body) throws IOException {
                BlockingQueue<byte[]> blockingQueue = completionQueueMap.get(properties.getCorrelationId());
                blockingQueue.add(body);
            }
        });
    }
}
// 服务端
package com.shawntime.test.rabbitmq.rpc.rabbit;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;

import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.shawntime.test.rabbitmq.rpc.JsonHelper;
import com.shawntime.test.rabbitmq.rpc.RpcInvokeModel;
import com.shawntime.test.rabbitmq.rpc.operator.bean.User;
import org.apache.commons.lang3.SerializationUtils;

/**
 * Created by shma on 2017/5/8.
 */
public class Service {

    private Channel produceChannel;

    private Channel consumeChannel;

    private ConnectModel connectModel;

    public Service(ConnectModel connectModel) throws IOException, TimeoutException {
        this.connectModel = connectModel;
    }

    public void start() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setVirtualHost(connectModel.getVirtualHost());
        factory.setPort(connectModel.getPort());
        factory.setUsername(connectModel.getUserName());
        factory.setPassword(connectModel.getPassword());
        factory.setHost(connectModel.getHost());
        Connection connection = factory.newConnection();
        produceChannel = connection.createChannel();
        produceChannel.exchangeDeclare(Constant.REPLY_EXCHANGE_NAME, "direct");
        produceChannel.basicQos(1);

        consumeChannel = connection.createChannel();
        consumeChannel.queueDeclare(Constant.REQUEST_QUEUE_NAME, true, false, false, null);
        consumeChannel.exchangeDeclare(Constant.REQUEST_EXCHANGE_NAME, "direct");
        consumeChannel.basicQos(1);
        consumeChannel.queueBind(Constant.REQUEST_QUEUE_NAME, Constant.REQUEST_EXCHANGE_NAME, Constant
                .REQUEST_ROUTING_NAME);
        consumeChannel.basicConsume(Constant.REQUEST_QUEUE_NAME, true, new DefaultConsumer(consumeChannel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[]
                    body) throws IOException {
                RpcInvokeModel model = SerializationUtils.deserialize(body);
                Class cls;
                try {
                    cls = Class.forName(model.getClassName());
                    Object[] arguments = model.getArguments();
                    Class[] clazz = new Class[arguments.length];
                    for (int index = 0 ; index < clazz.length; ++index) {
                        clazz[index] = arguments[index].getClass();
                    }
                    Method method = cls.getDeclaredMethod(model.getMethodName(), clazz);
                    Object object = method.invoke(cls.newInstance(), arguments);
                    byte[] resultData = JsonHelper.serialize(object).getBytes("UTF-8");
                    String queueName = properties.getReplyTo();
                    AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder()
                            .correlationId(properties.getCorrelationId()).build();
                    produceChannel.basicPublish(Constant.REPLY_EXCHANGE_NAME, queueName, replyProps, resultData);
                } catch (ClassNotFoundException e) {
                    e.printStackTrace();
                } catch (InvocationTargetException e) {
                    e.printStackTrace();
                } catch (NoSuchMethodException e) {
                    e.printStackTrace();
                } catch (InstantiationException e) {
                    e.printStackTrace();
                } catch (IllegalAccessException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}
package com.shawntime.test.rabbitmq.rpc.rabbit;

/**
 * Created by shma on 2017/5/8.
 */
public class ConnectModel {

    private String virtualHost;

    private String host;

    private String userName;

    private String password;

    private int port;

    public String getVirtualHost() {
        return virtualHost;
    }

    public void setVirtualHost(String virtualHost) {
        this.virtualHost = virtualHost;
    }

    public String getHost() {
        return host;
    }

    public void setHost(String host) {
        this.host = host;
    }

    public String getUserName() {
        return userName;
    }

    public void setUserName(String userName) {
        this.userName = userName;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    public int getPort() {
        return port;
    }

    public void setPort(int port) {
        this.port = port;
    }
}
package com.shawntime.test.rabbitmq.rpc;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.ExecutionException;

/**
 * Created by shma on 2017/5/8.
 */
public interface IRpcService<T> {

    T call(RpcInvokeModel model) throws IOException, InterruptedException, ExecutionException;
}

package com.shawntime.test.rabbitmq.rpc;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.shawntime.test.rabbitmq.rpc.rabbit.ConnectModel;
import com.shawntime.test.rabbitmq.rpc.rabbit.Service;

/**
 * Created by shma on 2017/5/8.
 */
public class TestServerMain {

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectModel model = new ConnectModel();
        model.setHost("127.0.0.1");
        model.setPassword("shawntime");
        model.setUserName("shawntime");
        model.setVirtualHost("Test");
        model.setPort(5672);

        Service service = new Service(model);
        service.start();
    }
}

package com.shawntime.test.rabbitmq.rpc;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.shawntime.test.rabbitmq.rpc.operator.IBaseClientService;
import com.shawntime.test.rabbitmq.rpc.operator.bean.User;
import com.shawntime.test.rabbitmq.rpc.operator.client.BaseClientService;
import com.shawntime.test.rabbitmq.rpc.rabbit.Client;
import com.shawntime.test.rabbitmq.rpc.rabbit.ConnectModel;

/**
 * Created by shma on 2017/5/8.
 */
public class TestClientMain {

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectModel model = new ConnectModel();
        model.setHost("127.0.0.1");
        model.setPassword("shawntime");
        model.setUserName("shawntime");
        model.setVirtualHost("Test");
        model.setPort(5672);

        Client client = new Client(model);
        IBaseClientService baseClientService = new BaseClientService(client);
        User userInfo = baseClientService.getUserInfo(1);
        System.out.println(userInfo.getUserId());
        System.out.println(userInfo.getUserName());
        User user = new User();
        user.setUserName("AAA");
        user.setUserId(222);
        System.out.println(baseClientService.save(user));
    }
}

相关文章

网友评论

    本文标题:RabbitMQ实现简易RPC调用

    本文链接:https://www.haomeiwen.com/subject/wyiekxtx.html