美文网首页vertx
Vert.x3 Core手册简译(Java版)[Part 1.

Vert.x3 Core手册简译(Java版)[Part 1.

作者: 半枚荔枝 | 来源:发表于2016-05-14 16:21 被阅读3312次

    本文依照 知识共享许可协议(署名-非商业性使用-禁止演绎) 发布。
    感谢@严禁扯淡 的修改建议。

    2017-2-9:更新异步协同部分

    先放两个链接:
    源文档
    Github repository.

    话说用Java这么多年,没给社区做过什么贡献。这次趁使用Vert.x3的机会,简单翻译了核心包的手册。

    Vert.x3的手册简洁明了,过一遍即可轻松入门。所以诸君若是看到什么无法理解的,必定是我的译文有问题(嘿嘿,水平低,见谅)。


    部分名词对照表:

    • handler:事件处理器
    • event loop:事件循环(线程)
    • verticle:Vert.x的专有名词。指代Vert.x中基本的功能单元,一个Vert.x应用应该是由一组verticles构成的。
    • worker:顾名思义,干活的(线程)。对应文档中有worker thread pool,worker verticle(Vert.x里的work verticle与标准版verticle区别较大)。
    • event bus:事件总线

    Vert.x核心包提供如下的功能:

    • TCP客户端与服务器
    • HTTP客户端与服务器(包含Websocket支持)
    • 事件总线(Event bus)
    • 共享数据-局部的map和集群下的分布式map
    • 定时或延迟的处理
    • 部署、卸载verticle
    • 数据报文套接字(datagram socket)
    • DNS客户端
    • 文件系统存取
    • 高可用性
    • 集群

    核心包提供的功能是相当底层的。这意味着没有诸如数据库存取、认证、高级web功能此类的组件,你可以在Vert.x ext(扩展包)找到以上这些。

    Vert.x 的核心小且轻量,诸位可以各取所需。它可以整个的嵌入你现有的应用当中,不需要为了使用Vert.x而以特别的方式组织你的应用。

    你可以在任何Vert.x支持的语言中使用核心包。但是有一点要提一下,我们不会迫使你在Javascript或者Ruby里使用为Java准备的API;毕竟不同的语言有不同的约定和惯用法,强迫Ruby开发者使用Java的惯用法确实比较古怪。相反的,我们为每一种语言都生成了等价于核心Java API的惯用法(idiomatic)

    现在开始,我们将使用核心包(core)指代Vert.x core。

    如果你使用Maven或Gradle,把下列几行加入项目描述符的依赖配置即可使用核心包的API:

    • Maven (in your pom.xml):
    <dependency>
        <groupId>io.vertx</groupId>
        <artifactId>vertx-core</artifactId>
        <version>3.2.1</version>
    </dependency>
    
    • Gradle (in your build.gradle file):
      compile io.vertx:vertx-core:3.2.1

    下面让我们看看核心包里的各种特性。

    开始使用Vert.x

    注意:这里所述大部分都是Java适用的,语言相关的部分需要以某种方式进行转换。

    在Vert.x里,如果没有Vertx对象,那几乎什么都做不了!
    Vertex对象是Vert.x的控制中心,许多功能都通过它实现。包括创建客户端和服务器、获取事件总线(event bus)的引用、设置定时器,等等。

    那么,如何获取它的实例呢?

    如果你在程序中嵌入Vert.x,那么可以像下面这样创建实例:
    Vertx vertx = Vertx.vertx();

    当你使用Verticle时

    注意:绝大部分应用其实只需要一个Vert.x实例;当然,你也可以创建多个实例。例如,两个总线需要隔离时或客户端服务器需分组时。

    创建Vert.x实例时指定可选项(option)

    创建Vert.x实例时,如果缺省选项不合适,你也可以设定一些值:
    Vertx vertx = Vertx.vertx(new VertxOptions().setWorkerPoolSize(40));

    VertxOptions对象有很多设置项,你可以配置集群(clustering)、高可用性(high availability),worker 线程池的大小(pool sizes)等等。详细的内容请参见Javadoc。

    创建集群模式(clustered)的Vert.x对象

    如果你在使用Vert.x的集群模式(更多细节请参考下面的event bus一节,关于集群下的event bus),记得创建Vert.x对象也是异步的。

    为了把集群里不同的vertx实例组织在一起,通常需要花一点时间(可能是几秒钟)。在这段时间里,为了不阻塞调用线程(the calling thread),结果会以异步的方式返回。


    Are you fluent?(fluent狂人,别走)

    你可能已经注意到,在前面的例子中,我们使用了流式(fluent)的API。

    流式API是指多个方法可以用链式的方式一起调用。例如:
    request.response().putHeader("Content-Type", "text/plain").write("some text").end();

    这在Vert.x的API里是很普遍的模式,你要试着习惯它。 :)

    链式调用允许你更简洁的编写代码。当然,如果你不喜欢这种方式,这也不是必须的。你可以愉快地忽略这些,然后像下面这样写:

    HttpServerResponse response = request.response();
    response.putHeader("Content-Type", "text/plain");
    response.write("some text");
    response.end();
    

    不要调用我们,我们会调用你(Don’t call us, we’ll call you.)

    大部分Vert.x API 都是事件驱动的。这意味着如果你对Vert.x里发生的某事感兴趣,Vert.x会以向你发送事件(events)的方式通知你。

    例如下面的事件:

    • 定时器被触发
    • socket收到了一些数据
    • 一些数据已经从磁盘上被读取
    • 某个异常产生了
    • HTTP服务器接受了一个请求

    通过提供handlers,你可以处理这些事件。例如定义一个定时器事件:

    vertx.setPeriodic(1000, id -> {
      // This handler will get called every second
      System.out.println("timer fired!");
    });
    

    或者接受一个HTTP请求:

    server.requestHandler(request -> {
      // This handler will be called every time an HTTP request is received at the server
      request.response().end("hello world!");
    });
    

    如果触发了某个事件,Vert.x将会异步地(asynchronously)调用它(the handler)。

    这里我们发现了Vert.x的如下重要概念。


    不要阻塞我!(Don't block me!)

    除了极少的例外(即某些以‘Sync’结尾的文件系统操作),Vert.x里没有API会阻塞调用线程。

    如果结果可以即刻获得,它会立刻被返回。否则,通常你需要提供一个处理器,以便稍后接收事件。

    没有API会阻塞线程意味着:用少量的线程,就可以处理大量的并发。

    传统的阻塞API可能会在哪些地方发生呢:

    • 从socket读取数据
    • 写数据到磁盘
    • 发消息给某个接收者,然后等待回应
    • 。。很多其他状况

    在上面这些案例中,你的线程在等待一个结果时不能做其他任何事,这样是很低效的。

    这也意味着,如果你想使用阻塞API处理大量并发,你将需要大量的线程来防止你的应用卡住。

    线程在内存(例如:栈)和上下文切换方面的开销不容忽视。

    以很多现代的应用所需求的并发级别,阻塞的方式根本实现不了。


    Reactor and Multi-Reactor(反应器和多路反应器?)

    前面我们提到了Vert.x的API是事件驱动的,当handlers可用的时候,Vert.x会向它们传递事件。

    绝大多数情况下,Vert.x通过一个叫event loop的线程调用你的handlers。

    event loop可以在事件到达时持续不断地将其分发给不同的handler,因为Vert.x和你的应用中不会有什么是阻塞的。

    同样,因为没什么是阻塞的,所以event loop具有在短时间内分发巨量事件的潜力。例如,单个event loop可以极迅速地处理数千的HTTP请求。

    我们称之为Reactor模式

    你之前可能已经听说过它--nodejs就实现了这种模式。

    标准的Reactor实现里,有一个单独的event loop(single event loop)线程,它会在所有事件到达时持续不断地将其分发给所有的handler。

    单一线程的困扰在于,在任意时刻,它只能在一个cpu核心上运行。所以如果你希望你的单线程Reactor应用(或者Nodejs应用)能够运用上多核服务器的扩展能力( scale over your multi-core server ),你需要启动多个进程并管理好它们。

    Vert.x的工作方式不同于此,每个vertx实例会维护数个event loop(several event loops)。缺省情况下,我们基于机器上可用的核心数来确定这个数字,当然这个也可以设置。

    不像Nodejs,这意味着单个Vert.x进程可以利用到服务器的扩展。

    为了与单线程的reactor模式区分开,我们称之为Multi-Reactor模式

    注意:虽然一个Vert.x实例会维护多个event loop,但任何特定的handler都绝不会被并发地执行,在绝大多数情况下(worker verticle除外),它都会被某个固定的event loop(exact same event loop)调用。


    黄金准则:不要阻塞Event Loop(Don’t Block the Event Loop)

    我们已经了解了,Vert.x的API是非阻塞的,不会阻塞event loop;但是,如果你在handler中自己(yourself)阻塞了event loop,那么上面的其实都没啥用。。

    如果你这么干了,那么event loop被阻塞的时候它啥都干不了。再如果你阻塞了Vert.x实例里所有的event loop,那你的应用将会陷入完全停滞的状态!

    所以千万别这么干!我们已经警告过你了哈(You have been warned)

    阻塞的例子包括:

    • Thread.sleep()
    • 等待一个锁
    • 等待一个同步锁或监视器(例如同步块(synchronized section))
    • 做一个耗时的数据库操作并等待结果
    • 做一个复杂的计算,耗费大量的时间
    • 在循环中(Spinning in a loop)

    如果上面任何一步挂住了event loop,让它花了大量时间( significant amount of time),那么你只能安心等待程序执行。

    那么,什么样是大量时间( significant amount of time)呢?

    这个时间,实际上取决于你的应用对并发量的需求。

    如果你有单一的event loop并且想每秒处理一万个http请求,那么很明显,处理每个请求的时间不能超过0.1毫秒,所以阻塞不能超过这个时间。

    这里面的计算不难,作为练习,我们将之留给读者(The maths is not hard and shall be left as an exercise for the reader)。

    如果你的应用没有响应了,这可能是event loop被阻塞的信号。为了帮助诊断这样的问题,Vert.x会在检测到某个event loop一段时间后还未返回时自动打印警告日志。如果在你的日志中看见这样的警告,那你可得调查调查了。
    Thread vertx-eventloop-thread-3 has been blocked for 20458 ms

    为了精确定位阻塞发生在何处,Vert.x也会提供堆栈跟踪消息。

    如果你想关闭这些警告或改变设置,可以在创建Vertx对象前,去VertxOptions对象里设置。


    执行阻塞式代码

    完美的世界里,不会有战争,也不会有饥饿。所有的API都会以异步的方式写成,小兔和小羊羔会手牵手地穿过阳光明媚的绿草地。

    但是,现实世界不是这样的。。(你有关注最近的新闻吗?)
    (迷之声:这篇文档生成时发生了啥??)

    事实上,不算其他大多数库,单单在JVM的子系统里就有同步的API和很多可能造成阻塞的方法。一个好的例子是JDBC,它天然就是同步的;无论怎么使劲,Vert.x也不会魔法,没办法撒一点魔力粉就让它变成异步的。

    我们不会整夜不睡的去重写所有(现存的组件)使它们成为异步的,所以我们需要提供一种方式,以使在Vert.x应用里可以安全的使用“传统的”阻塞API。

    就像前面讨论的,为了不妨碍event loop干其他有益的活,不能从它这直接调用阻塞操作。所以你该怎么办呢?

    指定待执行的阻塞代码和一个结果处理器(result handler),然后调用executeBlocking,当阻塞代码执行完毕的时候,handler将会以异步的方式被回调(to be called back asynchronous )。

    vertx.executeBlocking(future -> {
      // Call some blocking API that takes a significant amount of time to return
      String result = someAPI.blockingMethod("hello");
      future.complete(result);
    }, res -> {
      System.out.println("The result is: " + res.result());
    });
    

    缺省情况下,如果executeBlocking 在同一上下文环境中(例如同一个verticle)被多次调用,那么不同的executeBlocking 会被顺序地执行(即一个接一个)。

    如果你不在意executeBlocking 执行的顺序,那么你可以将ordered参数设置为false。这种情况下,worker pool有可能会并行地执行executeBlocking。

    另一种执行阻塞代码的方法是在worker verticle中干这事。

    worker verticle总是由worker pool里的线程来执行。


    异步协同

    多个异步结果的协同可以由Vert.x的futures来实现。

    CompositeFuture.all接受数个future参数(到6为止)并返回一个future;当所有的future都成功了,就返回成功(succeeded)的future,否则返回失败(failed)的future:

    Future<HttpServer> httpServerFuture = Future.future();
    httpServer.listen(httpServerFuture.completer());
    
    Future<NetServer> netServerFuture = Future.future();
    netServer.listen(netServerFuture.completer());
    
    CompositeFuture.all(httpServerFuture, netServerFuture).setHandler(ar -> {
      if (ar.succeeded()) {
        // All server started
      } else {
        // At least one server failed
      }
    });
    

    completer返回的handler会完成这个future。

    CompositeFuture.any接受数个future参数(到6为止)并返回一个future;只要有一个future成功了,那返回的future也成功(succeeded),否则就失败(failed)

    Future<String> future1 = Future.future();
    Future<String> future2 = Future.future();
    CompositeFuture.any(future1, future2).setHandler(ar -> {
      if (ar.succeeded()) {
        // At least one is succeeded
      } else {
        // All failed
      }
    });
    

    2017-2-9 更新

    新版CompositeFuture 的API 中增加了与all类似的系列方法:join
    看文档的说明,alljoin 的区别在于:

    如果参数列表中的某个Future 失败了,那么all 不会继续等,这个CompositeFuture将被标记为失败并完成;而join会继续等待直到所有参数完成(不管成功与否)。


    compose可以用来链式调用future:

    FileSystem fs = vertx.fileSystem();
    
    Future<Void> fut1 = Future.future();
    Future<Void> fut2 = Future.future();
    
    fs.createFile("/foo", fut1.completer());
    fut1.compose(v -> {
      fs.writeFile("/foo", Buffer.buffer(), fut2.completer());
    }, fut2);
    fut2.compose(v -> {
      fs.move("/foo", "/bar", startFuture.completer());
    }, startFuture);
    

    Verticles

    Vert.x有一个简单的、可扩展的,类似actor的部署方式(actor-like deployment )和开箱即用的并发模型,这方面可以节省下你亲自动手的时间精力。

    这个模型是完全可选的,如果你不想,Vert.x并不会强迫你以这种方式创建自己的应用。。

    这个模型并未严格地实现actor模型,但确实与其有相似之处,尤其在并发性、扩展,部署方面。

    为了使用这个模型,你需要将代码写成verticles的集合。

    verticles是由Vert.x部署和运行的代码块。verticles可以由任何Vert.x支持的语言写成,并且单独的应用可以包含多种语言写就的verticles。

    你可以把verticle看成有点像Actor Model里的actor。

    一个典型的应用由同一时间运行在同一Vert.x实例里的很多verticle实例组成的。不同的verticle实例之间通过在event bus上向彼此发送消息来通信。

    编写verticle

    verticle类必须实现Verticle接口。

    你可以直接实现这个接口,但通常有个更简单的办法,就是继承下面这个抽象类:AbstractVerticle

    举个例子:

    public class MyVerticle extends AbstractVerticle {
    
      // Called when verticle is deployed
      public void start() {
      }
    
      // Optional - called when verticle is undeployed
      public void stop() {
      }
    
    }
    

    通常你需要像上面的例子一样,重载start方法。

    Vert.x部署verticle时,会调用其start方法。当该start方法完成时,就认为该verticle已启动。

    你也可以选择重载stop方法,当verticle被卸载(undeployed)时会调用这个方法。同样,stop方法完成时,会认为verticle已被终止。

    异步verticle的启动和终止

    有时候你可能想在verticle启动时做点耗时的事,除非完成了,否则不应该认定verticle已成功部署。比如你可能想在start方法里部署其他的verticles。

    你不能在start方法里阻塞地等待其他verticles部署完成,这会打破我们的黄金准则。

    那该怎么做呢?

    合适的途径是实现异步的start方法。这个版本的start方法有一个future参数,这个方法返回时verticle并不会被认定已经部署完成。

    等干完所有活之后(例如启动其他的verticles)你就可以调用future对象的complete(或者fail)方法;这是一个信号,标记你这里已经都完成了。

    下面有个例子:

    public class MyVerticle extends AbstractVerticle {
    
      public void start(Future<Void> startFuture) {
        // Now deploy some other verticle:
    
        vertx.deployVerticle("com.foo.OtherVerticle", res -> {
          if (res.succeeded()) {
            startFuture.complete();
          } else {
            startFuture.fail();
          }
        });
      }
    }
    

    类似的,stop方法也有一个异步的版本。如果你做清理工作要花点时间,就可以用它。

    public class MyVerticle extends AbstractVerticle {
    
      public void start() {
        // Do something
      }
    
      public void stop(Future<Void> stopFuture) {
        obj.doSomethingThatTakesTime(res -> {
          if (res.succeeded()) {
            stopFuture.complete();
          } else {
            stopFuture.fail();
          }
        });
      }
    }
    

    提示:并不需要在stop方法中手动卸载某个verticle的子verticles(child verticles),因为Vert.x会在父verticle被卸载时自动卸载它们。

    Verticle的类型

    有三种不同类型的verticle。

    标准verticle(Standard Verticles)

    这是最平常并有用的版本,它们会一直由同一个event loop线程执行。下一节里我们会更详细地讨论这个。

    Worker Verticles

    这一类由worker pool里的线程运行。绝不会有超过一个线程并发地执行单一实例。

    多线程版(Multi-threaded) worker verticles

    这些还是由worker pool里的线程运行,不过单一实例可以被多个线程并发执行。

    标准verticle

    标准verticle被创建的时候,它们会被指定给一个event loop线程,然后event loop会调用其start方法。当你从event loop调用任意核心API上可以接受handler的方法时,Vert.x保证那些handlers会由同样的event loop执行。

    这意味着我们可以保证一个verticle实例里的所有代码都会在同一个event loop上执行(只要你不自己创建线程并调用它!)。

    这同样意味着可以像单线程应用那样来写所有代码,至于多线程并发和扩展的问题交给Vert.x就可以了。不需要有同步和易变性(volatile)的困扰,这样你也可以避免‘传统的’手写多线程应用中普遍会碰到的状况,譬如竞争条件(race conditions)和死锁(deadlock)。

    Worker Verticles

    worker verticle和标准verticle挺像的,不同点在于worker verticle由Vert.x的worker 线程池中的线程执行,而标准verticle由event loop执行。

    worker verticle是为调用阻塞代码而设计的。它们不会阻塞任意的event loop。

    如果你不想用worker verticle来执行阻塞代码,那也可以通过直接运行内联阻塞代码的方式(就是前文所述的executeBlocking)。

    如果你想将某个verticle作为worker verticle部署,可以通过调用setWorker方法。

    DeploymentOptions options = new DeploymentOptions().setWorker(true);
    vertx.deployVerticle("com.mycompany.MyOrderProcessorVerticle", options);
    

    worker verticle实例绝不会被多个线程并发执行,但可以被不同线程在不同时候执行。

    多线程版worker verticle

    一个多线程版worker verticle就像普通的worker verticle一般,但它可以被不同线程并发执行。

    警告:多线程版worker 线程是个高级特性,绝大多数应用对此并无需求。为了这些verticle的并发执行,你需要使用标准的Java多线程编程技能,小心地使verticles保持一致的状态。

    以编程的方式部署verticle

    你可以使用deployVerticle系列方法中的一个来部署verticle,只需要知道verticle的名称或者你自己创建一个verticle实例丢过去。

    注意:部署verticle实例的方式是Java专有的。

    Verticle myVerticle = new MyVerticle();
    vertx.deployVerticle(myVerticle);
    

    你也可以通过指定verticle的名称来部署。

    verticle的实例化需要用到特定的VerticleFactory,verticle的名称就是用来查询这个特定的工厂类。

    不同的语言有不同的工厂类,用来初始化verticle。原因多种多样,比如运行时从Maven加载服务或者获取verticles。

    这样你可以部署任意以Vert.x支持的语言写就的verticle。

    下面是一个部署不同种verticle的例子:

    vertx.deployVerticle("com.mycompany.MyOrderProcessorVerticle");
    
    // Deploy a JavaScript verticle
    vertx.deployVerticle("verticles/myverticle.js");
    
    // Deploy a Ruby verticle verticle
    vertx.deployVerticle("verticles/my_verticle.rb");
    

    从verticle的名称映射到verticle factory的规则

    当使用名称部署verticle时,名称的作用是选出实际中用来实例化这个verticle的verticle factory。

    verticle的名称可以有一个前缀:前缀是个字符串,后面紧跟着一个冒号;如果前缀存在将被用于查询对应的factory。
    即:
    js:foo.js // Use the JavaScript verticle factory
    groovy:com.mycompany.SomeGroovyCompiledVerticle // Use the Groovy verticle factory
    service:com.mycompany:myorderservice // Uses the service verticle factory

    如果没有前缀,Vert.x会寻找后缀来查询factory。
    即:
    foo.js // Will also use the JavaScript verticle factory
    SomeScript.groovy // Will use the Groovy verticle factory

    如果前后缀都不存在,那么Vert.x会假定这是一个完全限定类名(FQCN)的Java verticle,并试着循此实例化。

    Verticle Factories如何定位呢?

    绝大多数verticle factories都是从类路径(classpath)中加载的,在Vert.x启动时注册。

    同样地,如果你希望用编程的方式注册、注销verticle factories,那么有registerVerticleFactoryunregisterVerticleFactory可用。

    等待部署完成

    verticle的部署是异步进行的,可能完成的时候对部署方法的调用都已经返回一阵子了。

    如果你想在部署完成时收到通知,可以在部署时指定一个完成处理器(completion handler):

    vertx.deployVerticle("com.mycompany.MyOrderProcessorVerticle", res -> {
      if (res.succeeded()) {
        System.out.println("Deployment id is: " + res.result());
      } else {
        System.out.println("Deployment failed!");
      }
    });
    

    如果部署成功,此handler会收到一个字符串,这里面包含了部署的ID。

    后面在你卸载这次部署的verticle时,会用到这个ID。

    卸载部署的verticle

    可以使用undeploy来卸载已部署的verticle。

    卸载本身也是异步的。所以如果你想在完成的时候收到通知,处理方法同部署的时候:

    vertx.undeploy(deploymentID, res -> {
      if (res.succeeded()) {
        System.out.println("Undeployed ok");
      } else {
        System.out.println("Undeploy failed!");
      }
    });
    

    指定verticle实例的数量

    用verticle的名称部署时,可以指定verticle实例的数量:

    DeploymentOptions options = new DeploymentOptions().setInstances(16);
    vertx.deployVerticle("com.mycompany.MyOrderProcessorVerticle", options);
    

    这个特性在扩展到多核cpu上时很有帮助。比如你要部署一个web服务器的verticle,并且你的机器上有多个核心;为了这多个核心能充分发挥自己的光和热,你可以部署上多个实例。

    给verticle传递配置参数

    部署时可以将配置以JSON的形式传递给verticle:

    JsonObject config = new JsonObject().put("name", "tim").put("directory", "/blah");
    DeploymentOptions options = new DeploymentOptions().setConfig(config);
    vertx.deployVerticle("com.mycompany.MyOrderProcessorVerticle", options);
    

    之后配置信息将可通过Context对象使用,或者直接使用config方法。

    返回的配置是一个JSON对象,所以你可以像下面这样取数据:
    System.out.println("Configuration: " + config().getString("name"));

    在verticle中访问环境变量

    环境变量和系统属性可以用Java API访问:

    System.getProperty("prop");
    System.getenv("HOME");
    

    verticle隔离组(Verticle Isolation groups)

    缺省情况下,Vert.x有一个扁平的类路径(flat classpath),部署vertilce时,Vert.x会使用当前的类加载器(classloader)--而不是创建一个新的。多数情况下,这都是最简单、清晰、稳健的做法。

    然而,有时候你可能想把某些verticle的部署与其他的隔离开来。

    譬如,你想在同一个Vert.x实例中部署同一个verticle的不同版本,他俩还有着相同的类名;又或者你的两个不同verticle分别用到了同一个类库的不同版本。

    使用隔离组时,你需要提供待隔离类的名称列表。方法setIsolatedClasses可以搞定这个事。传入的名称可以是类似com.mycompany.myproject.engine.MyClass这样的完全限定类名;还可以是类似com.mycompany.myproject.这样带通配符的,这会匹配到包com.mycompany.myproject*里的任意类和任意子包。

    请注意唯有匹配到的类才会被隔离,其他的类仍然由当前的类加载器载入。

    如果想从非主类路径中加载类和资源,那你可以用setExtraClasspath方法提供额外的类路径条目。

    警告:使用这个特性要小心。类加载器们也可能带来一堆bug,使你的排错工作变得困难(译注:大家知道bug在英文里有虫子和计算机程序错误的意思;所以此处前面说虫子,后面说除错困难)。

    这儿有个利用隔离组来隔离一个verticle的部署的例子。

    DeploymentOptions options = new DeploymentOptions().setIsolationGroup("mygroup");
    options.setIsolatedClasses(Arrays.asList("com.mycompany.myverticle.*",
                       "com.mycompany.somepkg.SomeClass", "org.somelibrary.*"));
    vertx.deployVerticle("com.mycompany.myverticle.VerticleClass", options);
    

    高可用性

    部署verticle时可以打开高可用性(HA),在这样的上下文环境里,若某个Vert.x实例上的某个vertilce意外地挂掉,集群里的另一个Vert.x实例将会重新部署这个verticle。

    以高可用性运行verticle时,只需要在命令行后面加上** -ha **:
    vertx run my-verticle.js -ha

    打开高可用性时,无需添加** -cluster **。

    更多关于高可用性和配置的细节可以在下面的高可用性和故障转移(High Availability and Fail-Over)一节中找到。

    从命令行运行verticles

    将依赖添加到Vert.x核心包里,就能以正常方式在你的maven或gradle项目中直接使用Vert.x。

    不过你也可以直接从命令行运行verticle。

    为了达成这个目的,你要下载并安装好Vert.x的发布包,并将安装目录下的** bin 目录添加到 PATH 环境变量,同样要确保Java8 的JDK在 PATH **里。

    注意:为了动态编译Java代码,JDK是必要的(言下之意,只装JRE是不够的)。

    一切就绪,现在可以用** vertx run **运行verticle了。下面有几个例子:

    # Run a JavaScript verticle
    vertx run my_verticle.js
    
    # Run a Ruby verticle
    vertx run a_n_other_verticle.rb
    
    # Run a Groovy script verticle, clustered
    vertx run FooVerticle.groovy -cluster
    

    至于Java verticle,甚至不需要编译你就可以直接运行它!
    vertx run SomeJavaSourceFile.java

    Vert.x会在运行前动态地编译它。这点在快速建立原型和演示时特别有用。不需要先设置Maven或Gradle就能开始了。

    有关在命令行执行vertx时各种其他可用的选项的所有信息,只需要在命令行输入vertx即可获得。

    退出Vert.x

    Vert.x实例维护的进程不是守护进程(daemon threads),所以它们会阻止JVM退出。

    如果你以嵌入的方式使用Vert.x,工作完成的时候,你可以调用close方法关闭它。

    这样做会关闭所有内部线程池、其他的资源,并允许JVM退出。

    上下文对象(The Context object)

    Vert.x给handler提供事件、或者调用verticle的start/stop方法时,其执行状况都是与一个Context(上下文)联系在一起的。通常,这个context是一个与特定的event loop线程绑定的event-loop context。所以与此context相关的执行动作都发生在同一确定的event loop线程上。至于worker verticle和运行内联的阻塞代码时,会有一个worker context与之关联,这些动作都由worker 线程池里的线程运行。

    利用getOrCreateContext方法,可以获得上下文环境:
    Context context = vertx.getOrCreateContext();

    如果当前线程已经存在一个context与之关联,它会重用这个context对象。否则会创建context的一个新实例。可以像下面这样测试下取到的context的类型

    Context context = vertx.getOrCreateContext();
    if (context.isEventLoopContext()) {
      System.out.println("Context attached to Event Loop");
    } else if (context.isWorkerContext()) {
      System.out.println("Context attached to Worker Thread");
    } else if (context.isMultiThreadedWorkerContext()) {
      System.out.println("Context attached to Worker Thread - multi threaded worker");
    } else if (! Context.isOnVertxThread()) {
      System.out.println("Context not attached to a thread managed by vert.x");
    }
    

    在你拿到一个context对象后,可以在此context里异步地运行代码。换句话说,你提交的任务最终会运行在同样的context里:

    vertx.getOrCreateContext().runOnContext( (v) -> {
      System.out.println("This will be executed asynchronously in the same context");
    });
    

    当有数个handler运行在同一context里时,它们可能会希望共享一些数据。context对象提供了存取共享在上下文里数据的方法。例如,你要传递数据过去做点事,可以用runOnContext方法:

    final Context context = vertx.getOrCreateContext();
    context.put("data", "hello");
    context.runOnContext((v) -> {
      String hello = context.get("data");
    });
    

    context对象也允许你通过config方法访问verticle的配置信息。去看看** 给verticle传递配置 **一节吧,你将获得关于此项配置的更多细节。

    定期、延时执行

    在Vert.x里,延时或定期执行是很普遍的。

    在标准verticle里,你不能以使线程休眠的方式引入延迟;这样干会阻塞event loop线程。

    取而代之的是Vert.x定时器,定时器分为一次性(one-shot)周期性(periodic)的。下面我们会分别讨论。

    一次性定时器

    一段确定的延时过后,一次性定时器将调用事件handler,度量衡是毫秒。

    设置一个触发一次的定时器用到setTimer方法,它有两个参数:延时和一个handler。

    long timerID = vertx.setTimer(1000, id -> {
      System.out.println("And one second later this is printed");
    });
    
    System.out.println("First this is printed");
    

    返回值是定时器的ID(long类型),它具有唯一属性。之后你可以用这个ID来取消定时器。这个handler也会收到定时器的ID。

    周期性定时器

    类似的,利用setPeriodic方法可以设置一个定期触发的定时器。

    初始的延迟值就是周期间隔。

    返回值与一次性定时器一样,此处不再赘述。

    定时器的事件handler的参数也与一次性定时器一致:

    记住,定时器会定期触发。如果你的定期处理需要耗费大量时间,你的定时器事件可能会连续运行甚至糟糕到堆积在一起。

    在这种情况下,你应该考虑转而使用setTimer。一旦你的处理完成了,你可以再设置下一个定时器。

    long timerID = vertx.setPeriodic(1000, id -> {
      System.out.println("And every second this is printed");
    });
    
    System.out.println("First this is printed");
    

    取消定时器

    像下面这样,调用cancelTimer方法,指定定时器ID,即可取消周期定时器。

    verticle里的自动清理(Automatic clean-up in verticles)

    如果你是在verticle内部创建的定时器,那么verticle被卸载时,这些定时器将被自动关闭。


    事件总线(The Event Bus)

    event bus是Vert.x的神经系统。

    每个Vert.x实例都拥有单独的一个event bus实例,你可以通过eventBus方法得到它。

    应用的不同部分,不管是否在同一个Vert.x实例里,即使是不同语言编写的,都可以通过event bus彼此交流。

    甚至浏览器里运行的的客户端JavaScript也可以通过同一个event bus相互通信。

    event bus在多个服务器和多个浏览器间形成了一个分布式的点对点消息系统。

    event bus支持发布/订阅(publish/subscribe)、点对点、请求-响应(request-response)这三种消息模式。

    event bus的API很简单,主要包括注册handlers,注销handlers,发送和发布消息。

    基本概念

    寻址(Addressing)

    消息通过event bus发送到某个地址(address)

    Vert.x没有花哨的令人困扰的寻址方案。Vert.x里地址就是字符串。任意字符串都有效。不过使用某种命名策略还是很明智的,例如使用分隔符限定命名空间。

    这里是一些有效的地址:europe.news.feed1, acme.games.pacman, sausages, and X。

    处理器(Handlers)

    消息由handlers接收,所以你需要把handler注册到地址上。

    同一个地址可以注册多个不同的handler。

    某个handler也可以被注册在多个不同的地址上。

    发布/订阅消息(Publish / subscribe messaging)

    event bus支持发布(publishing)消息。

    消息被发布到某个地址,这意味着把消息分发到注册在此地址上的所有handlers。

    这就是我们很熟悉的发布/订阅模式。

    点对点和请求-响应(Point to point and Request-Response messaging)

    event bus也支持点对点消息。

    当消息被发送到某个地址,Vert.x会把消息路由给注册在此地址上的某个handler。

    如此此地址上注册了超过一个handler,Vert.x将会通过一个不严格的轮询算法(non-strict round-robin algorithm)从中选择一个。

    在点对点的消息机制中,发消息时可以选择指定一个应答handler(reply handler)。

    当有接收者收到消息并处理后,接收者可以选择是否答复此消息。如果选择答复,上述的reply handler将被调用。

    发送者收到消息回应后,同样可以做出回应。这可以无限地重复下去,并允许两个不同的verticle间形成对话。

    这种通用的消息模式称为请求-响应模式。

    尽力分发(Best-effort delivery)

    Vert.x会尽最大的努力分发消息,绝不会有意丢弃某些消息。这被称为尽力分发。

    然而,在event bus部分或全部失效的情况下,消息还是有可能丢失。

    如果你的应用很关心这一点,那么编码时应该注意使你的handler是幂等的(be idempotent),并且发送方应该在恢复后尝试重新发送消息。

    消息的类型

    任何的基本类型/简单类型,字符串或者buffers都可以被当成消息发送出去。

    但是Vert.x里通常使用JSON格式的消息。

    在Vert.x支持的语言里,创建、读取、解析JSON都很容易,所以它就成了Vert.x上的通用语(lingua franca)

    当然了,并不是必须使用JSON。

    event bus是很灵活的,支持在其上发送任意专有的对象。你只需为此定义一个编解码器(codec)

    The Event Bus API

    下面让我们来看看API。

    获取event bus对象

    可以像下面这样拿到event bus对象的引用:
    EventBus eb = vertx.eventBus();

    每个Vert.x实例有唯一的event bus实例。

    注册handlers

    注册handler最简单的方法是用consumer方法。看例子:

    EventBus eb = vertx.eventBus();
    
    eb.consumer("news.uk.sport", message -> {
      System.out.println("I have received a message: " + message.body());
    });
    

    当你的handler收到一条消息时,handler会被调用,而消息(message)会作为参数传递过去。

    consumer方法会返回一个MessageConsumer实例。

    这个对象可以用来注销handler,或将handler当作流(stream)来使用。

    或者你也可以用consumer方法得到一个未设置handler的MessageConsumer 对象,随后再设置handler:

    EventBus eb = vertx.eventBus();
    
    MessageConsumer<String> consumer = eb.consumer("news.uk.sport");
    consumer.handler(message -> {
      System.out.println("I have received a message: " + message.body());
    });
    

    当在一个集群event bus上注册了handler时,完成在集群上每个节点的注册需要花点时间。

    如果你想在完成时得到通知,你可以在MessageConsumer 对象上注册一个completion handler

    consumer.completionHandler(res -> {
      if (res.succeeded()) {
        System.out.println("The handler registration has reached all nodes");
      } else {
        System.out.println("Registration failed!");
      }
    });
    

    注销handlers

    注销handler可以通过调用unregister方法完成。

    在集群event bus做这件事时,同样要花点时间等其传播到各个节点,所以你也可以通过unregister方法得到通知。

    consumer.unregister(res -> {
      if (res.succeeded()) {
        System.out.println("The handler un-registration has reached all nodes");
      } else {
        System.out.println("Un-registration failed!");
      }
    });
    

    发布消息

    发布一个消息只需简单地调用publish方法,指定要发往的地址。
    eventBus.publish("news.uk.sport", "Yay! Someone kicked a ball");

    这个消息会被分发到注册在地址 news.uk.sport 上的所有handlers。

    发送消息

    发送消息的结果是注册在此地址上的handler只有一个会收到消息。这是点对点的消息模式。

    你可以用send方法发送消息:
    eventBus.send("news.uk.sport", "Yay! Someone kicked a ball");

    设置消息头(Setting headers on messages)

    event bus 上传输的消息也可以包含一些消息头(headers)。

    可以在发送/发布消息时指定一个DeliveryOptions对象来做这件事:

    DeliveryOptions options = new DeliveryOptions();
    options.addHeader("some-header", "some-value");
    eventBus.send("news.uk.sport", "Yay! Someone kicked a ball", options);
    

    消息的顺序

    Vert.x会将消息以发送者送出的顺序分发给handler。

    消息对象

    你在handler里收到的消息是一个Message对象。

    消息的body就是发送过来的对象。

    消息头可以通过headers方法得到。

    消息/发送回应的知识点(Acknowledging messages / sending replies)

    使用send方法时event bus会尝试把消息发送到注册在event bus上的MessageConsumer对象。

    某些情况下,发送方可能想知道消息已经被接收并处理了。

    为了让发放方了解消息已被处理,consumer可以通过调用reply方法给予回应。

    如果这么做了,那么发送方将会收到一个回应,并且回应handler将被调用。看下面这个例子:
    接收方:

    MessageConsumer<String> consumer = eventBus.consumer("news.uk.sport");
    consumer.handler(message -> {
      System.out.println("I have received a message: " + message.body());
      message.reply("how interesting!");
    });
    

    发送方:

    eventBus.send("news.uk.sport", "Yay! Someone kicked a ball across a patch of grass", ar -> {
      if (ar.succeeded()) {
        System.out.println("Received reply: " + ar.result().body());
      }
    });
    

    回应可以带一个消息体,你可以在其中放置一些有用的信息。

    “处理”实际上是由应用程序定义的,其中会发生什么完全依赖于consumer做了什么;Vert.x的event bus并不知道也不关心这些。

    例如:

    • 一个简单的消息consumer,它实现了返回当天时间的服务;可以确认回应的消息体中包含了这个时间。
    • 一个实现了持久化队列的消息consumer,如果消息被成功地持久化在存储中可以确认是true,反之则为false
    • 一个处理订单的消息consumer,当订单被成功处理时,它可以从数据库里被删掉,这时候可以确认是true

    指定超时时间的发送(Sending with timeouts)

    在发送一个带回应handler的消息时,可以在DeliveryOptions对象里指定超时的时间。

    如果在这段时间里没有收到回应,回应handler将被调用,并以失败结束。

    缺省的超时时间是30秒。

    发送失败(Send Failures)

    消息发放也可能因为其他原因失败,包括:

    • 消息发送到的地址没有handler可用。
    • 接收方显示地调用fail方法返回了失败的信息。

    所有的情况下回应的handler都会被调用,并返回特定的错误。

    消息编解码器(Message Codecs)

    只要你定义并注册了相关的message codec,就可以在event bus上发送任意的对象。

    当发送/发布消息时,你需要在DeliveryOptions对象上指定codec的名称:

    eventBus.registerCodec(myCodec);
    
    DeliveryOptions options = new DeliveryOptions().setCodecName(myCodec.name());
    
    eventBus.send("orders", new MyPOJO(), options);
    

    如果你想一直使用同一个codec,可以将它注册成缺省的codec,这样后面就不用每次发消息时再专门指定:

    eventBus.registerDefaultCodec(MyPOJO.class, myCodec);
    
    eventBus.send("orders", new MyPOJO());
    

    unregisterCodec方法可以用来注销codec。

    消息codec并不总是对同样的类型进行编码、解码。例如,你可以写一个codec用来发送MyPOJO类,而当消息送达handler时,可以是MyOtherPOJO类。

    集群event bus

    event bus并不只是存在于单一的Vert.x实例中。将多个Vert.x实例组成集群后,可以形成一个单独的,分布式的event bus。

    以编程实现集群

    如果你以编码的方式创建了Vert.x实例,将Vert.x实例配置成集群式的即可得到集群event bus;

    VertxOptions options = new VertxOptions();
    Vertx.clusteredVertx(options, res -> {
      if (res.succeeded()) {
        Vertx vertx = res.result();
        EventBus eventBus = vertx.eventBus();
        System.out.println("We now have a clustered event bus: " + eventBus);
      } else {
        System.out.println("Failed: " + res.cause());
      }
    });
    

    当然你应该确保classpath中有一个ClusterManager的实现,例如缺省的HazelcastClusterManager

    命令行里实现集群

    你可以在命令行里运行集群vertx:
    vertx run my-verticle.js -cluster

    verticle的自动清理

    如果你在verticle内部注册了event bus handlers,当这些verticle被卸载时handlers将被自动注销。


    JSON

    与其他语言不同,Java对JSON并没有提供头等的支持;所以我们提供了两个类,使得JSON的处理容易些。

    JSON 对象

    JsonObject类用来表示JSON对象。

    JSON对象基本可以认为是个map,键是字符串,而值可以是JSON支持的类型中的一种(字符串、数字、布尔型)。

    JSON对象也支持null 值。

    创建JSON对象

    默认的构造器可以创建一个空的JSON对象。

    你也可以从JSON格式的字符串创建一个JSON对象:

    String jsonString = "{\"foo\":\"bar\"}";
    JsonObject object = new JsonObject(jsonString);
    

    向JSON对象中添加条目

    put方法可以用于往JSON对象中添加条目。

    put方法支持流式API:

    JsonObject object = new JsonObject();
    object.put("foo", "bar").put("num", 123).put("mybool", true);
    

    从JSON对象中取值

    可以使用类似getXXX方法从JSON对象中取值,例如:

    String val = jsonObject.getString("some-key");
    int intVal = jsonObject.getInteger("some-other-key");
    

    将JSON对象编码为字符串

    encode方法用来将JSON对象转换为字符串。

    JSON 数组

    JsonArray类用来表示JSON 数组。

    一个JSON 数组是一些值(字符串、数字或者布尔型)组成的序列。

    JSON 数组也可以包括null 。

    创建JSON 数组

    默认的构造器可以创建一个空的JSON 数组。

    可以从JSON格式的字符串创建JSON 数组:

    String jsonString = "[\"foo\",\"bar\"]";
    JsonArray array = new JsonArray(jsonString);
    

    往JSON数组中添加元素

    add方法:

    JsonArray array = new JsonArray();
    array.add("foo").add(123).add(false);
    

    从JSON 数组中取值

    类似下面这样:

    String val = array.getString(0);
    Integer intVal = array.getInteger(1);
    Boolean boolVal = array.getBoolean(2);
    

    将JSON 数组转换为字符串

    使用encode即可。


    Buffers

    Vert.x中大量使用buffers传输数据。

    buffer是一个可以读写的字节序列,超出其容量时,它会自动扩展。可以将其看成一个智能的字节数组。

    创建buffers

    可以使用静态方法Buffer.buffer创建buffers。

    buffer可以由字符串或字节数组初始化,当然,空的buffer也是允许的。

    这里有些例子。空buffer:
    Buffer buff = Buffer.buffer();

    字符串初始化的buffer,这里的字符串将使用UTF-8编码成buffer。
    Buffer buff = Buffer.buffer("some string");

    或者你可以指定编码:
    Buffer buff = Buffer.buffer("some string", "UTF-16");

    从字节数组创建:

    byte[] bytes = new byte[] {1, 3, 5};
    Buffer buff = Buffer.buffer(bytes);
    

    如果你知道将会有多少数据待写入,可以在创建buffer时指定buffer的尺寸。这样buffer创建时就会分配这么多内存,这在效率上要优过边写入边扩容。

    注意,这样创建的buffer仍然是空的(empty)。创建时并不会有0填充于其中。
    Buffer buff = Buffer.buffer(10000);

    写buffer

    写入buffer有两种方式:附加(appending)、随机存取(random access)。这两种方式下,buffer都会自动扩容。不会产生**IndexOutOfBoundsException **异常。

    Appending to a Buffer

    往buffer上附加信息,可以使用**appendXXX **系列方法。有适合各种类型的append方法。

    append系列方法的返回值就是buffer本身,所以适用链式写法:

    Buffer buff = Buffer.buffer();
    
    buff.appendInt(123).appendString("hello\n");
    
    socket.write(buff);
    

    Random access buffer writes

    你也可以通过一系列**setXXX **方法在指定的索引处写入数据。set系列方法的第一个参数都是索引值。

    buffer会自动扩容的。

    Buffer buff = Buffer.buffer();
    
    buff.setInt(1000, 123);
    buff.setString(0, "hello");
    

    读buffer

    **getXXX **系列方法用来从buffer中读取数据。get系列方法的第一个参数也是指示从哪开始读的索引值。

    Buffer buff = Buffer.buffer();
    for (int i = 0; i < buff.length(); i += 4) {
      System.out.println("int value at " + i + " is " + buff.getInt(i));
    }
    

    使用无符号数

    可以使用**getUnsignedXXX、appendUnsignedXXX、setUnsignedXXX **系列方法读写buffer。当你在为网络协议实现编解码器时,如果想将带宽消耗优化到极致,这个特性能帮上忙。

    下面这个例子里,使用一个字节在指定位置写入200:

    Buffer buff = Buffer.buffer(128);
    int pos = 15;
    buff.setUnsignedByte(pos, (short) 200);
    System.out.println(buff.getUnsignedByte(pos));
    

    控制台将显示‘200’。

    buffer的长度

    length方法可以获得buffer的长度。buffer的长度是最大的索引值+1。

    复制buffer

    使用copy方法。

    将buffer分片(slicing buffers)

    slice方法用来将buffer分片,切分出来的新buffer与原buffer共享缓存区。

    buffer重用

    在buffer被写入socket或类似地方后,它就不能再被使用了。


    相关文章

      网友评论

        本文标题:Vert.x3 Core手册简译(Java版)[Part 1.

        本文链接:https://www.haomeiwen.com/subject/pbomrttx.html