美文网首页
基于BIO的RPC实现(2.0)带服务注册中心

基于BIO的RPC实现(2.0)带服务注册中心

作者: 吗丁啉要餐前吃 | 来源:发表于2019-07-08 00:07 被阅读0次

    在1.0版本的基础上,这次将服务注册中心抽取出来作为一个单独的module.

    先看服务注册中心的实现

    1.创建用于保存服务信息的实体

    public class RegistServiceEntity implements Serializable {
        private String host;
        private int port;
    
        public String getHost() {
            return host;
        }
        public int getPort() {
            return port;
        }
        public RegistServiceEntity(String host, int port) {
            this.host = host;
            this.port = port;
        }
    }
    

    2.创建服务注册中心框架类

    package com.zhang.frame;
    
    import com.zhang.entity.RegistServiceEntity;
    
    import java.io.IOException;
    import java.io.ObjectInputStream;
    import java.io.ObjectOutputStream;
    import java.net.ServerSocket;
    import java.net.Socket;
    import java.util.HashMap;
    import java.util.HashSet;
    import java.util.Map;
    import java.util.Set;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    /**
     * Description:  服务注册中心
     */
    public class RpcRegistCenter {
    
        //创建线程池用于执行socket连接线程(不建议这样创建线程)
        private ExecutorService executorService= Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
        //端口
        private int port;
        //服务缓存
        private final static Map<String, Set<RegistServiceEntity>> serviceHolder=new HashMap<>();
    
        public RpcRegistCenter(int port) {
            this.port = port;
        }
    
        /**
         *  创建线程类供服务注册中心socket连接使用
         * */
        private static class SocketTask implements Runnable{
            private Socket socket;
    
            public SocketTask(Socket socket) {
                this.socket = socket;
            }
    
            @Override
            public void run() {
                try (ObjectInputStream inputStream = new ObjectInputStream(socket.getInputStream());
                     ObjectOutputStream outputStream =new ObjectOutputStream(socket.getOutputStream())
                ) {
                    //是否是"服务注册"
                    Boolean isRegist=inputStream.readBoolean();
                    if(isRegist){
                        registService(inputStream);
                        outputStream.writeBoolean(true);
                        outputStream.flush();
                    }else{
                        getService(inputStream, outputStream);
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
    
            /**
             * 获取service
             * */
            private void getService(ObjectInputStream inputStream, ObjectOutputStream outputStream) throws IOException {
                String serviceName=inputStream.readUTF();
                Set<RegistServiceEntity> addressSet=serviceHolder.get(serviceName);
                outputStream.writeUTF(serviceName);
                outputStream.writeObject(addressSet);
                outputStream.flush();
                System.out.println("Service: "+serviceName +" has been call by client;");
            }
    
            /**
             * 注册服务
             * */
            private void registService(ObjectInputStream inputStream) throws IOException {
                //读取服务名
                String serviceName=inputStream.readUTF();
                //读取服务host
                String host=inputStream.readUTF();
                //读取服务端口
                int port=inputStream.readInt();
                //获取服务注册地址
                RegistServiceEntity address=new RegistServiceEntity(host,port);
                Set<RegistServiceEntity> serviceEntities=serviceHolder.get(serviceName);
                if(serviceEntities==null){
                    serviceEntities=new HashSet<>();
                }
                serviceEntities.add(address);
                serviceHolder.put(serviceName,serviceEntities);
                System.out.println("Service: "+host+":"+port+"/"+serviceName+"has been regist.");
            }
        }
    
      /**
         * 开启服务
         */
        public void startService() throws IOException {
            ServerSocket serverSocket = new ServerSocket(port);
            System.out.println("注册中心启动");
            try {
                while (true){
                     Socket socket=serverSocket.accept();
                    executorService.execute(new SocketTask(socket));
                }
            }finally {
                if(serverSocket!=null){
                    serverSocket.close();
                }
            }
        }
    
       /**
         * 启动main类
         */
        public static void main(String[] args) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        RpcRegistCenter rpcRegistCenter=new RpcRegistCenter(8888);
                        rpcRegistCenter.startService();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }
    
    }
    

    服务端的实现

    1.服务端框架类

    package com.zhang.frame;
    
    import java.io.*;
    import java.lang.reflect.Method;
    import java.net.InetSocketAddress;
    import java.net.ServerSocket;
    import java.net.Socket;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    
    public class RpcServerFrame {
    
        private static ExecutorService executorService
                = Executors.newFixedThreadPool(
                Runtime.getRuntime().availableProcessors());
    
        //RPCServer缓存service
        private static final Map<String, Class<?>> serviceHolder = new HashMap<>();
    
        //rpc服务的端口号
        private int port;
    
        public RpcServerFrame(int port) {
            this.port = port;
        }
    
        //把服务注册到服务中心
        public void registServerToCenter(Class<?> serviceInterface, Class<?> impl) throws IOException {
            Socket socket = new Socket();
            ObjectOutputStream outputStream = null;
            ObjectInputStream inputStream = null;
            try {
                //连接服务注册中心serverSocket
                socket.connect(new InetSocketAddress("127.0.0.1", 8888));
                outputStream = new ObjectOutputStream(socket.getOutputStream());
                inputStream = new ObjectInputStream(socket.getInputStream());
                //本地缓存服务名对应的实现类
                serviceHolder.put(serviceInterface.getName(), impl);
                outputStream.writeBoolean(true);
                outputStream.writeUTF(serviceInterface.getName());
                outputStream.writeUTF("127.0.0.1");
                outputStream.writeInt(port);
                outputStream.flush();
                if (inputStream.readBoolean()) {
                    System.out.println(serviceInterface.getName() + "regist success");
                } else {
                    System.out.println(serviceInterface.getName() + "regist failed");
                }
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                if (socket != null) socket.close();
                if (outputStream != null) outputStream.close();
                if (inputStream != null) inputStream.close();
            }
        }
    
    
        //处理服务请求任务
        private static class ServerTask implements Runnable {
            private Socket client = null;
    
            public ServerTask(Socket client) {
                this.client = client;
            }
    
            @Override
            public void run() {
                try (ObjectInputStream inputStream =
                             new ObjectInputStream(client.getInputStream());
                     ObjectOutputStream outputStream =
                             new ObjectOutputStream(client.getOutputStream())) {
    
                    //方法所在类名接口名
                    String serviceName = inputStream.readUTF();
                    //方法的名字
                    String methodName = inputStream.readUTF();
                    //方法的入参类型
                    Class<?>[] paramTypes = (Class<?>[]) inputStream.readObject();
                    //方法入参的值
                    Object[] args = (Object[]) inputStream.readObject();
    
                    Class serviceClass = serviceHolder.get(serviceName);
                    if (serviceClass == null) {
                        throw new ClassNotFoundException(serviceName + " Not Found");
                    }
    
                    Method method = serviceClass.getMethod(methodName, paramTypes);
                    Object result = method.invoke(serviceClass.newInstance(), args);
    
                    outputStream.writeObject(result);
                    outputStream.flush();
    
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    try {
                        client.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    
        //启动RPC服务
        public void startService() throws IOException {
            ServerSocket serverSocket = new ServerSocket();
            serverSocket.bind(new InetSocketAddress(port));
            System.out.println("RPC server on:" + port + ":运行");
            try {
                while (true) {
                    executorService.execute(new ServerTask(serverSocket.accept()));
                }
            } finally {
                serverSocket.close();
            }
        }
    
    }
    
    

    2.创建具体service类和序列化实体类

    与1.0创建的一样,不罗列代码
    

    3.rpc调用

    public class SendSmsRpc {
        public static void main(String[] args) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        RpcServerFrame serverFrame = new RpcServerFrame(9001);
                        serverFrame.registServerToCenter(SendSms.class, SendSmsImpl.class);
                        serverFrame.startService();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }
    }
    

    客户端代码实现

    1.客户端rpc框架代码

    package com.zhang.frame;
    
    import com.zhang.entity.RegistServiceEntity;
    
    import java.io.IOException;
    import java.io.ObjectInputStream;
    import java.io.ObjectOutputStream;
    import java.lang.reflect.InvocationHandler;
    import java.lang.reflect.Method;
    import java.lang.reflect.Proxy;
    import java.net.InetSocketAddress;
    import java.net.Socket;
    import java.util.Random;
    import java.util.Set;
    
    public class RpcClientFrame {
    
        //获取远程代理对象
        public static <T> T getRemoteProxyObj(final Class<?> serviceInterface){
            InetSocketAddress addr=new InetSocketAddress("127.0.0.1",8888);
           return (T) Proxy.newProxyInstance(serviceInterface.getClassLoader(),
                   new Class<?>[]{serviceInterface},
                    new DynProxy(serviceInterface,addr));
        }
    
    
        //动态代理类
        public static class DynProxy implements InvocationHandler {
    
            private final Class<?> serviceInterface;
            private final InetSocketAddress socketAddress;
            //远程服务在本地的缓存
            private RegistServiceEntity [] serviceArray;
    
            public DynProxy(Class<?> serviceInterface, InetSocketAddress socketAddress) {
                this.serviceInterface = serviceInterface;
                this.socketAddress = socketAddress;
            }
    
            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                //先从服务注册中心取服务
                    Socket socket=null;
                ObjectOutputStream outputStream=null;
                ObjectInputStream inputStream=null;
                RegistServiceEntity serviceEntity;
                try {
                    socket=new Socket();
                    socket.connect(socketAddress);
                    outputStream=new ObjectOutputStream(socket.getOutputStream());
                    //false代表从服务注册中心获取服务
                    outputStream.writeBoolean(false);
                    outputStream.writeUTF(serviceInterface.getName());
                    outputStream.flush();
                    inputStream=new ObjectInputStream(socket.getInputStream());
                    System.out.println("Get services from registered center success:"+inputStream.readUTF());
                    Set<RegistServiceEntity> result = (Set<RegistServiceEntity>) inputStream.readObject();
                    serviceArray=new RegistServiceEntity[result.size()];
                    result.toArray(serviceArray);
                } catch (IOException e) {
                    e.printStackTrace();
                }finally {
                    if(socket!=null){
                        socket.close();
                    }
                    if(outputStream!=null){
                        outputStream.close();
                    }
                    if(inputStream!=null){
                        inputStream.close();
                    }
                }
    
                //从缓存列表中随机取一个服务器远程端口
                Random random=new Random();
                int index=random.nextInt(serviceArray.length);
                InetSocketAddress socketAddr=new InetSocketAddress(serviceArray[index].getHost(),serviceArray[index].getPort());
    
                //调用rpc服务接口
                try  {
                    socket=new Socket();
                    socket.connect(socketAddr);
                    outputStream=new ObjectOutputStream(socket.getOutputStream());
                    //方法所在的类
                    outputStream.writeUTF(serviceInterface.getName());
                    //方法名
                    outputStream.writeUTF(method.getName());
                    //方法参数类型
                    outputStream.writeObject(method.getParameterTypes());
                    //方法参数
                    outputStream.writeObject(args);
                    outputStream.flush();
    
                    inputStream=new ObjectInputStream(socket.getInputStream());
                    return inputStream.readObject();
                }finally {
                    if(socket!=null) socket.close();
                    if(outputStream!=null) outputStream.close();
                    if(inputStream!=null) inputStream.close();
                }
            }
        }
    }
    

    2.创建接口对应服务端,创建序列化实体类

    复制服务端service接口,entity;复制注册中心entity
    

    3.客户端调用

    public class Client {
    
        public static void main(String[] args) {
            UserInfo userInfo = new UserInfo("张三","1359999999");
            SendSms sendSms = RpcClientFrame.getRemoteProxyObj(SendSms.class);
            System.out.println("Send mail: "+ sendSms.sendMail(userInfo));
        }
    }
    

    相关文章

      网友评论

          本文标题:基于BIO的RPC实现(2.0)带服务注册中心

          本文链接:https://www.haomeiwen.com/subject/nkswhctx.html