业务流程是服务的主要用例之一,无论你尝试通过高并发、低延迟业务多服务调用,还是基于各自尝试业务操作、数据读写、服务调用等。简化描述你的业务逻辑的能力对于服务的易于理解和维护是必不可少的。编排DSL-squbs-pattern的一部分-将使得异步的代码易于读写和理解。
入门
让我们从一个简单的但完整的业务流程例子开始。这个业务流程由三个相关的异步任务组成。
- 加载请求这个业务流程的查看用户
- 加载item,item的细节可能基于查看用户
- 基于用户和item数据构建item视图。
让我们深入了解流程和细节:
Scala
// 1. 定义业务流程actor
class MyOrchestrator extends Actor with Orchestrator {
// 2. 提供初始化expectOnce块来接收请求消息
expectOnce {
case r: MyOrchestrationRequest => orchestrate(sender(), r)
}
// 3. 定义业务流程(orchestrate )- 业务流程函数.
def orchestrate(requester: ActorRef, request: MyOrchestrationRequest) {
// 4. 使用pipe(>>)组合业务流在业务逻辑中是需要的
val userF = loadViewingUser
val itemF = userF >> loadItem(request.itemId)
val itemViewF = (userF, itemF) >> buildItemView
// 5. 结束并返回业务流程的结果
for {
user <- userF
item <- itemF
itemView <- itemViewF
} {
requester ! MyOrchestrationResult(user, item, itemView)
context.stop(self)
}
// 6. 通过调用来关闭业务流程
// context.stop(self).
}
// 7. 实现业务流程的具体功能,如下模式:
def loadItem(itemId: String)(seller: User): OFuture[Option[Item]] = {
val itemPromise = OPromise[Option[Item]]
context.actorOf(Props[ItemActor]) ! ItemRequest(itemId, seller.id)
expectOnce {
case item: Item => itemPromise success Some(item)
case e: NoSuchItem => itemPromise success None
}
itemPromise.future
}
def loadViewingUser: OFuture[Option[User]] = {
val userPromise = OPromise[Option[User]]
...
userPromise.future
}
def buildItemView(user: Option[User], item: Option[Item]): OFuture[Option[ItemView]] = {
...
}
}
Java
受限于JAVA语言,同样的实现看起来更加繁琐:
// 1. Define the orchestrator actor.
public class MyOrchestrator extends AbstractOrchestrator {
// 2. Provide the initial expectOnce in the constructor. It will receive the request message.
public MyOrchestrator() {
expectOnce(ReceiveBuilder.match(MyOrchestrationRequest.class, r -> orchestrate(r, sender())).build());
}
// 3. Define orchestrate - the orchestration function.
public void orchestrate(MyOrchestrationRequest request, ActorRef requester) {
// 4. Compose the orchestration flow as needed by the business logic.
static CompletableFuture<Optional<User>> userF = loadViewingUser();
static CompletableFuture<Optional<Item>> itemF =
userF.thenCompose(user ->
loadItem(user, request.itemId));
static CompletableFuture<Optional<ItemView>> itemViewF =
userF.thenCompose(user ->
itemF.thenCompose(item ->
buildItemView(user, item)));
// 5. Conclude and send back the result of the orchestration.
userF.thenCompose(user ->
itemF.thenCompose(item ->
itemViewF.thenAccept(itemView -> {
requester.tell(new MyOrchestrationResult(user, item, itemView), self());
context().stop(self());
})));
// 6. Make sure to stop the orchestrator actor by calling
// context.stop(self).
}
// 7. Implement the orchestration functions as in the following patterns.
private CompletableFuture<Optional<Item>> loadItem(User seller, String itemId) {
CompletableFuture<Optional<Item>> itemF = new CompletableFuture<>();
context().actorOf(Props.create(ItemActor.class))
.tell(new ItemRequest(itemId, seller.id), self());
expectOnce(ReceiveBuilder.
match(Item.class, item ->
itemF.complete(Optional.of(item)).
matchEquals(noSuchItem, e ->
itemF.complete(Optional.empty())).
build()
);
return itemF;
}
private CompletableFuture<Optional<User>> loadViewingUser() {
CompletableFuture<Optional<User>> userF = new CompletableFuture<>();
...
return userF.future
}
private CompletableFuture<Optional<ItemView>> buildItemView(Optional<User> user, Optional<Item> item) {
...
}
}
你可以先放慢阅读的脚步,深入理解后再阅读剩余的部分。轻松的阅读后面的内容并满足你的好奇心。
依赖
在build.sbt或scala构建文件中加入以下依赖:
"org.squbs" %% "squbs-pattern" % squbsVersion
核心概念
业务流(Orchestrator)
Orchestrator
是一个扩展自actor的特性(trait)来支持业务流程功能。从技术上来讲,它是一个 Aggregator 的子特性(trait)并且提供它所有的功能。除此之外,它提供了功能和语法,允许有效的业务流程组合。-加入工具包通常用在创建业务流程函数,在下文中会有详细的讨论。使用orchestrator,actor只需要简单的扩展 Orchestrator
特性。
import org.squbs.pattern.orchestration.Orchestrator
class MyOrchestrator extends Actor with Orchestrator {
...
}
AbstractOrchestrator
是JAVA使用者在java中使用的超类。它组合Actor和Orchestrator,因为JAVA不支持特性mix-in。因此JAVA的orchestrator使用如下:
import org.squbs.pattern.orchestration.japi.AbstractOrchestrator;
public class MyOrchestrator extends AbstractOrchestrator {
...
}
与Aggregator相似,orchestrator 通常不声明Akka actor 接收块,不过允许expect/expectOnce/unexpect 块来定义在那些埋点有哪些预期的响应。这些预期块通常在业务功能流程的内部使用。
Scala: Orchestration Future and Promise
orchestration promise和future与描述在这里的 scala.concurrent.Future
和scala.concurrent.Promise
相似,只是把名字改成OFuture
和OPromise
,意味着他们在actor中将被Orchestrator使用。orchestration的版本通过不存在并发行为的并发版本工件区分自己。它们在它们的签名中不使用和执行 ExecutionContext
(隐式)。它们同样缺少一些显式的执行一个异步闭包的函数。在actor中使用,它们的回调不会被actor的作用域之外被调用。这将消除由多线程回调并发修改actor状态的风险。另外,它们包含性能优化,假设它们始终在actor中被使用。
注意: 不要在actor之外传递OFuture
。隐私转换提供scala.concurrent.Future
和OFuture
之间的转换。
import org.squbs.pattern.orchestration.{OFuture, OPromise}
Java: CompletableFuture
Java的CompletableFuture 和他的synchronous 回调用在orchestration中的值占位符。同步回调确保处理future发生在线程的完成时,在orchestration模型中,它将成为orchestrator actor的线程来接收和处理消息。这确保这里不存在同时关闭actor状态。所有的回调在actor的作用域内进行。
import java.util.concurrent.CompletableFuture;
异步Orchestration功能
Orchestration函数被orchestration流调用来执行单一的orchestration任务,诸如数据库服务调用。一个orchestration函数必须遵守如下指南:
- 它必须使用非Future参数作为入参。基于并发限制。在scala中,参数的数量最高为22。java风格的orchestration不暴露这个限制。在所有的情况下,这类函数不应该含有太多参数。
- Scala函数可能从piped (future)输入被科里化 成分段的直接输入。在科里化函数中,管道输入必须为参数的最后一组。
- 它必须引发异步执行。异步执行通常通过发送一条被其他actor处理的消息实现。
- Scala实现必须返回 OFuture (orchestration future)。Java实现必须返回CompletableFuture
让我们看一些orchestration函数的例子:
Scala
def loadItem(itemId: String)(seller: User): OFuture[Option[Item]] = {
val itemPromise = OPromise[Option[Item]]
context.actorOf(Props[ItemActor]) ! ItemRequest(itemId, seller.id)
expectOnce {
case item: Item => itemPromise success Some(item)
case e: NoSuchItem => itemPromise success None
}
itemPromise.future
}
在这个例子中,函数已科里化。itemId
参数被同步传递,seller异步传递
Java
private CompletableFuture<Optional<Item>> loadItem(User seller, String itemId) {
CompletableFuture<Optional<Item>> itemF = new CompletableFuture<>();
context().actorOf(Props.create(ItemActor.class))
.tell(new ItemRequest(itemId, seller.id), self());
expectOnce(ReceiveBuilder.
match(Item.class, item ->
itemF.complete(Optional.of(item)).
matchEquals(noSuchItem, e ->
itemF.complete(Optional.empty())).
build()
);
return itemF;
}
我们从创建OPromise
(Scala) 或 CompletableFuture
(Java) 保持最终值功能开始。然后我们将ItemRequest发送给另一个actor。这个actor将立刻异步获取item。一旦我们发送这个请求,我们通过expectOnce注册一个callback。expectOnce 中的代码是一个Receive
,它将在ItemActor发回响应时执行。在所有的情况下,它将success
promise 或 complete
CompletableFuture。在最后,我们发出future。不反悔promise的原因是因为它是可变的。我们不希望在函数外返回一个可变对象。在它上面调用future
会提供一个不可变的OPromise
视图,即OFuture
。可惜的是,这并不支持JAVA。
下面这个例子在逻辑上与第一个相同,只是将tell替换成了ask:
Scala
private def loadItem(itemId: String)(seller: User): OFuture[Option[Item]] = {
import context.dispatcher
implicit val timeout = Timeout(3 seconds)
(context.actorOf[ItemActor] ? ItemRequest(itemId, seller.id)).mapTo[Option[Item]]
}
在这种情况下,这个ask(?
)操作返回 scala.concurrent.Future
,Orchestrator 特性(trait)提供隐式转换在 scala.concurrent.Future 和OFuture之间,所以ask( ?
)从这个函数返回的结果声明为OFuture类型,且不需要显示的调用转换。
Java
private CompletableFuture<Optional<Item>> loadItem(User seller, String itemId) {
CompletableFuture<Optional<Item>> itemF = new CompletableFuture<>();
Timeout timeout = new Timeout(Duration.create(5, "seconds"));
ActorRef itemActor = context().actorOf(Props.create(ItemActor.class));
ask(itemActor, new ItemRequest(itemId, seller.id), timeout).thenComplete(itemF);
return itemF;
}
使用 ask
或?
看起来使用更少的代码,并在 expect/expectOnce中有更差的性能和更少的复杂度。预期块中的逻辑同样被用作结果的进一步转换。这些通过使用ask返回的future同样可以实现。但是,性能无法简单的补偿,具体原因如下:
- Ask将会创建一个新的actor作为响应的接受者
- 从
scala.concurrent.Future
到OFuture
的转换和JAVA api中的fill
操作需要消息 piped 回到orchestrator,因此新增的消息跳跃同时增加了CPU和延迟。
在使用ask而不是expect/expectOnce时,测试显示更高的延迟和CPU利用率.
组合
Scala
pipe( >>
)标记使用至少一个 future OFuture
并且使其结果作为orchestration函数的输入。当所有OFuture
代表的输入被解决时,实际的orchestration 函数将会异步调用。
pipe是Orchestration DSL 的主要部件,基于他们的输入输出,允许orchestration函数被组合。orchestration流被orchestration声明亦或是通过管道声明的orchestration 流隐式的指定。
如果多个OFutures输入到orchestration函数,OFutures 需要逗号分隔和括在括号中,构建OFutures元组作为输入。在elements元组中的元素数量和OFuture 类型必须匹配函数参数和类型,或者最后一组参数在 科里化情况下或编译将会失败。这些错误一般会被IDE捕获。
以下的例子显示了一个简单的使用loadItem orchestration函数使用orchestration声明和流,这些在前面章节已声明,其中包括:
val userF = loadViewingUser
val itemF = userF >> loadItem(request.itemId)
val itemViewF = (userF, itemF) >> buildItemView
上面的流可以如下描述:
- 首先调用loadViewingUser (无参)
- 当viewing user可用时,使用 viewing user 作为入参调用loadItem (在这种情况下,先前的id有效)。loadItem在这种情况下跟随确切的在orchestration上面声明的标识符。
- 当user和item均生效时,调用buildItemView
Java
多个CompletableFuture
的组合可以通过使用组合函数CompletableFuture.thenCompose()
实现。每个thenCompose
需要已解决的未来的值作为输入使用lambda。当 CompletableFuture
结束时,它将会被调用。
通过一个使用例子来描述最佳:
static CompletableFuture<Optional<User>> userF = loadViewingUser();
static CompletableFuture<Optional<Item>> itemF =
userF.thenCompose(user ->
loadItem(user, request.itemId));
static CompletableFuture<Optional<ItemView>> itemViewF =
userF.thenCompose(user ->
itemF.thenCompose(item ->
buildItemView(user, item)));
流程可以描述如下:
- 首先调用loadViewingUser.
- 当viewing user生效时,使用viewing user作为参数来调用loadItem (在这种情况下,itemId在之前有效)
- 当user和item生效时,调用buildItemView
Orchestrator实例生命周期
Orchestrator通常单独使用actor。他们接受初始化请求,然后基于请求调用的orchestration函数响应发送出去。
为了允许orchestrator服务多个orchestration请求,orchestrator需要为每个请求结合输入和响应,并且把它们从不同的请求中分离。这会很大程度上加深开发的复杂度,并且在这些我们看到的例子中,它们并不会以一个干净的orchestration结束 。为此,创建一个新的actor成本更低,为每个orchestration请求我们可以简单的创建新的orchestrator。
orchestration的最后一部分回调应该关闭actor。在Scala中通过调用context.stop(self)
或 context stop self
(如果优先中缀表示法)。Java的实现应该调用:`context().stop(self())。
完成Orchestration流
这里,我们将以上的所有概念结合。从上面开始重复同样的例子,包含更多完整的解释:
// 1. 定义orchestrator actor
class MyOrchestrator extends Actor with Orchestrator {
// 2.提供初始expectOnce块,它将接受请求消息
// 在接收到这些请求后,同样的actor不会再次接受到同样的请求。
// expectOnce看起来有一个初始化模式匹配请求,并使用请求成员和参数,sender()来调用
// 高等级orchestration函数。这个函数通常称为orchestrate。
expectOnce {
case r: MyOrchestrationRequest => orchestrate(sender(), r)
}
// 3. 定义orchestratem,它的参数默认不可变
// 使得开发者依赖的这些情况永远不变。
def orchestrate(requester: ActorRef, request: MyOrchestrationRequest) {
// 4. 如果有任何事我们需要同步启动orchestration,在orchestrate的最前部分执行
// 5. 业务逻辑需要使用管道组合orchestration流
val userF = loadViewingUser
val itemF = userF >> loadItem(request.itemId)
val itemViewF = (userF, itemF) >> buildItemView
// 6. 通过调用函数结束流结合了请求响应。如果结合非常大,它可能
// 具有更多的可读性来使用结合而不是包含大量的参数的结合函数。
// 可能在某些特殊响应情况下需要这些多重组合。
// 例子中展示的组合仅仅作为参考,
// 在这个小情况下,你可以通过3个参数增加请求者来使用orchestration函数
for {
user <- userF
item <- itemF
itemView <- itemViewF
} {
requester ! MyOrchestrationResult(user, item, itemView)
context.stop(self)
}
// 7. 确保最后的响应通过调用关闭orchestrator actor
// context.stop(self).
}
// 8.在orchestrator actor内实现异步orchestration函数,
// 但在orchestrate函数之外
def loadItem(itemId: String)(seller: User): OFuture[Option[Item]] = {
val itemPromise = OPromise[Option[Item]]
context.actorOf[ItemActor] ! ItemRequest(itemId, seller.id)
expectOnce {
case item: Item => itemPromise success Some(item)
case e: NoSuchItem => itemPromise success None
}
itemPromise.future
}
def loadViewingUser: OFuture[Option[User]] = {
val userPromise = OPromise[Option[User]]
...
userPromise.future
}
def buildItemView(user: Option[User], item: Option[Item]): OFuture[Option[ItemView]] = {
...
}
}
Java
// 1. Define the orchestrator actor.
public class MyOrchestrator extends AbstractOrchestrator {
// 2. Provide the initial expectOnce in the constructor. It will receive the request message.
public MyOrchestrator() {
expectOnce(ReceiveBuilder.match(MyOrchestrationRequest.class, r -> orchestrate(r, sender())).build());
}
// 3. Define orchestrate - the orchestration function.
public void orchestrate(MyOrchestrationRequest request, ActorRef requester) {
// 4. If there is anything we need to do synchronously to setup for
// the orchestration, do this in the first part of orchestrate.
// 5. Compose the orchestration flow as needed by the business logic.
static CompletableFuture<Optional<User>> userF = loadViewingUser();
static CompletableFuture<Optional<Item>> itemF =
userF.thenCompose(user ->
loadItem(user, request.itemId));
static CompletableFuture<Optional<ItemView>> itemViewF =
userF.thenCompose(user ->
itemF.thenCompose(item ->
buildItemView(user, item)));
// 6. Conclude and send back the result of the orchestration.
userF.thenCompose(user ->
itemF.thenCompose(item ->
itemViewF.thenAccept(itemView -> {
requester.tell(new MyOrchestrationResult(user, item, itemView), self());
context().stop(self());
})));
// 7. Make sure to stop the orchestrator actor by calling
// context.stop(self).
}
// 8. Implement the orchestration functions as in the following patterns.
private CompletableFuture<Optional<Item>> loadItem(User seller, String itemId) {
CompletableFuture<Optional<Item>> itemF = new CompletableFuture<>();
context().actorOf(Props.create(ItemActor.class))
.tell(new ItemRequest(itemId, seller.id), self());
expectOnce(ReceiveBuilder.
match(Item.class, item ->
itemF.complete(Optional.of(item)).
matchEquals(noSuchItem, e ->
itemF.complete(Optional.empty())).
build()
);
return itemF;
}
private CompletableFuture<Optional<User>> loadViewingUser() {
CompletableFuture<Optional<User>> userF = new CompletableFuture<>();
...
return userF;
}
private CompletableFuture<Optional<ItemView>> buildItemView(Optional<User> user, Optional<Item> item) {
...
}
}
重用Orchestration 函数
Scala
Orchestration functions通常依赖 Orchestrator
trait 提供的功能,无法单独存在。然而,在许多情况下,跨orchestrator重用orchestration函数来进行不同形式的orchestrate是需要的。在这些情况下,分割orchestration函数至不同的trait并混合每个orchestrator中非常重要。trait需要访问orchestration并且需要自引用至 Orchestrator
。以下是一个简单的trait例子:
trait OrchestrationFunctions { this: Actor with Orchestrator =>
def loadItem(itemId: String)(seller: User): OFuture[Option[Item]] = {
...
}
}
上面例子中的 this: Actor with Orchestrator
是一个自引用。它告诉Scala编译器这个triat只能混合到Actor
,并且同时是个 Orchestrator
,因此它将能访问Actor
和Orchestrator
功能,并使用这些来自trait和类混合所获取的功能。
在orchestrator中使用 OrchestrationFunctions
trait,只需要如下方式混合这个trait至orchestrator:
class MyOrchestrator extends Actor with Orchestrator with OrchestrationFunctions {
...
}
Java
Java 需要一个单独的层次结构不支持多trait接口集成。重用通过扩展AbstractOrchestrator实现,实现orchestration函数,并且留下剩余的抽象-需要被具体的orchestrator实现,具体如下:
abstract class MyAbstractOrchestrator extends AbstractOrchestrator {
protected CompletableFuture<Optional<Item>> loadItem(User seller, String itemId) {
CompletableFuture<Optional<Item>> itemF = new CompletableFuture<>();
...
return itemF;
}
protected CompletableFuture<Optional<User>> loadViewingUser() {
CompletableFuture<Optional<User>> userF = new CompletableFuture<>();
...
return userF;
}
protected CompletableFuture<Optional<ItemView>> buildItemView(Optional<User> user, Optional<Item> item) {
...
}
}
这个具体的orchestrator 实现,只需要从上面的MyAbstractOrchestrator
扩展,并实现不同的orchestration。
确保响应唯一
使用 expect
或 expectOnce
时,我们被单个expect块的模式匹配能力限制,它被限制在作用域中并且不能区别在多orchestration函数中跨expect块的匹配。接收到的消息来自请求消息 (在同一个orchestration函数声明expect之前发送)不存在逻辑关系。针对复杂orchestration,我们可能遇到消息混乱的问题。响应并未与正确的请求关联并且会错误处理。这里有一些解决这些问题的策略:
如果是初始消息的接受者,因此响应消息的发送者是唯一的,模式匹配可以包含消息发送者的引用,作为一个如下的模式匹配。
Scala
def loadItem(itemId: String)(seller: User): OFuture[Option[Item]] = {
val itemPromise = OPromise[Option[Item]]
val itemActor = context.actorOf(Props[ItemActor])
itemActor ! ItemRequest(itemId, seller.id)
expectOnce {
case item: Item if sender() == itemActor => itemPromise success Some(item)
case e: NoSuchItem if sender() == itemActor => itemPromise success None
}
itemPromise.future
}
Java
private CompletableFuture<Optional<Item>> loadItem(User seller, String itemId) {
CompletableFuture<Optional<Item>> itemF = new CompletableFuture<>();
ActorRef itemActor = context().actorOf(Props.create(ItemActor.class));
itemActor.tell(new ItemRequest(itemId, seller.id), self());
expectOnce(ReceiveBuilder.
match(Item.class, item -> itemActor.equals(sender()), item ->
itemF.complete(Optional.of(item)).
matchEquals(noSuchItem, e -> itemActor.equals(sender()), e ->
itemF.complete(Optional.empty())).
build()
);
return itemF;
}
换句话说,在结合actor实例时,Orchestrator
特性提供唯一消息编号生成器。我们可以使用这个id生成器来生成唯一消息编号。actor接收到这些消息将只需要返回这些消息编号作为响应消息的一部分。下面展示了一个orchestration函数使用消息生成器的例子。
Scala
def loadItem(itemId: String)(seller: User): OFuture[Option[Item]] = {
val itemPromise = OPromise[Option[Item]]
// Generate the message id.
val msgId = nextMessageId
context.actorOf(Props[ItemActor]) ! ItemRequest(msgId, itemId, seller.id)
// Use the message id as part of the response pattern match. It needs to
// be back-quoted as to not be interpreted as variable extractions, where
// a new variable is created by extraction from the matched object.
expectOnce {
case item @ Item(`msgId`, _, _) => itemPromise success Some(item)
case NoSuchItem(`msgId`, _) => itemPromise success None
}
itemPromise.future
}
Java
private CompletableFuture<Optional<Item>> loadItem(User seller, String itemId) {
CompletableFuture<Optional<Item>> itemF = new CompletableFuture<>();
long msgId = nextMessageId();
context().actorOf(Props.create(ItemActor.class))
.tell(new ItemRequest(msgId, itemId, seller.id), self());
expectOnce(ReceiveBuilder.
match(Item.class, item -> item.msgId == msgId, item ->
itemF.complete(Optional.of(item)).
match(NoSuchItem.class, e -> e.msgId == msgId, e ->
itemF.complete(Optional.empty())).
build()
);
return itemF;
}
网友评论