美文网首页
Pebble分布式框架浅析

Pebble分布式框架浅析

作者: fooboo | 来源:发表于2019-05-26 17:15 被阅读0次

因为早些时候分析过Pebble的部分实现Pebble协程库实现Pebble网络通信实现(上),后来看到关注的公众平台有三篇原创,之前仅仅是收藏并没有过多的时间去阅读。通读下来还是挺有意思的,所以在周末花点时间又去分析下相关的设计实现。题外话,2018年初的时候,问过框架的作者,该项目不再维护和更新,且用在项目上的情况不多,可能会有坑,仅用于学习。

这三篇链接分别是:
教你写游戏服务器框架(一)
教你写游戏服务器框架(二)
教你写游戏服务器框架(三)

因为该框架是基本用于游戏的,因为之前的工作经历虽然接触过相关的框架,但还有些区别。包括开源的skynet框架,用在游戏上的情况较多些,也是有不同的设计,多学习和比较,吸取经验。

协程相关的和网络相关的就不多说,可以参考之前的分析,这一篇通过example下的hellow demo来简单分析下Pebble工作过程,其他细节会捡一些重要的来说明,其中包括重要的类声明和成员变量/接口。另外,该框架是单线程的,所以看不到使用锁的地方。

在pebble_client.h中创建PebbleClient实例,并初始化,部分声明如下:

79 class PebbleClient {
 84 public:
 88     int32_t Init();
 96     template<class RPC_CLIENT>
 97     RPC_CLIENT* NewRpcClientByAddress(const std::string& service_address, ProtocolType protocol_type = kPEBBLE_RPC_BINARY);
179 private:
180     Options            m_options;
181     CoroutineSchedule* m_coroutine_schedule;
182     Naming*            m_naming_array[kNAMING_BUTT];
183     IProcessor*        m_processor_array[kPROTOCOL_TYPE_BUTT];
184     IEventHandler*     m_rpc_event_handler;
186     Timer*             m_timer;//定时器
187     uint32_t           m_stat_timer_ms; // 资源使用采样定时器,供统计用
188     SessionMgr*        m_session_mgr;//会话管理器
190     cxx::unordered_map<int64_t, IProcessor*> m_processor_map;
191     cxx::unordered_map<std::string, Router*> m_router_map;
192     cxx::unordered_map<Router*, std::vector<int64_t> > m_router_handle_map;
193 };

其中,构造和析构要做的事情比较简单,分别是初始化各指针为空和delete操作,不再这里贴代码。初始化操作部分实现如下:

 90 int32_t PebbleClient::Init() {
103     int32_t ret = InitTimer();//定时管理实例,统计作用
104     CHECK_RETURN(ret);
105 
106     ret = InitCoSchedule();//创建协程调度实例,协程处理
107     CHECK_RETURN(ret);
108 
112     ret = Message::Init();//初始化消息通信[参考Pebble网络通信实现(上)]
113     CHECK_RETURN(ret);
114 
115     signal(SIGPIPE, SIG_IGN);//防止收到该信号默认终止进程
117     return 0;
118 }

354 int32_t PebbleClient::InitTimer() {//初始化定时器
355     if (!m_timer) {
356         m_timer = new SequenceTimer();//实现比较简单,就不贴代码咯
357     }
358     
359     TimeoutCallback on_stat_timeout = cxx::bind(&PebbleClient::OnStatTimeout, this);//bind统计资源功能
360     int64_t ret = m_timer->StartTimer(m_stat_timer_ms, on_stat_timeout);//设置定时器回调函数
361     //more code...
367 }

其中网络相关的实现在之前文章中分析过,由于作者实现的时候就注释了性能不好,只是用于测试,实际线上的应该使用类似tbus之类的组件。

接着创建client rpc stub实例:

 48     example::HelloWorldClient* hello_client =
 49         client.NewRpcClientByAddress<example::HelloWorldClient>("tcp://127.0.0.1:8888");

197 template<class RPC_CLIENT>
198 RPC_CLIENT* PebbleClient::NewRpcClientByAddress(const std::string& service_address,
199     ProtocolType protocol_type) {
200     PebbleRpc* pebble_rpc = GetPebbleRpc(protocol_type);
201     if (pebble_rpc == NULL) {
203         return NULL;
204     }
205     int64_t handle = Connect(service_address);
206     if (handle < 0) {
209         return NULL;
210     }
211     if (Attach(handle, pebble_rpc) != 0) {
214         Close(handle);
215         return NULL;
216     }
217     RPC_CLIENT* client = new RPC_CLIENT(pebble_rpc);
218     client->SetHandle(handle);
219     return client;
220 }

