背景
- verticle相当于1个执行模块,是vertx的部署单元。
- vertx可以部署多个verticle,且verticle之间可以互相通信。
- 因为vertx主要是个网络框架,所以在verticle中通常会启动server,如http或tcp。
Verticle部署源码分析
@Override
public void deployVerticle(String name, DeploymentOptions options, Handler<AsyncResult<String>> completionHandler) {
if (options.isHa() && haManager() != null && haManager().isEnabled()) {
haManager().deployVerticle(name, options, completionHandler);
} else {
deploymentManager.deployVerticle(name, options, completionHandler);
}
}
deploymentManager.deployVerticle代码如下:
public void deployVerticle(String identifier,
DeploymentOptions options,
Handler<AsyncResult<String>> completionHandler) {
ContextImpl callingContext = vertx.getOrCreateContext();
ClassLoader cl = getClassLoader(options, callingContext);
/**
* identifier为verticle类的全限定名
* callingContext为eventloop context
*/
doDeployVerticle(identifier, generateDeploymentID(), options, callingContext, callingContext, cl, completionHandler);
}
首先是创建context,ContextImpl callingContext = vertx.getOrCreateContext(),这块代码如下:
public ContextImpl getOrCreateContext() {
ContextImpl ctx = getContext();
if (ctx == null) {
// We are running embedded - Create a context
ctx = createEventLoopContext(null, null, new JsonObject(), Thread.currentThread().getContextClassLoader());
}
return ctx;
}
public ContextImpl getContext() {
ContextImpl context = (ContextImpl) context();
if (context != null && context.owner == this) {
return context;
}
return null;
}
public static Context context() {
Thread current = Thread.currentThread();
if (current instanceof VertxThread) {
return ((VertxThread) current).getContext();
}
return null;
}
public EventLoopContext createEventLoopContext(String deploymentID, WorkerPool workerPool, JsonObject config, ClassLoader tccl) {
return new EventLoopContext(this, internalBlockingPool, workerPool != null ? workerPool : this.workerPool, deploymentID, config, tccl);
}
最终会调到创建EventLoopContext的方法,详细见context部分分析,这里得到context后,接下来分析doDeployVerticle方法,如下:
private void doDeployVerticle(Iterator<VerticleFactory> iter,
Throwable prevErr,
String identifier,
String deploymentID,
DeploymentOptions options,
ContextImpl parentContext,
ContextImpl callingContext,
ClassLoader cl,
Handler<AsyncResult<String>> completionHandler) {
if (iter.hasNext()) {
VerticleFactory verticleFactory = iter.next();
Future<String> fut = Future.future();
if (verticleFactory.requiresResolve()) {
try {
verticleFactory.resolve(identifier, options, cl, fut);
} catch (Exception e) {
try {
fut.fail(e);
} catch (Exception ignore) {
// Too late
}
}
} else {
fut.complete(identifier);
}
fut.setHandler(ar -> {
Throwable err;
if (ar.succeeded()) {
/**
* resolvedName为verticle类全限定名
*/
String resolvedName = ar.result();
if (!resolvedName.equals(identifier)) {
deployVerticle(resolvedName, options, completionHandler);
return;
} else {
if (verticleFactory.blockingCreate()) {
vertx.<Verticle[]>executeBlocking(createFut -> {
try {
Verticle[] verticles = createVerticles(verticleFactory, identifier, options.getInstances(), cl);
createFut.complete(verticles);
} catch (Exception e) {
createFut.fail(e);
}
}, res -> {
if (res.succeeded()) {
doDeploy(identifier, deploymentID, options, parentContext, callingContext, completionHandler, cl, res.result());
} else {
// Try the next one
doDeployVerticle(iter, res.cause(), identifier, deploymentID, options, parentContext, callingContext, cl, completionHandler);
}
});
return;
} else {
try {
Verticle[] verticles = createVerticles(verticleFactory, identifier, options.getInstances(), cl);
doDeploy(identifier, deploymentID, options, parentContext, callingContext, completionHandler, cl, verticles);
return;
} catch (Exception e) {
err = e;
}
}
}
} else {
err = ar.cause();
}
// Try the next one
doDeployVerticle(iter, err, identifier, deploymentID, options, parentContext, callingContext, cl, completionHandler);
});
} else {
if (prevErr != null) {
// Report failure if there are no more factories to try otherwise try the next one
reportFailure(prevErr, callingContext, completionHandler);
} else {
// not handled or impossible ?
}
}
}
核心代码如下:
Verticle[] verticles = createVerticles(verticleFactory, identifier, options.getInstances(), cl);
doDeploy(identifier, deploymentID, options, parentContext, callingContext, completionHandler, cl, verticles);
createVerticles方法如下,创建verticle就是根据其类的全限定名创建对象:
private Verticle[] createVerticles(VerticleFactory verticleFactory, String identifier, int instances, ClassLoader cl) throws Exception {
Verticle[] verticles = new Verticle[instances];
for (int i = 0; i < instances; i++) {
verticles[i] = verticleFactory.createVerticle(identifier, cl);
if (verticles[i] == null) {
throw new NullPointerException("VerticleFactory::createVerticle returned null");
}
}
return verticles;
}
doDeploy方法如下:
private void doDeploy(String identifier, String deploymentID, DeploymentOptions options,
ContextImpl parentContext,
ContextImpl callingContext,
Handler<AsyncResult<String>> completionHandler,
ClassLoader tccl, Verticle... verticles) {
if (options.isMultiThreaded() && !options.isWorker()) {
throw new IllegalArgumentException("If multi-threaded then must be worker too");
}
JsonObject conf = options.getConfig() == null ? new JsonObject() : options.getConfig().copy(); // Copy it
String poolName = options.getWorkerPoolName();
Deployment parent = parentContext.getDeployment();
DeploymentImpl deployment = new DeploymentImpl(parent, deploymentID, identifier, options);
AtomicInteger deployCount = new AtomicInteger();
AtomicBoolean failureReported = new AtomicBoolean();
for (Verticle verticle : verticles) {
WorkerExecutorImpl workerExec = poolName != null ? vertx.createSharedWorkerExecutor(poolName, options.getWorkerPoolSize()) : null;
WorkerPool pool = workerExec != null ? workerExec.getPool() : null;
/**
* parentContext为部署verticle的context,这里又创建了新的context,用于启动verticle。
* 根据verticle类型,这里分为3种context,EventLoopContext、WorkerContext和MultiThreadedWorkerContext。
*/
ContextImpl context = options.isWorker() ? vertx.createWorkerContext(options.isMultiThreaded(), deploymentID, pool, conf, tccl) :
vertx.createEventLoopContext(deploymentID, pool, conf, tccl);
if (workerExec != null) {
context.addCloseHook(workerExec);
}
context.setDeployment(deployment);
deployment.addVerticle(new VerticleHolder(verticle, context));
/**
* 对于EventLoopContext,在context的eventloop中执行
* 对于WorkerContext,在worker线程顺序执行器中执行
* 对于MultiThreadedWorkerContext,在worker线程中执行
*/
context.runOnContext(v -> {
try {
verticle.init(vertx, context);
Future<Void> startFuture = Future.future();
verticle.start(startFuture);
startFuture.setHandler(ar -> {
if (ar.succeeded()) {
if (parent != null) {
parent.addChild(deployment);
deployment.child = true;
}
vertx.metricsSPI().verticleDeployed(verticle);
deployments.put(deploymentID, deployment);
if (deployCount.incrementAndGet() == verticles.length) {
reportSuccess(deploymentID, callingContext, completionHandler);
}
} else if (!failureReported.get()) {
reportFailure(ar.cause(), callingContext, completionHandler);
}
});
} catch (Throwable t) {
reportFailure(t, callingContext, completionHandler);
}
});
}
}
doDeploy核心代码主如下:
首先创建上下文:
ContextImpl context = options.isWorker() ? vertx.createWorkerContext(options.isMultiThreaded(), deploymentID, pool, conf, tccl) :
vertx.createEventLoopContext(deploymentID, pool, conf, tccl);
parentContext为部署verticle的context,这里又创建了新的context,用于启动verticle。
根据verticle类型,这里分为3种context,EventLoopContext、WorkerContext和MultiThreadedWorkerContext。
然后启动verticle,调用其init和start方法,但执行线程会因为verticle的不同模式而有所不同:
- 对于EventLoopContext,在context的eventloop中执行
- 对于WorkerContext,在worker线程顺序执行器中执行
- 对于MultiThreadedWorkerContext,在worker线程中执行
下面看下context.runOnContext方法,这里传入的回调是执行verticle的启动和初始化,除了这些,看还做了什么,以standard模式verticle为例,该context为EventLoopContext,runOnContext方法如下:
//EventLoopContext
public void executeAsync(Handler<Void> task) {
nettyEventLoop().execute(wrapTask(null, task, true, null));
}
nettyEventLoop()得到的是该context绑定的event loop,task回调参数即为verticle启动和初始化的回调,看看wrapTask都做了啥:
protected Runnable wrapTask(ContextTask cTask, Handler<Void> hTask, boolean checkThread, PoolMetrics metrics) {
Object metric = metrics != null ? metrics.submitted() : null;
return () -> {
Thread th = Thread.currentThread();
if (!(th instanceof VertxThread)) {
throw new IllegalStateException("Uh oh! Event loop context executing with wrong thread! Expected " + contextThread + " got " + th);
}
VertxThread current = (VertxThread) th;
if (THREAD_CHECKS && checkThread) {
if (contextThread == null) {
contextThread = current;
} else if (contextThread != current && !contextThread.isWorker()) {
throw new IllegalStateException("Uh oh! Event loop context executing with wrong thread! Expected " + contextThread + " got " + current);
}
}
if (metrics != null) {
metrics.begin(metric);
}
if (!DISABLE_TIMINGS) {
//记录开始时间
current.executeStart();
}
try {
//为当前线程current设置为当前上下文ContextImpl.this
setContext(current, ContextImpl.this);
if (cTask != null) {
cTask.run();
} else {
//执行启动和初始化verticle的回调
hTask.handle(null);
}
if (metrics != null) {
metrics.end(metric, true);
}
} catch (Throwable t) {
log.error("Unhandled exception", t);
Handler<Throwable> handler = this.exceptionHandler;
if (handler == null) {
handler = owner.exceptionHandler();
}
if (handler != null) {
handler.handle(t);
}
if (metrics != null) {
metrics.end(metric, false);
}
} finally {
// We don't unset the context after execution - this is done later when the context is closed via
// VertxThreadFactory
if (!DISABLE_TIMINGS) {
//清楚开始时间
current.executeEnd();
}
}
};
}
这段代码主要有4处核心调用:
- current.executeStart():记录线程执行开始时间,为线程阻塞检查器提供线程开始时间信息。
//VertxThread
public final void executeStart() {
execStart = System.nanoTime();
}
- setContext(current, ContextImpl.this):设置线程上下文
//ContextImpl
private static void setContext(VertxThread thread, ContextImpl context) {
thread.setContext(context);
if (!DISABLE_TCCL) {
if (context != null) {
context.setTCCL();
} else {
Thread.currentThread().setContextClassLoader(null);
}
}
}
//VertxThread
void setContext(ContextImpl context) {
this.context = context;
}
- hTask.handle(null):执行verticle启动和初始化回调
- current.executeEnd():清楚开始时间
//VertxThread
public final void executeEnd() {
execStart = 0;
}
网友评论