比特币实现一个核心客户端。也就是人们经常分析的bitcoind程序。bitcoind程序实现的是一个http服务器。另外bitcoin-cli实现的是一个http客户端。二者之间的传输数据使用json-rpc
编译好程序之后。执行一下 bitcoind-cli gettransaction 9ca8f969bd3ef5ec2a8685660fdbf7a8bd365524c2e1fc66c309acbae2c14ae3
然后在命令行就能看待这个交易信息:
{
"amount" : 0.05000000,
"confirmations" : 0,
"txid":"9ca8f969bd3ef5ec2a8685660fdbf7a8bd365524c2e1fc66c309acbae2c14ae3",
"time" : 1392660908,
"timereceived" : 1392660908,
"details" : [
{
"account" : "",
"address":"1hvzSofGwT8cjb8JU7nBsCSfEVQX5u9CL",
"category" : "receive",
"amount" : 0.05000000
}
]
}
json-rpc的请求非常简单,其格式如下:
{
"jsonrpc" : 2.0,
"method" : "getinfo",
"params" : [""],
"id" : 1
}
jsonrpc:json-rpc的版本;
method:rpc调用的方法名;
params:方法传入的参数,没有参数传入nullptr;
id:调用的标识符,可以为字符串,也可以为nullptr,但是不建议使用nullptr,因为容易引起混乱。
2.2 响应
{
"jsonrpc" : 2.0,
"result" : "info",
"error" : null,
"id" : 1
}
jsonrpc:json-rpc版本;
result:rpc调用的返回值,调用成功时不能为nullptr,调用失败必须为nullptr;
error:调用错误时用,无错误为nullptr,有错误时返回错误对象,参见下一节;
id:调用标识符,与调用方传入的保持一致。
josn-rpc
是一个基于josn的跨语言rpc协议。具有目前java c++都支持。
Json -rpc的请求非常简单。
{
"jsonrpc" : 2.0,
"method" : "getinfo",
"params" : [""],
"id" : 1
}
jsonrpc:json-rpc的版本;
method:rpc调用的方法名;
params:方法传入的参数,没有参数传入nullptr;
id:调用的标识符,可以为字符串,也可以为nullptr,但是不建议使用nullptr,因为容易引起混乱。
{
"jsonrpc" : 2.0,
"result" : "info",
"error" : null,
"id" : 1
}
jsonrpc:json-rpc版本;
result:rpc调用的返回值,调用成功时不能为nullptr,调用失败必须为nullptr;
error:调用错误时用,无错误为nullptr,有错误时返回错误对象,参见下一节;
id:调用标识符,与调用方传入的保持一致。
libevent实现的http服务器。
io复用:通过select poll 或者epoll等系统api 实现io复用
多线程或者多进程:
多线程和多进程可以解决大量的并发请求。无论多线程还是多进程,都存在问题,多进程不适合短连接。多线程
将IO复用和多线程结合起来,这是目前解决大并发的常用方案。最常见的套路就是主线程里监听某个端口以及接受的描述符,当有读写事件产生时,将事件交给工作线程去处理。
下面使用一个libevent http服务器
1,首先创建套接字病在指定的端口上监听:
int sock_fd = ::socket(AF_INET, SOCK_STREAM, 0);
if( sock_fd == -1 )
return -1;
evutil_make_listen_socket_reuseable(sock_fd);
struct sockaddr_in sin;
sin.sin_family = AF_INET;
sin.sin_addr.s_addr = 0;
sin.sin_port = htons(port);
if( ::bind(sock_fd, (SA*)&sin, sizeof(sin)) < 0 )
goto error;
if( ::listen(sock_fd, listen_num) < 0)
goto error;
evutil_make_socket_nonblocking(listener);
(2) 创建一个监听客户连接请求的事件:
struct event* ev_listen = event_new(base, sock_fd, EV_READ | EV_PERSIST,
accept_cb, base);
event_add(ev_listen, NULL);
event_base_dispatch(base);
当监听套接字有新连接时,事件将被触发,从而执行回调accept_cb:
void accept_cb(int fd, short events, void* arg)
{
evutil_socket_t sockfd;
struct sockaddr_in client;
socklen_t len = sizeof(client);
sockfd = ::accept(fd, (struct sockaddr*)&client, &len );
evutil_make_socket_nonblocking(sockfd);
printf("accept a client %d\n", sockfd);
struct event_base* base = (event_base*)arg;
bufferevent* bev = bufferevent_socket_new(base, sockfd, BEV_OPT_CLOSE_ON_FREE);
bufferevent_setcb(bev, socket_read_cb, NULL, event_cb, arg);
bufferevent_enable(bev, EV_READ | EV_PERSIST);
}
创建一个bufferevent事件,将accept以后的已连接套接字与之关联,这样当套接字上有数据到来时,就会触发bufferevent事件,从而执行socket_read_cb回调:
void socket_read_cb(bufferevent* bev, void* arg)
{
char msg[4096];
size_t len = bufferevent_read(bev, msg, sizeof(msg));
msg[len] = '\0';
char reply_msg[4096] = "recvieced msg:";
strcat(reply_msg + strlen(reply_msg), msg);
bufferevent_write(bev, reply_msg, strlen(reply_msg));
}
然后就能从bufferevent中读取到客户数据。
(1) 创建事件集和evhttp事件:
struct event_base *event_base_new(void);
struct evhttp *evhttp_new(struct event_base *base);
(2) 绑定地址和端口
int evhttp_bind_socket(struct evhttp *http, const char *address, ev_uint16_t port);
(3) 设置回调来处理http请求
void evhttp_set_gencb(struct evhttp *http, void (*cb)(struct evhttp_request *, void *), void *arg);
(4) 进入事件循环
int event_base_dispatch(struct event_base *);
在下一节我们结合比特币的源码,来看看比特币中是如使用上面这些api实现http服务的,当然比特币的http服务封装的更为复杂一些。
josn-rpc的初始化也是在Bitcoind的初始步骤 在Init.cpp的appinitmain函数里。
吃
RegisterAllCoreRPCCommands(tableRPC);
g_wallet_init_interface.RegisterRPC(tableRPC);
/* Start the RPC server already. It will be started in "warmup" mode
* and not really process calls already (but it will signify connections
* that the server is there and will be ready later). Warmup mode will
* be disabled when initialisation is finished.
*/
第一行 RegisterAllCoreRPCCommands(tableRPC);
就是注册rpc命令
static inline void RegisterAllCoreRPCCommands(CRPCTable &t)
{
RegisterBlockchainRPCCommands(t);
RegisterNetRPCCommands(t);
RegisterMiscRPCCommands(t);
RegisterMiningRPCCommands(t);
RegisterRawTransactionRPCCommands(t);
}
可以看到这里分类注册了rpc命令
,操作区块链的、网络相关的、挖矿相关的以及比特币交易相关的RPC命令一应俱全。这里不妨列出来,这样读者对通过客户端能做些什么事情有个大概印象:
(1) 区块链相关的rpc,位于blockchain.cpp中:
static const CRPCCommand commands[] =
{ // category name actor (function) argNames
// --------------------- ------------------------ ----------------------- ----------
{ "blockchain", "getblockchaininfo", &getblockchaininfo, {} },
{ "blockchain", "getchaintxstats", &getchaintxstats, {"nblocks", "blockhash"} },
{ "blockchain", "getblockstats", &getblockstats, {"hash_or_height", "stats"} },
{ "blockchain", "getbestblockhash", &getbestblockhash, {} },
{ "blockchain", "getblockcount", &getblockcount, {} },
{ "blockchain", "getblock", &getblock, {"blockhash","verbosity|verbose"} },
{ "blockchain", "getblockhash", &getblockhash, {"height"} },
{ "blockchain", "getblockheader", &getblockheader, {"blockhash","verbose"} },
{ "blockchain", "getchaintips", &getchaintips, {} },
{ "blockchain", "getdifficulty", &getdifficulty, {} },
{ "blockchain", "getmempoolancestors", &getmempoolancestors, {"txid","verbose"} },
{ "blockchain", "getmempooldescendants", &getmempooldescendants, {"txid","verbose"} },
{ "blockchain", "getmempoolentry", &getmempoolentry, {"txid"} },
{ "blockchain", "getmempoolinfo", &getmempoolinfo, {} },
{ "blockchain", "getrawmempool", &getrawmempool, {"verbose"} },
{ "blockchain", "gettxout", &gettxout, {"txid","n","include_mempool"} },
{ "blockchain", "gettxoutsetinfo", &gettxoutsetinfo, {} },
{ "blockchain", "pruneblockchain", &pruneblockchain, {"height"} },
{ "blockchain", "savemempool", &savemempool, {} },
{ "blockchain", "verifychain", &verifychain, {"checklevel","nblocks"} },
{ "blockchain", "preciousblock", &preciousblock, {"blockhash"} },
/* Not shown in help */
{ "hidden", "invalidateblock", &invalidateblock, {"blockhash"} },
{ "hidden", "reconsiderblock", &reconsiderblock, {"blockhash"} },
{ "hidden", "waitfornewblock", &waitfornewblock, {"timeout"} },
{ "hidden", "waitforblock", &waitforblock, {"blockhash","timeout"} },
{ "hidden", "waitforblockheight", &waitforblockheight, {"height","timeout"} },
{ "hidden", "syncwithvalidationinterfacequeue", &syncwithvalidationinterfacequeue, {} },
};
所有的RPC命令以及对应的回调函数指针都封装在了CRPCCommand中,按分类、rpc方法名,回调函数,参数名封装。基本上通过方法名就能猜出其作用。
(2) 网络相关的rpc,位于net.cpp中:
static const CRPCCommand commands[] =
{ // category name actor (function) argNames
// --------------------- ------------------------ ----------------------- ----------
{ "network", "getconnectioncount", &getconnectioncount, {} },
{ "network", "ping", &ping, {} },
{ "network", "getpeerinfo", &getpeerinfo, {} },
{ "network", "addnode", &addnode, {"node","command"} },
{ "network", "disconnectnode", &disconnectnode, {"address", "nodeid"} },
{ "network", "getaddednodeinfo", &getaddednodeinfo, {"node"} },
{ "network", "getnettotals", &getnettotals, {} },
{ "network", "getnetworkinfo", &getnetworkinfo, {} },
{ "network", "setban", &setban, {"subnet", "command", "bantime", "absolute"} },
{ "network", "listbanned", &listbanned, {} },
{ "network", "clearbanned", &clearbanned, {} },
{ "network", "setnetworkactive", &setnetworkactive, {"state"} },
(3) 挖矿相关的rpc,位于mining.cpp中:
static const CRPCCommand commands[] =
{ // category name actor (function) argNames
// --------------------- ------------------------ ----------------------- ----------
{ "mining", "getnetworkhashps", &getnetworkhashps, {"nblocks","height"} },
{ "mining", "getmininginfo", &getmininginfo, {} },
{ "mining", "prioritisetransaction", &prioritisetransaction, {"txid","dummy","fee_delta"} },
{ "mining", "getblocktemplate", &getblocktemplate, {"template_request"} },
{ "mining", "submitblock", &submitblock, {"hexdata","dummy"} },
{ "generating", "generatetoaddress", &generatetoaddress, {"nblocks","address","maxtries"} },
{ "hidden", "estimatefee", &estimatefee, {} },
{ "util", "estimatesmartfee", &estimatesmartfee, {"conf_target", "estimate_mode"} },
{ "hidden", "estimaterawfee", &estimaterawfee, {"conf_target", "threshold"} },
};
(4) 比特币交易相关rpc,位于rawtransaction.cpp中:
static const CRPCCommand commands[] =
{ // category name actor (function) argNames
// --------------------- ------------------------ ----------------------- ----------
{ "rawtransactions", "getrawtransaction", &getrawtransaction, {"txid","verbose","blockhash"} },
{ "rawtransactions", "createrawtransaction", &createrawtransaction, {"inputs","outputs","locktime","replaceable"} },
{ "rawtransactions", "decoderawtransaction", &decoderawtransaction, {"hexstring","iswitness"} },
{ "rawtransactions", "decodescript", &decodescript, {"hexstring"} },
{ "rawtransactions", "sendrawtransaction", &sendrawtransaction, {"hexstring","allowhighfees"} },
{ "rawtransactions", "combinerawtransaction", &combinerawtransaction, {"txs"} },
{ "rawtransactions", "signrawtransaction", &signrawtransaction, {"hexstring","prevtxs","privkeys","sighashtype"} }, /* uses wallet if enabled */
{ "rawtransactions", "signrawtransactionwithkey", &signrawtransactionwithkey, {"hexstring","privkeys","prevtxs","sighashtype"} },
{ "rawtransactions", "testmempoolaccept", &testmempoolaccept, {"rawtxs","allowhighfees"} },
{ "blockchain", "gettxoutproof", &gettxoutproof, {"txids", "blockhash"} },
{ "blockchain", "verifytxoutproof", &verifytxoutproof, {"proof"} },
};
当注册完以后,如果用户启用了-server选项,将会调用AppInitServers创建Http服务器。
AppInitServers实现如下:
static bool AppInitServers()
{
RPCServer::OnStarted(&OnRPCStarted);
RPCServer::OnStopped(&OnRPCStopped);
if (!InitHTTPServer())
return false;
if (!StartRPC())
return false;
if (!StartHTTPRPC())
return false;
if (gArgs.GetBoolArg("-rest", DEFAULT_REST_ENABLE) && !StartREST())
return false;
if (!StartHTTPServer())
return false;
return true;
}
这里按步骤一步一步的来。首先是调用InitHTTPServer,使用libevent api来建立http服务器,这里截取主要代码来看看,位于httpserver.cpp文件:
raii_event_base base_ctr = obtain_event_base();
/* Create a new evhttp object to handle requests. */
raii_evhttp http_ctr = obtain_evhttp(base_ctr.get());
struct evhttp* http = http_ctr.get();
if (!http) {
LogPrintf("couldn't create evhttp. Exiting.\n");
return false;
}
evhttp_set_timeout(http, gArgs.GetArg("-rpcservertimeout", DEFAULT_HTTP_SERVER_TIMEOUT));
evhttp_set_max_headers_size(http, MAX_HEADERS_SIZE);
evhttp_set_max_body_size(http, MAX_SIZE);
evhttp_set_gencb(http, http_request_cb, nullptr);
if (!HTTPBindAddresses(http)) {
LogPrintf("Unable to bind any endpoint for RPC server\n");
return false;
}
LogPrint(BCLog::HTTP, "Initialized HTTP server\n");
int workQueueDepth = std::max((long)gArgs.GetArg("-rpcworkqueue", DEFAULT_HTTP_WORKQUEUE), 1L);
LogPrintf("HTTP: creating work queue of depth %d\n", workQueueDepth);
workQueue = new WorkQueue(workQueueDepth);
// transfer ownership to eventBase/HTTP via .release()
eventBase = base_ctr.release();
eventHTTP = http_ctr.release();
这里的套路和3.4节中用libevent建立http服务器的步骤基本一样,注意两点:
(1) 用evhttp_set_gencb设置了http请求的处理函数:http_request_cb;
(2) 创建了一个工作队列,队列里的元素类型HTTPClosure,这是一个函数对象接口类,重写了函数调用操作符,HttpWorkItem实现了此接口
http的请求处理:
当我们收到一个http请求后。使用函数http_request_cb回调,主要代码如下
// Find registered handler for prefix
std::string strURI = hreq->GetURI();
std::string path;
std::vector::const_iterator i = pathHandlers.begin();
std::vector::const_iterator iend = pathHandlers.end();
for (; i != iend; ++i) {
bool match = false;
if (i->exactMatch)
match = (strURI == i->prefix);
else
match = (strURI.substr(0, i->prefix.size()) == i->prefix);
if (match) {
path = strURI.substr(i->prefix.size());
break;
}
}
// Dispatch to worker thread
if (i != iend) {
std::unique_ptr item(new HTTPWorkItem(std::move(hreq), path, i->handler));
assert(workQueue);
if (workQueue->Enqueue(item.get()))
item.release(); /* if true, queue took ownership */
else {
LogPrintf("WARNING: request rejected because http work queue depth exceeded, it can be increased with the -rpcworkqueue= setting\n");
item->req->WriteReply(HTTP_INTERNAL, "Work queue depth exceeded");
}
} else {
hreq->WriteReply(HTTP_NOTFOUND);
}
用一句句话来概括这个函数的作用就是:将请求的url的path部分与注册过的前缀进行匹配,并生成HttpWorkItem放入到工作队列中。目前注册了两个前缀:/和/wallet/,代码在StartHttpRPC中:
bool StartHTTPRPC()
{
LogPrint(BCLog::RPC, "Starting HTTP RPC server\n");
if (!InitRPCAuthentication())
return false;
RegisterHTTPHandler("/", true, HTTPReq_JSONRPC);
#ifdef ENABLE_WALLET
// ifdef can be removed once we switch to better endpoint support and API versioning
RegisterHTTPHandler("/wallet/", false, HTTPReq_JSONRPC);
#endif
assert(EventBase());
httpRPCTimerInterface = MakeUnique(EventBase());
RPCSetTimerInterface(httpRPCTimerInterface.get());
return true;
}
两个前缀/和/wallet/对应的回调处理函数均为HttpReq_JSONRPC。
之后调用StartHttpServer让工作队列运行起来:
bool StartHTTPServer()
{
LogPrint(BCLog::HTTP, "Starting HTTP server\n");
int rpcThreads = std::max((long)gArgs.GetArg("-rpcthreads", DEFAULT_HTTP_THREADS), 1L);
LogPrintf("HTTP: starting %d worker threads\n", rpcThreads);
std::packaged_task task(ThreadHTTP);
threadResult = task.get_future();
threadHTTP = std::thread(std::move(task), eventBase, eventHTTP);
for (int i = 0; i < rpcThreads; i++) {
g_thread_http_workers.emplace_back(HTTPWorkQueueRun, workQueue);
}
return true;
}
最终会调用到工作队列的run方法:
void Run()
{
while (true) {
std::unique_ptr i;
{
std::unique_lock lock(cs);
while (running && queue.empty())
cond.wait(lock);
if (!running)
break;
i = std::move(queue.front());
queue.pop_front();
}
(*i)();
}
}
很简单,工作队列为空的时候线程阻塞等待,当收到http请求以后,解析请求并添加HttpWorkItem到队列中并唤醒线程,线程从队列头部取出一个item运行。最终将执行HttpReq_JSONRPC这个回调,这里会将JSONRPC中的rpc方法分发到服务端不同的方法中,来看看其处理:
(1) 请求合法性检查及认证
首先检查请求是否合法,http头部中的auchoization是否合法:
static bool HTTPReq_JSONRPC(HTTPRequest* req, const std::string &)
{
// JSONRPC handles only POST
if (req->GetRequestMethod() != HTTPRequest::POST) {
req->WriteReply(HTTP_BAD_METHOD, "JSONRPC server handles only POST requests");
return false;
}
// Check authorization
std::pair authHeader = req->GetHeader("authorization");
if (!authHeader.first) {
req->WriteHeader("WWW-Authenticate", WWW_AUTH_HEADER_DATA);
req->WriteReply(HTTP_UNAUTHORIZED);
return false;
}
JSONRPCRequest jreq;
jreq.peerAddr = req->GetPeer().ToString();
if (!RPCAuthorized(authHeader.second, jreq.authUser)) {
LogPrintf("ThreadRPCServer incorrect password attempt from %s\n", jreq.peerAddr);
/* Deter brute-forcing
If this results in a DoS the user really
shouldn't have their RPC port exposed. */
MilliSleep(250);
req->WriteHeader("WWW-Authenticate", WWW_AUTH_HEADER_DATA);
req->WriteReply(HTTP_UNAUTHORIZED);
return false;
}
可以看到,比特币的json rpc服务只支持POST。
(2) 读取http请求数据,将rpc请求分发到不同的函数
try {
// Parse request
UniValue valRequest;
if (!valRequest.read(req->ReadBody()))
throw JSONRPCError(RPC_PARSE_ERROR, "Parse error");
// Set the URI
jreq.URI = req->GetURI();
std::string strReply;
// singleton request
if (valRequest.isObject()) {
jreq.parse(valRequest);
UniValue result = tableRPC.execute(jreq);
// Send reply
strReply = JSONRPCReply(result, NullUniValue, jreq.id);
// array of requests
} else if (valRequest.isArray())
strReply = JSONRPCExecBatch(jreq, valRequest.get_array());
else
throw JSONRPCError(RPC_PARSE_ERROR, "Top-level object parse error");
req->WriteHeader("Content-Type", "application/json");
req->WriteReply(HTTP_OK, strReply);
如果收到的是单个json,则tableRPC.execute执行,否则如果收到的是以数组形式的批量rpc请求,则批量执行,批量执行最终也是走tableRPC.execute()来分发,execute()执行后的结果将写入到http响应包中:
UniValue CRPCTable::execute(const JSONRPCRequest &request) const
{
// Return immediately if in warmup
{
LOCK(cs_rpcWarmup);
if (fRPCInWarmup)
throw JSONRPCError(RPC_IN_WARMUP, rpcWarmupStatus);
}
// Find method
const CRPCCommand *pcmd = tableRPC[request.strMethod];
if (!pcmd)
throw JSONRPCError(RPC_METHOD_NOT_FOUND, "Method not found");
g_rpcSignals.PreCommand(*pcmd);
try
{
// Execute, convert arguments to array if necessary
if (request.params.isObject()) {
return pcmd->actor(transformNamedArguments(request, pcmd->argNames));
} else {
return pcmd->actor(request);
}
}
catch (const std::exception& e)
{
throw JSONRPCError(RPC_MISC_ERROR, e.what());
}
}
代码也比较容易理解,就是从根据json-rpc协议,从请求中读取method,然后根据method找到对应的CRPCCommand执行体,这些执行体就是4.2.1节中提到那几张分门别类的映射表。
至此,比特币的json-rpc服务端的脉络我们就梳理的差不多了,整体框架并不难理解,只是封装的略微复杂一点点。
网友评论