异步grpc
有三个参与者:client、server、grpc runtime
client和server通信:server把response放在completion queue中,client从中取出。
基本工作流:
- 在 RPC 调用上绑定一个 CompletionQueue
- 做一些事情如读取或者写入,以唯一的 void * 标签表示
- 调用 CompletionQueue::Next 去等待操作结束。如果标签出现,表示对应的操作已经完成。
异步grpc client
先看同步grpc client:发送请求后,client阻塞等待server返回结果;
异步client:发送完grpc请求,不等待server的response,继续执行,server执行操作后将返回结果放在一个completion queue中,client根据标识从completion queue取到返回的结果。
-
构造rqc request
// Data we are sending to the server. HelloRequest request; request.set_name(user); // Container for the data we expect from the server. HelloReply reply; // Context for the client. It could be used to convey extra information to // the server and/or tweak certain RPC behaviors. ClientContext context; // The producer-consumer queue we use to communicate asynchronously with the // gRPC runtime. CompletionQueue cq; // Storage for the status of the RPC upon completion. Status status;
-
把rpc request绑定到一个completion queue
// stub_->AsyncSayHello() performs the RPC call, returning an instance we // store in "rpc". Because we are using the asynchronous API, we need to // hold on to the "rpc" instance in order to get updates on the ongoing RPC. std::unique_ptr<ClientAsyncResponseReader<HelloReply> > rpc( stub_->AsyncSayHello(&context, request, &cq));
-
用一个唯一的tag,寻求reply和最终状态
// Request that, upon completion of the RPC, "reply" be updated with the // server's response; "status" with the indication of whether the operation // was successful. Tag the request with the integer 1. // 准备一个reply收server response,status表示rpc请求的执行状态,tag告诉server rpc请求的标识 rpc->Finish(&reply, &status, (void*)1);
-
真正发现rpc请求执行完。
等待完成队列返回下一个标签。当标签被传入对应的 Finish() 调用时,回答和状态就可以被返回了。void* got_tag; bool ok = false; // Block until the next result is available in the completion queue "cq". // The return value of Next should always be checked. This return value // tells us whether there is any kind of event or the cq_ is shutting down. // 一直等待返回一个result GPR_ASSERT(cq.Next(&got_tag, &ok)); // Verify that the result from "cq" corresponds, by its tag, our previous // request. GPR_ASSERT(got_tag == (void*)1); // ... and that the request was completed successfully. Note that "ok" // corresponds solely to the request for updates introduced by Finish(). // ok只是说response已经更新到reply中 GPR_ASSERT(ok); // Act upon the status of the actual RPC. // status反应的是rpc实际的执行结果 if (status.ok()) { return reply.message(); } else { return "RPC failed"; }
异步grpc server
server发起一个带有标识tag的rpc请求,等待completion queue返回这个tag。
网友评论