有点郁闷的是,上一篇博文写了好久,本来是已经写好了的,但只要在read下面添加内容,就导致chrome挂了,试了很多次了,不知道为啥,但那个还是写完,后期会补在原来博客的下面。
由于后台程序是多线程模型,其中有个模块是多个线程处理rpc请求,从队列中取任务,处理并作响应,下面好些用法与C++11相关[工作中没有用过,只是知道些,有时间看看相关源码实现]。先来看线程初始化:
456 for (int i = 0; i < rpcThreads; i++) {
457 std::thread rpc_worker(HTTPWorkQueueRun, workQueue);
458 rpc_worker.detach();
459 }
其中使用了C++11中的thread,detach的作用是“Detaches the thread represented by the object from the calling thread, allowing them to execute independently from each other.”
创建rpcThreads个线程,每个线程以HTTPWorkQueueRun回调函数,workQueue为参数初始化:
358 static void HTTPWorkQueueRun(WorkQueue<HTTPClosure>* queue)
359 {
360 RenameThread("bitcoin-httpworker");
361 queue->Run();
362 }
179 static WorkQueue<HTTPClosure>* workQueue = 0;
看看HTTPClosure
请求类型的声明:
117 class HTTPClosure
118 {
119 public:
120 virtual void operator()() = 0;
121 virtual ~HTTPClosure() {}
122 };
42 class HTTPWorkItem : public HTTPClosure
43 {
44 public:
45 HTTPWorkItem(std::unique_ptr<HTTPRequest> _req, const std::string &_path, const HTTPRequestH andler& _func):
46 req(std::move(_req)), path(_path), func(_func)
47 {
48 }
49 void operator()()
50 {
51 func(req.get(), path);
52 }
53
54 std::unique_ptr<HTTPRequest> req;
55
56 private:
57 std::string path;
58 HTTPRequestHandler func;
59 };
其中:
36 typedef std::function<bool(HTTPRequest* req, const std::string &)> HTTPRequestHandler;
53 class HTTPRequest
54 {
55 private:
56 struct evhttp_request* req;
57 bool replySent;
59 public:
60 HTTPRequest(struct evhttp_request* req);
61 ~HTTPRequest();
73 std::string GetURI();
81 RequestMethod GetRequestMethod();
82 }
上面HTTPRequest
是一个完整的rpc请求,有url,有method等其他信息,有些接口没放上来。evhttp_request
成员类型是libevent中定义的,这里不放代码了,比如请求内容为{"method":"getblock","params":"0x000000000019d6689c085ae1658 31e934ff763ae46a2a6c172b3f1b60a8ce26f"}
,那么GetURI
返回"/"
,
GetRequestMethod
返回POST
。unique_ptr
是属于智能指针中的一类,没有引用计数功能,它持有对象的独有权--两个unique_ptr
不能指向一个对象,不能进行复制操作只能进行移动操作。
move
的功能是将一个左值强制转化为右值引用,继而通过右值引用使用该值,以用于移动语义,具体怎么实现的还待仔细研究:-),这儿先放着。WorkQueue
的具体实现如下:
64 template <typename WorkItem>
65 class WorkQueue
66 {
67 private:
68 /** Mutex protects entire object */
69 std::mutex cs;
70 std::condition_variable cond;
71 std::deque<std::unique_ptr<WorkItem>> queue;
72 bool running;
73 size_t maxDepth;
94 public:
107 bool Enqueue(WorkItem* item)
108 {
109 std::unique_lock<std::mutex> lock(cs);
110 if (queue.size() >= maxDepth) {
111 return false;
112 }
113 queue.emplace_back(std::unique_ptr<WorkItem>(item));
114 cond.notify_one();
115 return true;
116 }
118 void Run()
119 {
121 while (running) {
122 std::unique_ptr<WorkItem> i;
123 {
124 std::unique_lock<std::mutex> lock(cs);
125 while (running && queue.empty())
126 cond.wait(lock);
127 if (!running)
128 break;
129 i = std::move(queue.front());
130 queue.pop_front();
131 }
132 (*i)();
133 }
134 }
135 //more code...
136 }
以上是工作队列的主要代码,放请求任务,从中取任务处理,即调用HTTPWorkItem::operator()
,其中使用了stl deque,互斥锁,条件变量。
其中也使用了在C++11中支持的emplace_back成员函数,省去了临时对象的构造与析构成本,对于大的类对象效果性能更优。
至于这里为什么先move队头元素再pop_front分两步?因为这里的deque实现了异常安全强保证[当操作因异常而终止,程序的状态应保持不变。],有时拷贝构造函数也会抛出异常,导致队列状态改变,详细知识可以自行搜索。
124 std::unique_lock<std::mutex> lock(cs);
125 while (running && queue.empty())
126 cond.wait(lock);
这么写是防止假唤醒,不能改为if,这里是先获得锁,如果条件不满足,则释放锁,并等待,等收到信号后,在wait返回前再获得锁,...,因为这是个语句块,lock超出作用域范围后自动释放锁了。
然后注册回调函数,举两个例子:
602 static const struct {
603 const char* prefix;
604 bool (*handler)(HTTPRequest* req, const std::string& strReq);
605 } uri_prefixes[] = {
606 {"/rest/tx/", rest_tx},
607 };
619 RegisterHTTPHandler(uri_prefixes[0].prefix, false, uri_prefixes[0].handler);
234 RegisterHTTPHandler("/", true, HTTPReq_JSONRPC);
653 void RegisterHTTPHandler(const std::string &prefix, bool exactMatch, const HTTPRequestHandler &h andler)
654 {
655 LogPrint("http", "Registering HTTP handler for %s (exactmatch %d)\n", prefix, exactMatch);
656 pathHandlers.push_back(HTTPPathHandler(prefix, exactMatch, handler));
657 }
181 std::vector<HTTPPathHandler> pathHandlers;
158 struct HTTPPathHandler
159 {
160 HTTPPathHandler() {}
161 HTTPPathHandler(std::string _prefix, bool _exactMatch,HTTPRequestHandler _handler):
162 prefix(_prefix), exactMatch(_exactMatch), handler(_handler)
163 {
164 }
165 std::string prefix;
166 bool exactMatch;
167 HTTPRequestHandler handler;
168 };
当libevent收到数据时,底层会回调http_request_cb函数:
249 static void http_request_cb(struct evhttp_request* req, void* arg)
250 {
251 std::unique_ptr<HTTPRequest> hreq(new HTTPRequest(req));
252 //more code ...
269 std::string strURI = hreq->GetURI();
270 std::string path;
271 std::vector<HTTPPathHandler>::const_iterator i = pathHandlers.begin();
272 std::vector<HTTPPathHandler>::const_iterator iend = pathHandlers.end();
273 for (; i != iend; ++i) {
274 bool match = false;
275 if (i->exactMatch)
276 match = (strURI == i->prefix);
277 else
278 match = (strURI.substr(0, i->prefix.size()) == i->prefix);
279 if (match) {
280 path = strURI.substr(i->prefix.size());
281 break;
282 }
283 }
286 if (i != iend) {
287 std::unique_ptr<HTTPWorkItem> item(new HTTPWorkItem(std::move(hreq), path, i->handler));
288 assert(workQueue);
289 if (workQueue->Enqueue(item.get()))
290 item.release(); /* if true, queue took ownership */
291 else {
//more code...
294 }
295 } else {
296 hreq->WriteReply(HTTP_NOTFOUND);
297 }
298 }
以上就是根据url参数查找有没有相应的处理接口,并形成一个task塞进队列中去[其中path参数后来调用时没用到]。
147 static bool HTTPReq_JSONRPC(HTTPRequest* req, const std::string &)
148 {
175 JSONRequest jreq;
176 try {
177 // Parse request
178 UniValue valRequest;
179 if (!valRequest.read(req->ReadBody()))
180 throw JSONRPCError(RPC_PARSE_ERROR, "Parse error");
182 std::string strReply;
183 // singleton request
184 if (valRequest.isObject()) {
185 jreq.parse(valRequest);
187 UniValue result = tableRPC.execute(jreq.strMethod, jreq.params);
188 }
190 }
191 //more code...
199 }
上面HTTPReq_JSONRPC接口处理以jsonrpc形式的请求,省去了url形式的查找接口。
以上就是bitcoin core多线程处理rpc请求的主要功能了,省去了一些异常处理代码,详细代码可以git该项目到本地学习下。
网友评论