默认使用kPEBBLE_RPC_BINARY(thrift binary编码协议),分析过程中以他来说明。

153 PebbleRpc* PebbleClient::GetPebbleRpc(ProtocolType protocol_type) {
154     if (protocol_type < kPEBBLE_RPC_BINARY || protocol_type > kPEBBLE_RPC_PROTOBUF) {
156         return NULL;
157     }
158 
159     if (m_processor_array[protocol_type] != NULL) {
160         return dynamic_cast<PebbleRpc*>(m_processor_array[protocol_type]);
161     }
162 
163     if (!m_rpc_event_handler) {
164         m_rpc_event_handler = new RpcEventHandler();
165         static_cast<RpcEventHandler*>(m_rpc_event_handler)->Init(m_stat_manager);
166     }
187     PebbleRpc* rpc_instance = new PebbleRpc(kCODE_BINARY, m_coroutine_schedule);
188     rpc_instance->SetSendFunction(Message::Send, Message::SendV);
189     rpc_instance->SetEventHandler(m_rpc_event_handler);
190     m_processor_array[kPEBBLE_RPC_BINARY] = rpc_instance;
191 
192     return rpc_instance;
193 }

以上会根据协议查找对应的PebbleRpc实例,继承关系如下:
class IProcessor<--class IRpc<--class PebbleRpc,而class IEventHandler如注释描述那般:“IEventHandler是对Processor通用处理的抽象,和Processor配套使用,一般建议Processor本身处理和协议、业务强相关的逻辑,针对消息的一些通用处理由EventHandler完成。”。我看用到的地方不多,且跟stat统计有关系,可能不是那么关键(因为有定时任务回调统计相关的接口),后面再看到其他使用它的时候,再详细介绍下。

PebbleRpc构造实例的过程中会根据code_type实例化encode/decode对象,部分代码如下:

 27 PebbleRpc::PebbleRpc(CodeType code_type, CoroutineSchedule* coroutine_schedule) {
 28     m_rpc_util = new RpcUtil(this, coroutine_schedule); 
 29     m_rpc_plugin = NULL;
 30     m_code_type = code_type;
 31     switch (m_code_type) {
 32         case kCODE_BINARY:
 33         case kCODE_JSON:
 34             m_rpc_plugin = new ThriftRpcPlugin(this);
 35             break;
 36             //more code...
 41     }
 48 }

 23 class RpcPlugin {
 24 public:
 25     virtual ~RpcPlugin() {}
 27     virtual int32_t HeadEncode(const RpcHead& rpc_head, uint8_t* buff, uint32_t buff_len) = 0;
 29     virtual int32_t HeadDecode(const uint8_t* buff, uint32_t buff_len, RpcHead* rpc_head) = 0;
 30 };
 31 
 32 class ThriftRpcPlugin : public RpcPlugin {
 33 public:
 34     ThriftRpcPlugin(PebbleRpc* pebble_rpc) : m_pebble_rpc(pebble_rpc) {}
 35     virtual ~ThriftRpcPlugin() {}
 36 
 37     virtual int32_t HeadEncode(const RpcHead& rpc_head, uint8_t* buff, uint32_t buff_len);
 39     virtual int32_t HeadDecode(const uint8_t* buff, uint32_t buff_len, RpcHead* rpc_head);
 41 private:
 42     PebbleRpc* m_pebble_rpc;
 43 };

后面会根据具体的例子来说明这些组件的作用和具体的实现。接着进行PebbleClient::Connect连接到server,返回回来的是个uint64_t handle,由NetIO::AllocNetAddr分配,表示一个连接,屏蔽底层网络细节,这些网络的实现细节可以参考之前的文章。

接着把handle跟rpc对象绑定:

132 int32_t PebbleClient::Attach(int64_t handle, IProcessor* processor) {
133     if (NULL == processor) {
135         return -1;     
136     }
137     
138     m_processor_map[handle] = processor;
139     return 0;
140 }

接着一个大循环:

 56     do {
 57         client.Update();
 58     
 59         // 异步调用
 60         cxx::function<void(int, const std::string&)> cb =
 61             cxx::bind(cb_hello, cxx::placeholders::_1, cxx::placeholders::_2);
 62         hello_client->hello("hello world!", cb);
 63     
 64         usleep(10000);
 65     } while (true);

