美文网首页Dubbo 源码学习手写RPC框架Java
手写类似dubbo的rpc框架第三章《rpc框架》

手写类似dubbo的rpc框架第三章《rpc框架》

作者: bugstack虫洞栈 | 来源:发表于2019-05-07 20:47 被阅读18次

    案例介绍
    结合上面两章节,本章将实现rpc的基础功能;提供一给rpc中间件jar给生产端和服务端。
    技术点;
    1、注册中心,生产者在启动的时候需要将本地接口发布到注册中心,我们这里采用redis作为注册中心,随机取数模拟权重。
    2、客户端在启动的时候,连接到注册中心,也就是我们的redis。连接成功后将配置的生产者方法发布到注册中心{接口+别名}。
    3、服务端配置生产者的信息后,在加载xml时候由中间件生成动态代理类,当发生发放调用时实际则调用了我们代理类的方法,代理里会通过netty的futuer通信方式进行数据交互。

    环境准备
    1、jdk 1.8.0
    2、IntelliJ IDEA Community Edition 2018.3.1 x64
    3、windows redis

    代码示例

    itstack-demo-rpc-03
    └── src
        └── main
        │    ├── java
        │    │    └── org.itstack.demo.rpc
        │    │        ├── config
        │    │        ├── domain
        │    │        ├── network   
        │    │        │   ├── client
        │    │        │   │   ├── ClientSocket.java
        │    │        │   │   └── MyClientHandler.java  
        │    │        │   ├── codec
        │    │        │   │   ├── RpcDecoder.java
        │    │        │   │   └── RpcEncoder.java  
        │    │        │   ├── future
        │    │        │   │   ├── SyncWrite.java    
        │    │        │   │   ├── SyncWriteFuture.java  
        │    │        │   │   ├── SyncWriteMap.java 
        │    │        │   │   └── WriteFuture.java  
        │    │        │   ├── msg
        │    │        │   │   ├── Request.java
        │    │        │   │   └── Response.java 
        │    │        │   ├── server
        │    │        │   │   ├── MyServerHandler.java
        │    │        │   │   └── ServerSocket.java     
        │    │        │   └── util
        │    │        │       └── SerializationUtil.java    
        │    │        ├── reflect
        │    │        │   ├── JDKInvocationHandler.java 
        │    │        │   └── JDKProxy.java
        │    │        ├── registry
        │    │        │   └── RedisRegistryCenter.java  
        │    │        └── util  
        │    └── resource
        │        └── META-INF
        │            ├── rpc.xsd
        │            ├── spring.handlers
        │            └── spring.schemas 
        
        └── test
             ├── java
             │   └── org.itstack.demo.test
             │       ├── service
             │       │   ├── impl
             │       │   │   └── HelloServiceImpl.java  
             │       │   └── HelloService.java
             │       └── ApiTest.java                
             └── resource  
                 ├── itstack-rpc-center.xml
                 ├── itstack-rpc-consumer.xml        
                 ├── itstack-rpc-provider.xml
                 └── log4j.xml           
    
    

    ConsumerBean.java

    package org.itstack.demo.rpc.config.spring.bean;
    
    import com.alibaba.fastjson.JSON;
    import io.netty.channel.ChannelFuture;
    import org.itstack.demo.rpc.config.ConsumerConfig;
    import org.itstack.demo.rpc.domain.RpcProviderConfig;
    import org.itstack.demo.rpc.network.client.ClientSocket;
    import org.itstack.demo.rpc.network.msg.Request;
    import org.itstack.demo.rpc.reflect.JDKProxy;
    import org.itstack.demo.rpc.registry.RedisRegistryCenter;
    import org.itstack.demo.rpc.util.ClassLoaderUtils;
    import org.springframework.beans.factory.FactoryBean;
    import org.springframework.util.Assert;
    
    /**
     * http://www.itstack.org
     * create by fuzhengwei on 2019/5/6
     */
    public class ConsumerBean<T> extends ConsumerConfig<T> implements FactoryBean {
    
        private ChannelFuture channelFuture;
    
        private RpcProviderConfig rpcProviderConfig;
    
        @Override
        public Object getObject() throws Exception {
    
            //从redis获取链接
            if (null == rpcProviderConfig) {
                String infoStr = RedisRegistryCenter.obtainProvider(nozzle, alias);
                rpcProviderConfig = JSON.parseObject(infoStr, RpcProviderConfig.class);
            }
            Assert.isTrue(null != rpcProviderConfig);
    
            //获取通信channel
            if (null == channelFuture) {
                ClientSocket clientSocket = new ClientSocket(rpcProviderConfig.getHost(), rpcProviderConfig.getPort());
                new Thread(clientSocket).start();
                for (int i = 0; i < 100; i++) {
                    if (null != channelFuture) break;
                    Thread.sleep(500);
                    channelFuture = clientSocket.getFuture();
                }
            }
            Assert.isTrue(null != channelFuture);
    
            Request request = new Request();
            request.setChannel(channelFuture.channel());
            request.setNozzle(nozzle);
            request.setRef(rpcProviderConfig.getRef());
            request.setAlias(alias);
            return (T) JDKProxy.getProxy(ClassLoaderUtils.forName(nozzle), request);
        }
    
        @Override
        public Class<?> getObjectType() {
            try {
                return ClassLoaderUtils.forName(nozzle);
            } catch (ClassNotFoundException e) {
                return null;
            }
        }
    
        @Override
        public boolean isSingleton() {
            return true;
        }
    
    
    }
    

    ProviderBean.java

    package org.itstack.demo.rpc.config.spring.bean;
    
    import com.alibaba.fastjson.JSON;
    import org.itstack.demo.rpc.config.ProviderConfig;
    import org.itstack.demo.rpc.domain.LocalServerInfo;
    import org.itstack.demo.rpc.domain.RpcProviderConfig;
    import org.itstack.demo.rpc.registry.RedisRegistryCenter;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.BeansException;
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.ApplicationContextAware;
    
    /**
     * http://www.itstack.org
     * create by fuzhengwei on 2019/5/6
     */
    public class ProviderBean extends ProviderConfig implements ApplicationContextAware {
    
        private Logger logger = LoggerFactory.getLogger(ProviderBean.class);
    
        @Override
        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
    
            RpcProviderConfig rpcProviderConfig = new RpcProviderConfig();
            rpcProviderConfig.setNozzle(nozzle);
            rpcProviderConfig.setRef(ref);
            rpcProviderConfig.setAlias(alias);
            rpcProviderConfig.setHost(LocalServerInfo.LOCAL_HOST);
            rpcProviderConfig.setPort(LocalServerInfo.LOCAL_PORT);
    
            //注册生产者
            long count = RedisRegistryCenter.registryProvider(nozzle, alias, JSON.toJSONString(rpcProviderConfig));
    
            logger.info("注册生产者:{} {} {}", nozzle, alias, count);
        }
    
    }
    

    ServerBean.java

    package org.itstack.demo.rpc.config.spring.bean;
    
    import org.itstack.demo.rpc.config.ServerConfig;
    import org.itstack.demo.rpc.domain.LocalServerInfo;
    import org.itstack.demo.rpc.network.server.ServerSocket;
    import org.itstack.demo.rpc.registry.RedisRegistryCenter;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.BeansException;
    import org.springframework.beans.factory.InitializingBean;
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.ApplicationContextAware;
    
    /**
     * http://www.itstack.org
     * create by fuzhengwei on 2019/5/6
     */
    public class ServerBean extends ServerConfig implements ApplicationContextAware {
    
        private Logger logger = LoggerFactory.getLogger(ServerBean.class);
    
        @Override
        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
            //启动注册中心
            logger.info("启动注册中心 ...");
            RedisRegistryCenter.init(host, port);
            logger.info("启动注册中心完成 {} {}", host, port);
    
            //初始化服务端
            logger.info("初始化生产端服务 ...");
            ServerSocket serverSocket = new ServerSocket(applicationContext);
            Thread thread = new Thread(serverSocket);
            thread.start();
            while (!serverSocket.isActiveSocketServer()) {
                try {
                    Thread.sleep(500);
                } catch (InterruptedException ignore) {
                }
            }
    
            logger.info("初始化生产端服务完成 {} {}", LocalServerInfo.LOCAL_HOST, LocalServerInfo.LOCAL_PORT);
        }
    
    
    }
    

    MyClientHandler.java

    package org.itstack.demo.rpc.network.client;
    
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import org.itstack.demo.rpc.network.future.SyncWriteFuture;
    import org.itstack.demo.rpc.network.future.SyncWriteMap;
    import org.itstack.demo.rpc.network.msg.Response;
    
    /**
     * http://www.itstack.org
     * create by fuzhengwei on 2019/5/6
     */
    public class MyClientHandler extends ChannelInboundHandlerAdapter {
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object obj) throws Exception {
            Response msg = (Response) obj;
            String requestId = msg.getRequestId();
            SyncWriteFuture future = (SyncWriteFuture) SyncWriteMap.syncKey.get(requestId);
            if (future != null) {
                future.setResponse(msg);
            }
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            cause.printStackTrace();
            ctx.close();
        }
    
    }
    

    MyServerHandler.java

    package org.itstack.demo.rpc.network.server;
    
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.util.ReferenceCountUtil;
    import org.itstack.demo.rpc.network.msg.Request;
    import org.itstack.demo.rpc.network.msg.Response;
    import org.itstack.demo.rpc.util.ClassLoaderUtils;
    import org.springframework.context.ApplicationContext;
    
    import java.lang.reflect.Method;
    
    /**
     * http://www.itstack.org
     * create by fuzhengwei on 2019/5/6
     */
    public class MyServerHandler extends ChannelInboundHandlerAdapter {
    
        private ApplicationContext applicationContext;
    
        MyServerHandler(ApplicationContext applicationContext) {
            this.applicationContext = applicationContext;
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object obj) {
            try {
                Request msg = (Request) obj;
                //调用
                Class<?> classType = ClassLoaderUtils.forName(msg.getNozzle());
                Method addMethod = classType.getMethod(msg.getMethodName(), msg.getParamTypes());
                Object objectBean = applicationContext.getBean(msg.getRef());
                Object result = addMethod.invoke(objectBean, msg.getArgs());
                //反馈
                Response request = new Response();
                request.setRequestId(msg.getRequestId());
                request.setResult(result);
                ctx.writeAndFlush(request);
                //释放
                ReferenceCountUtil.release(msg);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) {
            ctx.flush();
        }
    
    }
    

    JDKInvocationHandler.java

    package org.itstack.demo.rpc.reflect;
    
    
    import org.itstack.demo.rpc.network.future.SyncWrite;
    import org.itstack.demo.rpc.network.msg.Request;
    import org.itstack.demo.rpc.network.msg.Response;
    
    import java.lang.reflect.InvocationHandler;
    import java.lang.reflect.Method;
    
    public class JDKInvocationHandler implements InvocationHandler {
    
        private Request request;
    
        public JDKInvocationHandler(Request request) {
            this.request = request;
        }
    
        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            String methodName = method.getName();
            Class[] paramTypes = method.getParameterTypes();
            if ("toString".equals(methodName) && paramTypes.length == 0) {
                return request.toString();
            } else if ("hashCode".equals(methodName) && paramTypes.length == 0) {
                return request.hashCode();
            } else if ("equals".equals(methodName) && paramTypes.length == 1) {
                return request.equals(args[0]);
            }
            //设置参数
            request.setMethodName(methodName);
            request.setParamTypes(paramTypes);
            request.setArgs(args);
            request.setRef(request.getRef());
            Response response = new SyncWrite().writeAndSync(request.getChannel(), request, 5000);
            //异步调用
            return response.getResult();
    
        }
    
    }
    

    JDKProxy.java

    package org.itstack.demo.rpc.reflect;
    
    
    import org.itstack.demo.rpc.network.msg.Request;
    import org.itstack.demo.rpc.util.ClassLoaderUtils;
    
    import java.lang.reflect.InvocationHandler;
    import java.lang.reflect.Proxy;
    
    public class JDKProxy {
    
        public static <T> T getProxy(Class<T> interfaceClass, Request request) throws Exception {
            InvocationHandler handler = new JDKInvocationHandler(request);
            ClassLoader classLoader = ClassLoaderUtils.getCurrentClassLoader();
            T result = (T) Proxy.newProxyInstance(classLoader, new Class[]{interfaceClass}, handler);
            return result;
        }
    
    }
    

    RedisRegistryCenter.java

    package org.itstack.demo.rpc.registry;
    
    import redis.clients.jedis.Jedis;
    import redis.clients.jedis.JedisPool;
    import redis.clients.jedis.JedisPoolConfig;
    
    /**
     * http://www.itstack.org
     * create by fuzhengwei on 2019/5/7
     * redis 模拟RPC注册中心
     */
    public class RedisRegistryCenter {
    
        private static Jedis jedis;   //非切片额客户端连接
    
        //初始化redis
        public static void init(String host, int port) {
            // 池基本配置
            JedisPoolConfig config = new JedisPoolConfig();
            config.setMaxIdle(5);
            config.setTestOnBorrow(false);
            JedisPool jedisPool = new JedisPool(config, host, port);
            jedis = jedisPool.getResource();
        }
    
        /**
         * 注册生产者
         *
         * @param nozzle 接口
         * @param alias  别名
         * @param info   信息
         * @return 注册结果
         */
        public static Long registryProvider(String nozzle, String alias, String info) {
            return jedis.sadd(nozzle + "_" + alias, info);
        }
    
        /**
         * 获取生产者
         * 模拟权重,随机获取
         * @param nozzle 接口名称
         */
        public static String obtainProvider(String nozzle, String alias) {
            return jedis.srandmember(nozzle + "_" + alias);
        }
    
        public static Jedis jedis() {
            return jedis;
        }
    
    }
    

    ApiTest.java

    public class ApiTest {
    
        public static void main(String[] args) {
            String[] configs = {"itstack-rpc-center.xml", "itstack-rpc-provider.xml", "itstack-rpc-consumer.xml"};
            new ClassPathXmlApplicationContext(configs);
        }
    
    }
    

    框架,测试结果

    2019-....ClassPathXmlApplicationContext:prepareRefresh:510] - Refreshing org.springframework.context.support.ClassPathXmlApplicationContext@299a06ac: startup date [Tue May 07 20:19:47 CST 2019]; root of context hierarchy
    2019-...ml.XmlBeanDefinitionReader:loadBeanDefinitions:315] - Loading XML bean definitions from class path resource [spring/itstack-rpc-center.xml]
    2019-...ml.XmlBeanDefinitionReader:loadBeanDefinitions:315] - Loading XML bean definitions from class path resource [spring/itstack-rpc-provider.xml]
    2019-...ml.XmlBeanDefinitionReader:loadBeanDefinitions:315] - Loading XML bean definitions from class path resource [spring/itstack-rpc-consumer.xml]
    2019-...upport.DefaultListableBeanFactory:preInstantiateSingletons:577] - Pre-instantiating singletons in org.springframework.beans.factory.support.DefaultListableBeanFactory@7e0b0338: defining beans [consumer_itstack,provider_helloService,consumer_helloService]; root of factory hierarchy
    2019-...bean.ServerBean:setApplicationContext:25] - 启动注册中心 ...
    2019-...bean.ServerBean:setApplicationContext:27] - 启动注册中心完成 127.0.0.1 6379
    2019-...bean.ServerBean:setApplicationContext:30] - 初始化生产端服务 ...
    2019-...bean.ServerBean:setApplicationContext:41] - 初始化生产端服务完成 10.13.81.104 22201
    2019-...bean.ProviderBean:setApplicationContext:35] - 注册生产者:org.itstack.demo.test.service.HelloService itStackRpc 0
    

    框架应用
    为了测试我们写两个测试工程;itstack-demo-rpc-provider、itstack-demo-rpc-consumer

    itstack-demo-rpc-provider 提供生产者接口

    itstack-demo-rpc-provider
    ├── itstack-demo-rpc-provider-export
    │   └── src
    │        └── main
    │            └── java
    │                 └── org.itstack.demo.rpc.provider.export
    │                     ├── domain 
    │                     │   └── Hi.java
    │                     └── HelloService.java
    │   
    └── itstack-demo-rpc-provider-web
        └── src
             └── main
                 ├── java
                 │    └── org.itstack.demo.rpc.provider.web
                 │        └── HelloServiceImpl.java
                 └── resources
                      └── spring
                          └── spring-itstack-rpc-provider.xml
    

    HelloService.java

    public interface HelloService {
    
        String hi();
    
        String say(String str);
    
        String sayHi(Hi hi);
    
    }
    

    HelloServiceImpl.java

    @Controller("helloService")
    public class HelloServiceImpl implements HelloService {
    
        @Override
        public String hi() {
            return "hi itstack rpc";
        }
    
        @Override
        public String say(String str) {
            return str;
        }
    
        @Override
        public String sayHi(Hi hi) {
            return hi.getUserName() + " say:" + hi.getSayMsg();
        }
    
    }
    

    spring-itstack-rpc-provider.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rpc="http://rpc.itstack.org/schema/rpc"
           xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
          http://rpc.itstack.org/schema/rpc http://rpc.itstack.org/schema/rpc/rpc.xsd">
    
        <!-- 注册中心 -->
        <rpc:server id="rpcServer" host="127.0.0.1" port="6379"/>
    
        <rpc:provider id="helloServiceRpc" nozzle="org.itstack.demo.rpc.provider.export.HelloService"
                      ref="helloService" alias="itstackRpc"/>
    
    </beans>
    

    itstack-demo-rpc-consumer 提供消费者调用

    itstack-demo-rpc-consumer
    └── src
         ├── main
         │   ├── java    
         │   └── resources
         │       └── spring
         │           └── spring-itstack-rpc-consumer.xml
         │   
         └── test
             └── java
                 └── org.itstack.demo.test
                     └── ConsumerTest.java               
    

    spring-itstack-rpc-consumer.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rpc="http://rpc.itstack.org/schema/rpc"
           xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
          http://rpc.itstack.org/schema/rpc http://rpc.itstack.org/schema/rpc/rpc.xsd">
    
        <!-- 注册中心 -->
        <rpc:server id="consumer_itstack" host="127.0.0.1" port="6379"/>
    
        <rpc:consumer id="helloService" nozzle="org.itstack.demo.rpc.provider.export.HelloService" alias="itstackRpc"/>
    
    </beans>
    

    ConsumerTest.java

    @RunWith(SpringJUnit4ClassRunner.class)
    @ContextConfiguration("/spring-config.xml")
    public class ConsumerTest {
    
        @Resource(name = "helloService")
        private HelloService helloService;
    
        @Test
        public void test() {
            
            String hi = helloService.hi();
            System.out.println("测试结果:" + hi);
    
            String say = helloService.say("hello world");
            System.out.println("测试结果:" + say);
    
            Hi hiReq = new Hi();
            hiReq.setUserName("付栈");
            hiReq.setSayMsg("付可敌国,栈无不胜");
            String hiRes = helloService.sayHi(hiReq);
    
            System.out.println("测试结果:" + hiRes);
        }
    
    }
    

    应用,测试结果 测试时启动redis

    启动ProviderTest Redis中的注册数据

    redis 127.0.0.1:6379> srandmember org.itstack.demo.rpc.provider.export.HelloService_itstackRpc
    "{\"alias\":\"itstackRpc\",\"host\":\"10.13.81.104\",\"nozzle\":\"org.itstack.demo.rpc.provider.export.HelloService\",\"port\":22201,\"ref\":\"helloService\"}"
    redis 127.0.0.1:6379>
    

    执行ConsumerTest中的单元测试方法

    log4j:WARN No appenders could be found for logger (org.springframework.test.context.junit4.SpringJUnit4ClassRunner).
    log4j:WARN Please initialize the log4j system properly.
    log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
    测试结果:hi itstack rpc
    测试结果:hello world
    测试结果:付栈 say:付可敌国,栈无不胜
    
    Process finished with exit code 0
    

    相关文章

      网友评论

        本文标题:手写类似dubbo的rpc框架第三章《rpc框架》

        本文链接:https://www.haomeiwen.com/subject/xsquoqtx.html