美文网首页BOOST
boost Strands Internal

boost Strands Internal

作者: Brent姜 | 来源:发表于2017-09-29 00:25 被阅读17次

    我对boost.asio的使用不深入,所以总是过了一段时间就会对它的理解上又变得模糊起来。
    还是先回头看看boost.asio的一些基础知识。

    ASIO攻破!!!这篇文章有一个蛮不错的比喻:

    其实io_service就像是劳工中介所,而一个线程就是一个劳工,调用post的模块相当于富人们,他们去中介所委托任务,而劳工们就听候中介所的调遣去执行这些任务,任务的内容就写在富人们给你的handler上,也就是函数指针,指针指向具体实现就是任务的实质内容。其实在整个过程中,富人们都不知道是哪个劳工帮他们做的工作,只知道是中介所负责完成这些就可以了。这使得逻辑上的耦合降到了最低。不过这样的比喻也有个不恰当的地方,如果硬要这样比喻的话,我只能说:其实劳工里面也有很多富人的o! 。很多劳工在完成任务的过程中自己也托给中介所一些任务,然后这些任务很可能还是自己去完成。这也难怪,运行代码的总是这些线程,那么调用post的肯定也会有这些线程了,不过不管怎么说,如此循环往复可以解决问题就行。

    多说一句,允许有多个劳工,也就允许有多个中介所。换句话说,你可以实现1+n模式(让一个中介所为多个劳工服务),也可以实现为n*(1+1)模式(一个中介所只为一个劳工服务,有多个这样的搭配),当然还可以实现为n+m模式(多个中介所共同为多个劳工服务)。

    对于n*(1+1)模式,其实相当于对那一部分数据实现了单线程处理模型,类似于一个spsc队列。这里暂不讨论。

    对于1+n模式和n+m模式,情况会复杂一些。
    boost asio io_service与 strand 分析 一文的例子拿来用,他采用了n+m模式(2+3)来验证strand的作用。这种模式也就是官方文档中说的线程池模型:

    Multiple threads may call the run()
    function to set up a pool of threads from which the io_service
    may execute handlers. All threads that are waiting in the pool are equivalent and the io_service
    may choose any one of them to invoke a handler.

    首先是使用了strand的版本。

    #include <iostream>
    #include <boost/shared_ptr.hpp>
    #include <boost/asio.hpp>
    #include <iostream>
    #include <boost/bind.hpp>
    #include <boost/thread/thread.hpp>
    #include <boost/thread/mutex.hpp>
    #include <boost/date_time/posix_time/posix_time.hpp>
    using namespace std;
    using namespace boost;
    using namespace asio;
    
    typedef boost::asio::io_service ioType;
    typedef boost::asio::strand strandType;
    ioType m_service;
    strandType m_strand(m_service);
    boost::mutex m_mutex;
    
    
    void print( int fatherID)
    {
    //  boost::mutex::scoped_lock lock(m_mutex);
      static int count = 0;
      cout<<"fatherID "<<fatherID<<" "<<endl;
      sleep(1);
      cout<<"count "<<count++<<endl;
    }
    
    void ioRun1()
    {
      while(1)
        {
          m_service.run();
        }
    }
    //
    void ioRun2()
    {
      while(1)
        {
          m_service.run();
        }
    }
    
    void print1()
    {
      m_strand.dispatch(bind(print,1));
    //  cout<<"over"<<endl;
    }
    
    void print2()
    {
      m_strand.post(bind(print,2));
    }
    
    void print3()
    {
      m_strand.post(bind(print,3));
    }
    
    int main()
    {
      boost::thread t0(ioRun1);
      boost::thread t(ioRun2);
      boost::thread t1(print1);
      boost::thread t2(print2);
      boost::thread t3(print3);
      cout<<"111111"<<endl;
      t1.join();
      t2.join();
      t3.join();
      t0.join();
      t.join();
      cout<<"ads"<<endl;
      return 0;
    }
    

    最终输出结果:

       fatherID 1 
     count 0
     fatherID 2 
     count 1
     fatherID 3 
       count 2
    

    说明这是线程安全的!

    但是 而 io_service 不能保证:更改程序:

    void print1()
    {
      m_service.dispatch(bind(print,1));
    //  cout<<"over"<<endl;
    }
    
    void print2()
    {
      m_service.post(bind(print,2));
    }
    
    void print3()
    {
      m_service.post(bind(print,3));
    }
    

    输出结果:

    fatherID 3 
    fatherID 2 
    count 0
    fatherID 1 
    count 1
    count 2
    

    很显然,这里存在并发的问题。所以strand就是为了解决这样的问题而出现的。

    How strands guarantee correct execution of pending events in boost.asio一文的这个回答说:

    The strand manages all handlers posted to it in a FIFO queue. When the queue is empty and a handler is posted to the strand, then the strand will post an internal handle to the io_service. Within the internal handler, a handler will be dequeued from the strand's FIFO queue, executed, and then if the queue is not empty, the internal handler posts itself back to the io_service.

    翻译过来,就是说strand使用一个FIFO队列来管理handlers(也就是函数对象)。

    asio::async_write and strand的问题是,
    asio::async_write(m_socket, asio::buffer(buf, bytes),
    custom_alloc(m_strand.wrap(custom_alloc(_OnSend))));

    Does this code guarantee that all asynchronous operation handlers(calls to async_write_some) inside async_write are called through strand? (or it's just for my_handler?)

    Tanner Sansbury的回答很细致:
    With the following code:

    asio::async_write(stream, ..., custom_alloc(m_strand.wrap(...)));
    

    For this composed operation, all calls to stream.async_write_some() will be invoked within m_strand if all of the following conditions are true:

    • The initiating async_write(...) call is running within m_strand():

       assert(m_strand.running_in_this_thread());
       asio::async_write(stream, ..., custom_alloc(m_strand.wrap(...)));
      
    • The return type from custom_alloc is either:

    • the exact type returned from strand::wrap()

            template <typename Handler> 
            Handler custom_alloc(Handler) { ... }
    
            template <class Handler>
            class custom_handler
            {
            public:
              custom_handler(Handler handler)
                : handler_(handler)
              {}
    
              template <class... Args>
              void operator()(Args&&... args)
              {
                handler_(std::forward<Args>(args)...);
              }
    
              template <typename Function>
              friend void asio_handler_invoke(
                Function intermediate_handler,
                custom_handler* my_handler)
              {
                // Support chaining custom strategies incase the wrapped handler
                // has a custom strategy of its own.
                using boost::asio::asio_handler_invoke;
                asio_handler_invoke(intermediate_handler, &my_handler->handler_);
              }
    
            private:
              Handler handler_;
            };
    
            template <typename Handler>
            custom_handler<Handler> custom_alloc(Handler handler)
            {
              return {handler};
            }
    

    See this answer for more details on strands, and this answer for details on asio_handler_invoke.

    相关文章

      网友评论

        本文标题:boost Strands Internal

        本文链接:https://www.haomeiwen.com/subject/kfxbextx.html