美文网首页
brpc之rpc流程分析(上)

brpc之rpc流程分析(上)

作者: fooboo | 来源:发表于2019-10-18 21:15 被阅读0次

    之前关于brpc的几篇分析:
    brpc之mutex源码分析
    brpc之ResourcePool源码分析
    brpc之bthread_id源码分析
    brpc之bthread源码浅淅(一)
    brpc之bthread源码浅淅(二)

    关于协程的实现分析:
    协程库基础知识
    Phxrpc协程库实现
    Pebble协程库实现
    Libco协程库实现

    这一篇分析下客户端使用rpc的流程,从简单的echo_c++开始,从client.cpp中main起,可以熟悉下brpc::Channel的使用和设计原理。与protobuf有关联,早之前也用过和分析过[百度文件系统bfs源码分析系列],但这块目前掌握的不是很深入。

    对于demo中的协议:

      6 message EchoRequest {
      7       required string message = 1;
      8 };
     10 message EchoResponse {
     11       required string message = 1;
     12 };
     14 service EchoService {
     15       rpc Echo(EchoRequest) returns (EchoResponse);
     16 };
    

    转换后的结果部分如下:

     67 class EchoRequest : public ::google::protobuf::Message /* @@protoc_insertion_point(class_definit    ion:example.EchoRequest) */ {
     68  public:
     69   EchoRequest();
     70   virtual ~EchoRequest();
     156   // required string message = 1;
    157   bool has_message() const;
    158   void clear_message();
    159   static const int kMessageFieldNumber = 1;
    160   const ::std::string& message() const;
    161   void set_message(const ::std::string& value);
    182 };
    
    185 class EchoResponse : public ::google::protobuf::Message /* @@protoc_insertion_point(class_defini    tion:example.EchoResponse) */ {
    186  public:
    187   EchoResponse();
    188   virtual ~EchoResponse();
    274   // required string message = 1;
    275   bool has_message() const;
    276   void clear_message();
    277   static const int kMessageFieldNumber = 1;
    278   const ::std::string& message() const;
    279   void set_message(const ::std::string& value);
    300 };
    
    338 class EchoService_Stub : public EchoService {
    339  public:
    340   EchoService_Stub(::google::protobuf::RpcChannel* channel);
    343   ~EchoService_Stub();
    348 
    349   void Echo(::google::protobuf::RpcController* controller,
    350                        const ::example::EchoRequest* request,
    351                        ::example::EchoResponse* response,
    352                        ::google::protobuf::Closure* done);
    353  private:
    354   ::google::protobuf::RpcChannel* channel_;
    357 };
    

    正常运行结果


    图一

    把server kill掉client这边显示


    图二

    因为不太方便测试集群的用例就暂时跳过,也可以通过web相应的ip/port查看各种后台数据。

    client.cpp中main函数部分代码:

     35 int main(int argc, char* argv[]) {
     37     GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);
     38                      
     39     // A Channel represents a communication line to a Server. Notice that 
     40     // Channel is thread-safe and can be shared by all threads in your program.
     41     brpc::Channel channel;
     42     
     44     brpc::ChannelOptions options;
     45     //more code...
     49     if (channel.Init(FLAGS_server.c_str(), FLAGS_load_balancer.c_str(), &options) != 0) {
     51         return -1;
     52     }
     56     //more code...
     91 }
    
     36 class ChannelBase : public google::protobuf::RpcChannel/*non-copyable*/,
     37                     public Describable {
     38 public:
     44     virtual int CheckHealth() = 0;
     45 };
    
    149 class Channel : public ChannelBase {
    150     //more code...
    229     butil::intrusive_ptr<SharedLoadBalancer> _lb;
    232 };
    

    这里为了分析某些功能和简化另外的一些功能,假设是以集群的方式请求rpc。以上列出的Channel类部分实现,其他的几个是关于打包序列化请求的接口,包括初始化等实现,比较简单。
    以集群方式的话那_server_id=INVALID_SOCKET_ID

    320 int Channel::Init(const char* ns_url,
    321                   const char* lb_name,
    322                   const ChannelOptions* options) {
    323     //more code...
    337     if (InitChannelOptions(options) != 0);
    338     LoadBalancerWithNaming* lb = new (std::nothrow) LoadBalancerWithNaming;
    339     //more code...
    350     if (lb->Init(ns_url, lb_name, _options.ns_filter, &ns_opt) != 0) {
    352         delete lb;
    353         return -1;
    354     }
    355     _lb.reset(lb);
    356     return 0;
    357 }
    
    146 int Channel::InitChannelOptions(const ChannelOptions* options) {
    147     if (options) {  // Override default options if user provided one.
    148         _options = *options;
    149     }
    150     const Protocol* protocol = FindProtocol(_options.protocol);
    151     //check protocol
    155     _serialize_request = protocol->serialize_request;
    156     _pack_request = protocol->pack_request;
    157     _get_method_name = protocol->get_method_name;
    203     return 0;
    204 }
    

    在options.proto定义了一些协议以及在policy/directory中定义相应的接口实现,考虑protocol==PROTOCOL_BAIDU_STD。这里会用protocol中的接口设置channel中的,而前者在global中注册:

    383     // Protocols
    384     Protocol baidu_protocol = { ParseRpcMessage,
    385                                 SerializeRequestDefault, PackRpcRequest,
    386                                 ProcessRpcRequest, ProcessRpcResponse,
    387                                 VerifyRpcRequest, NULL, NULL,
    388                                 CONNECTION_TYPE_ALL, "baidu_std" };
    389     if (RegisterProtocol(PROTOCOL_BAIDU_STD, baidu_protocol) != 0) {
    390         exit(1);
    391     }
    
     30 class LoadBalancerWithNaming : public SharedLoadBalancer,
     31                                public NamingServiceWatcher {
     32 public:
     44     //more code...
     45 private:
     46     butil::intrusive_ptr<NamingServiceThread> _nsthread_ptr;
     47 };
    
    113 // A intrusively shareable load balancer created from name.
    114 class SharedLoadBalancer : public SharedObject, public NonConstDescribable {
    115 public:
    116     //more code...
    175     LoadBalancer* _lb;
    180 };
    
     34 // Inherit this class to observer NamingService changes.
     38 class NamingServiceWatcher {
     39 public:
     40     virtual ~NamingServiceWatcher() {}
     41     virtual void OnAddedServers(const std::vector<ServerId>& servers) = 0;
     42     virtual void OnRemovedServers(const std::vector<ServerId>& servers) = 0;
     43 };
    
     56 // A dedicated thread to map a name to ServerIds
     57 class NamingServiceThread : public SharedObject, public Describable {
    123 };
    

    有些不是重要的组件暂时不在这里说明,反正知道实现原理和调用过程即可。

     31 int LoadBalancerWithNaming::Init(const char* ns_url, const char* lb_name,
     32                                  const NamingServiceFilter* filter,
     33                                  const GetNamingServiceThreadOptions* options) {
     34     if (SharedLoadBalancer::Init(lb_name) != 0) { 
     35         return -1;
     36     }
     37     if (GetNamingServiceThread(&_nsthread_ptr, ns_url, options) != 0) {
     39         return -1;
     40     }
     41     if (_nsthread_ptr->AddWatcher(this, filter) != 0) {
     43         return -1;
     44     }
     45     return 0;
     46 }
    

    上面功能是根据负载均衡名称new一个实例出来;并根据url解析出protocol,比如bns,来创建一个名字服务并Start

    411 int GetNamingServiceThread(
    412     butil::intrusive_ptr<NamingServiceThread>* nsthread_out,
    413     const char* url,
    414     const GetNamingServiceThreadOptions* options) {
    412     //more code...
    459         if (ptr == NULL) {
    460             NamingServiceThread* thr = new (std::nothrow) NamingServiceThread;
    461             //more code...
    468             new_thread = true;
    469         }
    470     }
    471     if (new_thread) {
    472         if (nsthread->Start(source_ns->New(), key.protocol, key.service_name, options) != 0) {
    474             std::unique_lock<pthread_mutex_t> mu(g_nsthread_map_mutex);
    475             g_nsthread_map->erase(key);
    476             return -1;
    477         }
    478     } else {
    479         if (nsthread->WaitForFirstBatchOfServers() != 0) {
    480             return -1;
    481         }
    482     }
    483     nsthread_out->swap(nsthread);
    484     return 0;
    485 }
    
     45 // Mapping a name to ServerNodes.
     46 class NamingService : public Describable, public Destroyable {
     54     virtual int RunNamingService(const char* service_name,
     55                                  NamingServiceActions* actions) = 0;
     62     virtual bool RunNamingServiceReturnsQuickly() { return false; }
     66     virtual NamingService* New() const = 0;
     68 protected:
     69     virtual ~NamingService() {}
     70 };
    
     28 class PeriodicNamingService : public NamingService {
     29 protected:
     30     virtual int GetServers(const char *service_name,
     31                            std::vector<ServerNode>* servers) = 0;
     32   
     33     int RunNamingService(const char* service_name,
     34                          NamingServiceActions* actions);
     35 };
    
     28 // Acquire server list from Baidu-Naming-Service, aka BNS
     29 class BaiduNamingService : public PeriodicNamingService {
     30 public:
     31     // You can specify port by appending port selector:
     32     // e.g.: bns://DPOP-inner-API-inner-API.jpaas.hosts:main
     33     //                                                 ^^^^^
     34     int GetServers(const char *service_name,
     35                    std::vector<ServerNode>* servers);
     36     
     37     void Describe(std::ostream& os, const DescribeOptions&) const;
     39     NamingService* New() const;
     41     void Destroy();
     42 }; 
    
    275 int NamingServiceThread::Start(NamingService* naming_service,
    276                                const std::string& protocol,
    277                                const std::string& service_name,
    278                                const GetNamingServiceThreadOptions* opt_in) {
    279     if (naming_service == NULL) {
    281         return -1;
    282     }
    283     _ns = naming_service;
    284     _protocol = protocol;
    285     _service_name = service_name;
    286     if (opt_in) {
    287         _options = *opt_in;
    288     }       
    289     _last_sockets.clear();
    290     if (_ns->RunNamingServiceReturnsQuickly()) {
    291         RunThis(this);
    292     } else {
    293         int rc = bthread_start_urgent(&_tid, NULL, RunThis, this);
    294         if (rc) {
    296             return -1;
    297         }
    298     }   
    299     return WaitForFirstBatchOfServers(); 
    300 }
    

    对于nameservice的启动,这里不再贴过多的实现代码,大致流程为NamingServiceThread::Start->NamingServiceThread::RunThis->NamingServiceThread::Run->PeriodicNamingService::RunNamingService,其中在PeriodicNamingService::RunNamingService中又根据具体的不同名字如bns,会调用virtual GetServers比如bns中:

     28 int BaiduNamingService::GetServers(const char *service_name,
     29                                    std::vector<ServerNode>* servers) {
     30     servers->clear();
     31     BnsInput input;
     32     input.set_service_name(service_name);
     33     BnsOutput output;
     34     const int rc = webfoot::get_instance_by_service(input, &output);
     35     if (rc != webfoot::WEBFOOT_RET_SUCCESS) {
     36         //more code...
     44     }
     45     const int instance_number = output.instance_size();
     46     if (instance_number == 0) {
     48         return 0;
     49     }
     50     for (int i = 0; i < instance_number; i++) {
     51         const BnsInstance& instance = output.instance(i);
     52         if (instance.status() == 0) {
     53             butil::ip_t ip;
     54             if (butil::str2ip(instance.host_ip().c_str(), &ip) != 0) {
     56                 continue;
     57             }
     58             servers->push_back(ServerNode(ip, instance.port(), instance.tag()));
     59         }
     60     }
     61     return 0;
     62 }
    

    上面的协议没有找到,从实现中来看,估计根据某个域名拉取一些可用的ip列表类似的功能。比如和BaiduNamingService一样的结构的FileNamingService,会从本地文件中加载可用的ip列表,并监视该文件是否有变动,有变动重新load什么的。
    借用命名服务上的一张图

    图四

    Channel初始化完毕后,接着下面的demo代码:

     54     // Normally, you should not call a Channel directly, but instead construct
     55     // a stub Service wrapping it. stub can be shared by all threads as well.
     56     example::EchoService_Stub stub(&channel);
     58     // Send a request and wait for the response every 1 second.
     59     int log_id = 0;
     60     while (!brpc::IsAskedToQuit()) {
     61         // We will receive response synchronously, safe to put variables
     62         // on stack.
     63         example::EchoRequest request;
     64         example::EchoResponse response;
     65         brpc::Controller cntl;
     66 
     67         request.set_message("hello world");
     68 
     69         cntl.set_log_id(log_id ++);  // set by user
     70         // Set attachment which is wired to network directly instead of 
     71         // being serialized into protobuf messages.
     72         cntl.request_attachment().append(FLAGS_attachment);
     73 
     74         // Because `done'(last parameter) is NULL, this function waits until
     75         // the response comes back or error occurs(including timedout).
     76         stub.Echo(&cntl, &request, &response, NULL);
     87     }
     90     return 0;
    

    其中Controller作用如下面的注释:

    106 // A Controller mediates a single method call. The primary purpose of
    107 // the controller is to provide a way to manipulate settings per RPC-call 
    108 // and to find out about RPC-level errors.
    109 class Controller : public google::protobuf::RpcController/*non-copyable*/ {
    761 };
    

    由于该类实现过于复杂,这里不再贴相关代码。这里是同步call所以logid没啥用,在ParallelChannel中会有用,接着stub.Echo(&cntl, &request, &response, NULL)

    759 void EchoService_Stub::Echo(::google::protobuf::RpcController* controller,
    760                               const ::example::EchoRequest* request,
    761                               ::example::EchoResponse* response,
    762                               ::google::protobuf::Closure* done) {
    763   channel_->CallMethod(descriptor()->method(0),
    764                        controller, request, response, done);
    765 }
    
    369 void Channel::CallMethod(const google::protobuf::MethodDescriptor* method, .../*同上四个参数*/) {
    374     const int64_t start_send_real_us = butil::gettimeofday_us();
    375     Controller* cntl = static_cast<Controller*>(controller_base);
    376     cntl->OnRPCBegin(start_send_real_us);
    377     //more code...
    397     const CallId correlation_id = cntl->call_id();
    398     const int rc = bthread_id_lock_and_reset_range(
    399                     correlation_id, NULL, 2 + cntl->max_retry());
    400     //more code...
    456     cntl->_response = response;
    457     cntl->_done = done;
    458     cntl->_pack_request = _pack_request;
    459     cntl->_method = method;
    460     cntl->_auth = _options.auth;
    468     cntl->_lb = _lb;
    482     _serialize_request(&cntl->_request_buf, cntl, request);
    493     if (cntl->backup_request_ms() >= 0 &&
    494         (cntl->backup_request_ms() < cntl->timeout_ms() ||
    495          cntl->timeout_ms() < 0)) {
    496         // Setup timer for backup request. When it occurs, we'll setup a
    497         // timer of timeout_ms before sending backup request.
    498 
    499        // Send another request if RPC does not finish after so many milliseconds
    500        // The request will be sent to a different server by best effort.
    501         //more code...
    515     } else if (cntl->timeout_ms() >= 0) {
    516         // Setup timer for RPC timetout
    517 
    518         // _deadline_us is for truncating _connect_timeout_ms
    519         cntl->_deadline_us = cntl->timeout_ms() * 1000L + start_send_real_us;
    520         const int rc = bthread_timer_add(
    521             &cntl->_timeout_id,
    522             butil::microseconds_to_timespec(cntl->_deadline_us),
    523             HandleTimeout, (void*)correlation_id.value);
    524         if (BAIDU_UNLIKELY(rc != 0)) {
    526             return cntl->HandleSendFailed();
    527         }
    528     } else {
    529         cntl->_deadline_us = -1;
    530     }
    531 
    532     cntl->IssueRPC(start_send_real_us);
    533     if (done == NULL) {
    534         // MUST wait for response when sending synchronous RPC. It will
    535         // be woken up by callback when RPC finishes (succeeds or still
    536         // fails after retry)
    537         Join(correlation_id);
    538         if (cntl->_span) {
    539             cntl->SubmitSpan();
    540         }
    541         cntl->OnRPCEnd(butil::gettimeofday_us());
    542     }
    543 }
    

    上面列出了一些关键实现的CallMethod代码,CallMethod作为Channel继承RpcChannel是需要重新定义该接口,主要用于一些控制类的功能。
    上面会设置rpc的开始时间,设置选项参数(代码跳过),创建一个callid,这个实现后面再分析bthread_id.md,这其中会设置channel:: HandleTimeout[对应Channel上所有RPC的总超时,brpc中的超时是deadline,超过就意味着RPC结束,超时后没有重试。]:

    359 static void HandleTimeout(void* arg) {
    360     bthread_id_t correlation_id = { (uint64_t)arg };
    362     bthread_id_error(correlation_id, ERPCTIMEDOUT);
    363 }
    

    其中对于一些错误判断会进行HandleSocketFailed

    1179 int Controller::HandleSocketFailed(bthread_id_t id, void* data, int error_code,
    1180                                    const std::string& error_text) {
    1181     Controller* cntl = static_cast<Controller*>(data);
    1182     if (!cntl->is_used_by_rpc()) {
    1183         // Cannot destroy the call_id before RPC otherwise an async RPC
    1184         // using the controller cannot be joined and related resources may be
    1185         // destroyed before done->Run() running in another bthread.
    1186         // The error set will be detected in Channel::CallMethod and fail
    1187         // the RPC.
    1190         return bthread_id_unlock(id);
    1191     }
    1192     const int saved_error = cntl->ErrorCode();
    1193     if (error_code == ERPCTIMEDOUT) {
    1197     } else if (error_code == EBACKUPREQUEST) {
    1201     } else if (!error_text.empty()) {
    1203     } else {
    1204         //more code...
    1206     }
    1207     CompletionInfo info = { id, false };
    1208     cntl->OnVersionedRPCReturned(info, true, saved_error);
    1209     return 0;
    1210 }
    

    OnVersionedRPCReturned后面再分析。接着行456〜460设置Controller* cntl一些数据成员,并序列化请求至_request_buf,并设置定时器,这里只考虑timeout_ms,超时后回调(是立即执行还是等待后再执行)

    359 static void HandleTimeout(void* arg) {
    360     bthread_id_t correlation_id = { (uint64_t)arg };
    361     bthread_id_error(correlation_id, ERPCTIMEDOUT);
    362 }
    

    里面的回调最终回到上面的HandleSocketFailed,即error_codeERPCTIMEDOUT。之后开始发起rpc请求IssueRPC,如果是同步即done == NULL需要等待(先跳过具体的分析):

     529 void Join(CallId id) {  
     530     bthread_id_join(id);
     531 }
    
     967 void Controller::IssueRPC(int64_t start_realtime_us) {
     968     _current_call.begin_time_us = start_realtime_us;
     969     // Clear last error, Don't clear _error_text because we append to it.
     970     _error_code = 0;
     971    
     972     // Make versioned correlation_id.
     973     // call_id         : unversioned, mainly for ECANCELED and ERPCTIMEDOUT
     974     // call_id + 1     : first try.
     975     // call_id + 2     : retry 1
     976     // ...
     977     // call_id + N + 1 : retry N
     978     // All ids except call_id are versioned. Say if we've sent retry 1 and
     979     // a failed response of first try comes back, it will be ignored.
     980     const CallId cid = current_id();
     992     // Pick a target server for sending RPC
     993     _current_call.need_feedback = false;
     994     _current_call.enable_circuit_breaker = has_enabled_circuit_breaker();
     996     if (SingleServer()) {
     997         //more code...
    1007     } else {
    1008         LoadBalancer::SelectIn sel_in =
    1009             { start_realtime_us, true,
    1010               has_request_code(), _request_code, _accessed };
    1011         LoadBalancer::SelectOut sel_out(&tmp_sock);
    1012         const int rc = _lb->SelectServer(sel_in, &sel_out);
    1013         if (rc != 0) {
    1014             //more code...
    1020         }
    1021         _current_call.need_feedback = sel_out.need_feedback;
    1022         _current_call.peer_id = tmp_sock->id();
    1027         _remote_side = tmp_sock->remote_side();
    1028     }
    
    1113     // Make request
    1114     butil::IOBuf packet;
    1115     SocketMessage* user_packet = NULL;
    1116     _pack_request(&packet, &user_packet, cid.value, _method, this,
    1117                   _request_buf, using_auth);//baidu_protocol:PackRpcRequest
    
    1142     Socket::WriteOptions wopt;
    1143     wopt.id_wait = cid;
    1144     wopt.abstime = pabstime;
    1145     wopt.pipelined_count = _pipelined_count;
    1146     wopt.with_auth = has_flag(FLAGS_REQUEST_WITH_AUTH);
    1147     wopt.ignore_eovercrowded = has_flag(FLAGS_IGNORE_EOVERCROWDED);
    1148     int rc;
    1149     size_t packet_size = 0;
    1150     if (user_packet_guard) {
    1151         //
    1155     } else {
    1156         packet_size = packet.size();
    1157         rc = _current_call.sending_sock->Write(&packet, &wopt);
    1158     }
    1175 }
    

    上面的逻辑为正式发请求,省略了一些判断,在发之前,会处理一些逻辑,比如如果是集群则选择一个server作为目的server,根据连接类型选择某个socket,接着_pack_request,并Write数据。

    当收到响应时,由于这里使用的是PROTOCOL_BAIDU_STD

    544 void ProcessRpcResponse(InputMessageBase* msg_base) {
    545     const int64_t start_parse_us = butil::cpuwide_time_us();
    546     DestroyingPtr<MostCommonMessage> msg(static_cast<MostCommonMessage*>(msg_base));
    547     RpcMeta meta;
    548     if (!ParsePbFromIOBuf(&meta, msg->meta)) {
    550         return;
    551     }
    552 
    553     const bthread_id_t cid = { static_cast<uint64_t>(meta.correlation_id()) };
    555     Controller* cntl = NULL;
    556     const int rc = bthread_id_lock(cid, (void**)&cntl);
    557     if (rc != 0) {
    558         //more code...
    563         return;
    564     }
    620     accessor.OnResponse(cid, saved_error);
    621 }
    
     46     void OnResponse(CallId id, int saved_error) {
     47         const Controller::CompletionInfo info = { id, true };
     48         _cntl->OnVersionedRPCReturned(info, false, saved_error);
     49     }
    

    以上会尝试解析消息并回调OnVersionedRPCReturned,中间的一些错误处理情况跳过。

     542 void Controller::OnVersionedRPCReturned(const CompletionInfo& info,
     543                                         bool new_bthread, int saved_error) {
     546     // TODO(gejun): Simplify call-ending code.
     547     // Intercept previous calls
     548     while (info.id != _correlation_id && info.id != current_id()) {
     549         //more code...
     564         return;
     565     }
     566 
     567     if ((!_error_code && _retry_policy == NULL) ||
     568         _current_call.nretry >= _max_retry) {
     570         goto END_OF_RPC;
     571     }
    
     572     if (_error_code == EBACKUPREQUEST) {
     607     } else if (_retry_policy ? _retry_policy->DoRetry(this)
     608                : DefaultRetryPolicy()->DoRetry(this)) {
     610         // The error must come from _current_call because:
     611         //  * we intercepted error from _unfinished_call in OnVersionedRPCReturned
     612         //  * ERPCTIMEDOUT/ECANCELED are not retrying error by default.
     613         CHECK_EQ(current_id(), info.id) << "error_code=" << _error_code;
     614         if (!SingleServer()) {
     615             if (_accessed == NULL) {
     616                 _accessed = ExcludedServers::Create(
     617                     std::min(_max_retry, RETRY_AVOIDANCE));
     618                 if (NULL == _accessed) {
     619                     SetFailed(ENOMEM, "Fail to create ExcludedServers");
     620                     goto END_OF_RPC;
     621                 }
     622             }
     623             _accessed->Add(_current_call.peer_id);
     624         }
     625         _current_call.OnComplete(this, _error_code, info.responded, false);
     626         ++_current_call.nretry;
     627         // Clear http responses before retrying, otherwise the response may
     628         // be mixed with older (and undefined) stuff. This is actually not
     629         // done before r32008.
     630         if (_http_response) {
     631             _http_response->Clear();
     632         }
     633         response_attachment().clear();
     634         return IssueRPC(butil::gettimeofday_us());
     635     }
     637 END_OF_RPC:
     638     if (new_bthread) {
     664         if ((FLAGS_usercode_in_pthread || _done != NULL/*Note[_done]*/) &&
     665             !has_flag(FLAGS_DESTROY_CID_IN_DONE)/*Note[cid]*/) {
     666             bthread_id_about_to_destroy(info.id);
     667         }
     668         // No need to join this bthread since RPC caller won't wake up
     669         // (or user's done won't be called) until this bthread finishes
     670         bthread_t bt;
     671         bthread_attr_t attr = (FLAGS_usercode_in_pthread ?
     672                                BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL);
     673         _tmp_completion_info = info;
     674         if (bthread_start_background(&bt, &attr, RunEndRPC, this) != 0) {
     676             EndRPC(info);
     677         }
     678     } else {
     679         if (_done != NULL/*Note[_done]*/ &&
     680             !has_flag(FLAGS_DESTROY_CID_IN_DONE)/*Note[cid]*/) {
     681             bthread_id_about_to_destroy(info.id);
     682         }
     683         EndRPC(info);
     684     }
     685 }
    

    以上当某个retry请求回来时,可能要中断之前的请求,这部分代码没贴上来;判断是否还有retry次数和是否retryDoRetry;对请求失败的情况下,若是集群情况,下一次rpc请求会避免上一次的选择,完成本次请求OnComplete,并增加retry次数,并再进行发起IssueRPC,这里列一下常见的retry的情况brpc的错误码

    ETIMEDOUT:连接超时
    EAGAIN:同时发送的请求过多。软限,很少出现。
    EFAILEDSOCKET:RPC进行过程中TCP连接出现问题
    ELIMIT:同时处理的请求数超过ServerOptions.max_concurrency了 
    
     709 void Controller::Call::OnComplete(
     710         Controller* c, int error_code/*note*/, bool responded, bool end_of_rpc) {
     711     //more code...
     793     if (need_feedback) {
     794         const LoadBalancer::CallInfo info =
     795             { begin_time_us, peer_id, error_code, c };
     796         c->_lb->Feedback(info);
     797     }
     798 
     799     // Release the `Socket' we used to send/receive data
     800     sending_sock.reset(NULL);
     801 }
    

    OnComplete中有大部分代码跟sending_sock操作有关,这里暂时不分析,上面部分代码是对一次rpc过程的结束,最后决定是否要需要lb重新计算权值。

    这里测试时,对一些关键路径上加了打印,对于正常情况:

    E0921 14:57:40   775 /Users//opensource/brpc/src/brpc/channel.cpp:399] Bthread Rpc Call Init 4294967297
    E0921 14:57:40  9475 /Users//opensource/brpc/src/brpc/controller.cpp:544] OnVersionedRPCReturned step1 4294967298 0=== 4294967297 xxx 4294967298
    E0921 14:57:40  9475 /Users//opensource/brpc/src/brpc/controller.cpp:569] OnVersionedRPCReturned step3 4294967298
    I0921 14:57:40   775 /Users//opensource/brpc/example/echo_c++/client.cpp:78] Received response from 0.0.0.0:8000 to 127.0.0.1:52693: hello world (attached=) latency=2489us
    

    在server收到请求时不回应直接sleep(xx)秒,然后打印出:

    E0921 15:00:46   775 /Users//opensource/brpc/src/brpc/channel.cpp:399] Bthread Rpc Call Init 4294967297
    E0921 15:00:46 10499 /Users//opensource/brpc/src/brpc/channel.cpp:361] Bthread Rpc Call Timeout 4294967297
    E0921 15:00:46 10499 /Users//opensource/brpc/src/brpc/controller.cpp:1216] HandleSocketFailed 4294967297 0---1008 ~~~~ 
    E0921 15:00:46 10499 /Users//opensource/brpc/src/brpc/controller.cpp:544] OnVersionedRPCReturned step1 4294967297 0=== 4294967297 xxx 4294967298
    E0921 15:00:46 10499 /Users//opensource/brpc/src/brpc/controller.cpp:663] OnVersionedRPCReturned step6 4294967297
    W0921 15:00:46   775 /Users//opensource/brpc/example/echo_c++/client.cpp:84] [E1008]Reached timeout=100ms @0.0.0.0:8000
    

    不启动server直接启动client:

    E0921 15:04:09   775 /Users//opensource/brpc/src/brpc/channel.cpp:399] Bthread Rpc Call Init 4294967297
    E0921 15:04:09  5891 /Users//opensource/brpc/src/brpc/controller.cpp:1216] HandleSocketFailed 4294967298 0---61 ~~~~ Fail to connect Socket{id=0 addr=0.0.0.0:8000} (0x0x1121c8000): Connection refused
    E0921 15:04:09  5891 /Users//opensource/brpc/src/brpc/controller.cpp:544] OnVersionedRPCReturned step1 4294967298 0=== 4294967297 xxx 4294967298
    E0921 15:04:09  5891 /Users//opensource/brpc/src/brpc/controller.cpp:609] OnVersionedRPCReturned step5 4294967298
    E0921 15:04:09  5891 /Users//opensource/brpc/src/brpc/controller.cpp:963] HandleSendFailed 4294967299 64
    E0921 15:04:09  5891 /Users//opensource/brpc/src/brpc/controller.cpp:544] OnVersionedRPCReturned step1 4294967299 64=== 4294967297 xxx 4294967299
    E0921 15:04:09  5891 /Users//opensource/brpc/src/brpc/controller.cpp:609] OnVersionedRPCReturned step5 4294967299
    E0921 15:04:09  5891 /Users//opensource/brpc/src/brpc/controller.cpp:963] HandleSendFailed 4294967300 64
    E0921 15:04:09  5891 /Users//opensource/brpc/src/brpc/controller.cpp:544] OnVersionedRPCReturned step1 4294967300 64=== 4294967297 xxx 4294967300
    E0921 15:04:09  5891 /Users//opensource/brpc/src/brpc/controller.cpp:609] OnVersionedRPCReturned step5 4294967300
    E0921 15:04:09  5891 /Users//opensource/brpc/src/brpc/controller.cpp:963] HandleSendFailed 4294967301 64
    E0921 15:04:09  5891 /Users//opensource/brpc/src/brpc/controller.cpp:544] OnVersionedRPCReturned step1 4294967301 64=== 4294967297 xxx 4294967301
    E0921 15:04:09  5891 /Users//opensource/brpc/src/brpc/controller.cpp:569] OnVersionedRPCReturned step3 4294967301
    W0921 15:04:09   775 /Users//opensource/brpc/example/echo_c++/client.cpp:84] [E61]Fail to connect Socket{id=0 addr=0.0.0.0:8000} (0x0x1121c8000): Connection refused [R1][E64]Not connected to 0.0.0.0:8000 yet, server_id=0 [R2][E64]Not connected to 0.0.0.0:8000 yet, server_id=0 [R3][E64]Not connected to 0.0.0.0:8000 yet, server_id=0
    I0921 15:04:10  9475 /Users//opensource/brpc/src/brpc/socket.cpp:2204] Checking Socket{id=0 addr=0.0.0.0:8000} (0x1121c8000)
    

    上面没有说到并发请求的情况,这些可以参考multi_threaded_echo_c++中实现,“Channel::CallMethod和stub访问都是线程安全的,可以被所有线程同时访问。”

    后面分析下server端实现。

    参考连接:
    使用google protobuf RPC实现echo service
    brpc超时

    相关文章

      网友评论

          本文标题:brpc之rpc流程分析(上)

          本文链接:https://www.haomeiwen.com/subject/mrtyyctx.html