Verticle

作者: 沧行 | 来源:发表于2016-11-20 18:04 被阅读1622次

    背景

    • verticle相当于1个执行模块,是vertx的部署单元。
    • vertx可以部署多个verticle,且verticle之间可以互相通信。
    • 因为vertx主要是个网络框架,所以在verticle中通常会启动server,如http或tcp。

    Verticle部署源码分析

    @Override
    public void deployVerticle(String name, DeploymentOptions options, Handler<AsyncResult<String>> completionHandler) {
        if (options.isHa() && haManager() != null && haManager().isEnabled()) {
            haManager().deployVerticle(name, options, completionHandler);
        } else {
            deploymentManager.deployVerticle(name, options, completionHandler);
        }
    }
    

    deploymentManager.deployVerticle代码如下:

    public void deployVerticle(String identifier,
                               DeploymentOptions options,
                               Handler<AsyncResult<String>> completionHandler) {
        ContextImpl callingContext = vertx.getOrCreateContext();
        ClassLoader cl = getClassLoader(options, callingContext);
        /**
         * identifier为verticle类的全限定名
         * callingContext为eventloop context
         */
        doDeployVerticle(identifier, generateDeploymentID(), options, callingContext, callingContext, cl, completionHandler);
    }
    

    首先是创建context,ContextImpl callingContext = vertx.getOrCreateContext(),这块代码如下:

    public ContextImpl getOrCreateContext() {
        ContextImpl ctx = getContext();
        if (ctx == null) {
            // We are running embedded - Create a context
            ctx = createEventLoopContext(null, null, new JsonObject(), Thread.currentThread().getContextClassLoader());
        }
        return ctx;
    }
    
    public ContextImpl getContext() {
        ContextImpl context = (ContextImpl) context();
        if (context != null && context.owner == this) {
            return context;
        }
        return null;
    }
    
    public static Context context() {
        Thread current = Thread.currentThread();
        if (current instanceof VertxThread) {
            return ((VertxThread) current).getContext();
        }
        return null;
    }
    
    public EventLoopContext createEventLoopContext(String deploymentID, WorkerPool workerPool, JsonObject config, ClassLoader tccl) {
        return new EventLoopContext(this, internalBlockingPool, workerPool != null ? workerPool : this.workerPool, deploymentID, config, tccl);
    }
    

    最终会调到创建EventLoopContext的方法,详细见context部分分析,这里得到context后,接下来分析doDeployVerticle方法,如下:

    private void doDeployVerticle(Iterator<VerticleFactory> iter,
                                  Throwable prevErr,
                                  String identifier,
                                  String deploymentID,
                                  DeploymentOptions options,
                                  ContextImpl parentContext,
                                  ContextImpl callingContext,
                                  ClassLoader cl,
                                  Handler<AsyncResult<String>> completionHandler) {
        if (iter.hasNext()) {
            VerticleFactory verticleFactory = iter.next();
            Future<String> fut = Future.future();
            if (verticleFactory.requiresResolve()) {
                try {
                    verticleFactory.resolve(identifier, options, cl, fut);
                } catch (Exception e) {
                    try {
                        fut.fail(e);
                    } catch (Exception ignore) {
                        // Too late
                    }
                }
            } else {
                fut.complete(identifier);
            }
            fut.setHandler(ar -> {
                Throwable err;
                if (ar.succeeded()) {
                    /**
                     * resolvedName为verticle类全限定名
                     */
                    String resolvedName = ar.result();
                    if (!resolvedName.equals(identifier)) {
                        deployVerticle(resolvedName, options, completionHandler);
                        return;
                    } else {
                        if (verticleFactory.blockingCreate()) {
                            vertx.<Verticle[]>executeBlocking(createFut -> {
                                try {
                                    Verticle[] verticles = createVerticles(verticleFactory, identifier, options.getInstances(), cl);
                                    createFut.complete(verticles);
                                } catch (Exception e) {
                                    createFut.fail(e);
                                }
                            }, res -> {
                                if (res.succeeded()) {
                                    doDeploy(identifier, deploymentID, options, parentContext, callingContext, completionHandler, cl, res.result());
                                } else {
                                    // Try the next one
                                    doDeployVerticle(iter, res.cause(), identifier, deploymentID, options, parentContext, callingContext, cl, completionHandler);
                                }
                            });
                            return;
                        } else {
                            try {
                                Verticle[] verticles = createVerticles(verticleFactory, identifier, options.getInstances(), cl);
                                doDeploy(identifier, deploymentID, options, parentContext, callingContext, completionHandler, cl, verticles);
                                return;
                            } catch (Exception e) {
                                err = e;
                            }
                        }
                    }
                } else {
                    err = ar.cause();
                }
                // Try the next one
                doDeployVerticle(iter, err, identifier, deploymentID, options, parentContext, callingContext, cl, completionHandler);
            });
        } else {
            if (prevErr != null) {
                // Report failure if there are no more factories to try otherwise try the next one
                reportFailure(prevErr, callingContext, completionHandler);
            } else {
                // not handled or impossible ?
            }
        }
    }
    

    核心代码如下:

        Verticle[] verticles = createVerticles(verticleFactory, identifier, options.getInstances(), cl);
        doDeploy(identifier, deploymentID, options, parentContext, callingContext, completionHandler, cl, verticles);
    

    createVerticles方法如下,创建verticle就是根据其类的全限定名创建对象:

    private Verticle[] createVerticles(VerticleFactory verticleFactory, String identifier, int instances, ClassLoader cl) throws Exception {
        Verticle[] verticles = new Verticle[instances];
        for (int i = 0; i < instances; i++) {
            verticles[i] = verticleFactory.createVerticle(identifier, cl);
            if (verticles[i] == null) {
                throw new NullPointerException("VerticleFactory::createVerticle returned null");
            }
        }
        return verticles;
    }
    

    doDeploy方法如下:

        private void doDeploy(String identifier, String deploymentID, DeploymentOptions options,
                          ContextImpl parentContext,
                          ContextImpl callingContext,
                          Handler<AsyncResult<String>> completionHandler,
                          ClassLoader tccl, Verticle... verticles) {
        if (options.isMultiThreaded() && !options.isWorker()) {
            throw new IllegalArgumentException("If multi-threaded then must be worker too");
        }
        JsonObject conf = options.getConfig() == null ? new JsonObject() : options.getConfig().copy(); // Copy it
        String poolName = options.getWorkerPoolName();
    
        Deployment parent = parentContext.getDeployment();
        DeploymentImpl deployment = new DeploymentImpl(parent, deploymentID, identifier, options);
    
        AtomicInteger deployCount = new AtomicInteger();
        AtomicBoolean failureReported = new AtomicBoolean();
        for (Verticle verticle : verticles) {
            WorkerExecutorImpl workerExec = poolName != null ? vertx.createSharedWorkerExecutor(poolName, options.getWorkerPoolSize()) : null;
            WorkerPool pool = workerExec != null ? workerExec.getPool() : null;
            /**
             * parentContext为部署verticle的context,这里又创建了新的context,用于启动verticle。
             * 根据verticle类型,这里分为3种context,EventLoopContext、WorkerContext和MultiThreadedWorkerContext。
             */
            ContextImpl context = options.isWorker() ? vertx.createWorkerContext(options.isMultiThreaded(), deploymentID, pool, conf, tccl) :
                    vertx.createEventLoopContext(deploymentID, pool, conf, tccl);
            if (workerExec != null) {
                context.addCloseHook(workerExec);
            }
            context.setDeployment(deployment);
            deployment.addVerticle(new VerticleHolder(verticle, context));
            /**
             * 对于EventLoopContext,在context的eventloop中执行
             * 对于WorkerContext,在worker线程顺序执行器中执行
             * 对于MultiThreadedWorkerContext,在worker线程中执行
             */
            context.runOnContext(v -> {
                try {
                    verticle.init(vertx, context);
                    Future<Void> startFuture = Future.future();
                    verticle.start(startFuture);
                    startFuture.setHandler(ar -> {
                        if (ar.succeeded()) {
                            if (parent != null) {
                                parent.addChild(deployment);
                                deployment.child = true;
                            }
                            vertx.metricsSPI().verticleDeployed(verticle);
                            deployments.put(deploymentID, deployment);
                            if (deployCount.incrementAndGet() == verticles.length) {
                                reportSuccess(deploymentID, callingContext, completionHandler);
                            }
                        } else if (!failureReported.get()) {
                            reportFailure(ar.cause(), callingContext, completionHandler);
                        }
                    });
                } catch (Throwable t) {
                    reportFailure(t, callingContext, completionHandler);
                }
            });
        }
    }
    

    doDeploy核心代码主如下:
    首先创建上下文:

    ContextImpl context = options.isWorker() ? vertx.createWorkerContext(options.isMultiThreaded(), deploymentID, pool, conf, tccl) :
                    vertx.createEventLoopContext(deploymentID, pool, conf, tccl);
    

    parentContext为部署verticle的context,这里又创建了新的context,用于启动verticle。
    根据verticle类型,这里分为3种context,EventLoopContext、WorkerContext和MultiThreadedWorkerContext。

    然后启动verticle,调用其init和start方法,但执行线程会因为verticle的不同模式而有所不同:

    • 对于EventLoopContext,在context的eventloop中执行
    • 对于WorkerContext,在worker线程顺序执行器中执行
    • 对于MultiThreadedWorkerContext,在worker线程中执行

    下面看下context.runOnContext方法,这里传入的回调是执行verticle的启动和初始化,除了这些,看还做了什么,以standard模式verticle为例,该context为EventLoopContext,runOnContext方法如下:

      //EventLoopContext
      public void executeAsync(Handler<Void> task) {  
        nettyEventLoop().execute(wrapTask(null, task, true, null));
      }
    

    nettyEventLoop()得到的是该context绑定的event loop,task回调参数即为verticle启动和初始化的回调,看看wrapTask都做了啥:

      protected Runnable wrapTask(ContextTask cTask, Handler<Void> hTask, boolean checkThread, PoolMetrics metrics) {
            Object metric = metrics != null ? metrics.submitted() : null;
            return () -> {
                Thread th = Thread.currentThread();
                if (!(th instanceof VertxThread)) {
                    throw new IllegalStateException("Uh oh! Event loop context executing with wrong thread! Expected " + contextThread + " got " + th);
                }
                VertxThread current = (VertxThread) th;
                if (THREAD_CHECKS && checkThread) {
                    if (contextThread == null) {
                        contextThread = current;
                    } else if (contextThread != current && !contextThread.isWorker()) {
                        throw new IllegalStateException("Uh oh! Event loop context executing with wrong thread! Expected " + contextThread + " got " + current);
                    }
                }
                if (metrics != null) {
                    metrics.begin(metric);
                }
                if (!DISABLE_TIMINGS) { 
                    //记录开始时间
                    current.executeStart();
                }
                try {
                    //为当前线程current设置为当前上下文ContextImpl.this
                    setContext(current, ContextImpl.this);
                    if (cTask != null) {
                        cTask.run();
                    } else {
                        //执行启动和初始化verticle的回调
                        hTask.handle(null);
                    }
                    if (metrics != null) {
                        metrics.end(metric, true);
                    }
                } catch (Throwable t) {
                    log.error("Unhandled exception", t);
                    Handler<Throwable> handler = this.exceptionHandler;
                    if (handler == null) {
                        handler = owner.exceptionHandler();
                    }
                    if (handler != null) {
                        handler.handle(t);
                    }
                    if (metrics != null) {
                        metrics.end(metric, false);
                    }
                } finally {
                    // We don't unset the context after execution - this is done later when the context is closed via
                    // VertxThreadFactory
                    if (!DISABLE_TIMINGS) {
                        //清楚开始时间
                        current.executeEnd();
                    }
                }
            };
        }
    

    这段代码主要有4处核心调用:

    • current.executeStart():记录线程执行开始时间,为线程阻塞检查器提供线程开始时间信息。
    //VertxThread
    public final void executeStart() {  
      execStart = System.nanoTime();
    }
    
    • setContext(current, ContextImpl.this):设置线程上下文
    //ContextImpl
    private static void setContext(VertxThread thread, ContextImpl context) {  
          thread.setContext(context);  
          if (!DISABLE_TCCL) {    
                if (context != null) {     
                       context.setTCCL();   
                } else {      
                       Thread.currentThread().setContextClassLoader(null);   
                }  
          }
    }
    //VertxThread
    void setContext(ContextImpl context) {  
        this.context = context;
    }
    
    • hTask.handle(null):执行verticle启动和初始化回调
    • current.executeEnd():清楚开始时间
    //VertxThread
    public final void executeEnd() {  
      execStart = 0;
    }
    

    相关文章

      网友评论

        本文标题:Verticle

        本文链接:https://www.haomeiwen.com/subject/wwdypttx.html