美文网首页
2018-06-30

2018-06-30

作者: nit小星星 | 来源:发表于2018-07-01 00:55 被阅读253次

    比特币实现一个核心客户端。也就是人们经常分析的bitcoind程序。bitcoind程序实现的是一个http服务器。另外bitcoin-cli实现的是一个http客户端。二者之间的传输数据使用json-rpc

    编译好程序之后。执行一下 bitcoind-cli gettransaction 9ca8f969bd3ef5ec2a8685660fdbf7a8bd365524c2e1fc66c309acbae2c14ae3

    然后在命令行就能看待这个交易信息:

    {

        "amount" : 0.05000000,

        "confirmations" : 0,

        "txid":"9ca8f969bd3ef5ec2a8685660fdbf7a8bd365524c2e1fc66c309acbae2c14ae3",

        "time" : 1392660908,

        "timereceived" : 1392660908,

        "details" : [

        {

        "account" : "",

        "address":"1hvzSofGwT8cjb8JU7nBsCSfEVQX5u9CL",

        "category" : "receive",

        "amount" : 0.05000000

        }

      ]

    }

    json-rpc的请求非常简单,其格式如下:

    {

        "jsonrpc" : 2.0,

        "method" : "getinfo",

        "params" : [""],

        "id" : 1

    }

        jsonrpc:json-rpc的版本;

    method:rpc调用的方法名;

    params:方法传入的参数,没有参数传入nullptr;

    id:调用的标识符,可以为字符串,也可以为nullptr,但是不建议使用nullptr,因为容易引起混乱。

    2.2 响应

    {

        "jsonrpc" : 2.0,

        "result" : "info",

        "error" : null,

        "id" : 1

    }

        jsonrpc:json-rpc版本;

    result:rpc调用的返回值,调用成功时不能为nullptr,调用失败必须为nullptr;

    error:调用错误时用,无错误为nullptr,有错误时返回错误对象,参见下一节;

        id:调用标识符,与调用方传入的保持一致。

    josn-rpc

    是一个基于josn的跨语言rpc协议。具有目前java c++都支持。

    Json -rpc的请求非常简单。

    {

        "jsonrpc" : 2.0,

        "method" : "getinfo",

        "params" : [""],

        "id" : 1

    }

        jsonrpc:json-rpc的版本;

    method:rpc调用的方法名;

    params:方法传入的参数,没有参数传入nullptr;

    id:调用的标识符,可以为字符串,也可以为nullptr,但是不建议使用nullptr,因为容易引起混乱。

    {

        "jsonrpc" : 2.0,

        "result" : "info",

        "error" : null,

        "id" : 1

    }

        jsonrpc:json-rpc版本;

    result:rpc调用的返回值,调用成功时不能为nullptr,调用失败必须为nullptr;

    error:调用错误时用,无错误为nullptr,有错误时返回错误对象,参见下一节;

        id:调用标识符,与调用方传入的保持一致。

    libevent实现的http服务器。

    io复用:通过select poll 或者epoll等系统api 实现io复用

    多线程或者多进程:

    多线程和多进程可以解决大量的并发请求。无论多线程还是多进程,都存在问题,多进程不适合短连接。多线程

     将IO复用和多线程结合起来,这是目前解决大并发的常用方案。最常见的套路就是主线程里监听某个端口以及接受的描述符,当有读写事件产生时,将事件交给工作线程去处理。

    下面使用一个libevent http服务器

    1,首先创建套接字病在指定的端口上监听:

    int sock_fd = ::socket(AF_INET, SOCK_STREAM, 0);

        if( sock_fd == -1 ) 

            return -1; 

        evutil_make_listen_socket_reuseable(sock_fd); 

        struct sockaddr_in sin; 

        sin.sin_family = AF_INET; 

        sin.sin_addr.s_addr = 0; 

        sin.sin_port = htons(port); 

        if( ::bind(sock_fd, (SA*)&sin, sizeof(sin)) < 0 ) 

            goto error; 

        if( ::listen(sock_fd, listen_num) < 0) 

            goto error; 

        evutil_make_socket_nonblocking(listener);

     (2) 创建一个监听客户连接请求的事件:

        struct event* ev_listen = event_new(base, sock_fd, EV_READ | EV_PERSIST, 

                                            accept_cb, base); 

        event_add(ev_listen, NULL); 

        event_base_dispatch(base);

        当监听套接字有新连接时,事件将被触发,从而执行回调accept_cb:

       void accept_cb(int fd, short events, void* arg) 

        { 

            evutil_socket_t sockfd; 

            struct sockaddr_in client; 

            socklen_t len = sizeof(client); 

            sockfd = ::accept(fd, (struct sockaddr*)&client, &len ); 

            evutil_make_socket_nonblocking(sockfd); 

            printf("accept a client %d\n", sockfd); 

            struct event_base* base = (event_base*)arg; 

            bufferevent* bev = bufferevent_socket_new(base, sockfd, BEV_OPT_CLOSE_ON_FREE); 

            bufferevent_setcb(bev, socket_read_cb, NULL, event_cb, arg); 

            bufferevent_enable(bev, EV_READ | EV_PERSIST); 

        }

        创建一个bufferevent事件,将accept以后的已连接套接字与之关联,这样当套接字上有数据到来时,就会触发bufferevent事件,从而执行socket_read_cb回调:

        void socket_read_cb(bufferevent* bev, void* arg) 

        { 

            char msg[4096]; 

            size_t len = bufferevent_read(bev, msg, sizeof(msg)); 

            msg[len] = '\0';   

            char reply_msg[4096] = "recvieced msg:"; 

            strcat(reply_msg + strlen(reply_msg), msg); 

            bufferevent_write(bev, reply_msg, strlen(reply_msg)); 

        }

        然后就能从bufferevent中读取到客户数据。

      (1) 创建事件集和evhttp事件:

    struct event_base *event_base_new(void);

    struct evhttp *evhttp_new(struct event_base *base);

        (2) 绑定地址和端口

    int evhttp_bind_socket(struct evhttp *http, const char *address, ev_uint16_t port);

        (3) 设置回调来处理http请求

    void evhttp_set_gencb(struct evhttp *http, void (*cb)(struct evhttp_request *, void *), void *arg);

        (4) 进入事件循环

    int event_base_dispatch(struct event_base *);

        在下一节我们结合比特币的源码,来看看比特币中是如使用上面这些api实现http服务的,当然比特币的http服务封装的更为复杂一些。

    josn-rpc的初始化也是在Bitcoind的初始步骤 在Init.cpp的appinitmain函数里。

    RegisterAllCoreRPCCommands(tableRPC);

        g_wallet_init_interface.RegisterRPC(tableRPC);

    /* Start the RPC server already. It will be started in "warmup" mode

        * and not really process calls already (but it will signify connections

        * that the server is there and will be ready later).  Warmup mode will

        * be disabled when initialisation is finished.

        */

    第一行 RegisterAllCoreRPCCommands(tableRPC);

    就是注册rpc命令

    static inline void RegisterAllCoreRPCCommands(CRPCTable &t)

        {

            RegisterBlockchainRPCCommands(t);

            RegisterNetRPCCommands(t);

            RegisterMiscRPCCommands(t);

            RegisterMiningRPCCommands(t);

            RegisterRawTransactionRPCCommands(t);

        }

    可以看到这里分类注册了rpc命令

    ,操作区块链的、网络相关的、挖矿相关的以及比特币交易相关的RPC命令一应俱全。这里不妨列出来,这样读者对通过客户端能做些什么事情有个大概印象:

     (1) 区块链相关的rpc,位于blockchain.cpp中:

    static const CRPCCommand commands[] =

    { //  category              name                      actor (function)        argNames

      //  --------------------- ------------------------  -----------------------  ----------

        { "blockchain",        "getblockchaininfo",      &getblockchaininfo,      {} },

        { "blockchain",        "getchaintxstats",        &getchaintxstats,        {"nblocks", "blockhash"} },

        { "blockchain",        "getblockstats",          &getblockstats,          {"hash_or_height", "stats"} },

        { "blockchain",        "getbestblockhash",      &getbestblockhash,      {} },

        { "blockchain",        "getblockcount",          &getblockcount,          {} },

        { "blockchain",        "getblock",              &getblock,              {"blockhash","verbosity|verbose"} },

        { "blockchain",        "getblockhash",          &getblockhash,          {"height"} },

        { "blockchain",        "getblockheader",        &getblockheader,        {"blockhash","verbose"} },

        { "blockchain",        "getchaintips",          &getchaintips,          {} },

        { "blockchain",        "getdifficulty",          &getdifficulty,          {} },

        { "blockchain",        "getmempoolancestors",    &getmempoolancestors,    {"txid","verbose"} },

        { "blockchain",        "getmempooldescendants",  &getmempooldescendants,  {"txid","verbose"} },

        { "blockchain",        "getmempoolentry",        &getmempoolentry,        {"txid"} },

        { "blockchain",        "getmempoolinfo",        &getmempoolinfo,        {} },

        { "blockchain",        "getrawmempool",          &getrawmempool,          {"verbose"} },

        { "blockchain",        "gettxout",              &gettxout,              {"txid","n","include_mempool"} },

        { "blockchain",        "gettxoutsetinfo",        &gettxoutsetinfo,        {} },

        { "blockchain",        "pruneblockchain",        &pruneblockchain,        {"height"} },

        { "blockchain",        "savemempool",            &savemempool,            {} },

        { "blockchain",        "verifychain",            &verifychain,            {"checklevel","nblocks"} },

        { "blockchain",        "preciousblock",          &preciousblock,          {"blockhash"} },

        /* Not shown in help */

        { "hidden",            "invalidateblock",        &invalidateblock,        {"blockhash"} },

        { "hidden",            "reconsiderblock",        &reconsiderblock,        {"blockhash"} },

        { "hidden",            "waitfornewblock",        &waitfornewblock,        {"timeout"} },

        { "hidden",            "waitforblock",          &waitforblock,          {"blockhash","timeout"} },

        { "hidden",            "waitforblockheight",    &waitforblockheight,    {"height","timeout"} },

        { "hidden",            "syncwithvalidationinterfacequeue", &syncwithvalidationinterfacequeue, {} },

    };

        所有的RPC命令以及对应的回调函数指针都封装在了CRPCCommand中,按分类、rpc方法名,回调函数,参数名封装。基本上通过方法名就能猜出其作用。

        (2) 网络相关的rpc,位于net.cpp中:

    static const CRPCCommand commands[] =

    { //  category              name                      actor (function)        argNames

      //  --------------------- ------------------------  -----------------------  ----------

        { "network",            "getconnectioncount",    &getconnectioncount,    {} },

        { "network",            "ping",                  &ping,                  {} },

        { "network",            "getpeerinfo",            &getpeerinfo,            {} },

        { "network",            "addnode",                &addnode,                {"node","command"} },

        { "network",            "disconnectnode",        &disconnectnode,        {"address", "nodeid"} },

        { "network",            "getaddednodeinfo",      &getaddednodeinfo,      {"node"} },

        { "network",            "getnettotals",          &getnettotals,          {} },

        { "network",            "getnetworkinfo",        &getnetworkinfo,        {} },

        { "network",            "setban",                &setban,                {"subnet", "command", "bantime", "absolute"} },

        { "network",            "listbanned",            &listbanned,            {} },

        { "network",            "clearbanned",            &clearbanned,            {} },

        { "network",            "setnetworkactive",      &setnetworkactive,      {"state"} },

        (3) 挖矿相关的rpc,位于mining.cpp中:

    static const CRPCCommand commands[] =

    { //  category              name                      actor (function)        argNames

      //  --------------------- ------------------------  -----------------------  ----------

        { "mining",            "getnetworkhashps",      &getnetworkhashps,      {"nblocks","height"} },

        { "mining",            "getmininginfo",          &getmininginfo,          {} },

        { "mining",            "prioritisetransaction",  &prioritisetransaction,  {"txid","dummy","fee_delta"} },

        { "mining",            "getblocktemplate",      &getblocktemplate,      {"template_request"} },

        { "mining",            "submitblock",            &submitblock,            {"hexdata","dummy"} },

        { "generating",        "generatetoaddress",      &generatetoaddress,      {"nblocks","address","maxtries"} },

        { "hidden",            "estimatefee",            &estimatefee,            {} },

        { "util",              "estimatesmartfee",      &estimatesmartfee,      {"conf_target", "estimate_mode"} },

        { "hidden",            "estimaterawfee",        &estimaterawfee,        {"conf_target", "threshold"} },

    };

        (4) 比特币交易相关rpc,位于rawtransaction.cpp中:

    static const CRPCCommand commands[] =

    { //  category              name                            actor (function)            argNames

      //  --------------------- ------------------------        -----------------------    ----------

        { "rawtransactions",    "getrawtransaction",            &getrawtransaction,        {"txid","verbose","blockhash"} },

        { "rawtransactions",    "createrawtransaction",        &createrawtransaction,      {"inputs","outputs","locktime","replaceable"} },

        { "rawtransactions",    "decoderawtransaction",        &decoderawtransaction,      {"hexstring","iswitness"} },

        { "rawtransactions",    "decodescript",                &decodescript,              {"hexstring"} },

        { "rawtransactions",    "sendrawtransaction",          &sendrawtransaction,        {"hexstring","allowhighfees"} },

        { "rawtransactions",    "combinerawtransaction",        &combinerawtransaction,    {"txs"} },

        { "rawtransactions",    "signrawtransaction",          &signrawtransaction,        {"hexstring","prevtxs","privkeys","sighashtype"} }, /* uses wallet if enabled */

        { "rawtransactions",    "signrawtransactionwithkey",    &signrawtransactionwithkey, {"hexstring","privkeys","prevtxs","sighashtype"} },

        { "rawtransactions",    "testmempoolaccept",            &testmempoolaccept,        {"rawtxs","allowhighfees"} },

        { "blockchain",        "gettxoutproof",                &gettxoutproof,            {"txids", "blockhash"} },

        { "blockchain",        "verifytxoutproof",            &verifytxoutproof,          {"proof"} },

    };

        当注册完以后,如果用户启用了-server选项,将会调用AppInitServers创建Http服务器。

    AppInitServers实现如下:

    static bool AppInitServers()

    {

        RPCServer::OnStarted(&OnRPCStarted);

        RPCServer::OnStopped(&OnRPCStopped);

        if (!InitHTTPServer())

            return false;

        if (!StartRPC())

            return false;

        if (!StartHTTPRPC())

            return false;

        if (gArgs.GetBoolArg("-rest", DEFAULT_REST_ENABLE) && !StartREST())

            return false;

        if (!StartHTTPServer())

            return false;

        return true;

    }

        这里按步骤一步一步的来。首先是调用InitHTTPServer,使用libevent api来建立http服务器,这里截取主要代码来看看,位于httpserver.cpp文件:

        raii_event_base base_ctr = obtain_event_base();

        /* Create a new evhttp object to handle requests. */

        raii_evhttp http_ctr = obtain_evhttp(base_ctr.get());

        struct evhttp* http = http_ctr.get();

        if (!http) {

            LogPrintf("couldn't create evhttp. Exiting.\n");

            return false;

        }

        evhttp_set_timeout(http, gArgs.GetArg("-rpcservertimeout", DEFAULT_HTTP_SERVER_TIMEOUT));

        evhttp_set_max_headers_size(http, MAX_HEADERS_SIZE);

        evhttp_set_max_body_size(http, MAX_SIZE);

        evhttp_set_gencb(http, http_request_cb, nullptr);

        if (!HTTPBindAddresses(http)) {

            LogPrintf("Unable to bind any endpoint for RPC server\n");

            return false;

        }

        LogPrint(BCLog::HTTP, "Initialized HTTP server\n");

        int workQueueDepth = std::max((long)gArgs.GetArg("-rpcworkqueue", DEFAULT_HTTP_WORKQUEUE), 1L);

        LogPrintf("HTTP: creating work queue of depth %d\n", workQueueDepth);

        workQueue = new WorkQueue(workQueueDepth);

        // transfer ownership to eventBase/HTTP via .release()

        eventBase = base_ctr.release();

        eventHTTP = http_ctr.release();

        这里的套路和3.4节中用libevent建立http服务器的步骤基本一样,注意两点:

        (1) 用evhttp_set_gencb设置了http请求的处理函数:http_request_cb;

        (2) 创建了一个工作队列,队列里的元素类型HTTPClosure,这是一个函数对象接口类,重写了函数调用操作符,HttpWorkItem实现了此接口

    http的请求处理:

    当我们收到一个http请求后。使用函数http_request_cb回调,主要代码如下

       // Find registered handler for prefix

        std::string strURI = hreq->GetURI();

        std::string path;

        std::vector::const_iterator i = pathHandlers.begin();

        std::vector::const_iterator iend = pathHandlers.end();

        for (; i != iend; ++i) {

            bool match = false;

            if (i->exactMatch)

                match = (strURI == i->prefix);

            else

                match = (strURI.substr(0, i->prefix.size()) == i->prefix);

            if (match) {

                path = strURI.substr(i->prefix.size());

                break;

            }

        }

        // Dispatch to worker thread

        if (i != iend) {

            std::unique_ptr item(new HTTPWorkItem(std::move(hreq), path, i->handler));

            assert(workQueue);

            if (workQueue->Enqueue(item.get()))

                item.release(); /* if true, queue took ownership */

            else {

                LogPrintf("WARNING: request rejected because http work queue depth exceeded, it can be increased with the -rpcworkqueue= setting\n");

                item->req->WriteReply(HTTP_INTERNAL, "Work queue depth exceeded");

            }

        } else {

            hreq->WriteReply(HTTP_NOTFOUND);

        }

        用一句句话来概括这个函数的作用就是:将请求的url的path部分与注册过的前缀进行匹配,并生成HttpWorkItem放入到工作队列中。目前注册了两个前缀:/和/wallet/,代码在StartHttpRPC中:

    bool StartHTTPRPC()

    {

        LogPrint(BCLog::RPC, "Starting HTTP RPC server\n");

        if (!InitRPCAuthentication())

            return false;

        RegisterHTTPHandler("/", true, HTTPReq_JSONRPC);

    #ifdef ENABLE_WALLET

        // ifdef can be removed once we switch to better endpoint support and API versioning

        RegisterHTTPHandler("/wallet/", false, HTTPReq_JSONRPC);

    #endif

        assert(EventBase());

        httpRPCTimerInterface = MakeUnique(EventBase());

        RPCSetTimerInterface(httpRPCTimerInterface.get());

        return true;

    }

        两个前缀/和/wallet/对应的回调处理函数均为HttpReq_JSONRPC。

        之后调用StartHttpServer让工作队列运行起来:

    bool StartHTTPServer()

    {

        LogPrint(BCLog::HTTP, "Starting HTTP server\n");

        int rpcThreads = std::max((long)gArgs.GetArg("-rpcthreads", DEFAULT_HTTP_THREADS), 1L);

        LogPrintf("HTTP: starting %d worker threads\n", rpcThreads);

        std::packaged_task task(ThreadHTTP);

        threadResult = task.get_future();

        threadHTTP = std::thread(std::move(task), eventBase, eventHTTP);

        for (int i = 0; i < rpcThreads; i++) {

            g_thread_http_workers.emplace_back(HTTPWorkQueueRun, workQueue);

        }

        return true;

    }

        最终会调用到工作队列的run方法:

    void Run()

        {

            while (true) {

                std::unique_ptr i;

                {

                    std::unique_lock lock(cs);

                    while (running && queue.empty())

                        cond.wait(lock);

                    if (!running)

                        break;

                    i = std::move(queue.front());

                    queue.pop_front();

                }

                (*i)();

            }

        }

        很简单,工作队列为空的时候线程阻塞等待,当收到http请求以后,解析请求并添加HttpWorkItem到队列中并唤醒线程,线程从队列头部取出一个item运行。最终将执行HttpReq_JSONRPC这个回调,这里会将JSONRPC中的rpc方法分发到服务端不同的方法中,来看看其处理:

     (1) 请求合法性检查及认证

        首先检查请求是否合法,http头部中的auchoization是否合法:

    static bool HTTPReq_JSONRPC(HTTPRequest* req, const std::string &)

    {

        // JSONRPC handles only POST

        if (req->GetRequestMethod() != HTTPRequest::POST) {

            req->WriteReply(HTTP_BAD_METHOD, "JSONRPC server handles only POST requests");

            return false;

        }

        // Check authorization

        std::pair authHeader = req->GetHeader("authorization");

        if (!authHeader.first) {

            req->WriteHeader("WWW-Authenticate", WWW_AUTH_HEADER_DATA);

            req->WriteReply(HTTP_UNAUTHORIZED);

            return false;

        }

        JSONRPCRequest jreq;

        jreq.peerAddr = req->GetPeer().ToString();

        if (!RPCAuthorized(authHeader.second, jreq.authUser)) {

            LogPrintf("ThreadRPCServer incorrect password attempt from %s\n", jreq.peerAddr);

            /* Deter brute-forcing

              If this results in a DoS the user really

              shouldn't have their RPC port exposed. */

            MilliSleep(250);

            req->WriteHeader("WWW-Authenticate", WWW_AUTH_HEADER_DATA);

            req->WriteReply(HTTP_UNAUTHORIZED);

            return false;

        }

    可以看到,比特币的json rpc服务只支持POST。

        (2) 读取http请求数据,将rpc请求分发到不同的函数

        try {

            // Parse request

            UniValue valRequest;

            if (!valRequest.read(req->ReadBody()))

                throw JSONRPCError(RPC_PARSE_ERROR, "Parse error");

            // Set the URI

            jreq.URI = req->GetURI();

            std::string strReply;

            // singleton request

            if (valRequest.isObject()) {

                jreq.parse(valRequest);

                UniValue result = tableRPC.execute(jreq);

                // Send reply

                strReply = JSONRPCReply(result, NullUniValue, jreq.id);

            // array of requests

            } else if (valRequest.isArray())

                strReply = JSONRPCExecBatch(jreq, valRequest.get_array());

            else

                throw JSONRPCError(RPC_PARSE_ERROR, "Top-level object parse error");

            req->WriteHeader("Content-Type", "application/json");

            req->WriteReply(HTTP_OK, strReply);

        如果收到的是单个json,则tableRPC.execute执行,否则如果收到的是以数组形式的批量rpc请求,则批量执行,批量执行最终也是走tableRPC.execute()来分发,execute()执行后的结果将写入到http响应包中:

    UniValue CRPCTable::execute(const JSONRPCRequest &request) const

    {

        // Return immediately if in warmup

        {

            LOCK(cs_rpcWarmup);

            if (fRPCInWarmup)

                throw JSONRPCError(RPC_IN_WARMUP, rpcWarmupStatus);

        }

        // Find method

        const CRPCCommand *pcmd = tableRPC[request.strMethod];

        if (!pcmd)

            throw JSONRPCError(RPC_METHOD_NOT_FOUND, "Method not found");

        g_rpcSignals.PreCommand(*pcmd);

        try

        {

            // Execute, convert arguments to array if necessary

            if (request.params.isObject()) {

                return pcmd->actor(transformNamedArguments(request, pcmd->argNames));

            } else {

                return pcmd->actor(request);

            }

        }

        catch (const std::exception& e)

        {

            throw JSONRPCError(RPC_MISC_ERROR, e.what());

        }

    }

        代码也比较容易理解,就是从根据json-rpc协议,从请求中读取method,然后根据method找到对应的CRPCCommand执行体,这些执行体就是4.2.1节中提到那几张分门别类的映射表。

        至此,比特币的json-rpc服务端的脉络我们就梳理的差不多了,整体框架并不难理解,只是封装的略微复杂一点点。

    相关文章

      网友评论

          本文标题:2018-06-30

          本文链接:https://www.haomeiwen.com/subject/ymjxuftx.html