美文网首页工作生活
基于BIO的RPC实现(1.0)

基于BIO的RPC实现(1.0)

作者: 吗丁啉要餐前吃 | 来源:发表于2019-07-04 00:34 被阅读0次

    先实现服务端代码

    1.rpc服务端框架实现

    package com.zhang.frame;
    
    import java.io.IOException;
    import java.io.ObjectInputStream;
    import java.io.ObjectOutputStream;
    import java.lang.reflect.Method;
    import java.net.InetSocketAddress;
    import java.net.ServerSocket;
    import java.net.Socket;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.*;
    import java.util.concurrent.atomic.AtomicInteger;
    
    public class RpcServerFrame {
        //线程池,用来处理客户端socket    
        ThreadFactory threadFactory = new ThreadFactory() {
            private AtomicInteger atomicInteger = new AtomicInteger(1);
    
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "thread" + atomicInteger.get());
            }
        };
    
        private ExecutorService executorService = new ThreadPoolExecutor(10, 15, 0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingDeque<>(1024), threadFactory, new ThreadPoolExecutor.AbortPolicy()
        );
    
    
        //服务注册中心
        private static final Map<String, Class> serverHolder = new HashMap<>();
    
        private int port;
    
        public RpcServerFrame(int port) {
            this.port = port;
        }
    
        //注册服务
        public void registServer(String className, Class impl) {
            serverHolder.put(className, impl);
        }
    
        //处理服务请求任务
        private static class ServerTask implements Runnable {
            private Socket socket;
    
            public ServerTask(Socket socket) {
                this.socket = socket;
            }
    
            @Override
            public void run() {
                try (ObjectOutputStream outputStream = new ObjectOutputStream(socket.getOutputStream());
                     ObjectInputStream inputStream = new ObjectInputStream(socket.getInputStream())
                ) {
                    //方法所在类的接口名
                    String serviceName = inputStream.readUTF();
                    //方法名
                    String methodName = inputStream.readUTF();
                    //参数类型
                    Class<?>[] classes = (Class<?>[]) inputStream.readObject();
                    //参数值
                    String[] args = (String[]) inputStream.readObject();
                    //从注册中心取出方法类
                    Class serviceClass = serverHolder.get(serviceName);
                    //找不到,报类找不到异常
                    if (serviceClass == null) {
                        throw new ClassNotFoundException(serviceName + " not found");
                    }
                    //根据方法名获取method
                    Method method = serviceClass.getMethod(methodName, classes);
                    //反射进行方法调用
                    Object obj = method.invoke(serviceClass.newInstance(), args);
    
                    outputStream.writeObject(obj);
                    outputStream.flush();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    
        //启动rpc服务
        public void startService() throws IOException {
            ServerSocket serverSocket = new ServerSocket();
            serverSocket.bind(new InetSocketAddress(port));
            System.out.println("RPC server on:" + port + " run");
            try {
                while (true) {
                    executorService.execute(new ServerTask(serverSocket.accept()));
                }
            } finally {
                serverSocket.close();
            }
        }
    }
    

    2.创建服务端service接口与实体,供客户端调用

    public interface UserService {
    
         User getUser();
    }
    
    public class UserServiceImpl implements UserService {
    
        @Override
        public User getUser() {
            User user=new User("张三",18);
            return user;
        }
    }
    
    public class User implements Serializable {
        
        private final String username;
        private final Integer age;
        public String getUsername() {
            return username;
        }
        public Integer getAge() {
            return age;
        }
    
        public User(String username, Integer age) {
            this.username = username;
            this.age = age;
        }
    }
    

    3.服务端启动

    public class UserRpcServer {
    
        public static void main(String[] args) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        RpcServerFrame userServer = new RpcServerFrame(8001);
                        userServer.registServer(UserService.class.getName(), UserServiceImpl.class);
                        userServer.startService();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }
    }
    

    客户端代码实现

    1.客户端rpc框架实现

    package com.zhang.frame;
    
    import java.io.ObjectInputStream;
    import java.io.ObjectOutputStream;
    import java.lang.reflect.InvocationHandler;
    import java.lang.reflect.Method;
    import java.lang.reflect.Proxy;
    import java.net.InetSocketAddress;
    import java.net.Socket;
    
    public class RpcClientFrame {
    
        //获取远程代理对象方法
        public static <T> T getRemoteProxyObj(Class<?> serviceInterface, String host, int port) {
            return (T) Proxy.newProxyInstance(serviceInterface.getClassLoader(), new Class<?>[]{serviceInterface},
                    new DynProxy(serviceInterface, new InetSocketAddress(host, port)));
        }
    
        //代理类
        private static class DynProxy implements InvocationHandler {
    
            private Class<?> serviceInterface;
            private InetSocketAddress socketAddress;
    
            public DynProxy(Class<?> serviceInterface, InetSocketAddress socketAddress) {
                this.serviceInterface = serviceInterface;
                this.socketAddress = socketAddress;
            }
    
            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                Socket socket = null;
                ObjectOutputStream outputStream = null;
                ObjectInputStream inputStream = null;
                try {
                    socket = new Socket();
                    socket.connect(socketAddress);
                    outputStream = new ObjectOutputStream(socket.getOutputStream());
                    //写入接口名
                    outputStream.writeUTF(serviceInterface.getName());
                    //写入方法名
                    outputStream.writeUTF(method.getName());
                    //写入参数类型
                    outputStream.writeObject(method.getParameterTypes());
                    //写入参数
                    outputStream.writeObject(args);
                    outputStream.flush();
                    inputStream = new ObjectInputStream(socket.getInputStream());
                    return inputStream.readObject();
                } finally {
                    if (socket != null) {
                        socket.close();
                    }
                    if (outputStream != null) {
                        outputStream.close();
                    }
                    if (inputStream != null) {
                        inputStream.close();
                    }
                }
            }
        }
    }
    

    2.客户端加入需要调用的服务端接口(反射)和实体(反序列化)

    public interface UserService {
        public User getUser();
    }
    
    public class User implements Serializable {
        
        private final String username;
        private final Integer age;
        public String getUsername() {
            return username;
        }
        public Integer getAge() {
            return age;
        }
        
        public User(String username, Integer age) {
            this.username = username;
            this.age = age;
        }
    }
    

    3.客户端调用代码

    package com.zhang;
    
    import com.zhang.frame.RpcClientFrame;
    import com.zhang.service.UserService;
    
    public class UserRpcClient {
        public static void main(String[] args) {
            UserService userService=RpcClientFrame.getRemoteProxyObj(UserService.class,"127.0.0.1",8001);
            System.out.println(userService.getUser().getUsername()+"------"+userService.getUser().getAge());
        }
    }
    

    大功告成,下一步要将注册中心提出来!

    相关文章

      网友评论

        本文标题:基于BIO的RPC实现(1.0)

        本文链接:https://www.haomeiwen.com/subject/dmtkhctx.html