因为早些时候分析过Pebble的部分实现Pebble协程库实现和Pebble网络通信实现(上),后来看到关注的公众平台有三篇原创,之前仅仅是收藏并没有过多的时间去阅读。通读下来还是挺有意思的,所以在周末花点时间又去分析下相关的设计实现。题外话,2018年初的时候,问过框架的作者,该项目不再维护和更新,且用在项目上的情况不多,可能会有坑,仅用于学习。
这三篇链接分别是:
教你写游戏服务器框架(一)
教你写游戏服务器框架(二)
教你写游戏服务器框架(三)
因为该框架是基本用于游戏的,因为之前的工作经历虽然接触过相关的框架,但还有些区别。包括开源的skynet框架,用在游戏上的情况较多些,也是有不同的设计,多学习和比较,吸取经验。
协程相关的和网络相关的就不多说,可以参考之前的分析,这一篇通过example下的hellow demo来简单分析下Pebble工作过程,其他细节会捡一些重要的来说明,其中包括重要的类声明和成员变量/接口。另外,该框架是单线程的,所以看不到使用锁的地方。
在pebble_client.h中创建PebbleClient
实例,并初始化,部分声明如下:
79 class PebbleClient {
84 public:
88 int32_t Init();
96 template<class RPC_CLIENT>
97 RPC_CLIENT* NewRpcClientByAddress(const std::string& service_address, ProtocolType protocol_type = kPEBBLE_RPC_BINARY);
179 private:
180 Options m_options;
181 CoroutineSchedule* m_coroutine_schedule;
182 Naming* m_naming_array[kNAMING_BUTT];
183 IProcessor* m_processor_array[kPROTOCOL_TYPE_BUTT];
184 IEventHandler* m_rpc_event_handler;
186 Timer* m_timer;//定时器
187 uint32_t m_stat_timer_ms; // 资源使用采样定时器,供统计用
188 SessionMgr* m_session_mgr;//会话管理器
190 cxx::unordered_map<int64_t, IProcessor*> m_processor_map;
191 cxx::unordered_map<std::string, Router*> m_router_map;
192 cxx::unordered_map<Router*, std::vector<int64_t> > m_router_handle_map;
193 };
其中,构造和析构要做的事情比较简单,分别是初始化各指针为空和delete操作,不再这里贴代码。初始化操作部分实现如下:
90 int32_t PebbleClient::Init() {
103 int32_t ret = InitTimer();//定时管理实例,统计作用
104 CHECK_RETURN(ret);
105
106 ret = InitCoSchedule();//创建协程调度实例,协程处理
107 CHECK_RETURN(ret);
108
112 ret = Message::Init();//初始化消息通信[参考Pebble网络通信实现(上)]
113 CHECK_RETURN(ret);
114
115 signal(SIGPIPE, SIG_IGN);//防止收到该信号默认终止进程
117 return 0;
118 }
354 int32_t PebbleClient::InitTimer() {//初始化定时器
355 if (!m_timer) {
356 m_timer = new SequenceTimer();//实现比较简单,就不贴代码咯
357 }
358
359 TimeoutCallback on_stat_timeout = cxx::bind(&PebbleClient::OnStatTimeout, this);//bind统计资源功能
360 int64_t ret = m_timer->StartTimer(m_stat_timer_ms, on_stat_timeout);//设置定时器回调函数
361 //more code...
367 }
其中网络相关的实现在之前文章中分析过,由于作者实现的时候就注释了性能不好,只是用于测试,实际线上的应该使用类似tbus之类的组件。
接着创建client rpc stub实例:
48 example::HelloWorldClient* hello_client =
49 client.NewRpcClientByAddress<example::HelloWorldClient>("tcp://127.0.0.1:8888");
197 template<class RPC_CLIENT>
198 RPC_CLIENT* PebbleClient::NewRpcClientByAddress(const std::string& service_address,
199 ProtocolType protocol_type) {
200 PebbleRpc* pebble_rpc = GetPebbleRpc(protocol_type);
201 if (pebble_rpc == NULL) {
203 return NULL;
204 }
205 int64_t handle = Connect(service_address);
206 if (handle < 0) {
209 return NULL;
210 }
211 if (Attach(handle, pebble_rpc) != 0) {
214 Close(handle);
215 return NULL;
216 }
217 RPC_CLIENT* client = new RPC_CLIENT(pebble_rpc);
218 client->SetHandle(handle);
219 return client;
220 }
默认使用kPEBBLE_RPC_BINARY
(thrift binary编码协议),分析过程中以他来说明。
153 PebbleRpc* PebbleClient::GetPebbleRpc(ProtocolType protocol_type) {
154 if (protocol_type < kPEBBLE_RPC_BINARY || protocol_type > kPEBBLE_RPC_PROTOBUF) {
156 return NULL;
157 }
158
159 if (m_processor_array[protocol_type] != NULL) {
160 return dynamic_cast<PebbleRpc*>(m_processor_array[protocol_type]);
161 }
162
163 if (!m_rpc_event_handler) {
164 m_rpc_event_handler = new RpcEventHandler();
165 static_cast<RpcEventHandler*>(m_rpc_event_handler)->Init(m_stat_manager);
166 }
187 PebbleRpc* rpc_instance = new PebbleRpc(kCODE_BINARY, m_coroutine_schedule);
188 rpc_instance->SetSendFunction(Message::Send, Message::SendV);
189 rpc_instance->SetEventHandler(m_rpc_event_handler);
190 m_processor_array[kPEBBLE_RPC_BINARY] = rpc_instance;
191
192 return rpc_instance;
193 }
以上会根据协议查找对应的PebbleRpc
实例,继承关系如下:
class IProcessor
<--class IRpc
<--class PebbleRpc
,而class IEventHandler
如注释描述那般:“IEventHandler是对Processor通用处理的抽象,和Processor配套使用,一般建议Processor本身处理和协议、业务强相关的逻辑,针对消息的一些通用处理由EventHandler完成。”。我看用到的地方不多,且跟stat统计有关系,可能不是那么关键(因为有定时任务回调统计相关的接口),后面再看到其他使用它的时候,再详细介绍下。
PebbleRpc
构造实例的过程中会根据code_type
实例化encode/decode
对象,部分代码如下:
27 PebbleRpc::PebbleRpc(CodeType code_type, CoroutineSchedule* coroutine_schedule) {
28 m_rpc_util = new RpcUtil(this, coroutine_schedule);
29 m_rpc_plugin = NULL;
30 m_code_type = code_type;
31 switch (m_code_type) {
32 case kCODE_BINARY:
33 case kCODE_JSON:
34 m_rpc_plugin = new ThriftRpcPlugin(this);
35 break;
36 //more code...
41 }
48 }
23 class RpcPlugin {
24 public:
25 virtual ~RpcPlugin() {}
27 virtual int32_t HeadEncode(const RpcHead& rpc_head, uint8_t* buff, uint32_t buff_len) = 0;
29 virtual int32_t HeadDecode(const uint8_t* buff, uint32_t buff_len, RpcHead* rpc_head) = 0;
30 };
31
32 class ThriftRpcPlugin : public RpcPlugin {
33 public:
34 ThriftRpcPlugin(PebbleRpc* pebble_rpc) : m_pebble_rpc(pebble_rpc) {}
35 virtual ~ThriftRpcPlugin() {}
36
37 virtual int32_t HeadEncode(const RpcHead& rpc_head, uint8_t* buff, uint32_t buff_len);
39 virtual int32_t HeadDecode(const uint8_t* buff, uint32_t buff_len, RpcHead* rpc_head);
41 private:
42 PebbleRpc* m_pebble_rpc;
43 };
后面会根据具体的例子来说明这些组件的作用和具体的实现。接着进行PebbleClient::Connect
连接到server,返回回来的是个uint64_t handle
,由NetIO::AllocNetAddr
分配,表示一个连接,屏蔽底层网络细节,这些网络的实现细节可以参考之前的文章。
接着把handle跟rpc对象绑定:
132 int32_t PebbleClient::Attach(int64_t handle, IProcessor* processor) {
133 if (NULL == processor) {
135 return -1;
136 }
137
138 m_processor_map[handle] = processor;
139 return 0;
140 }
接着一个大循环:
56 do {
57 client.Update();
58
59 // 异步调用
60 cxx::function<void(int, const std::string&)> cb =
61 cxx::bind(cb_hello, cxx::placeholders::_1, cxx::placeholders::_2);
62 hello_client->hello("hello world!", cb);
63
64 usleep(10000);
65 } while (true);
240 int32_t PebbleClient::Update() {
241 int32_t num = 0;
242
245 for (uint32_t i = 0; i < m_options._max_msg_num_per_loop; ++i) {
246 if (ProcessMessage() <= 0) {
247 break;
248 }
249 num++;
250 }
251
252 for (int32_t i = 0; i < kNAMING_BUTT; ++i) {
253 if (m_naming_array[i]) {
254 num += m_naming_array[i]->Update();
255 }
256 }
257
258 for (int32_t i = 0; i < kPROTOCOL_TYPE_BUTT; ++i) {
259 if (m_processor_array[i]) {
260 m_processor_array[i]->Update();
261 }
262 }
263
264 if (m_timer) {
265 num += m_timer->Update();
266 }
267
268 if (m_session_mgr) {
269 num += m_session_mgr->CheckTimeout();
270 }
271
277 return num;
278 }
280 int32_t PebbleClient::ProcessMessage() {
281 int64_t handle = -1;
282 int32_t event = 0;
283 int32_t ret = Message::Poll(&handle, &event, 0);//epoll
284 if (ret != 0) {
285 return 0;
286 }
287
288 const uint8_t* msg = NULL;
289 uint32_t msg_len = 0;
290 ret = Message::Peek(handle, &msg, &msg_len, &m_last_msg_info);//查看是否完整的包connection->PeekMsg(msg, msg_len)
291 do {
299 //more code...
300 cxx::unordered_map<int64_t, IProcessor*>::iterator it = m_processor_map.find(m_last_msg_info._self_handle);//根据handle找到对应的"IProcessor处理器"
301 if (m_processor_map.end() == it) {
302 Message::Pop(handle);//弹出handle对应连接的消息
304 break;
305 }
306
307 it->second->OnMessage(m_last_msg_info._remote_handle, msg, msg_len, &m_last_msg_info, 0);//具体的消息处理
308 Message::Pop(handle);
309 } while (0);
310
311 return 1;
312 }
75 int32_t IRpc::OnMessage(int64_t handle, const uint8_t* msg,
76 uint32_t msg_len, const MsgExternInfo* msg_info, uint32_t is_overload) {
77 //more code...
83 RpcHead head;
84
85 int32_t head_len = HeadDecode(msg, msg_len, &head);
86 //more code..
96 if (msg_info) {
97 head.m_arrived_ms = msg_info->_msg_arrived_ms;
98 head.m_dst = (IProcessor*)(msg_info->_src);
99 }
100 const uint8_t* data = msg + head_len;
101 uint32_t data_len = msg_len - head_len;
102
103 int32_t ret = kRPC_UNKNOWN_TYPE;
104 switch (head.m_message_type) {
105 case kRPC_CALL:
106 if (is_overload != 0) {
107 ret = ResponseException(handle, kRPC_SYSTEM_OVERLOAD_BASE - is_overload, head);
108 RequestProcComplete(head.m_function_name, kRPC_SYSTEM_OVERLOAD_BASE - is_overload,
109 head.m_arrived_ms > 0 ? TimeUtility::GetCurrentMS() - head.m_arrived_ms : 0);
110 break;
111 }
112 case kRPC_ONEWAY:
113 ret = ProcessRequest(handle, head, data, data_len);
114 break;
115
116 case kRPC_REPLY:
117 case kRPC_EXCEPTION:
118 ret = ProcessResponse(head, data, data_len);
119 break;
120
121 default:
123 break;
124 }
125
126 return ret;
127 }
183 int32_t PebbleRpc::HeadDecode(const uint8_t* buff, uint32_t buff_len, RpcHead* rpc_head) {
184 if (m_rpc_plugin) {
185 int len = m_rpc_plugin->HeadDecode(buff, buff_len, rpc_head);
186 if (m_code_type != kCODE_JSON) {
187 return len;
188 }
189
190 // json解码头部后再解出','
191 int read_len = 0;
192 if (len > 0 && len < static_cast<int>(buff_len)) {
193 read_len = dr::protocol::readElemSeparator(buff + len, buff_len - len);
194 }
195 //more code...
205 }
以上是decode头部信息,根据不同的code_type
来选择HeadDecode
方法,这里是kCODE_BINARY
,即ThriftRpcPlugin
,具体不再分析。然后根据head.m_message_type
来判断是什么操作。这种设计,就是当server(后面再分析)从网络底层收到原始字节流后,不做处理,交给上层由不同的协议去处理。进入ProcessRequest
后:
262 int32_t PebbleRpc::ProcessRequest(int64_t handle, const RpcHead& rpc_head,
263 const uint8_t* buff, uint32_t buff_len) {
264 return m_rpc_util->ProcessRequest(handle, rpc_head, buff, buff_len);
265 }
182 const uint8_t* buff, uint32_t buff_len) {
183 if (!m_coroutine_schedule) {
184 return m_rpc->ProcessRequestImp(handle, rpc_head, buff, buff_len);
185 }
186
187 if (m_coroutine_schedule->CurrentTaskId() != INVALID_CO_ID) {
188 return m_rpc->ProcessRequestImp(handle, rpc_head, buff, buff_len);
189 }
190
191 CommonCoroutineTask* task = m_coroutine_schedule->NewTask<CommonCoroutineTask>();
192 cxx::function<void(void)> run = cxx::bind(&RpcUtil::ProcessRequestInCoroutine, this,
193 handle, rpc_head, buff, buff_len);
194 task->Init(run);
195 task->Start();
196
197 return kRPC_SUCCESS;
198 }
以上判断跟协程相关的,创建task交给协程处理,最终还是调用ProcessRequestImp
这里直接以ProcessRequestImp
分析:
330 int32_t IRpc::ProcessRequestImp(int64_t handle, const RpcHead& rpc_head,
331 const uint8_t* buff, uint32_t buff_len) {
332
333 cxx::unordered_map<std::string, OnRpcRequest>::iterator it =
334 m_service_map.find(rpc_head.m_function_name);
335 if (m_service_map.end() == it) {
337 ResponseException(handle, kRPC_UNSUPPORT_FUNCTION_NAME, rpc_head);
338 RequestProcComplete(rpc_head.m_function_name, kRPC_UNSUPPORT_FUNCTION_NAME,
339 rpc_head.m_arrived_ms > 0 ? TimeUtility::GetCurrentMS() - rpc_head.m_arrived_ms : 0);
340 return kRPC_UNSUPPORT_FUNCTION_NAME;
341 }
342
343 if (kRPC_ONEWAY == rpc_head.m_message_type) {
344 cxx::function<int32_t(int32_t, const uint8_t*, uint32_t)> rsp; // NOLINT
345 int32_t ret = (it->second)(buff, buff_len, rsp);
346 RequestProcComplete(rpc_head.m_function_name, ret,
347 rpc_head.m_arrived_ms > 0 ? TimeUtility::GetCurrentMS() - rpc_head.m_arrived_ms : 0);
348 return ret;
349 }
351 // 请求处理也保持会话,方便扩展
352 cxx::shared_ptr<RpcSession> session(new RpcSession());
353 session->m_session_id = GenSessionId();
354 session->m_handle = handle;
355 session->m_rpc_head = rpc_head;
356 session->m_server_side = true;
357
358 TimeoutCallback cb = cxx::bind(&IRpc::OnTimeout, this, session->m_session_id);
359 session->m_timerid = m_timer->StartTimer(m_proc_req_timeout_ms, cb);
360 session->m_start_time = rpc_head.m_arrived_ms > 0 ? rpc_head.m_arrived_ms : TimeUtility::GetCurrentMS();
361
362 m_session_map[session->m_session_id] = session;
363
364 cxx::function<int32_t(int32_t, const uint8_t*, uint32_t)> rsp = cxx::bind( // NOLINT
365 &IRpc::SendResponse, this, session->m_session_id,
366 cxx::placeholders::_1, cxx::placeholders::_2, cxx::placeholders::_3);
367
368 return (it->second)(buff, buff_len, rsp);
369 }
当server收到rpc请求时,对function_name
进行判断,其实这里使用cmd整数也行,在初始化的时候建立映射关系。没有找到对应处理函数就返回异常并统计ResponseException
。然后再进行(it->second)(buff, buff_len, rsp)
并统计。这里会判断消息是否是kRPC_ONEWAY
单向消息,如果不是则要保持会话关系,并设置超时回调函数:
25 struct RpcSession {
26 private:
27 RpcSession(const RpcSession& rhs) {
28 m_session_id = rhs.m_session_id;
29 m_handle = rhs.m_handle;
30 m_timerid = rhs.m_timerid;
31 m_start_time = rhs.m_start_time;
32 m_server_side = rhs.m_server_side;
33 }
34 public:
43 uint64_t m_session_id;
44 int64_t m_handle;
45 int64_t m_timerid;
46 int64_t m_start_time;
47 RpcHead m_rpc_head;
48 bool m_server_side;
49 OnRpcResponse m_rsp;
50 };
到这儿,并没有真正处理消息,前面的逻辑都是根据head信息来作些检查和处理,并在可能的情况下提前返回。假如没超时,在回调完用户函数并回调rsp
时:
238 int32_t IRpc::SendResponse(uint64_t session_id, int32_t ret,
239 const uint8_t* buff, uint32_t buff_len) {
240
241 cxx::unordered_map< uint64_t, cxx::shared_ptr<RpcSession> >::iterator it =
242 m_session_map.find(session_id);
243 if (m_session_map.end() == it) {//会话找不到了
245 return kRPC_SESSION_NOT_FOUND;
246 }
247
248 m_timer->StopTimer(it->second->m_timerid);//停止定时器
249
250 int32_t result = kRPC_SUCCESS;
251 int32_t error_code = kRPC_SUCCESS;
252 if (kRPC_SUCCESS == ret) {
253 // 业务处理成功,构造响应消息返回
254 it->second->m_rpc_head.m_message_type = kRPC_REPLY;
255 result = SendMessage(it->second->m_handle, it->second->m_rpc_head, buff, buff_len);
256 error_code = result;
257 } else {
258 // 业务处理失败,构造异常消息携带错误信息返回
259 result = ResponseException(it->second->m_handle, ret, it->second->m_rpc_head, buff, buff_len);
260 error_code = ret;
261 }
262 RequestProcComplete(it->second->m_rpc_head.m_function_name,
263 error_code, TimeUtility::GetCurrentMS() - it->second->m_start_time);
264
265 m_session_map.erase(it);
266
267 return result;
268 }
上面的逻辑还是比较简单的,当客户端收到response后,进入ProcessResponse
:
371 int32_t IRpc::ProcessResponse(const RpcHead& rpc_head,
372 const uint8_t* buff, uint32_t buff_len) {
373
374 cxx::unordered_map< uint64_t, cxx::shared_ptr<RpcSession> >::iterator it =
375 m_session_map.find(rpc_head.m_session_id);
376 if (m_session_map.end() == it) {
379 return kRPC_SESSION_NOT_FOUND;
380 }
381
382 cxx::shared_ptr<RpcSession> session = it->second;
384 m_timer->StopTimer(session->m_timerid);
385
386 int ret = kRPC_SUCCESS;
387 const uint8_t* real_buff = buff;
388 uint32_t real_buff_len = buff_len;
389
390 RpcException exception;
391 if (kRPC_EXCEPTION == rpc_head.m_message_type) {
392 int32_t len = ExceptionDecode(buff, buff_len, &exception);
393 //more code...处理异常消息
403 }
405 if (session->m_rsp) {
406 ret = session->m_rsp(ret, real_buff, real_buff_len);//回调
407 }
408
409 //more code...统计相关
413 m_session_map.erase(rpc_head.m_session_id);
415 return ret;
416 }
下面一部分代码是跟协程发送和接收相关:
31 int32_t RpcUtil::SendRequestSync(int64_t handle,
32 const RpcHead& rpc_head,
33 const uint8_t* buff,
34 uint32_t buff_len,
35 const OnRpcResponse& on_rsp,
36 int32_t timeout_ms) {
37 if (!m_coroutine_schedule) {
39 return kRPC_UTIL_CO_SCHEDULE_IS_NULL;
40 }
41
42 if (m_coroutine_schedule->CurrentTaskId() == INVALID_CO_ID) {
44 return kRPC_UTIL_NOT_IN_COROUTINE;
45 }
46
47 int32_t ret = kRPC_SUCCESS;
49 SendRequestInCoroutine(handle, rpc_head, buff, buff_len, on_rsp, timeout_ms, &ret);
51 return ret;
52 }
54 void RpcUtil::SendRequestInCoroutine(int64_t handle,
55 const RpcHead& rpc_head,
56 const uint8_t* buff,
57 uint32_t buff_len,
58 const OnRpcResponse& on_rsp,
59 uint32_t timeout_ms,
60 int32_t* ret) {
61 int64_t co_id = m_coroutine_schedule->CurrentTaskId();
62 OnRpcResponse rsp = cxx::bind(&RpcUtil::OnResponse, this,
63 cxx::placeholders::_1, cxx::placeholders::_2, cxx::placeholders::_3, co_id);
64 *ret = m_rpc->SendRequest(handle, rpc_head, buff, buff_len, rsp, timeout_ms);
65 if (*ret != kRPC_SUCCESS) {
66 return;
67 }
68
69 m_coroutine_schedule->Yield(); //切出协程
71 *ret = on_rsp(m_result._ret, m_result._buff, m_result._buff_len);//resume回来后接着执行
72 }
153 int32_t RpcUtil::OnResponse(int32_t ret,
154 const uint8_t* buff,
155 uint32_t buff_len,
156 int64_t co_id) {
157 m_result._ret = ret;
158 m_result._buff = buff;
159 m_result._buff_len = buff_len;
160
161 m_coroutine_schedule->Resume(co_id);
162 return kRPC_SUCCESS;
163 }
168 int32_t IRpc::SendRequest(int64_t handle,
169 const RpcHead& rpc_head,
170 const uint8_t* buff,
171 uint32_t buff_len,
172 const OnRpcResponse& on_rsp,
173 int32_t timeout_ms) {
174 // buff允许为空,长度非0时做非空检查
175 if (buff_len != 0 && NULL == buff) {
177 return kRPC_INVALID_PARAM;
178 }
179
180 // 发送请求
181 int32_t ret = SendMessage(handle, rpc_head, buff, buff_len);
182 if (ret != kRPC_SUCCESS) {
183 ResponseProcComplete(rpc_head.m_function_name, kRPC_SEND_FAILED, 0);
184 return ret;
185 }
186
187 // ONEWAY请求
188 if (!on_rsp) {
189 ResponseProcComplete(rpc_head.m_function_name, kRPC_SUCCESS, 0);
190 return kRPC_SUCCESS;
191 }
193 // 保持会话
194 cxx::shared_ptr<RpcSession> session(new RpcSession());
195 session->m_session_id = rpc_head.m_session_id;
196 session->m_handle = handle;
197 session->m_rsp = on_rsp;
198 session->m_rpc_head = rpc_head;
199 session->m_server_side = false;
200 TimeoutCallback cb = cxx::bind(&IRpc::OnTimeout, this, session->m_session_id);
201
202 if (timeout_ms <= 0) {
203 timeout_ms = 10 * 1000;
204 }
205 session->m_timerid = m_timer->StartTimer(timeout_ms, cb);
206 session->m_start_time = TimeUtility::GetCurrentMS();
207
208 m_session_map[session->m_session_id] = session;
210 return kRPC_SUCCESS;
211 }
剩下的部分留着下一次分析。
因为由之前的文章和代码可知,底层网络实现是每次只接收一个完整的消息,直到上层消费掉,这样会有一些问题。由于网络部分和业务部分是在一个进程中,虽然是多协程,很大可能会有性能问题。又比如redis也是单线程的,话说性能也挺好的。再比如skynet框架,底层多线程,业务层多lua虚拟机多协程,虽然在push/pull消息时会有一定的锁,粒度也很小,且获取消息后会把消息队列从全局队列中移走。发生竞争时只在全局一级队列的时候,一般性能在发生竞争时处于自旋并同步cacheline时。可是我总觉得,把网络进程和业务进程分开,独立部署会好些,也起到缓存作用,互不影响。
2019.6.2 看了下剩下的代码,后面可能再写一篇关于zookeeper的实现分析,这块我不是很清楚需要多看看。Pebble中其他的组件实现比较简单,都是些基本知识,不再单独分析。
网友评论