美文网首页
关于boost的async_与io_serivce.run的联系

关于boost的async_与io_serivce.run的联系

作者: 高斯_Cpper | 来源:发表于2019-02-18 11:55 被阅读0次

    前序:
    现在很多服务器都使用boost::asio作为异步socket通信,但很多人只会copy其中的代码,却不了解io_serivce与async_函数之间的联系,下面我们就来深入了解它们之间的关系,并怎么利用它们来实现高性能服务器。

    我们知道async_的函数所绑定的函数是异步的形式,它在系统完成后并不会立即回调(io_serivce.run没有调用),而是要等到io_serivce.run时才会调用,所以io_serivce.run是一个死循环函数。

    由于高性能服务器通常会有两种方式去实现
    一、多线程io收发数据,主线程处理逻辑(当前不考虑多进程的情况)
    二、多线程io收发数据,多线程处理逻辑(这种情况需要考虑到线程安全问题)

    如果让io_serivce实现多线程收发数据是关键,我们了解到io_serivce.run是处理系统io数据的回调,所以可以用不同的线程去调用run()函数,这样就实现了多线程io的回调,但问题在于客户端的连接处理也是在不同的线程,这样就会有线程安全问题了。
    -------------------------------------------------------------例子 ---------------------------------------------------------
    int main()
    {
    boost::asio::io_service io_svc;
    std::vector<std::thread> vecThread;
    for (int i = 0; i < 5; ++i)
    {
    vecThread.emplace_back(std::thread({ io_svc.run(); }));
    }
    return 0;
    }

    如果按上面的代码将一个io_svc 分配多个线程按并发处理,处理的对象是TCP 服务时。发现对同一个TCP 客户端的连接的处理会在多个线程中。

    如果是这样的话,那这一个客户端发送过来数据被分成多段后,在服务端会不会乱,而这样我是不是需要加锁控制,可如果加锁的话又应该怎么加呢?

    按时我的想法是,如果 是同一个话,它如果只在同一个线程中那么这个我就不用担心了,因为它会顺序到达,那么我收到的头部和数据部就不会被分到两个线程中去了。

    这主要问题是TCP 服务端使用异步的方式接收客户端的数据,首先收到固定的头部数据,然后解析头部得到数据部的长度,然后再次异步接收实际的数据。如果这两个步骤被分到了两个线程中去了。导致了这两个部分发生了并发操作,那就尴尬了。

    以上是摘子网友的一个疑问,确实是会产生接收到数据会到不同的线程中去,所以在使用async_函数要加以控制,能使它合理的回调。
    下面某个网友回复的做法

    方法:单个线程用于数据接收,放到一个队列,开多个消费线程用于数据处理

    这种方法是可以做到线程安全的问题,但问题在于io效率比较低,所以不推荐。

    比较好的做法,多线程收数据,这里采用收head再收body,async这里要绑定具体长度,当收到数据时,写入buffer时,要加锁,原因是处理事务的线程也要取buffer的数据,所以要锁。

    下面收数据的一个例子

    void NetSocket::ReadMsg(mutable_buffers_1& _buffers,uint32_t dataLen)
    {
        async_read(*this, _buffers, transfer_exactly(dataLen),
            boost::bind(&NetSocket::RecvMsg, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
        TimeoutStart();  // 处理长时间未收到数据的socket
    }
    
    void NetSocket::RecvMsg(const boost::system::error_code& ec, size_t bytes_transferred)
    {
        TimeoutCancel();  // 取消超时处理
        if (ec)
        {
            printf("[ERROR]:Recv error msg,%s\n", ec.message().c_str());
            assert(0);
            SetLocalClose(true);
            (m_pNotify)(*this);
            return;
        }
    
        if (m_eRecvStage == REVC_FSRS_HEAD)   //处理头
        {
            m_eRecvStage = REVC_FSRS_BODY;
            uint32_t nextSizeLen = 0;
            memcpy(&nextSizeLen, m_arrRecvBuffer, bytes_transferred);
            ReadMsg(m_cRecvBodyBuffer, nextSizeLen);
        }
        else   //处理body,这里收到的长度刚好为一个body
        {
            
            uint32_t nextSizeLen = 0;
            memcpy(&nextSizeLen, m_arrRecvBuffer, PACKAGE_HEADER_SIZE);
            assert(nextSizeLen == bytes_transferred);
    
            boost::mutex::scoped_lock lock(m_writeBufferMutex);
            DataBuffer* dataBuff = new DataBuffer((uint8_t*)(m_arrRecvBuffer + PACKAGE_HEADER_SIZE), bytes_transferred);
            if (recvQueueHeader_ == NULL && recvQueueLastest_ == NULL)
            {
                recvQueueHeader_ = dataBuff;
                recvQueueLastest_ = dataBuff;
            }
            else
            {
                recvQueueLastest_->next = dataBuff;
                recvQueueLastest_ = recvQueueLastest_->next;
            }
            lock.unlock();
            m_eRecvStage = REVC_FSRS_HEAD;
            ReadMsg(m_cRecvHeadBuffer, PACKAGE_HEADER_SIZE);
            (m_pNotify)(*this);
        }
    }
    ```cpp
    -------------------------------
    上面的代码解决了线程安全的问题,现在讲如何开启io_service多线程的代码
    1、在创建一个NetServer时,就预先创建一定数量的socket,如1000个
    2、同时绑定1000个socket的async_accept,这就相当于可以同时并发1000个连接过来
    3、创建8个线程去调用io_service.run()。
    
    

    相关文章

      网友评论

          本文标题:关于boost的async_与io_serivce.run的联系

          本文链接:https://www.haomeiwen.com/subject/xhooeqtx.html