感兴趣的朋友,可以关注微信服务号“猿学堂社区”,或加入“猿学堂社区”微信交流群
版权声明:本文由作者自行翻译,未经作者授权,不得随意转发。
与我们最早的实现相比,前面的重构已经是向前一大步,因为我们提取出了独立且可配置的Verticle,并且在事件总线之上使用异步消息进行链接。我们还看到,我们可以同时部署一个指定Verticle的几个实例,以便更好地处理负载以及更好地利用CPU内核。
在这一节,我们将看到如何设计和使用Vert.x服务。服务的主要优势是,它定义了一个接口用于执行Verticle公开的特定操作。对于所有事件总线消息工作,我们还可以利用代码生成,而不是像前一节那样自己创建它。
step-3/src/main/java/
└── io
└── vertx
└── guides
└── wiki
├── MainVerticle.java
├── database
│ ├── ErrorCodes.java
│ ├── SqlQuery.java
│ ├── WikiDatabaseService.java
│ ├── WikiDatabaseServiceImpl.java
│ ├── WikiDatabaseVerticle.java
│ └── package-info.java
└── http
└── HttpServerVerticle.java
io.vertx.guides.wiki现在包含主Verticle,io.vertx.guides.wiki.database包含数据库Verticle和服务,io.vertx.guides.wiki.http包含HTTP Server Verticle。
4.1 Maven配置变更
首先,我们需要添加下面两个依赖到我们的项目。很明显,我们需要vertx-service-proxy的API:
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-service-proxy</artifactId>
</dependency>
我们需要Vert.x代码生成模块作为一个编译时依赖(所以是provided范围):
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-codegen</artifactId>
<scope>provided</scope>
</dependency>
接下来,我们必须稍微调整一下maven-compiler-plugin的配置来使用代码生成,它通过一个javac注解处理器完成:
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<useIncrementalCompilation>false</useIncrementalCompilation>
<annotationProcessors>
<annotationProcessor>io.vertx.codegen.CodeGenProcessor</annotationProcessor>
</annotationProcessors>
<generatedSourcesDirectory>${project.basedir}/src/main/generated</generatedSourcesDirectory>
<compilerArgs>
<arg>-AoutputDirectory=${project.basedir}/src/main</arg>
</compilerArgs>
</configuration>
</plugin>
注意,生成代码放置在src/main/generated目录下,一些集成开发环境诸如IntelliJ IDEA将自动识别为类路径。
更新maven-clean-plugin插件移除这些生成文件也是一个好注意:
<plugin>
<artifactId>maven-clean-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<filesets>
<fileset>
<directory>${project.basedir}/src/main/generated</directory>
</fileset>
</filesets>
</configuration>
</plugin>
关于Vert.x Service的完整文档位于http://vertx.io/docs/vertxservice-
proxy/java/。
4.2 数据库服务接口
定义一个服务接口与定义一个Java接口一样简单,除此之外,有一些规则需要遵守,以使代码生成可以工作,并且还要确保与Vert.x中的其它代码的互操作性。
接口的开始定义如下:
@ProxyGen
public interface WikiDatabaseService {
@Fluent
WikiDatabaseService fetchAllPages(Handler<AsyncResult<JsonArray>> resultHandler);
@Fluent
WikiDatabaseService fetchPage(String name, Handler<AsyncResult<JsonObject>> resultHandler);
@Fluent
WikiDatabaseService createPage(String title, String markdown, Handler<AsyncResult<Void>> resultHandler);
@Fluent
WikiDatabaseService savePage(int id, String markdown, Handler<AsyncResult<Void>> resultHandler);
@Fluent
WikiDatabaseService deletePage(int id, Handler<AsyncResult<Void>> resultHandler);
// (...)
1、ProxyGen注解用于触发该服务的客户端代理代码生成。
2、Fluent注解是可选的,但是允许fluent接口,操作可以通过返回服务实例被链式调用(chained)。这对于代码生成器非常有用,当服务将被其它JVM语言消费时。
3、参数类型需要是字符串、Java原始数据类型、JSON对象或者数组、任何枚举类型或者前面类型的java.util集合(List/Set/Map)。支持任意Java类的唯一方法是使用@DataObject注解,使它们作为Vert.x数据对象。传递其它类型的最后机会是服务引用类型。
4、由于服务提供异步结果,一个服务的最后参数需要是Handler<AsyncResult<T>>,T是上面描述的适合于代码生成的任何类型。
服务接口提供一个静态方法用于创建实际服务实现以及事件总线上客户端代码使用的代理,这是一种好的习惯。
我们声明了create方法,简单的委托给实现类及其构造函数:
static WikiDatabaseService create(JDBCClient dbClient, HashMap<SqlQuery, String> sqlQueries, Handler<AsyncResult
<WikiDatabaseService>> readyHandler) {
return new WikiDatabaseServiceImpl(dbClient, sqlQueries, readyHandler);
}
Vert.x代码生成器创建代理类,并且类名以VertxEBProxy作为后缀。这些代理类的构造方法需要一个Vert.x上下文的引用以及事件总线的目的地址作为参数:
static WikiDatabaseService createProxy(Vertx vertx, String address) {
return new WikiDatabaseServiceVertxEBProxy(vertx, address);
}
在上次迭代中作为内部类的SqlQuery和ErrorCodes枚举类型,本次迭代已被提取为包保护(package-protected)类型,具体查看SqlQuery.java和ErrorCodes.java。
4.3 Database服务实现
服务实现是先前WikiDatabaseVerticle类代码的直截了当的移植。主要区别在于,服务实现在构造函数(报告初始化结果)和服务方法(报告操作成功)中支持异步处理结果处理(Handler)。
类代码如下:
class WikiDatabaseServiceImpl implements WikiDatabaseService {
private static final Logger LOGGER = LoggerFactory
.getLogger(WikiDatabaseServiceImpl.class);
private final HashMap<SqlQuery, String> sqlQueries;
private final JDBCClient dbClient;
WikiDatabaseServiceImpl(JDBCClient dbClient,
HashMap<SqlQuery, String> sqlQueries,
Handler<AsyncResult<WikiDatabaseService>> readyHandler) {
this.dbClient = dbClient;
this.sqlQueries = sqlQueries;
dbClient.getConnection(ar -> {
if (ar.failed()) {
LOGGER.error("Could not open a database connection", ar.cause());
readyHandler.handle(Future.failedFuture(ar.cause()));
} else {
SQLConnection connection = ar.result();
connection.execute(sqlQueries.get(SqlQuery.CREATE_PAGES_TABLE),
create -> {
connection.close();
if (create.failed()) {
LOGGER.error("Database preparation error",
create.cause());
readyHandler.handle(Future.failedFuture(create
.cause()));
} else {
readyHandler.handle(Future
.succeededFuture(this));
}
});
}
});
}
@Override
public WikiDatabaseService fetchAllPages(
Handler<AsyncResult<JsonArray>> resultHandler) {
dbClient.query(sqlQueries.get(SqlQuery.ALL_PAGES), res -> {
if (res.succeeded()) {
JsonArray pages = new JsonArray(res.result().getResults()
.stream().map(json -> json.getString(0)).sorted()
.collect(Collectors.toList()));
resultHandler.handle(Future.succeededFuture(pages));
} else {
LOGGER.error("Database query error", res.cause());
resultHandler.handle(Future.failedFuture(res.cause()));
}
});
return this;
}
@Override
public WikiDatabaseService fetchPage(String name,
Handler<AsyncResult<JsonObject>> resultHandler) {
dbClient.queryWithParams(
sqlQueries.get(SqlQuery.GET_PAGE),
new JsonArray().add(name),
fetch -> {
if (fetch.succeeded()) {
JsonObject response = new JsonObject();
ResultSet resultSet = fetch.result();
if (resultSet.getNumRows() == 0) {
response.put("found", false);
} else {
response.put("found", true);
JsonArray row = resultSet.getResults().get(0);
response.put("id", row.getInteger(0));
response.put("rawContent", row.getString(1));
}
resultHandler.handle(Future.succeededFuture(response));
} else {
LOGGER.error("Database query error", fetch.cause());
resultHandler.handle(Future.failedFuture(fetch.cause()));
}
});
return this;
}
@Override
public WikiDatabaseService createPage(String title, String markdown,
Handler<AsyncResult<Void>> resultHandler) {
JsonArray data = new JsonArray().add(title).add(markdown);
dbClient.updateWithParams(sqlQueries.get(SqlQuery.CREATE_PAGE), data,
res -> {
if (res.succeeded()) {
resultHandler.handle(Future.succeededFuture());
} else {
LOGGER.error("Database query error", res.cause());
resultHandler.handle(Future.failedFuture(res.cause()));
}
});
return this;
}
@Override
public WikiDatabaseService savePage(int id, String markdown,
Handler<AsyncResult<Void>> resultHandler) {
JsonArray data = new JsonArray().add(markdown).add(id);
dbClient.updateWithParams(sqlQueries.get(SqlQuery.SAVE_PAGE), data,
res -> {
if (res.succeeded()) {
resultHandler.handle(Future.succeededFuture());
} else {
LOGGER.error("Database query error", res.cause());
resultHandler.handle(Future.failedFuture(res.cause()));
}
});
return this;
}
@Override
public WikiDatabaseService deletePage(int id,
Handler<AsyncResult<Void>> resultHandler) {
JsonArray data = new JsonArray().add(id);
dbClient.updateWithParams(sqlQueries.get(SqlQuery.DELETE_PAGE), data,
res -> {
if (res.succeeded()) {
resultHandler.handle(Future.succeededFuture());
} else {
LOGGER.error("Database query error", res.cause());
resultHandler.handle(Future.failedFuture(res.cause()));
}
});
return this;
}
}
在代理代码生成可以工作之前,还需要最后一步:服务包需要有一个package-info.java注解来声明一个Vert.x模块:
@ModuleGen(groupPackage = "io.vertx.guides.wiki.database", name = "wiki-database")
package io.vertx.guides.wiki.database;
import io.vertx.codegen.annotations.ModuleGen;
4.4 从数据库Verticle公开数据库服务
由于大多数数据库处理代码已经被移动到WikiDatabaseServiceImpl类,WikiDatabaseVerticle类现在包含两个方法:start方法用来注册服务,以及一个工具方法加载SQL查询:
public class WikiDatabaseVerticle extends AbstractVerticle {
public static final String CONFIG_WIKIDB_JDBC_URL = "wikidb.jdbc.url";
public static final String CONFIG_WIKIDB_JDBC_DRIVER_CLASS = "wikidb.jdbc.driver_class";
public static final String CONFIG_WIKIDB_JDBC_MAX_POOL_SIZE = "wikidb.jdbc.max_pool_size";
public static final String CONFIG_WIKIDB_SQL_QUERIES_RESOURCE_FILE = "wikidb.sqlqueries.resource.file";
public static final String CONFIG_WIKIDB_QUEUE = "wikidb.queue";
@Override
public void start(Future<Void> startFuture) throws Exception {
HashMap<SqlQuery, String> sqlQueries = loadSqlQueries();
JDBCClient dbClient = JDBCClient.createShared(
vertx,
new JsonObject()
.put("url",config().getString(CONFIG_WIKIDB_JDBC_URL,"jdbc:hsqldb:file:db/wiki"))
.put("driver_class",config().getString(CONFIG_WIKIDB_JDBC_DRIVER_CLASS,"org.hsqldb.jdbcDriver"))
.put("max_pool_size",config().getInteger(CONFIG_WIKIDB_JDBC_MAX_POOL_SIZE, 30)));
WikiDatabaseService.create(dbClient, sqlQueries, ready -> {
if (ready.succeeded()) {
ProxyHelper.registerService(WikiDatabaseService.class, vertx,
ready.result(), CONFIG_WIKIDB_QUEUE); ①
startFuture.complete();
} else {
startFuture.fail(ready.cause());
}
});
}
/*
* Note: this uses blocking APIs, but data is small...
*/
private HashMap<SqlQuery, String> loadSqlQueries() throws IOException {
String queriesFile = config().getString(
CONFIG_WIKIDB_SQL_QUERIES_RESOURCE_FILE);
InputStream queriesInputStream;
if (queriesFile != null) {
queriesInputStream = new FileInputStream(queriesFile);
} else {
queriesInputStream = getClass().getResourceAsStream(
"/db-queries.properties");
}
Properties queriesProps = new Properties();
queriesProps.load(queriesInputStream);
queriesInputStream.close();
HashMap<SqlQuery, String> sqlQueries = new HashMap<>();
sqlQueries.put(SqlQuery.CREATE_PAGES_TABLE,queriesProps.getProperty("create-pages-table"));
sqlQueries.put(SqlQuery.ALL_PAGES,queriesProps.getProperty("all-pages"));
sqlQueries.put(SqlQuery.GET_PAGE, queriesProps.getProperty("get-page"));
sqlQueries.put(SqlQuery.CREATE_PAGE,queriesProps.getProperty("create-page"));
sqlQueries.put(SqlQuery.SAVE_PAGE,queriesProps.getProperty("save-page"));
sqlQueries.put(SqlQuery.DELETE_PAGE,queriesProps.getProperty("delete-page"));
return sqlQueries;
}
}
① 我们在此处注册服务。
注册服务需要一个接口类,一个Vert.x上下文,一个实现类以及一个事件总线目的地址。
WikiDatabaseServiceVertxEBProxy生成类处理事件总线上接收到的消息并分发它们到WikiDatabaseServiceImpl。它做的事情实际上非常接近于我们在前面章节所做的:发送消息使用一个action头指定哪个方法被调用,并且参数被编码为JSON。
4.5 获得一个数据库服务代理
重构为Vert.x服务的最后步骤是改写HTTP Server Verticle来获得数据库服务的代理,并且在处理器(handler)中使用它代替事件总线:
首先,我们需要在启动Verticle时,创建代理:
private WikiDatabaseService dbService;
@Override
public void start(Future<Void> startFuture) throws Exception {
String wikiDbQueue = config().getString(CONFIG_WIKIDB_QUEUE, "wikidb.queue"); ①
dbService = WikiDatabaseService.createProxy(vertx, wikiDbQueue);
HttpServer server = vertx.createHttpServer();
// (...)
① 我们仅仅需要确保使用的事件总线目的地址与WikiDatabaseVerticle发布的服务相同。
然后,我们需要用数据库服务的调用替换事件总线的调用:
private void indexHandler(RoutingContext context) {
dbService.fetchAllPages(reply -> {
if (reply.succeeded()) {
context.put("title", "Wiki home");
context.put("pages", reply.result().getList());
templateEngine.render(
context,
"templates",
"/index.ftl",
ar -> {
if (ar.succeeded()) {
context.response().putHeader("Content-Type",
"text/html");
context.response().end(ar.result());
} else {
context.fail(ar.cause());
}
});
} else {
context.fail(reply.cause());
}
});
}
private void pageRenderingHandler(RoutingContext context) {
String requestedPage = context.request().getParam("page");
dbService.fetchPage(
requestedPage,
reply -> {
if (reply.succeeded()) {
JsonObject payLoad = reply.result();
boolean found = payLoad.getBoolean("found");
String rawContent = payLoad.getString("rawContent",
EMPTY_PAGE_MARKDOWN);
context.put("title", requestedPage);
context.put("id", payLoad.getInteger("id", -1));
context.put("newPage", found ? "no" : "yes");
context.put("rawContent", rawContent);
context.put("content", Processor.process(rawContent));
context.put("timestamp", new Date().toString());
templateEngine.render(
context,
"templates",
"/page.ftl",
ar -> {
if (ar.succeeded()) {
context.response().putHeader(
"Content-Type", "text/html");
context.response().end(ar.result());
} else {
context.fail(ar.cause());
}
});
} else {
context.fail(reply.cause());
}
});
}
private void pageUpdateHandler(RoutingContext context) {
String title = context.request().getParam("title");
Handler<AsyncResult<Void>> handler = reply -> {
if (reply.succeeded()) {
context.response().setStatusCode(303);
context.response().putHeader("Location", "/wiki/" + title);
context.response().end();
} else {
context.fail(reply.cause());
}
};
String markdown = context.request().getParam("markdown");
if ("yes".equals(context.request().getParam("newPage"))) {
dbService.createPage(title, markdown, handler);
} else {
dbService.savePage(
Integer.valueOf(context.request().getParam("id")),
markdown, handler);
}
}
private void pageCreateHandler(RoutingContext context) {
String pageName = context.request().getParam("name");
String location = "/wiki/" + pageName;
if (pageName == null || pageName.isEmpty()) {
location = "/";
}
context.response().setStatusCode(303);
context.response().putHeader("Location", location);
context.response().end();
}
private void pageDeletionHandler(RoutingContext context) {
dbService.deletePage(Integer.valueOf(context.request().getParam("id")),
reply -> {
if (reply.succeeded()) {
context.response().setStatusCode(303);
context.response().putHeader("Location", "/");
context.response().end();
} else {
context.fail(reply.cause());
}
});
}
生成的WikiDatabaseServiceVertxProxyHandler类处理转发调用为事件总线消息。
仍然完全可以直接通过事件总线消息使用Vert.x服务,因为这正是生成的代理做的事情。
网友评论