前言
Promise/Future是一种异步编程机制,第一次见是在openwhisk中,由scala语言提供的语言特性,加上对scala本身就不熟悉,可给我恶心坏了。后来发现原来C++中也有Promise的实现,功能上比较简单的。
而本文中的Promise,并非C++标准中的std::promise
,而是Pistache为了进行异步编程,对 Promises/A+ 标准的实现,在功能上类似与JavaScript的Promise,关于JS的Promise,可以参考:Using Promises - JavaScript | MDN
本文的题目和之前写的不同,缺了“源码分析”四个字,因为的确是不好分析,Pistache实现这一部分用了将近1700行的代码,而且几乎都是复杂的模板类,对于我这样的C++小白,可读性太差了。好容易才从功能上摸的差不多,所以源码实现就不分析了,只求搞清楚怎么用,怎么个异步就行了。
定义
为了使用Promise,我们需要定一个Promise对象:
Async::Promise<int> p([success](Async::Resolver& resolve,
Async::Rejection& reject) {
doSomeAsync(success, resolve, reject);
printf("2\n");
});
或者
Async::Promise<int> p1([success](Async::Deferred<int> deferred) {
doSomeAsync(success, deferred);
printf("2\n");
});
根据我对源码的分析,两者其实并没有太的区别,Async::Deferred仅仅就是把Async::Resolver和Async::Rejection封装到了一个类里面。
可以看到,这是一个模板类:template <typename T> Async::Promise
而模板的类型,就是Promise给我们的“保证”的值的类型。异步操作有没有完成的标志就是,是否填充了此值。而填充的方法就是使用Async::Resolver提供的接口。
定义Promise对象时,需要传入一个处理函数,此函数将执行异步操作,函数的参数是Async::Resolver和Async::Rejection,这是Promise提供的进行数据填充或者发生异常时调用的接口。
异步处理
我们用一个函数在 表示 异步操作,他们分别对应了两类接口定义:
void doSomeAsync(bool success, Async::Resolver& resolver, Async::Rejection& rejection)
{
if (success)
resolver(3);
else
rejection("3");
}
void doSomeAsync(const bool success, Async::Deferred<int>& deferred)
{
if (success)
deferred.resolve(3);
else
deferred.reject("3");
}
成功时我们使用resolver(3)
进行数据填充,当发生异常时,使用 rejection("3")
发出异常信息,注意,在这里rejection接口,其实是调用了std::make_exception_ptr()进行了异常的封装,这部分可以参考我写的《学习 C++ 的异常处理》
回调函数
我们使用then()函数添加回调:
- 如果添加回调时,数据已经填充或者异常,那么将直接执行回调
- 填充数据或者产生异常时,如果存在回调,那么将直接执行回调
then()函数使用如下,需要传入两个函数,第一个用于处理数据成功填充时的回调,第二个处理发生异常时的回调,他们的参数分别是resolver填充的数据或者rejection抛出的异常,注意,异常使用了exception_ptr来进行传递。
p.then(
[](int a) {
printf("%d\n", a);
},
[](std::exception_ptr& eptr) {
try
{
std::rethrow_exception(eptr);
}
catch (const char* e)
{
printf("%s\n", e);
}
});
执行顺序
下面是完整的代码,结果将按数字顺序(1->2->3->4)输出,这是为啥呢?
- 函数main()开始执行,打印 “1”
- 创建Promise对象,进入到异步处理流程,首先调用了doSomeAsync
- 显然doSomeAsync()并不是异步操作,因此会直接执行resolver(3);
- 此时并没有设置回调,因为then()还没开始执行呢,但是数据填充已经完成
- doSomeAsync返回后,继续执行,打印 “2”
- Promise对象p创建完毕
- 添加回调函数then(),由于此时数据已经填充,那么将直接执行回调,打印 “3”
- then() 执行结束,继续执行打印 “4”
那我们假设doSomeAsync()就是异步执行的,顺序会是怎么样的呢?(假设异步执行需要一定的时间才会填充数据):
- 函数main()开始执行,打印 “1”
- 创建Promise对象,进入到异步处理流程,首先调用了doSomeAsync
- 因为doSomeAsync()是异步的,因此立刻返回
- 线程继续执行,打印 “2”
- Promise对象p创建完毕
- 添加回调函数then(),因为数据没有填充,因此回调函数不会执行
- then() 执行结束,继续执行打印 “4”
- doSomeAsync() 经过一段时间的运行,终于填充了数据,此时回调函数已经设置,因此调用回调函数,打印 "3"
顺序为:1->2->4->3
完整代码:
void doSomeAsync(bool success, Async::Resolver& resolver, Async::Rejection& rejection)
{
if (success)
resolver(3);
else
rejection("3");
}
void doSomeAsync(const bool success, Async::Deferred<int> &deferred)
{
if (success)
deferred.resolve(3);
else
deferred.reject("3");
}
int main()
{
printf("1\n");
bool success = true;
Async::Promise<int> p([success](Async::Resolver& resolve,
Async::Rejection& reject) {
doSomeAsync(success, resolve, reject);
printf("2\n");
});
Async::Promise<int> p1([success](Async::Deferred<int> deferred) {
doSomeAsync(success, deferred);
});
p.then(
[](int a) {
printf("%d\n", a);
},
[](std::exception_ptr& eptr) {
try
{
std::rethrow_exception(eptr);
}
catch (const char* e)
{
printf("%s\n", e);
}
});
printf("4");
}
实现异步
那如何实现异步的呢,我们可以使用多线程,或者是基于epoll和eventfd,后者就是Pistache实现异步写的方法,我们在Pistache源码分析 —— Server的初始化和请求处理中的peerQueue中提到过。
1、多线程
最简单的方法就是使用多线程,我们修改doSomeAsync()如下:
void doSomeAsync(bool success, Async::Resolver& resolver, Async::Rejection& rejection)
{
if (success)
std::thread([&resolver]() {
sleep(1);
resolver(3);
}).detach();
else
std::thread ([&rejection]() {
sleep(1);
rejection("3");
}).detach();
}
网友评论