240 int32_t PebbleClient::Update() {
241     int32_t num = 0;
242     
245     for (uint32_t i = 0; i < m_options._max_msg_num_per_loop; ++i) {
246         if (ProcessMessage() <= 0) {
247             break;
248         }   
249         num++;
250     }   
251     
252     for (int32_t i = 0; i < kNAMING_BUTT; ++i) {
253         if (m_naming_array[i]) {
254             num += m_naming_array[i]->Update();
255         }   
256     }   
257     
258     for (int32_t i = 0; i < kPROTOCOL_TYPE_BUTT; ++i) {
259         if (m_processor_array[i]) {
260             m_processor_array[i]->Update();
261         }   
262     }   
263     
264     if (m_timer) {
265         num += m_timer->Update();
266     }   
267     
268     if (m_session_mgr) {
269         num += m_session_mgr->CheckTimeout();
270     }   
271     
277     return num;
278 }

280 int32_t PebbleClient::ProcessMessage() {
281     int64_t handle = -1;
282     int32_t event  = 0;
283     int32_t ret = Message::Poll(&handle, &event, 0);//epoll
284     if (ret != 0) {
285         return 0;
286     }
287                       
288     const uint8_t* msg = NULL;
289     uint32_t msg_len   = 0;
290     ret = Message::Peek(handle, &msg, &msg_len, &m_last_msg_info);//查看是否完整的包connection->PeekMsg(msg, msg_len)
291     do {
299         //more code...
300         cxx::unordered_map<int64_t, IProcessor*>::iterator it = m_processor_map.find(m_last_msg_info._self_handle);//根据handle找到对应的"IProcessor处理器"
301         if (m_processor_map.end() == it) {
302             Message::Pop(handle);//弹出handle对应连接的消息
304             break;
305         }
306     
307         it->second->OnMessage(m_last_msg_info._remote_handle, msg, msg_len, &m_last_msg_info, 0);//具体的消息处理
308         Message::Pop(handle);
309     } while (0);
310 
311     return 1;
312 }
 75 int32_t IRpc::OnMessage(int64_t handle, const uint8_t* msg,
 76     uint32_t msg_len, const MsgExternInfo* msg_info, uint32_t is_overload) {
 77     //more code...
 83     RpcHead head;
 84 
 85     int32_t head_len = HeadDecode(msg, msg_len, &head);
 86     //more code..
 96     if (msg_info) {
 97         head.m_arrived_ms = msg_info->_msg_arrived_ms;
 98         head.m_dst = (IProcessor*)(msg_info->_src);
 99     }
100     const uint8_t* data = msg + head_len;
101     uint32_t data_len   = msg_len - head_len;
102 
103     int32_t ret = kRPC_UNKNOWN_TYPE;
104     switch (head.m_message_type) {
105         case kRPC_CALL:
106             if (is_overload != 0) {
107                 ret = ResponseException(handle, kRPC_SYSTEM_OVERLOAD_BASE - is_overload, head);
108                 RequestProcComplete(head.m_function_name, kRPC_SYSTEM_OVERLOAD_BASE - is_overload,
109                     head.m_arrived_ms > 0 ? TimeUtility::GetCurrentMS() - head.m_arrived_ms : 0);
110                 break;
111             }
112         case kRPC_ONEWAY:
113             ret = ProcessRequest(handle, head, data, data_len);
114             break;
115 
116         case kRPC_REPLY:
117         case kRPC_EXCEPTION:
118             ret = ProcessResponse(head, data, data_len);
119             break;
120 
121         default:
123             break;
124     }
125 
126     return ret;
127 }

