美文网首页
深入dubbo spring源码

深入dubbo spring源码

作者: snail_knight | 来源:发表于2017-07-10 11:13 被阅读0次

    测试代码已经上传到github上了

    image.png
    以上为spring实例化列的方法路径,之前会有很多预处理操作
    
    image.png
    融合注入属性
    
    image.png
    初始方法  和销毁方法
    
    private LifecycleMetadata buildLifecycleMetadata(final Class<?> clazz) {
            final boolean debug = logger.isDebugEnabled();
            LinkedList<LifecycleElement> initMethods = new LinkedList<LifecycleElement>();
            LinkedList<LifecycleElement> destroyMethods = new LinkedList<LifecycleElement>();
            Class<?> targetClass = clazz;
    
            do {
                final LinkedList<LifecycleElement> currInitMethods = new LinkedList<LifecycleElement>();
                final LinkedList<LifecycleElement> currDestroyMethods = new LinkedList<LifecycleElement>();
    
                ReflectionUtils.doWithLocalMethods(targetClass, new ReflectionUtils.MethodCallback() {
                    @Override
                    public void doWith(Method method) throws IllegalArgumentException, IllegalAccessException {
                        if (initAnnotationType != null) {
                            if (method.getAnnotation(initAnnotationType) != null) {
                                LifecycleElement element = new LifecycleElement(method);
                                currInitMethods.add(element);
                                if (debug) {
                                    logger.debug("Found init method on class [" + clazz.getName() + "]: " + method);
                                }
                            }
                        }
                        if (destroyAnnotationType != null) {
                            if (method.getAnnotation(destroyAnnotationType) != null) {
                                currDestroyMethods.add(new LifecycleElement(method));
                                if (debug) {
                                    logger.debug("Found destroy method on class [" + clazz.getName() + "]: " + method);
                                }
                            }
                        }
                    }
                });
    
                initMethods.addAll(0, currInitMethods);
                destroyMethods.addAll(currDestroyMethods);
                targetClass = targetClass.getSuperclass();
            }
            while (targetClass != null && targetClass != Object.class);
    
            return new LifecycleMetadata(clazz, initMethods, destroyMethods);
        }
    

    一个回调模式设计比较好的例子

    image.png
    ServiceConfig
    
    image.png
    连接池
    
    image.png image.png
    心跳更新
    

    附带心跳例子

    public class ScheduledThreadPoolTest {
        public static void main(String[] args) {
            LinkedBlockingDeque<String> workQueue = new LinkedBlockingDeque<String>();
            ScheduledExecutorService scheduledExecutorService
                    = Executors.newScheduledThreadPool(
                            8);
    
            for (int i = 0; i < 10; i++) {
                workQueue.add(""+i);
            }
            ScheduledFuture scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(
                    new Runnable() {
                        @Override
                        public void run() {
                            System.out.println(Thread.currentThread().getName()+":"+workQueue.poll());
                        }
                    },
                    1000,
                    1000,
                    TimeUnit.MICROSECONDS
            );
    
            for (int i = 10; i < 200; i++) {
                workQueue.add(""+i);
            }
    
    
    //        scheduledExecutorService.schedule(new Callable<String>() {
    //            @Override
    //            public String call() throws Exception {
    //                String message = workQueue.poll();
    //
    //                System.out.println(Thread.currentThread().getName()+":"+message);
    //                return Thread.currentThread().getName()+"";
    //            }
    //        },500, TimeUnit.MICROSECONDS);
    
    
        }
    }
    

    dubbo中HeaderExchageServer的启动源码

    private void startHeatbeatTimer() {
            stopHeartbeatTimer();
            if (heartbeat > 0) {
                                            //  采用固定延迟心跳方式
                heatbeatTimer = scheduled.scheduleWithFixedDelay(
                        new HeartBeatTask( new HeartBeatTask.ChannelProvider() {
                            public Collection<Channel> getChannels() {
                                return Collections.unmodifiableCollection(
                                        HeaderExchangeServer.this.getChannels() );
                            }
                        }, heartbeat, heartbeatTimeout),
                        heartbeat, heartbeat,TimeUnit.MILLISECONDS);
            }
        }
    
    image.png
    注册zk
    
    image.png
    启动接口配置
    

    接口代理主要的几个类:
    ReferenceBean
    ReferenceConfig

    image.png
    创建接口代理的入口代码
    
    image.png
    ClassHelper.getCallerClassLoader(.class)
    

    dubbo 接口代理高潮部分:
    com.alibaba.dubbo.common.bytecode.Proxy.getProxy

    public static Proxy getProxy(ClassLoader cl, Class<?>... ics)
        {
            if( ics.length > 65535 )
                throw new IllegalArgumentException("interface limit exceeded");
            
            StringBuilder sb = new StringBuilder();
            for(int i=0;i<ics.length;i++)
            {
                String itf = ics[i].getName();
                if( !ics[i].isInterface() )
                    throw new RuntimeException(itf + " is not a interface.");
    
                Class<?> tmp = null;
                try
                {
                    tmp = Class.forName(itf, false, cl);
                }
                catch(ClassNotFoundException e)
                {}
    
                if( tmp != ics[i] )
                    throw new IllegalArgumentException(ics[i] + " is not visible from class loader");
    
                sb.append(itf).append(';');
            }
    
            // use interface class name list as key.
            String key = sb.toString();
    
            // get cache by class loader.
            Map<String, Object> cache;
            synchronized( ProxyCacheMap )
            {
                cache = ProxyCacheMap.get(cl);
                if( cache == null )
                {
                    cache = new HashMap<String, Object>();
                    ProxyCacheMap.put(cl, cache);
                }
            }
    
            Proxy proxy = null;
            synchronized( cache )
            {
                do
                {
                    Object value = cache.get(key);
                    if( value instanceof Reference<?> )
                    {
                        proxy = (Proxy)((Reference<?>)value).get();
                        if( proxy != null )
                            return proxy;
                    }
    
                    if( value == PendingGenerationMarker )
                    {
                        try{ cache.wait(); }catch(InterruptedException e){}
                    }
                    else
                    {
                        cache.put(key, PendingGenerationMarker);
                        break;
                    }
                }
                while( true );
            }
    
            long id = PROXY_CLASS_COUNTER.getAndIncrement();
            String pkg = null;
            ClassGenerator ccp = null, ccm = null;
            try
            {
                ccp = ClassGenerator.newInstance(cl);
    
                Set<String> worked = new HashSet<String>();
                List<Method> methods = new ArrayList<Method>();
    
                for(int i=0;i<ics.length;i++)
                {
                    if( !Modifier.isPublic(ics[i].getModifiers()) )
                    {
                        String npkg = ics[i].getPackage().getName();
                        if( pkg == null )
                        {
                            pkg = npkg;
                        }
                        else
                        {
                            if( !pkg.equals(npkg)  )
                                throw new IllegalArgumentException("non-public interfaces from different packages");
                        }
                    }
                    ccp.addInterface(ics[i]);
    
                    for( Method method : ics[i].getMethods() )
                    {
                        String desc = ReflectUtils.getDesc(method);
                        if( worked.contains(desc) )
                            continue;
                        worked.add(desc);
    
                        int ix = methods.size();
                        Class<?> rt = method.getReturnType();
                        Class<?>[] pts = method.getParameterTypes();
    
                        StringBuilder code = new StringBuilder("Object[] args = new Object[").append(pts.length).append("];");
                        for(int j=0;j<pts.length;j++)
                            code.append(" args[").append(j).append("] = ($w)$").append(j+1).append(";");
                        code.append(" Object ret = handler.invoke(this, methods[" + ix + "], args);");
                        if( !Void.TYPE.equals(rt) )
                            code.append(" return ").append(asArgument(rt, "ret")).append(";");
    
                        methods.add(method);
                        ccp.addMethod(method.getName(), method.getModifiers(), rt, pts, method.getExceptionTypes(), code.toString());
                    }
                }
    
                if( pkg == null )
                    pkg = PACKAGE_NAME;
    
                // create ProxyInstance class.
                String pcn = pkg + ".proxy" + id;
                ccp.setClassName(pcn);
                ccp.addField("public static java.lang.reflect.Method[] methods;");
                ccp.addField("private " + InvocationHandler.class.getName() + " handler;");
                ccp.addConstructor(Modifier.PUBLIC, new Class<?>[]{ InvocationHandler.class }, new Class<?>[0], "handler=$1;");
                ccp.addDefaultConstructor();
                Class<?> clazz = ccp.toClass();
                clazz.getField("methods").set(null, methods.toArray(new Method[0]));
    
                // create Proxy class.
                String fcn = Proxy.class.getName() + id;
                ccm = ClassGenerator.newInstance(cl);
                ccm.setClassName(fcn);
                ccm.addDefaultConstructor();
                ccm.setSuperClass(Proxy.class);
                ccm.addMethod("public Object newInstance(" + InvocationHandler.class.getName() + " h){ return new " + pcn + "($1); }");
                Class<?> pc = ccm.toClass();
                proxy = (Proxy)pc.newInstance();
            }
            catch(RuntimeException e)
            {
                throw e;
            }
            catch(Exception e)
            {
                throw new RuntimeException(e.getMessage(), e);
            }
            finally
            {
                // release ClassGenerator
                if( ccp != null )
                    ccp.release();
                if( ccm != null )
                    ccm.release();
                synchronized( cache )
                {
                    if( proxy == null )
                        cache.remove(key);
                    else
                        cache.put(key, new WeakReference<Proxy>(proxy));
                    cache.notifyAll();
                }
            }
            return proxy;
        }
    

    在做以上步骤的时候,首先需要通过zookeeper验证下,是否已经注册与之对应的服务,当然也可以将接口配置成check="false",这样启动时不检查,而是通过心跳方式检查。
    接下来我们梳理步骤
    首先引入两个类,在本例中,我的消费接口为
    interface cn.wd.easyx.service.api.QueryAdapter 自定义
    interface com.alibaba.dubbo.rpc.service.EchoService dubbo定义的一个空接口。
    第一步:确保都在同一个classloader中
    第二步:将所有的接口加载类加载器为键的 Map中
    第三步:根据classloader创建ClassGenerator dubbo基于javassist二进制的字节码生成器
    第四步:遍历两个接口中的所有方法,将接口中的所有方法注册到ClassGenerator中,然后生成一个以com.alibaba.dubbo.common.bytecode.proxy0为Class.name的class实例
    第五步:再创建一个基于cl加载器的ClassGenerator的实例,设置同名为com.alibaba.dubbo.common.bytecode.proxy0,继承Proxy.class,添加实例化方法,生成实例化proxy

    image.png image.png

    为了更加直观看到dubbo中的ClassGenerator到底做了哪些东西,我们做一个实验:
    在自己的项目中引入dubbo 依赖

    <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>dubbo</artifactId>
                <version>2.5.3</version>
    </dependency>
    

    模仿dubbo中的EchoService接口

    public interface EchoService {
        Object $eche(Object obj);
    }
    

    自定义接口:

    public interface Interface1 {
        String getName();
        String setName(String name);
    }
    

    拷贝Proxy类,将类名给为MyProxy,

    String pcn = pkg + ".proxy" + id;
                ccp.setClassName(pcn);
                ccp.addField("public static java.lang.reflect.Method[] methods;");
                ccp.addField("private " + InvocationHandler.class.getName() + " handler;");
                ccp.addConstructor(Modifier.PUBLIC, new Class<?>[]{ InvocationHandler.class }, new Class<?>[0], "handler=$1;");
                ccp.addDefaultConstructor();
                Class<?> clazz = ccp.toClass();
    
                //把生成的class文件写入文件
                byte[] byteArr = ccp.getCtClass(clazz).toBytecode();
                FileOutputStream fos = new FileOutputStream(new File("Proxy.class"));
                fos.write(byteArr);
                fos.close();
    
                clazz.getField("methods").set(null, methods.toArray(new Method[0]));
    
                // create Proxy class.
                String fcn = MyProxy.class.getName() + id;
                ccm = MyClassGenerator.newInstance(cl);
                ccm.setClassName(fcn);
                ccm.addDefaultConstructor();
                ccm.setSuperClass(MyProxy.class);
                ccm.addMethod("public Object newInstance(" + InvocationHandler.class.getName() + " h){ return new " + pcn + "($1); }");
                Class<?> pc = ccm.toClass();
    
                //把生成的class文件写入文件
                byte[] byteArr2 = ccm.getCtClass(pc).toBytecode();
                FileOutputStream fos2 = new FileOutputStream(new File("Proxy2.class"));
                fos2.write(byteArr2);
                fos2.close();
    

    拷贝ClassGenerator类,改名为MyClassGenerator,将getCtClass方法改成public

    public CtClass getCtClass(Class<?> c) throws NotFoundException
        {
            return mPool.get(c.getName());
        }
    

    运行代码

    public static void main(String[] args) {
    
            Class<?>[] interfaces = new Class[]{Interface1.class,EchoService.class};
    
    
    
            MyProxy.getProxy(interfaces).newInstance(new InvocationHandler() {
                @Override
                public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                    return null;
                }
            });
        }
    

    动态生成的字节码

    public class proxy0 implements DC, EchoService, Interface1 {
        public static Method[] methods;
        private InvocationHandler handler;
    
        public String getName() {
            Object[] var1 = new Object[0];
            Object var2 = this.handler.invoke(this, methods[0], var1);
            return (String)var2;
        }
    
        public String setName(String var1) {
            Object[] var2 = new Object[]{var1};
            Object var3 = this.handler.invoke(this, methods[1], var2);
            return (String)var3;
        }
    
        public Object $eche(Object var1) {
            Object[] var2 = new Object[]{var1};
            Object var3 = this.handler.invoke(this, methods[2], var2);
            return (Object)var3;
        }
    
        public proxy0() {
        }
    
        public proxy0(InvocationHandler var1) {
            this.handler = var1;
        }
    }
    
    public class MyProxy0 extends MyProxy implements DC {
        public Object newInstance(InvocationHandler var1) {
            return new proxy0(var1);
        }
    
        public MyProxy0() {
        }
    }
    

    相关文章

      网友评论

          本文标题:深入dubbo spring源码

          本文链接:https://www.haomeiwen.com/subject/bndehxtx.html