完整代码链接:https://github.com/shawntime/shawn-test-rabbitmq/tree/master/src/main/java/com/shawntime/test/rabbitmq/rpc
RPC的处理流程
- 当客户端启动时,创建一个匿名的回调队列。
- 客户端为RPC请求设置2个属性:replyTo,设置回调队列名字;correlationId,标记request。
- 请求被发送到请求队列中。
- RPC服务器端监听请求队列中的请求,当请求到来时,服务器端会处理并且把带有结果的消息发送给客户端。接收的队列就是replyTo设定的回调队列。
- 客户端监听回调队列,当有消息时,检查correlationId属性,如果与request中匹配,则返回。
832799-20161224004437839-1074972304.png
代码实现
// 客户端
package com.shawntime.test.rabbitmq.rpc.rabbit;
import java.io.IOException;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeoutException;
import com.google.common.collect.Maps;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.shawntime.test.rabbitmq.rpc.IRpcService;
import com.shawntime.test.rabbitmq.rpc.RpcInvokeModel;
import org.apache.commons.lang3.SerializationUtils;
/**
* Created by shma on 2017/5/8.
*/
public class Client implements IRpcService {
private Channel produceChannel;
private Channel consumeChannel;
private String callBackQueueName;
private final Map<String, BlockingQueue<byte[]>> completionQueueMap;
public Client(ConnectModel connectModel) throws IOException, TimeoutException {
connect(connectModel);
this.completionQueueMap = Maps.newConcurrentMap();
}
public byte[] call(RpcInvokeModel model) throws IOException, InterruptedException, ExecutionException {
model.setDid(UUID.randomUUID().toString());
model.setCallBackQueueName(callBackQueueName);
byte[] body = SerializationUtils.serialize(model);
BlockingQueue<byte[]> blockingQueue = new LinkedBlockingQueue<byte[]>(1);
completionQueueMap.put(model.getDid(), blockingQueue);
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties()
.builder()
.correlationId(model.getDid())
.replyTo(callBackQueueName)
.build();
produceChannel.basicPublish(Constant.REQUEST_EXCHANGE_NAME, Constant.REQUEST_ROUTING_NAME, basicProperties, body);
return blockingQueue.take();
}
private void connect(ConnectModel connectModel) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setVirtualHost(connectModel.getVirtualHost());
factory.setPort(connectModel.getPort());
factory.setUsername(connectModel.getUserName());
factory.setPassword(connectModel.getPassword());
factory.setHost(connectModel.getHost());
Connection connection = factory.newConnection();
produceChannel = connection.createChannel();
consumeChannel = connection.createChannel();
produceChannel.queueDeclare(Constant.REQUEST_QUEUE_NAME, true, false, false, null);
produceChannel.exchangeDeclare(Constant.REQUEST_EXCHANGE_NAME, "direct");
produceChannel.basicQos(1);
callBackQueueName = produceChannel.queueDeclare().getQueue();
consumeChannel.exchangeDeclare(Constant.REPLY_EXCHANGE_NAME, "direct");
consumeChannel.queueBind(callBackQueueName, Constant.REPLY_EXCHANGE_NAME, callBackQueueName);
consumeChannel.basicQos(1);
consumeChannel.basicConsume(callBackQueueName, true, new DefaultConsumer(consumeChannel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, final byte[] body) throws IOException {
BlockingQueue<byte[]> blockingQueue = completionQueueMap.get(properties.getCorrelationId());
blockingQueue.add(body);
}
});
}
}
// 服务端
package com.shawntime.test.rabbitmq.rpc.rabbit;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;
import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.shawntime.test.rabbitmq.rpc.JsonHelper;
import com.shawntime.test.rabbitmq.rpc.RpcInvokeModel;
import com.shawntime.test.rabbitmq.rpc.operator.bean.User;
import org.apache.commons.lang3.SerializationUtils;
/**
* Created by shma on 2017/5/8.
*/
public class Service {
private Channel produceChannel;
private Channel consumeChannel;
private ConnectModel connectModel;
public Service(ConnectModel connectModel) throws IOException, TimeoutException {
this.connectModel = connectModel;
}
public void start() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setVirtualHost(connectModel.getVirtualHost());
factory.setPort(connectModel.getPort());
factory.setUsername(connectModel.getUserName());
factory.setPassword(connectModel.getPassword());
factory.setHost(connectModel.getHost());
Connection connection = factory.newConnection();
produceChannel = connection.createChannel();
produceChannel.exchangeDeclare(Constant.REPLY_EXCHANGE_NAME, "direct");
produceChannel.basicQos(1);
consumeChannel = connection.createChannel();
consumeChannel.queueDeclare(Constant.REQUEST_QUEUE_NAME, true, false, false, null);
consumeChannel.exchangeDeclare(Constant.REQUEST_EXCHANGE_NAME, "direct");
consumeChannel.basicQos(1);
consumeChannel.queueBind(Constant.REQUEST_QUEUE_NAME, Constant.REQUEST_EXCHANGE_NAME, Constant
.REQUEST_ROUTING_NAME);
consumeChannel.basicConsume(Constant.REQUEST_QUEUE_NAME, true, new DefaultConsumer(consumeChannel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[]
body) throws IOException {
RpcInvokeModel model = SerializationUtils.deserialize(body);
Class cls;
try {
cls = Class.forName(model.getClassName());
Object[] arguments = model.getArguments();
Class[] clazz = new Class[arguments.length];
for (int index = 0 ; index < clazz.length; ++index) {
clazz[index] = arguments[index].getClass();
}
Method method = cls.getDeclaredMethod(model.getMethodName(), clazz);
Object object = method.invoke(cls.newInstance(), arguments);
byte[] resultData = JsonHelper.serialize(object).getBytes("UTF-8");
String queueName = properties.getReplyTo();
AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder()
.correlationId(properties.getCorrelationId()).build();
produceChannel.basicPublish(Constant.REPLY_EXCHANGE_NAME, queueName, replyProps, resultData);
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
} catch (NoSuchMethodException e) {
e.printStackTrace();
} catch (InstantiationException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
}
}
});
}
}
package com.shawntime.test.rabbitmq.rpc.rabbit;
/**
* Created by shma on 2017/5/8.
*/
public class ConnectModel {
private String virtualHost;
private String host;
private String userName;
private String password;
private int port;
public String getVirtualHost() {
return virtualHost;
}
public void setVirtualHost(String virtualHost) {
this.virtualHost = virtualHost;
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
}
package com.shawntime.test.rabbitmq.rpc;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ExecutionException;
/**
* Created by shma on 2017/5/8.
*/
public interface IRpcService<T> {
T call(RpcInvokeModel model) throws IOException, InterruptedException, ExecutionException;
}
package com.shawntime.test.rabbitmq.rpc;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.shawntime.test.rabbitmq.rpc.rabbit.ConnectModel;
import com.shawntime.test.rabbitmq.rpc.rabbit.Service;
/**
* Created by shma on 2017/5/8.
*/
public class TestServerMain {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectModel model = new ConnectModel();
model.setHost("127.0.0.1");
model.setPassword("shawntime");
model.setUserName("shawntime");
model.setVirtualHost("Test");
model.setPort(5672);
Service service = new Service(model);
service.start();
}
}
package com.shawntime.test.rabbitmq.rpc;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.shawntime.test.rabbitmq.rpc.operator.IBaseClientService;
import com.shawntime.test.rabbitmq.rpc.operator.bean.User;
import com.shawntime.test.rabbitmq.rpc.operator.client.BaseClientService;
import com.shawntime.test.rabbitmq.rpc.rabbit.Client;
import com.shawntime.test.rabbitmq.rpc.rabbit.ConnectModel;
/**
* Created by shma on 2017/5/8.
*/
public class TestClientMain {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectModel model = new ConnectModel();
model.setHost("127.0.0.1");
model.setPassword("shawntime");
model.setUserName("shawntime");
model.setVirtualHost("Test");
model.setPort(5672);
Client client = new Client(model);
IBaseClientService baseClientService = new BaseClientService(client);
User userInfo = baseClientService.getUserInfo(1);
System.out.println(userInfo.getUserId());
System.out.println(userInfo.getUserName());
User user = new User();
user.setUserName("AAA");
user.setUserId(222);
System.out.println(baseClientService.save(user));
}
}
网友评论