183 int32_t PebbleRpc::HeadDecode(const uint8_t* buff, uint32_t buff_len, RpcHead* rpc_head) {
184     if (m_rpc_plugin) {
185         int len = m_rpc_plugin->HeadDecode(buff, buff_len, rpc_head);
186         if (m_code_type != kCODE_JSON) {
187             return len;
188         }
189 
190         // json解码头部后再解出','
191         int read_len = 0;
192         if (len > 0 && len < static_cast<int>(buff_len)) {
193             read_len = dr::protocol::readElemSeparator(buff + len, buff_len - len);
194         }
195         //more code...
205 }

以上是decode头部信息,根据不同的code_type来选择HeadDecode方法,这里是kCODE_BINARY,即ThriftRpcPlugin,具体不再分析。然后根据head.m_message_type来判断是什么操作。这种设计,就是当server(后面再分析)从网络底层收到原始字节流后,不做处理,交给上层由不同的协议去处理。进入ProcessRequest后:

262 int32_t PebbleRpc::ProcessRequest(int64_t handle, const RpcHead& rpc_head,
263     const uint8_t* buff, uint32_t buff_len) {
264     return m_rpc_util->ProcessRequest(handle, rpc_head, buff, buff_len);
265 }

182     const uint8_t* buff, uint32_t buff_len) {
183     if (!m_coroutine_schedule) {
184         return m_rpc->ProcessRequestImp(handle, rpc_head, buff, buff_len);
185     }
186     
187     if (m_coroutine_schedule->CurrentTaskId() != INVALID_CO_ID) {
188         return m_rpc->ProcessRequestImp(handle, rpc_head, buff, buff_len);
189     }
190 
191     CommonCoroutineTask* task = m_coroutine_schedule->NewTask<CommonCoroutineTask>();
192     cxx::function<void(void)> run = cxx::bind(&RpcUtil::ProcessRequestInCoroutine, this,
193         handle, rpc_head, buff, buff_len);
194     task->Init(run);
195     task->Start();
196     
197     return kRPC_SUCCESS;
198 }

以上判断跟协程相关的,创建task交给协程处理,最终还是调用ProcessRequestImp这里直接以ProcessRequestImp分析:

330 int32_t IRpc::ProcessRequestImp(int64_t handle, const RpcHead& rpc_head,
331     const uint8_t* buff, uint32_t buff_len) {
332     
333     cxx::unordered_map<std::string, OnRpcRequest>::iterator it =
334         m_service_map.find(rpc_head.m_function_name);
335     if (m_service_map.end() == it) {
337         ResponseException(handle, kRPC_UNSUPPORT_FUNCTION_NAME, rpc_head);
338         RequestProcComplete(rpc_head.m_function_name, kRPC_UNSUPPORT_FUNCTION_NAME,
339             rpc_head.m_arrived_ms > 0 ? TimeUtility::GetCurrentMS() - rpc_head.m_arrived_ms : 0);
340         return kRPC_UNSUPPORT_FUNCTION_NAME;
341     }   
342     
343     if (kRPC_ONEWAY == rpc_head.m_message_type) {
344         cxx::function<int32_t(int32_t, const uint8_t*, uint32_t)> rsp; // NOLINT
345         int32_t ret = (it->second)(buff, buff_len, rsp);
346         RequestProcComplete(rpc_head.m_function_name, ret,
347             rpc_head.m_arrived_ms > 0 ? TimeUtility::GetCurrentMS() - rpc_head.m_arrived_ms : 0);
348         return ret;
349     }
351     // 请求处理也保持会话,方便扩展
352     cxx::shared_ptr<RpcSession> session(new RpcSession());
353     session->m_session_id  = GenSessionId();
354     session->m_handle      = handle;
355     session->m_rpc_head    = rpc_head;
356     session->m_server_side = true;
357     
358     TimeoutCallback cb     = cxx::bind(&IRpc::OnTimeout, this, session->m_session_id);
359     session->m_timerid     = m_timer->StartTimer(m_proc_req_timeout_ms, cb);
360     session->m_start_time  = rpc_head.m_arrived_ms > 0 ? rpc_head.m_arrived_ms : TimeUtility::GetCurrentMS();
361     
362     m_session_map[session->m_session_id] = session;
363     
364     cxx::function<int32_t(int32_t, const uint8_t*, uint32_t)> rsp = cxx::bind( // NOLINT
365         &IRpc::SendResponse, this, session->m_session_id,
366         cxx::placeholders::_1, cxx::placeholders::_2, cxx::placeholders::_3);
367         
368     return (it->second)(buff, buff_len, rsp);
369 }

