美文网首页
Asio无栈协程

Asio无栈协程

作者: chnmagnus | 来源:发表于2021-02-09 17:08 被阅读0次

    对于Asio无栈协程做一个比较全面的导引,从使用、原理和实现三个方面进行叙述。

    官方文档:Asio Stackless Coroutine

    1 引入

    Asio Stackless Coroutine不同于一般依赖编译器实现的无栈协程,其实现方式很巧妙,完全依赖c++本身的函数机制和语言特性实现(Duff's Device / switch)。

    其初衷是 无缝结合asio的async api,同时屏蔽掉异步编程的复杂性,通过引入极少几个宏,提供一种简单的同步组织代码的方式。

    使用传统的异步+回调,逻辑代码会被迫拆分成多段,通过多次回调才能完成一次逻辑,比如要发起一次http访问,代码可能像这样:

    void DoHttpCall(string host) {
        // 解析域名,并注册下一步回调函数
        _resolver.async_resolve(host, "http", HandleResolve);
    }
    // 建立连接
    void HandleResolve(const asio::error_code& ec,
        const tcp::resolver::results_type& endpoints){
        if (!ec){
          asio::async_connect(_socket, endpoints, HandleConnect);
        }
    }
    // 发送请求
    void HandleConnect(const asio::error_code& ec){
        if (!ec){
            asio::async_write(_socket, _request, HandleWrite);
        }
    }
    // 读取响应并解析处理
    void HandleWrite(const asio::error_code& ec) {
        // 通过async_read系列函数读取响应并解析
        // 后面应该还会有 反复多次 HandleReadLine 逐行解析http头部
        // 然后是 HandleContent 读取http body 等等调用
        // 此处略
    }
    

    如果使用asio无栈协程,则可以把所有逻辑流畅得放在一个函数里,代码会像这样(略去了错误处理):

    void Do(std::shared_ptr<Context> client_ctx, asio::coroutine coro,
            asio::error_code ec, std::size_t length,
            asio::ip::tcp::resolver::results_type results) {
    
        auto gen_io_func = [&]() {
            return std::bind(&Do, client_ctx, coro, std::placeholders::_1,
                             std::placeholders::_2,
                             asio::ip::tcp::resolver::results_type());
        };
    
        reenter(coro) {
            // 解析域名
            yield {
                auto result_func =
                    std::bind(&Do, client_ctx, coro, std::placeholders::_1, 0,
                              std::placeholders::_2);
                client_ctx->_resolver.async_resolve(client_ctx->_request._host,
                                                    client_ctx->_request._port,
                                                    result_func);
            }
            // 建立连接
            yield {
                auto conn_func =
                    std::bind(&Do, client_ctx, coro, std::placeholders::_1, 0,
                              asio::ip::tcp::resolver::results_type());
                client_ctx->_socket.async_connect(*results, conn_func);
            }
            // 发送请求
            yield async_write(client_ctx->_socket, client_ctx->_request.ToBuffers(),
                              gen_io_func());
    
            client_ctx->_buffer.fill(0);
            // 读取并解析响应
            do {
                yield client_ctx->_socket.async_read_some(
                    asio::buffer(client_ctx->_buffer), gen_io_func());
                std::tie(client_ctx->_valid_reply, std::ignore) =
                    client_ctx->_reply_parser.Parse(
                        client_ctx->_reply, client_ctx->_buffer.data(),
                        client_ctx->_buffer.data() + length);
            } while (client_ctx->_valid_reply ==
                     ParseHelper::Result::indeterminate);
            // 处理响应
            // ...
        }
    }
    

    可以看到,使用了协程之后,所有的逻辑都被以同步的方式放在了一个函数中处理,其可维护性和可读性都有很大提升。
    完整代码见这里.

    2 使用

    本章结合官方文档对协程所涉及的语法进行描述。对于官方文档中已经提及的内容,不会再重复赘述,更多是一些补充和扩展。

    2.1 语法

    基于asio::coroutine实现协程,每一个协程必定包含一个coroutine实例,一个包含reenter、yield的逻辑函数,一般如下:

    class Session : asio::coroutine {
    public:
        void operator()(error_code ec, std::size_t length = 0) {
            // 这里的逻辑,每次重新进入协程都会执行
            // 比如,可以在这里放置一些通用的错误处理逻辑
            if (!err) {
                // error handle
                return ;
            }
    
            // 协程内的具体业务逻辑要放在reenter内
            reenter(this) {
                // 记录当前位置,发起async_accept调用,并切出协程
                yield _acceptor->async_accept(_socket, *this);
                // 如果有连接进来,触发回调,回到此处继续执行
    
                // 记录当前位置,发起async_read调用,并切出协程
                yield _socket->async_read_some(asio::buffer(*_buffer), *this);
                // 如果有数据可读,触发回调,回到此处继续执行
            }
    
            // 这里的逻辑,每次切出协程均会执行
        }
    };
    

    下面对使用协程所涉及的各个关键字进行描述。

    class coroutine

    每个协程都需要存储其本身的状态,为此,有了class coroutine。coroutine类可以拷贝构造、可以赋值,其数据成员只有一个int,消耗很少。

    class coroutine {
    public:
      coroutine() : value_(0) {}
      /// Returns true if the coroutine is the child of a fork.
      bool is_child() const { return value_ < 0; }
      /// Returns true if the coroutine is the parent of a fork.
      bool is_parent() const { return !is_child(); }
      /// Returns true if the coroutine has reached its terminal state.
      bool is_complete() const { return value_ == -1; }
    private:
      friend class detail::coroutine_ref;
      int value_;
    };
    

    coroutine一般有两种用法。一种是作为基类使用:

    class session : coroutine {
        void operator()(error err);
        ...
    };
    

    或者作为类的一个成员变量:

    class session {
        void operator()(error err);
        ...
        coroutine coro_;
    };
    

    具体的协程逻辑则在类的operator()运算符中实现。

    这里思路可以放开一点,可以直接抛弃class,直接将其作为函数的一个参数,结合std::bind进行传递(可以参见上一节的代码片段)。只要在协程的存在期间,能够维持coroutine对象的拷贝即可,具体的方式并不重要。

    void RunCoro(coroutine coro, error err);
    

    reenter

    reenter用来定义协程的主体,协程的主要逻辑要放在reenter限定的作用域内。

    协程采用继承coroutine方式实现时,你的代码会像这样:

    reenter (this) {
        // coroutine body
    }
    

    如果采用成员变量或者函数形式实现,则像这样:

    reenter (_coro) {
        // coroutine body
    }
    

    后文提及的关键字(yield、fork)只能在reenter限定的范围内使用;这样才能保证协程重新切回后,能够继续之前的位置执行。另外,要注意,reenter是基于switch实现,所以在协程体内,有着和switch一样的限制,要时刻留意临时变量的定义问题。

    yield

    yield关键字的用法,官方文档描述的很清晰,不在此赘述。

    • yield statement ;
    • yield { statements }
    • yield return expression ;
    • yield ;
    • yield break ;

    一个小的窍门是用yield {}来规避switch的局部变量限制:

    yield {
        auto conn_func =
            std::bind(&Do, client_ctx, coro, std::placeholders::_1, 0,
                        asio::ip::tcp::resolver::results_type());
        client_ctx->_socket.async_connect(*results, conn_func);
    }
    

    要注意一个非常容易踩且比较隐晦的坑:

    class downloader : asio::coroutine {
        using socket_t = boost::asio::ip::tcp::socket;
        using resolver_t = boost::asio::ip::tcp::resolver;
    
        std::shared_ptr<socket_t> socket_;
        std::shared_ptr<resolver_t> resolver_;
    
        const std::string file_;
        const std::string host_;
        size_t fileSize_{};
    
    public:
        void operator()(error_code ec = error_code(), std::size_t length = 0,
                      const resolver_t::results_type &results = {}) {
    
            // Check if the last yield resulted in an error.
            if (ec &7 ec != eof) {
                throw std::runtime_error(ec.message());
            }
    
            // Jump to after the previous yield.
            reenter(this) {
                // 问题根源在这里
                yield {
                    resolver_t::query query{host_, "80"};
    
                    // Use bind to skip the length parameter not provided by async_resolve
                    auto result_func = std::bind(&downloader::operator(), this, _1, 0, _2);
    
                    resolver_->async_resolve(query, result_func);
                }
    
                // 这里会偶发段错误
                yield socket_->async_connect(*results, *this);
    
                // 略去其他逻辑
                //......
            }
        }
    
    

    上面这段代码,将asio::coroutine作为基类来使用,然后重写了operator()运算符,通过传递*this拷贝类本身来传递coroutine实例,一切看起来很正确?
    问题在于,这种基于基类的方式,和std::bind进行结合时,bind产生的可调用体中仅包含了this指针的拷贝,而不是类对象的拷贝。当类对象被释放后,这个this指针会变成野指针,在协程重新切回之后,下一次的async_connect(*results, *this),对一个野指针进行寻址访问,会发生什么,不必多说。

    完整的错误示范可以见stackoverflow这个问题的accepted answer

    要注意,这个回答是错的,是错的,是错的。

    fork

    fork关键字的用法,官方文档描述的很清晰,不在此赘述。

    • fork statement ;

    fork一般会用在服务端,比如accept到新的连接,启动一个子协程去处理。

    2.2 示例

    这里给出两个示例,基本涵盖了asio::coroutine的所有用法。

    基于asio::coroutine实现的http client
    基于asio::coroutine实现的http server

    3 原理

    3.1 Duff设备与switch case的本质

    Duff's Device

    在上世纪,计算机单核性能较低,人们经常使用一种被称为“循环展开”的方式优化代码,使其在多核处理器和流水线处理器上性能更好。

    简单来说,这样一段循环1000w次的代码:

    int result = 0;
    for (int i = 0; i < 10000000; ++i) {
        sum += i; 
    }
    

    可以修改成这样,只需要循环200w次即可:

    int result = 0;
    for (int i = 0; i < 2000000; i += 5) {
        result += i;
        result += i+1;
        result += i+2;
        result += i+3;
        result += i+4;
    }
    

    Duff在编写一个数组拷贝函数时(将from指针开始count个元素的内容拷贝到to数组),使用到了上述技术。但是,数组长度往往无法被循环展开的次数整除(count%8 != 0),这意味着除了一个while循环之外,还需要一个额外的switch来将n无法整除的部分单独执行。代码如下:

    void copy(char *to, char *from, int count) {
        int n = (count + 7) / 8;
        switch (count % 8) {
            case 0: *to = *from++;
            case 7: *to = *from++;
            case 6: *to = *from++;
            case 5: *to = *from++;
            case 4: *to = *from++;
            case 3: *to = *from++;
            case 2: *to = *from++;
            case 1: *to = *from++;
        }
        while (--n > 0) {
            *to = *from++;
            *to = *from++;
            *to = *from++;
            *to = *from++;
            *to = *from++;
            *to = *from++;
            *to = *from++;
            *to = *from++;
        }
    }
    

    Duff稍加思索,将switch和while结合到一起,写出了如下的代码。

    当函数开始执行时,会根据count%8算出的余数,跳转到case 0-7对应的代码执行,因为没有显式break,switch不会跳出,而是会继续执行跳转位置后方case的内容,这会完成无法整除部分的拷贝。直到执行到while语句,被循环捕获,进入while循环,这会完成整除部分的拷贝。

    void copy(char *to, char *from, int count) {
        int n = (count + 7) / 8;
        switch (count % 8) {
        case 0: do { *to = *from++;
        case 7:      *to = *from++;
        case 6:      *to = *from++;
        case 5:      *to = *from++;
        case 4:      *to = *from++;
        case 3:      *to = *from++;
        case 2:      *to = *from++;
        case 1:      *to = *from++;
                } while (--n > 0);
        }
    }
    

    switch case的本质

    Duff's Device向我们揭示了这样一个事实:switch case后的语句,并没有我们想象中诸如代码块or作用域之类的限制,case的角色本质上是一种特殊的标签,可以在代码的任何地方放置case,且不会影响其他代码的执行。swtich的角色则是限定switch case标签的作用域,并且维护somthing like jump table,用于case匹配时的跳转。
    如果你打开c标准Labeled statements一节,你会发现 标签、case、default 一同被归为标签语句。

    可以试试这个hello world,体会一下switch case的行为。

    #include <iostream>
    
    using std::cout;
    using std::endl;
    
    int main(int argc, char *argv[]) {
        // int a(0);
        int a(1);
        switch (a) case(0):for(;;) {
            cout<<"hello"<<endl;
            case (1):
            cout<<"world"<<endl;
            return 0;
        }
    }
    

    3.2 基于switch case实现协程

    asio::coroutine正是受Duff's Device的启发,基于switch case这一特性实现的,没有涉及任何汇编。

    协程的切换方式其实和函数很像,但其对比函数的区别是,协程的重要特性是可以返回并继续,当协程被重新切回后,应该能从其切走时的位置继续执行。
    如果要把一个函数改造为协程,一种朴素的想法是,给这个函数增加一个静态变量state,在所有函数要恢复执行的位置,放置一个label。并在函数切走之前,设置state的值标识切回后,函数继续执行的位置。比如这样:

    假定这是一个需要循环对某个server发起心跳探活的函数实现:

    void coro_func(error_code ec) {
        static int state = 0;
        switch(state) {
            case 0: goto LABEL0;
            case 1: goto LABEL1;
            case 2: goto LABEL2;
        }
    LABEL0:
        for(;;) {
            // 发起连接
            state = 1;
            asio::async_connect(socket, endpoints, &coro_func);
            return;
    LABEL1:
            // 写入数据
            state = 2;
            asio::async_write(socket, request, &coro_func);
            return;
    LABEL2:
            // 读取响应并处理,略
            // sleep一段时间,略
        }
    }
    

    这样的“协程”稍显简陋,主要的问题在于,所有的label都要手动设置,并时刻需要维护switch与label的一致性。
    现在是Duff's Device大展身手的时候了:

    void coro_func(error_code ec) {
        static int state = 0;
        switch(state) {
            case 0:
            for(;;) {
                // 发起连接
                state = 1;
                asio::async_connect(socket, endpoints, &coro_func);
                return;
            case 1:
                // 写入数据
                state = 2;
                asio::async_write(socket, request, &coro_func);
                return;
            case 2:
                // 读取响应并处理,略
                // sleep一段时间,略
            }
        }
    }
    

    现在只剩下两个问题:

    1. 将函数开头用于保存恢复位置的静态变量state用其他方式实现(多线程成精局部静态变量是非线程安全的),可以考虑使用类成员变量,或者函数参数等方式记录状态
    2. 现在我们仍然需要手动编写switch case,更好的方式是基于__LINE__封装一组宏,自动实现跳转恢复的逻辑,以保证代码的可读性和易用性

    来看看asio::coroutine是如何实现的。

    4 实现

    asio::coroutine代码非常少,全部代码不过100行,下面贴出代码,并在每个代码片段之前以及代码中的注释中给出解释。

    在看代码之前,你要理清几点:

    1. 基于Duff's Device的协程,其本质上是可以恢复并继续执行的函数,每一次切出,都是正常的函数return
    2. 协程的恢复(切回),都是正常的函数调用,都会从协程函数的开头开始执行,执行到reenter时,通过reenter宏定义的switch,基于你传入的class corourine对象中的value_跳转到具体的case继续执行
    3. 函数的恢复(切回)这一行为的触发,asio::coroutine没有定义,如果结合asio使用,切回的实现,是将协程函数注册为async_xxx系列函数的回调,然后在事件ready后,回调触发协程恢复,这可以让coroutine和异步接口无缝衔接

    class coroutine用于替代上一节中 静态变量 state 的作用,主要作用是通过value_成员记录协程恢复后继续执行的位置。
    除此之外,还通过一些特殊的取值,比如-1来标识协程退出等状态(其实这些完全可以单独搞一个bool,放在一起是为了节省内存使用和拷贝的消耗)。

    // 协程上下文
    class coroutine {
    public:
      /// Constructs a coroutine in its initial state.
      coroutine() : value_(0) {}
    
      // 用于在fork关键字后,区分当前是子协程还是父协程
      // 要注意的是,is_child只有在子协程第一次进入,没有执行过yield进行切出前有效
      // 一旦你进行了切出,子协程的value_也会被重置为正值
      // Returns true if the coroutine is the child of a fork.
      bool is_child() const { return value_ < 0; }
      // Returns true if the coroutine is the parent of a fork.
      bool is_parent() const { return !is_child(); }
    
      // value_如果等于-1 代表协程已经结束
      // 详见 ~coroutine_ref() 注释
      bool is_complete() const { return value_ == -1; }
    
    private:
      friend class detail::coroutine_ref;
      int value_;
    };
    

    class coroutine 对象的生命周期贯穿整个协程,而coroutine_ref在每次进入协程时创建,在每次切出协程后释放。
    同时协程中所有对class coroutine中value_的修改,都是通过coroutine_ref进行的。

    // 一个对于class coroutine的引用类
    class coroutine_ref {
    public:
      // 从class coroutine对象构造
      coroutine_ref(coroutine& c) : value_(c.value_), modified_(false) {}
      coroutine_ref(coroutine* c) : value_(c->value_), modified_(false) {}
    
      // 如果是通过yield临时切出协程,则一定会修改value_,即modified_一定为true
      // 只有通过return、抛出异常,或者函数正常执行到末尾退出协程函数,modified_才会是false
      // 即,当modifed_为false时,代表协程已经结束,此时将value_设为-1,表示协程已经complete
      ~coroutine_ref() { if (!modified_) value_ = -1; }
    
      // 如果对value_进行修改,同步将modified_设为true
      int& operator=(int v) { modified_ = true; return value_ = v; }
    
      operator int() const { return value_; }
    private:
      void operator=(const coroutine_ref&);
      int& value_;
      bool modified_;
    };
    

    reenter 是所有协程都要包含的一个宏,用于开始协程体的定义,定义switch,以及一些特殊的case。

    case 0后的逻辑在协程第一次进入时跳转执行。

    如果是协程重新切回,则reenter中的switch会将其跳转到切出前的位置继续执行。

    case -1后的逻辑,一方面,用于处理协程已经退出,但又被重新调度执行的情况;另一方面,可由其他位置的goto语句(比如yield)进入,执行协程切换时的退出逻辑。
    terminate_coroutine 标签后的代码 定义了终止协程要做的逻辑。
    bail_out_of_coroutine 标签后的代码 定义了退出协程要做的逻辑。

    // reenter的实现
    #define ASIO_CORO_REENTER(c) \
      switch (::asio::detail::coroutine_ref _coro_value = c) \
        case -1: if (_coro_value) \
        { \
          goto terminate_coroutine; \
          terminate_coroutine: \
          _coro_value = -1; \
          goto bail_out_of_coroutine; \
          bail_out_of_coroutine: \
          break; \
        } \
        else /* fall-through */ case 0:
    
    #ifndef reenter
    # define reenter(c) ASIO_CORO_REENTER(c)
    #endif
    
    

    yield 在需要临时切出当前协程,或者要终止协程时调用。

    for (_coro_value = (n);;) 此类写法起到赋值并延展作用域的作用。

    if (_coro_value == 0) 此判断永远为false,主动调用yield不会进入该分支,此分支判断中的代码,仅在协程切回,通过switch case jump进入执行。执行逻辑仅一个break,效果是跳出一开始的for作用域,直接继续执行yield语句后方的代码。

    else switch (_coro_value ? 0 : 1)后的代码比较复杂,主要是为了实现yield的多种功能,在看这里的代码之前,先回顾下yield的几种用法。yield共有五种用法,在这里,我们把它分成两类:

    • 第一类
      • yield statement ;
      • yield { statements }
      • yield return expression ;
      • yield ;
    • 第二类
      • yield break ;

    第一类,yield后跟一个用户自定义的语句或者空语句,此时yield的行为是:
    记录当前的执行位置 -> 执行用户定义的语句 -> 跳出reenter限定的作用域执行函数末尾的代码然后返回
    此后,协程如果被恢复执行,会接着yield下一行代码继续执行。

    第二类,yield接break,此时的yield的行为是:
    终止协程的运行(将value_设为-1) -> 跳出reenter限定的作用域执行函数末尾的代码然后返回
    此后,协程完全终止,不再可以被恢复执行。

    回到代码,else switch (_coro_value ? 0 : 1)中的_coro_value恒大于0,所以一定会跳到末尾的case 0执行,此时执行用户定义的语句。

    • 如果用户的语句是return,则临时切出协程,并返回一个值。
    • 如果用户的语句是break,则会跳出else for (;;) case 1:所在的for循环,进入for(;;) case -1:的循环,然后执行goto terminate_coroutine;,将value_设为0,标识协程终止,并跳出reenter作用域,执行函数末尾的逻辑,然后返回。
    • 如果用户的语句不是breakreturn,则执行完用户的语句后,会进入for (;;) case 1:所在for循环,然后执行goto bail_out_of_coroutine;,跳出reenter作用域,执行函数末尾的逻辑,然后返回。
    // yield的实现
    #define ASIO_CORO_YIELD_IMPL(n) \
      for (_coro_value = (n);;) \
        if (_coro_value == 0) \
        { \
          case (n): ; \
          break; \
        } \
        else \
          switch (_coro_value ? 0 : 1) \
            for (;;) \
              /* fall-through */ case -1: if (_coro_value) \
                goto terminate_coroutine; \
              else for (;;) \
                /* fall-through */ case 1: if (_coro_value) \
                  goto bail_out_of_coroutine; \
                else /* fall-through */ case 0:
    
    # define ASIO_CORO_YIELD ASIO_CORO_YIELD_IMPL(__LINE__)
    
    #ifndef yield
    # define yield ASIO_CORO_YIELD
    #endif
    

    fork 在创建子协程时调用。

    仔细看代码,你会发现,这里没有做任何与fork相关的操作,创建和启动子协程的操作是由用户编写的。

    这里做的仅仅是在执行用户提供的子协程代码之前,将coro_value改为负值;
    用户的子协程构造代码,会把 class coroutine 拷贝一份,拷贝构造出的class coroutine中的value
    即为负值。

    在用户子协程函数返回之后,再把_coro_value改回正值,并继续后面的代码执行。

    // fork的实现
    #define ASIO_CORO_FORK_IMPL(n) \
      for (_coro_value = -(n);; _coro_value = (n)) \
        if (_coro_value == (n)) \
        { \
          case -(n): ; \
          break; \
        } \
        else
    
    # define ASIO_CORO_FORK ASIO_CORO_FORK_IMPL(__LINE__)
    
    #ifndef fork
    # define fork ASIO_CORO_FORK
    #endif
    

    相关文章

      网友评论

          本文标题:Asio无栈协程

          本文链接:https://www.haomeiwen.com/subject/bsrcxltx.html