Bitcoin Core中的多线程模型

Bitcoin Core中的多线程模型

作者: fooboo | 来源:发表于2016-12-08 21:50 被阅读273次



    456     for (int i = 0; i < rpcThreads; i++) {
    457         std::thread rpc_worker(HTTPWorkQueueRun, workQueue);
    458         rpc_worker.detach();
    459     }

    其中使用了C++11中的thread,detach的作用是“Detaches the thread represented by the object from the calling thread, allowing them to execute independently from each other.”

    358 static void HTTPWorkQueueRun(WorkQueue<HTTPClosure>* queue)
    359 {
    360     RenameThread("bitcoin-httpworker");
    361     queue->Run();
    362 }
    179 static WorkQueue<HTTPClosure>* workQueue = 0;


    117 class HTTPClosure
    118 {
    119 public:
    120     virtual void operator()() = 0;
    121     virtual ~HTTPClosure() {}
    122 };
     42 class HTTPWorkItem : public HTTPClosure
     43 {
     44 public:
     45     HTTPWorkItem(std::unique_ptr<HTTPRequest> _req, const std::string &_path, const HTTPRequestH    andler& _func):
     46         req(std::move(_req)), path(_path), func(_func)
     47     {
     48     }
     49     void operator()()
     50     {
     51         func(req.get(), path);
     52     }
     54     std::unique_ptr<HTTPRequest> req;
     56 private:
     57     std::string path;
     58     HTTPRequestHandler func;
     59 };


     36 typedef std::function<bool(HTTPRequest* req, const std::string &)>  HTTPRequestHandler;
     53 class HTTPRequest
     54 {
     55 private:
     56     struct evhttp_request* req;
     57     bool replySent;
     59 public:
     60     HTTPRequest(struct evhttp_request* req);
     61     ~HTTPRequest();
     73     std::string GetURI();
     81     RequestMethod GetRequestMethod();
     82 }

    上面HTTPRequest是一个完整的rpc请求,有url,有method等其他信息,有些接口没放上来。evhttp_request成员类型是libevent中定义的,这里不放代码了,比如请求内容为{"method":"getblock","params":"0x000000000019d6689c085ae1658 31e934ff763ae46a2a6c172b3f1b60a8ce26f"},那么GetURI返回"/"

    64 template <typename WorkItem>
    65 class WorkQueue
    66 {
    67 private:
    68     /** Mutex protects entire object */
    69     std::mutex cs;
    70     std::condition_variable cond;
    71     std::deque<std::unique_ptr<WorkItem>> queue;
    72     bool running;
    73     size_t maxDepth;
    94 public:
    107    bool Enqueue(WorkItem* item)
    108    {
    109        std::unique_lock<std::mutex> lock(cs);
    110        if (queue.size() >= maxDepth) {
    111            return false;
    112        }
    113        queue.emplace_back(std::unique_ptr<WorkItem>(item));
    114        cond.notify_one();
    115        return true;
    116    }
    118    void Run()
    119    {
    121        while (running) {
    122            std::unique_ptr<WorkItem> i;
    123            {
    124                std::unique_lock<std::mutex> lock(cs);
    125                while (running && queue.empty())
    126                    cond.wait(lock);
    127                if (!running)
    128                    break;
    129                i = std::move(queue.front());
    130                queue.pop_front();
    131            }
    132            (*i)();
    133        }
    134    }
    135    //more code...
    136 }

    以上是工作队列的主要代码,放请求任务,从中取任务处理,即调用HTTPWorkItem::operator(),其中使用了stl deque,互斥锁,条件变量。

    124 std::unique_lock<std::mutex> lock(cs);
    125 while (running && queue.empty())
    126     cond.wait(lock);


    602 static const struct {
    603     const char* prefix;
    604     bool (*handler)(HTTPRequest* req, const std::string& strReq);
    605 } uri_prefixes[] = {
    606       {"/rest/tx/", rest_tx},
    607 };
    619 RegisterHTTPHandler(uri_prefixes[0].prefix, false, uri_prefixes[0].handler);
    234 RegisterHTTPHandler("/", true, HTTPReq_JSONRPC);
    653 void RegisterHTTPHandler(const std::string &prefix, bool exactMatch, const HTTPRequestHandler &h    andler)
    654 {   
    655     LogPrint("http", "Registering HTTP handler for %s (exactmatch %d)\n", prefix, exactMatch);
    656     pathHandlers.push_back(HTTPPathHandler(prefix, exactMatch, handler));
    657 }
    181 std::vector<HTTPPathHandler> pathHandlers;
    158 struct HTTPPathHandler
    159 {
    160     HTTPPathHandler() {}
    161     HTTPPathHandler(std::string _prefix, bool _exactMatch,HTTPRequestHandler _handler):
    162         prefix(_prefix), exactMatch(_exactMatch), handler(_handler)
    163     {
    164     }
    165     std::string prefix;
    166     bool exactMatch;
    167     HTTPRequestHandler handler;
    168 };


    249 static void http_request_cb(struct evhttp_request* req, void* arg)
    250 {
    251     std::unique_ptr<HTTPRequest> hreq(new HTTPRequest(req));
    252     //more code ...
    269     std::string strURI = hreq->GetURI();
    270     std::string path;
    271     std::vector<HTTPPathHandler>::const_iterator i = pathHandlers.begin();
    272     std::vector<HTTPPathHandler>::const_iterator iend = pathHandlers.end();
    273     for (; i != iend; ++i) {
    274         bool match = false;
    275         if (i->exactMatch)
    276             match = (strURI == i->prefix);
    277         else
    278             match = (strURI.substr(0, i->prefix.size()) == i->prefix);
    279         if (match) {
    280             path = strURI.substr(i->prefix.size());
    281             break;
    282         }
    283     }
    286     if (i != iend) {
    287         std::unique_ptr<HTTPWorkItem> item(new HTTPWorkItem(std::move(hreq), path, i->handler));
    288         assert(workQueue);
    289         if (workQueue->Enqueue(item.get()))
    290             item.release(); /* if true, queue took ownership */
    291         else {
                    //more code...
    294         }
    295     } else {
    296         hreq->WriteReply(HTTP_NOTFOUND);
    297     }
    298 }


    147 static bool HTTPReq_JSONRPC(HTTPRequest* req, const std::string &)
    148 {
    175     JSONRequest jreq;
    176     try {
    177         // Parse request
    178         UniValue valRequest;
    179         if (!valRequest.read(req->ReadBody()))
    180             throw JSONRPCError(RPC_PARSE_ERROR, "Parse error");
    182         std::string strReply;
    183         // singleton request
    184         if (valRequest.isObject()) {
    185             jreq.parse(valRequest);
    187             UniValue result = tableRPC.execute(jreq.strMethod, jreq.params);
    188         }
    190     }
    191     //more code...
    199 }


    以上就是bitcoin core多线程处理rpc请求的主要功能了,省去了一些异常处理代码,详细代码可以git该项目到本地学习下。



          本文标题:Bitcoin Core中的多线程模型