当server收到rpc请求时,对function_name进行判断,其实这里使用cmd整数也行,在初始化的时候建立映射关系。没有找到对应处理函数就返回异常并统计ResponseException。然后再进行(it->second)(buff, buff_len, rsp)并统计。这里会判断消息是否是kRPC_ONEWAY单向消息,如果不是则要保持会话关系,并设置超时回调函数:

 25 struct RpcSession {
 26 private:
 27     RpcSession(const RpcSession& rhs) {
 28         m_session_id  = rhs.m_session_id;
 29         m_handle      = rhs.m_handle;
 30         m_timerid     = rhs.m_timerid;
 31         m_start_time  = rhs.m_start_time;
 32         m_server_side = rhs.m_server_side;
 33     }
 34 public:
 43     uint64_t m_session_id;
 44     int64_t  m_handle;
 45     int64_t  m_timerid; 
 46     int64_t  m_start_time;
 47     RpcHead  m_rpc_head;
 48     bool     m_server_side;
 49     OnRpcResponse m_rsp;
 50 };

到这儿,并没有真正处理消息,前面的逻辑都是根据head信息来作些检查和处理,并在可能的情况下提前返回。假如没超时,在回调完用户函数并回调rsp时:

238 int32_t IRpc::SendResponse(uint64_t session_id, int32_t ret,
239     const uint8_t* buff, uint32_t buff_len) {
240     
241     cxx::unordered_map< uint64_t, cxx::shared_ptr<RpcSession> >::iterator it =
242         m_session_map.find(session_id);
243     if (m_session_map.end() == it) {//会话找不到了
245         return kRPC_SESSION_NOT_FOUND;
246     }   
247 
248     m_timer->StopTimer(it->second->m_timerid);//停止定时器
249 
250     int32_t result = kRPC_SUCCESS;
251     int32_t error_code = kRPC_SUCCESS;
252     if (kRPC_SUCCESS == ret) {
253         // 业务处理成功,构造响应消息返回
254         it->second->m_rpc_head.m_message_type = kRPC_REPLY;
255         result = SendMessage(it->second->m_handle, it->second->m_rpc_head, buff, buff_len);
256         error_code = result;
257     } else {
258         // 业务处理失败,构造异常消息携带错误信息返回
259         result = ResponseException(it->second->m_handle, ret, it->second->m_rpc_head, buff, buff_len);
260         error_code = ret;
261     }
262     RequestProcComplete(it->second->m_rpc_head.m_function_name,
263         error_code, TimeUtility::GetCurrentMS() - it->second->m_start_time);
264 
265     m_session_map.erase(it);
266 
267     return result;
268 }

上面的逻辑还是比较简单的,当客户端收到response后,进入ProcessResponse

371 int32_t IRpc::ProcessResponse(const RpcHead& rpc_head,
372     const uint8_t* buff, uint32_t buff_len) {
373 
374     cxx::unordered_map< uint64_t, cxx::shared_ptr<RpcSession> >::iterator it =
375         m_session_map.find(rpc_head.m_session_id);
376     if (m_session_map.end() == it) {
379         return kRPC_SESSION_NOT_FOUND;
380     }
381 
382     cxx::shared_ptr<RpcSession> session = it->second;
384     m_timer->StopTimer(session->m_timerid);
385 
386     int ret = kRPC_SUCCESS;
387     const uint8_t* real_buff = buff;
388     uint32_t real_buff_len = buff_len;
389 
390     RpcException exception;
391     if (kRPC_EXCEPTION == rpc_head.m_message_type) {
392         int32_t len = ExceptionDecode(buff, buff_len, &exception);
393         //more code...处理异常消息
403     }
405     if (session->m_rsp) {
406         ret = session->m_rsp(ret, real_buff, real_buff_len);//回调
407     }
408 
409     //more code...统计相关
413     m_session_map.erase(rpc_head.m_session_id);
415     return ret;
416 }

