RPC 主要实现远程过程调用
一、实现思路
1. 通讯方式 先简单实用socket
2. server端,
RPCServer
- start() 启动
- register(interface,impl) 注册 接口 和 对应的实现
3. RPCClient
- getRemoteProxy(interface,add) 获取到访问远程接口方法的代理对象
4. ServiceProduce
- sendData(String data) 远程的某一个调用接口
二、简易实现
RPCServer 接口
public interface RPCServer {
/**
* 启动
* @throws IOException
*/
public void start() throws IOException;
/**
* 注册 接口 和 对应的实现
* @param serverInterface
* @param impl
*/
public void register(Class serverInterface,Class impl);
}
RPCServerImpl 实现
public class RPCServerImpl implements RPCServer {
private int port;
static final HashMap<String,Class> serviceImplMap = new HashMap<>();
private static ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
public RPCServerImpl(int port) {
this.port = port;
}
@Override
public void start() throws IOException{
//启动服务,并调用服务
ServerSocket server = new ServerSocket();
server.bind(new InetSocketAddress(port));
System.out.println("Server start...");
try{
while(true) {
//为了方便管理,使用线程池调用
executor.execute(new ServiceTask(server.accept()));
}
}finally {
server.close();
}
}
@Override
public void register(Class serverInterface, Class impl) {
//注册接口与实现的对应关系
serviceImplMap.put(serverInterface.getName(),impl);
}
/**
* 主要业务逻辑放置在Task中
*/
private static class ServiceTask implements Runnable{
Socket socket;
public ServiceTask(Socket accept) {
this.socket = accept;
}
@Override
public void run() {
ObjectInputStream input =null;
ObjectOutputStream output =null;
// 对输入流处理 ,反序列化拿到数据
try{
input = new ObjectInputStream(socket.getInputStream());
// 输入的格式: 服务名 方法名 参数类型 参数值
String serviceInterfaceName = input.readUTF();
String methodName = input.readUTF();
Class[] parameterTypes = (Class<?>[])input.readObject();
Object[] arguments = (Object[])input.readObject();
//通过服务接口名获取到实现
Class serviceImpl = serviceImplMap.get(serviceInterfaceName);
if(serviceImpl==null){
throw new ClassNotFoundException(serviceInterfaceName+" not found!");
}
Method method = serviceImpl.getMethod(methodName, parameterTypes);
Object invokeResult = method.invoke(serviceImpl.newInstance(), arguments);
output = new ObjectOutputStream(socket.getOutputStream());
output.writeObject(invokeResult);
}catch (Exception e){
e.printStackTrace();
}finally {
if(output!=null){
try {
output.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if(input!=null){
try {
input.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if(socket!=null){
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
}
RPCClient 接口
public interface RPCClient<T> {
public T getRemoteProxy(Class<?> serviceInterface, InetSocketAddress addr);
}
RPCClient 实现接口
public class RPCClientImpl<T> implements RPCClient {
//由于是调用接口,直接动态代理,发送Socket给服务端,调用服务端服务
@Override
public T getRemoteProxy(Class serviceInterface, InetSocketAddress addr) {
return (T) Proxy.newProxyInstance(serviceInterface.getClassLoader(), new Class<?>[]{serviceInterface}, new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Socket socket = null;
ObjectOutputStream output =null;
ObjectInputStream input = null;
try{
socket = new Socket();
socket.connect(addr);
//传输数据
output = new ObjectOutputStream(socket.getOutputStream());
output.writeUTF(serviceInterface.getName());
output.writeUTF(method.getName());
output.writeObject(method.getParameterTypes());
output.writeObject(args);
//获取返回结果
input = new ObjectInputStream(socket.getInputStream());
//这里返回的是invoke ,也就是方法执行后的返回结果
return input.readObject();
}finally {
if (socket != null){
socket.close();
}
if (output != null){
output.close();
}
if (input != null){
input.close();
}
}
}
});
}
}
上面用到的某个远程调用实例
public interface ServiceProducer {
public String sendData(String data);
}
public class ServiceProducerImpl implements ServiceProducer {
@Override
public String sendData(String data) {
return "I am service producer!!!, the data is "+ data;
}
}
测试用例
ublic class RPCTest {
public static void main(String[] args) {
int port =9999;
//需要开启线程,不然会阻塞,由于有while(true)
new Thread(){
@Override
public void run() {
//创建服务
RPCServer server = new RPCServerImpl(port);
//注册服务
server.register(ServiceProducer.class,ServiceProducerImpl.class);
//启动服务
try {
server.start();
} catch (IOException e) {
e.printStackTrace();
}
}
}.start();
RPCClient<ServiceProducer> client = new RPCClientImpl<>();
ServiceProducer serviceProducer = client.getRemoteProxy(ServiceProducer.class, new InetSocketAddress("localhost",port));
System.out.println(serviceProducer.sendData("rpc test ..."));
}
}
网友评论