美文网首页
Bitcoin Core中的多线程模型

Bitcoin Core中的多线程模型

作者: fooboo | 来源:发表于2016-12-08 21:50 被阅读273次

    有点郁闷的是,上一篇博文写了好久,本来是已经写好了的,但只要在read下面添加内容,就导致chrome挂了,试了很多次了,不知道为啥,但那个还是写完,后期会补在原来博客的下面。


    由于后台程序是多线程模型,其中有个模块是多个线程处理rpc请求,从队列中取任务,处理并作响应,下面好些用法与C++11相关[工作中没有用过,只是知道些,有时间看看相关源码实现]。先来看线程初始化:

    456     for (int i = 0; i < rpcThreads; i++) {
    457         std::thread rpc_worker(HTTPWorkQueueRun, workQueue);
    458         rpc_worker.detach();
    459     }
    

    其中使用了C++11中的thread,detach的作用是“Detaches the thread represented by the object from the calling thread, allowing them to execute independently from each other.”
    创建rpcThreads个线程,每个线程以HTTPWorkQueueRun回调函数,workQueue为参数初始化:

    358 static void HTTPWorkQueueRun(WorkQueue<HTTPClosure>* queue)
    359 {
    360     RenameThread("bitcoin-httpworker");
    361     queue->Run();
    362 }
    179 static WorkQueue<HTTPClosure>* workQueue = 0;
    

    看看HTTPClosure请求类型的声明:

    117 class HTTPClosure
    118 {
    119 public:
    120     virtual void operator()() = 0;
    121     virtual ~HTTPClosure() {}
    122 };
    
     42 class HTTPWorkItem : public HTTPClosure
     43 {
     44 public:
     45     HTTPWorkItem(std::unique_ptr<HTTPRequest> _req, const std::string &_path, const HTTPRequestH    andler& _func):
     46         req(std::move(_req)), path(_path), func(_func)
     47     {
     48     }
     49     void operator()()
     50     {
     51         func(req.get(), path);
     52     }
     53 
     54     std::unique_ptr<HTTPRequest> req;
     55 
     56 private:
     57     std::string path;
     58     HTTPRequestHandler func;
     59 };
    

    其中:

     36 typedef std::function<bool(HTTPRequest* req, const std::string &)>  HTTPRequestHandler;
    
     53 class HTTPRequest
     54 {
     55 private:
     56     struct evhttp_request* req;
     57     bool replySent;
     59 public:
     60     HTTPRequest(struct evhttp_request* req);
     61     ~HTTPRequest();
     73     std::string GetURI();
     81     RequestMethod GetRequestMethod();
     82 }
    

    上面HTTPRequest是一个完整的rpc请求,有url,有method等其他信息,有些接口没放上来。evhttp_request成员类型是libevent中定义的,这里不放代码了,比如请求内容为{"method":"getblock","params":"0x000000000019d6689c085ae1658 31e934ff763ae46a2a6c172b3f1b60a8ce26f"},那么GetURI返回"/"
    GetRequestMethod返回POSTunique_ptr是属于智能指针中的一类,没有引用计数功能,它持有对象的独有权--两个unique_ptr不能指向一个对象,不能进行复制操作只能进行移动操作。
    move的功能是将一个左值强制转化为右值引用,继而通过右值引用使用该值,以用于移动语义,具体怎么实现的还待仔细研究:-),这儿先放着。WorkQueue的具体实现如下:

    64 template <typename WorkItem>
    65 class WorkQueue
    66 {
    67 private:
    68     /** Mutex protects entire object */
    69     std::mutex cs;
    70     std::condition_variable cond;
    71     std::deque<std::unique_ptr<WorkItem>> queue;
    72     bool running;
    73     size_t maxDepth;
    94 public:
    107    bool Enqueue(WorkItem* item)
    108    {
    109        std::unique_lock<std::mutex> lock(cs);
    110        if (queue.size() >= maxDepth) {
    111            return false;
    112        }
    113        queue.emplace_back(std::unique_ptr<WorkItem>(item));
    114        cond.notify_one();
    115        return true;
    116    }
    118    void Run()
    119    {
    121        while (running) {
    122            std::unique_ptr<WorkItem> i;
    123            {
    124                std::unique_lock<std::mutex> lock(cs);
    125                while (running && queue.empty())
    126                    cond.wait(lock);
    127                if (!running)
    128                    break;
    129                i = std::move(queue.front());
    130                queue.pop_front();
    131            }
    132            (*i)();
    133        }
    134    }
    135    //more code...
    136 }
    

    以上是工作队列的主要代码,放请求任务,从中取任务处理,即调用HTTPWorkItem::operator(),其中使用了stl deque,互斥锁,条件变量。
    其中也使用了在C++11中支持的emplace_back成员函数,省去了临时对象的构造与析构成本,对于大的类对象效果性能更优。
    至于这里为什么先move队头元素再pop_front分两步?因为这里的deque实现了异常安全强保证[当操作因异常而终止,程序的状态应保持不变。],有时拷贝构造函数也会抛出异常,导致队列状态改变,详细知识可以自行搜索。

    124 std::unique_lock<std::mutex> lock(cs);
    125 while (running && queue.empty())
    126     cond.wait(lock);
    

    这么写是防止假唤醒,不能改为if,这里是先获得锁,如果条件不满足,则释放锁,并等待,等收到信号后,在wait返回前再获得锁,...,因为这是个语句块,lock超出作用域范围后自动释放锁了。
    然后注册回调函数,举两个例子:

    602 static const struct {
    603     const char* prefix;
    604     bool (*handler)(HTTPRequest* req, const std::string& strReq);
    605 } uri_prefixes[] = {
    606       {"/rest/tx/", rest_tx},
    607 };
    
    619 RegisterHTTPHandler(uri_prefixes[0].prefix, false, uri_prefixes[0].handler);
    234 RegisterHTTPHandler("/", true, HTTPReq_JSONRPC);
    
    653 void RegisterHTTPHandler(const std::string &prefix, bool exactMatch, const HTTPRequestHandler &h    andler)
    654 {   
    655     LogPrint("http", "Registering HTTP handler for %s (exactmatch %d)\n", prefix, exactMatch);
    656     pathHandlers.push_back(HTTPPathHandler(prefix, exactMatch, handler));
    657 }
    
    181 std::vector<HTTPPathHandler> pathHandlers;
    
    158 struct HTTPPathHandler
    159 {
    160     HTTPPathHandler() {}
    161     HTTPPathHandler(std::string _prefix, bool _exactMatch,HTTPRequestHandler _handler):
    162         prefix(_prefix), exactMatch(_exactMatch), handler(_handler)
    163     {
    164     }
    165     std::string prefix;
    166     bool exactMatch;
    167     HTTPRequestHandler handler;
    168 };
    

    当libevent收到数据时,底层会回调http_request_cb函数:

    249 static void http_request_cb(struct evhttp_request* req, void* arg)
    250 {
    251     std::unique_ptr<HTTPRequest> hreq(new HTTPRequest(req));
    252     //more code ...
    269     std::string strURI = hreq->GetURI();
    270     std::string path;
    271     std::vector<HTTPPathHandler>::const_iterator i = pathHandlers.begin();
    272     std::vector<HTTPPathHandler>::const_iterator iend = pathHandlers.end();
    273     for (; i != iend; ++i) {
    274         bool match = false;
    275         if (i->exactMatch)
    276             match = (strURI == i->prefix);
    277         else
    278             match = (strURI.substr(0, i->prefix.size()) == i->prefix);
    279         if (match) {
    280             path = strURI.substr(i->prefix.size());
    281             break;
    282         }
    283     }
    286     if (i != iend) {
    287         std::unique_ptr<HTTPWorkItem> item(new HTTPWorkItem(std::move(hreq), path, i->handler));
    288         assert(workQueue);
    289         if (workQueue->Enqueue(item.get()))
    290             item.release(); /* if true, queue took ownership */
    291         else {
                    //more code...
    294         }
    295     } else {
    296         hreq->WriteReply(HTTP_NOTFOUND);
    297     }
    298 }
    

    以上就是根据url参数查找有没有相应的处理接口,并形成一个task塞进队列中去[其中path参数后来调用时没用到]。

    147 static bool HTTPReq_JSONRPC(HTTPRequest* req, const std::string &)
    148 {
    175     JSONRequest jreq;
    176     try {
    177         // Parse request
    178         UniValue valRequest;
    179         if (!valRequest.read(req->ReadBody()))
    180             throw JSONRPCError(RPC_PARSE_ERROR, "Parse error");
    182         std::string strReply;
    183         // singleton request
    184         if (valRequest.isObject()) {
    185             jreq.parse(valRequest);
    187             UniValue result = tableRPC.execute(jreq.strMethod, jreq.params);
    188         }
    190     }
    191     //more code...
    199 }
    

    上面HTTPReq_JSONRPC接口处理以jsonrpc形式的请求,省去了url形式的查找接口。

    以上就是bitcoin core多线程处理rpc请求的主要功能了,省去了一些异常处理代码,详细代码可以git该项目到本地学习下。

    相关文章

      网友评论

          本文标题:Bitcoin Core中的多线程模型

          本文链接:https://www.haomeiwen.com/subject/haljmttx.html