FoundationDB(fdb) 是最近苹果重新开源的支持 ACID 的 NoSQL 系统。fdb 为了使用 actor 并发模型,魔改了一下编译流程,做了一个叫作 Flow 的拓展系统。 fdb 的编译流程为:1. 先用 flowcompiler 将带有自定义拓展关键字的C++代码翻译为正常的C++代码;2.再用标准的 C++ 编译器进行编译。
actor 模型
在 erlang 以及 scala akka 中使用的并发模型,每个服务进程都有一个 mailbox,进程中的通信通过向对端的 mailbox 发送消息的形式进行。更详细的可以参考 scala akka 的文档。
为了引入 actor 模型去开发一个语言拓展是否合适?按照 fdb 官方的测试结论,Flow 的引入对于性能的提升非常大。对于 ring benchmark 的结果如下:
Programming language | Time in seconds |
---|---|
Ruby (using threads) | 1990 sec |
Ruby (using queues) | 360 sec |
Objective C (using threads) | 26 sec |
Java (using threads) | 12 sec |
Stackless Python | 1.68 sec |
Erlang | 1.09 sec |
Go | 0.87 sec |
Flow | 0.075 |
Write a ring benchmark. Create N processes in a ring. Send a message round the ring M times so that a total of N * M messages get sent. Time how long this takes for different values of N and M. Write a similar program in some other programming language you are familiar with. Compare the results. Write a blog, and publish the results on the internet!
Flow Features
Flow 相对于标准 C++ 添加了一些新的关键字,来支持在不同组件间异步的传输消息的能力。这些关键字包括: Promise、Future、wait、ACTOR、State、PromiseStream、FutureStream、waitNext、choose、when。
flow-example.png
Promise/Future
promise/future 是一种用来简化异步编程的抽象,future 代表该变量在未来某个时刻可读,promise 代表会在未来某个时刻给该变量赋值。
在 flow 中,promise/future 对不仅可以在一个进程中使用,也可以跨网络使用。例如,一个计算机生成一个 promise/future 对,并将 promise 通过网络发送到另一个计算机。这时 promise 和 future 仍然是连接的,当远端计算机对 promise 赋值时,另一个计算机可以从 future 中得到对应的赋值。
wait()
上一段介绍到 future 是代表未来某个时刻可读。flow 提供了一个关键字 wait, 来让一个 actor 可以停止执行并等待 future 返回值。在该 actor 等待期间,其他 actor 并不会阻塞。
ACTOR
只有打了 ACTOR 标签的函数才可以调用 wait()。
actor 是异步工作的基本单位,并可以被组合为更复杂的消息传递系统。通过组合actor,future 可以被链到一起来实现一个future的值依赖于前一个future的输出。
actor 被声明为返回 Future,如果返回值只是用来发送信号,不携带值,可以使用 Future void 作为返回值。
每个 actor 会被预处理为一个 C++11 类。
state
state 关键字用来修饰一个变量,代表该变量可以在一个actor内被多个 wait() 语句可见。
PromiseStream/FutureStream
当一些组件希望处理异步消息流而不单单是一个消息时,就可以使用 PromiseStream/FutureStream 关键字。
waitNext()
waitNext 与 wait 相似,被设计用来处理 FutureStream。 waitNext 会阻塞 actor,等到 FutureStream 的下一个值。如果在 FutureStream 中已经有一个值,就会无延迟的立即执行返回。
choose .. when
choose..when 语句可以让一个actor同时等待多个future,可以理解为 golang 的 select 等待多个 chan 。
示例
计数服务
下面一个关于计数服务的示例,可以看到 serveCountingServerInterface 函数被打上了 ACTOR 标签来标识这个函数是一个 actor,可以使用 wait/waitNext 关键字。int count 被标记为了 state,是因为它会被多个 actor 使用,包括serveCountingServerInterface actor以及对端 actor。然后,serveCountingServerInterface 是一个死循环,通过 choose..when 和 waitNext 关键字来监听 csi.addCount/csi.subtractCount/csi.getCount 三个 FutureStream
ACTOR void serveCountingServerInterface(
CountingServerInterface csi) {
state int count = 0;
while (1) {
choose {
when (int x = waitNext(csi.addCount.getFuture())){
count += x;
}
when (int x = waitNext(csi.subtractCount.getFuture())){
count -= x;
}
when (Promise<int> r = waitNext(csi.getCount.getFuture())){
r.send( count ); // goes to client
}
}
}
}
Coordination
从 fdb 代码中截取一段代码,位于 fdbserver/Coordination.actor.cpp,如下:
// 注:ACTOR 声明该函数为一个 actor,返回为 Future<Void>
ACTOR Future<Void> localGenerationReg( GenerationRegInterface interf, OnDemandStore* pstore ) {
// 注:state 声明该变量会被多个 wait/waitNext 使用
state GenerationRegVal v;
state OnDemandStore& store = *pstore;
// SOMEDAY: concurrent access to different keys?
// 注:loop 为宏定义的 while(true),使用 choose .. when 语法来监听 interf.read/interf.write 两个 stream
loop choose {
when ( GenerationRegReadRequest _req = waitNext( interf.read.getFuture() ) ) {
TraceEvent("GenerationRegReadRequest").detail("From", _req.reply.getEndpoint().address).detail("K", printable(_req.key));
state GenerationRegReadRequest req = _req;
Optional<Value> rawV = wait( store->readValue( req.key ) );
v = rawV.present() ? BinaryReader::fromStringRef<GenerationRegVal>( rawV.get(), IncludeVersion() ) : GenerationRegVal();
TraceEvent("GenerationRegReadReply").detail("RVSize", rawV.present() ? rawV.get().size() : -1).detail("VWG", v.writeGen.generation);
if (v.readGen < req.gen) {
v.readGen = req.gen;
store->set( KeyValueRef( req.key, BinaryWriter::toValue(v, IncludeVersion()) ) );
Void _ = wait(store->commit());
}
req.reply.send( GenerationRegReadReply( v.val, v.writeGen, v.readGen ) ); // 注 req.reply 是一个 promise
}
when ( GenerationRegWriteRequest _wrq = waitNext( interf.write.getFuture() ) ) {
state GenerationRegWriteRequest wrq = _wrq;
Optional<Value> rawV = wait( store->readValue( wrq.kv.key ) );
v = rawV.present() ? BinaryReader::fromStringRef<GenerationRegVal>( rawV.get(), IncludeVersion() ) : GenerationRegVal();
if (v.readGen <= wrq.gen && v.writeGen < wrq.gen) {
v.writeGen = wrq.gen;
v.val = wrq.kv.value;
store->set( KeyValueRef( wrq.kv.key, BinaryWriter::toValue(v, IncludeVersion()) ) );
Void _ = wait(store->commit());
TraceEvent("GenerationRegWrote").detail("From", wrq.reply.getEndpoint().address).detail("Key", printable(wrq.kv.key))
.detail("reqGen", wrq.gen.generation).detail("Returning", v.writeGen.generation);
wrq.reply.send( v.writeGen );
} else {
TraceEvent("GenerationRegWriteFail").detail("From", wrq.reply.getEndpoint().address).detail("Key", printable(wrq.kv.key))
.detail("reqGen", wrq.gen.generation).detail("readGen", v.readGen.generation).detail("writeGen", v.writeGen.generation);
wrq.reply.send( std::max( v.readGen, v.writeGen ) );
}
}
}
};
网友评论