下面一部分代码是跟协程发送和接收相关:

 31 int32_t RpcUtil::SendRequestSync(int64_t handle,
 32                     const RpcHead& rpc_head,
 33                     const uint8_t* buff,
 34                     uint32_t buff_len,
 35                     const OnRpcResponse& on_rsp,
 36                     int32_t timeout_ms) {
 37     if (!m_coroutine_schedule) {
 39         return kRPC_UTIL_CO_SCHEDULE_IS_NULL;
 40     }
 41 
 42     if (m_coroutine_schedule->CurrentTaskId() == INVALID_CO_ID) {
 44         return kRPC_UTIL_NOT_IN_COROUTINE;
 45     }
 46 
 47     int32_t ret = kRPC_SUCCESS;
 49     SendRequestInCoroutine(handle, rpc_head, buff, buff_len, on_rsp, timeout_ms, &ret);
 51     return ret;
 52 }

 54 void RpcUtil::SendRequestInCoroutine(int64_t handle,
 55                     const RpcHead& rpc_head,
 56                     const uint8_t* buff,
 57                     uint32_t buff_len,
 58                     const OnRpcResponse& on_rsp,
 59                     uint32_t timeout_ms,
 60                     int32_t* ret) {
 61     int64_t co_id = m_coroutine_schedule->CurrentTaskId();
 62     OnRpcResponse rsp = cxx::bind(&RpcUtil::OnResponse, this,
 63         cxx::placeholders::_1, cxx::placeholders::_2, cxx::placeholders::_3, co_id);
 64     *ret = m_rpc->SendRequest(handle, rpc_head, buff, buff_len, rsp, timeout_ms);
 65     if (*ret != kRPC_SUCCESS) {
 66         return;
 67     }
 68 
 69     m_coroutine_schedule->Yield(); //切出协程
 71     *ret = on_rsp(m_result._ret, m_result._buff, m_result._buff_len);//resume回来后接着执行
 72 }

153 int32_t RpcUtil::OnResponse(int32_t ret,
154                             const uint8_t* buff,
155                             uint32_t buff_len,
156                             int64_t co_id) {
157     m_result._ret      = ret;
158     m_result._buff     = buff;
159     m_result._buff_len = buff_len;
160 
161     m_coroutine_schedule->Resume(co_id);
162     return kRPC_SUCCESS;
163 }

168 int32_t IRpc::SendRequest(int64_t handle,
169                     const RpcHead& rpc_head,
170                     const uint8_t* buff,
171                     uint32_t buff_len,
172                     const OnRpcResponse& on_rsp,
173                     int32_t timeout_ms) {
174     // buff允许为空,长度非0时做非空检查
175     if (buff_len != 0 && NULL == buff) {
177         return kRPC_INVALID_PARAM;
178     }
179 
180     // 发送请求
181     int32_t ret = SendMessage(handle, rpc_head, buff, buff_len);
182     if (ret != kRPC_SUCCESS) {
183         ResponseProcComplete(rpc_head.m_function_name, kRPC_SEND_FAILED, 0);
184         return ret;
185     }
186     
187     // ONEWAY请求
188     if (!on_rsp) {
189         ResponseProcComplete(rpc_head.m_function_name, kRPC_SUCCESS, 0);
190         return kRPC_SUCCESS;
191     }
193     // 保持会话
194     cxx::shared_ptr<RpcSession> session(new RpcSession());
195     session->m_session_id  = rpc_head.m_session_id;
196     session->m_handle      = handle;
197     session->m_rsp         = on_rsp;
198     session->m_rpc_head    = rpc_head;
199     session->m_server_side = false;
200     TimeoutCallback cb     = cxx::bind(&IRpc::OnTimeout, this, session->m_session_id);
201     
202     if (timeout_ms <= 0) {
203         timeout_ms = 10 * 1000;
204     }   
205     session->m_timerid     = m_timer->StartTimer(timeout_ms, cb);
206     session->m_start_time  = TimeUtility::GetCurrentMS();
207     
208     m_session_map[session->m_session_id] = session;
210     return kRPC_SUCCESS;
211 }

剩下的部分留着下一次分析。

因为由之前的文章和代码可知,底层网络实现是每次只接收一个完整的消息,直到上层消费掉,这样会有一些问题。由于网络部分和业务部分是在一个进程中,虽然是多协程,很大可能会有性能问题。又比如redis也是单线程的,话说性能也挺好的。再比如skynet框架,底层多线程,业务层多lua虚拟机多协程,虽然在push/pull消息时会有一定的锁,粒度也很小,且获取消息后会把消息队列从全局队列中移走。发生竞争时只在全局一级队列的时候,一般性能在发生竞争时处于自旋并同步cacheline时。可是我总觉得,把网络进程和业务进程分开,独立部署会好些,也起到缓存作用,互不影响。

2019.6.2 看了下剩下的代码,后面可能再写一篇关于zookeeper的实现分析,这块我不是很清楚需要多看看。Pebble中其他的组件实现比较简单,都是些基本知识,不再单独分析。

参考
腾讯互娱开源分布式开发框架 Pebble

相关文章

网友评论

      本文标题:Pebble分布式框架浅析

      本文链接:https://www.haomeiwen.com/subject/htbazqtx.html