美文网首页
使用socket实现RPC远程调用

使用socket实现RPC远程调用

作者: 奋斗的韭菜汪 | 来源:发表于2020-06-22 15:34 被阅读0次

优化升级代码见:https://www.jianshu.com/p/ea443aade85d
服务与服务之间的通信,必须是可靠的,比如TCP
常见的rpc框架:1、Webservice; 2、Dubbo; 3、Thrift; 4、Grpc
Feign是一个伪代理

远程通信实战之实现一个简易版RPC.jpg
客户端:
public class App {
    public static void main(String[] args) {
        RpcProxyClient rpcProxyClient = new RpcProxyClient();
        IOrderService orderService = rpcProxyClient.clientProxy(IOrderService.class, "localhost", 8080);
        System.out.println(orderService.queryOrderList());
        System.out.println(orderService.queryOrderInfo("1111"));
    }
}
public class RpcProxyClient {
    public <T> T clientProxy(final Class<T> clazz, final String host, final int port){
        return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class<?>[]{clazz}, new RemoteInvocationHandler(host, port));
    }
}
public class RemoteInvocationHandler implements InvocationHandler {
    private String host;
    private int port;
    public RemoteInvocationHandler(String host, int port) {
        this.host = host;
        this.port = port;
    }
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        //先建立远程连接
        RpcNetTransport rpcNetTransport = new RpcNetTransport(host, port);
        //传递数据(调用哪个接口,方法,参数),服务端接收到这些数据可以基于这些数据反射调用
        RpcRequest rpcRequest = new RpcRequest();
        rpcRequest.setClassName(method.getDeclaringClass().getName());
        rpcRequest.setArgs(args);
        rpcRequest.setMethodName(method.getName());
        rpcRequest.setTypes(method.getParameterTypes());
        return rpcNetTransport.send(rpcRequest);
    }
}
public class RpcNetTransport {
    private String host;
    private int port;
    public RpcNetTransport(String host, int port) {
        this.host = host;
        this.port = port;
    }
    public Socket createSocket() throws IOException {
        Socket socket = new Socket(host, port);
        return socket;
    }
    public Object send(RpcRequest request) throws IOException, ClassNotFoundException {
        ObjectInputStream inputStream = null;
        ObjectOutputStream outputStream = null;
        Socket socket = createSocket();
        //IO操作
        socket.getOutputStream();
        outputStream = new ObjectOutputStream(socket.getOutputStream());
        outputStream.writeObject(request);
        //清空缓冲区
        outputStream.flush();
        //读取服务端返回数据
        inputStream = new ObjectInputStream(socket.getInputStream());
        return inputStream.readObject();
    }
}
image.png

服务端:

public class Bootstrap {
    public static void main(String[] args) {
        IOrderService orderService = new OrderServiceImpl();
        RpcProxyServer rpcProxyServer = new RpcProxyServer();
        //通过代理将服务发布到网络上
        rpcProxyServer.publisher(orderService, 8080);
    }
}
public class RpcProxyServer {
    public final ExecutorService executorService = Executors.newCachedThreadPool();
    /***
     * 发布服务
     */
    public void publisher(Object server, int port){
        ServerSocket serverSocket = null;
        try {
            serverSocket = new ServerSocket(port);
            //不断循环去监听客户端请求
            for(;;){
                Socket socket = serverSocket.accept();
                //通过Processorhandler解决IO阻塞
                executorService.execute(new ProcessorHandler(socket, server));
            }
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            try {
                serverSocket.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}
public class ProcessorHandler implements Runnable{
    private Socket socket;
    private Object server;
    public ProcessorHandler(Socket socket, Object server) {
        this.socket = socket;
        this.server = server;
    }
    public Socket getSocket() {
        return socket;
    }
    public void setSocket(Socket socket) {
        this.socket = socket;
    }
    public Object getServer() {
        return server;
    }
    public void setServer(Object server) {
        this.server = server;
    }
    public void run() {
        ObjectInputStream inputStream = null;
        ObjectOutputStream outputStream = null;
        try {
            inputStream = new ObjectInputStream(socket.getInputStream());
            RpcRequest request = (RpcRequest)inputStream.readObject();
            //开始反射调用服务端方法
            Object rs = invoke(request);
            System.out.println("服务端处理的结果:" + rs);
            //结果写回去
            outputStream = new ObjectOutputStream(socket.getOutputStream());
            outputStream.writeObject(rs);
            outputStream.flush();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if(outputStream != null) {
                    outputStream.close();
                }
                if(inputStream != null) {
                    inputStream.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    private Object invoke(RpcRequest request) throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IllegalAccessException {
        //通过反射进行服务的调用
        Class<?> clazz = Class.forName(request.getClassName());
        //找到目标方法
        Method method = clazz.getMethod(request.getMethodName(), request.getTypes());
        //调用
        return method.invoke(server, request.getArgs());
    }
}
public class RpcRequest implements Serializable {
    private String className;
    private String methodName;
    //参数
    private Object[] args;
    //参数类型
    private Class[] types;

    public String getClassName() {
        return className;
    }
    public void setClassName(String className) {
        this.className = className;
    }
    public String getMethodName() {
        return methodName;
    }
    public void setMethodName(String methodName) {
        this.methodName = methodName;
    }
    public Object[] getArgs() {
        return args;
    }
    public void setArgs(Object[] args) {
        this.args = args;
    }
    public Class[] getTypes() {
        return types;
    }
    public void setTypes(Class[] types) {
        this.types = types;
    }
}
public interface IOrderService {
    String queryOrderInfo(String id);
    String queryOrderList();
}
public class OrderServiceImpl implements IOrderService {
    public String queryOrderInfo(String id) {
        return "this is queryOrderInfo";
    }
    public String queryOrderList() {
        return "this is queryOrderList";
    }
}
image.png

相关文章

网友评论

      本文标题:使用socket实现RPC远程调用

      本文链接:https://www.haomeiwen.com/subject/ehpvxktx.html