今天记录下关于过载时,该如何应对,仅分享下自己平时工作中遇到的情况,其他业务场景的处理方案可能因架构不同有不一样,但问题表现都是一样的。这里不考虑因什么缓存大面积失效引起的问题,也不考虑单点等类似潜在问题,不考虑有上下游后台server的情况。仅是请求处理不过来,单台server的情况,导致连锁反映。
之前记录过线上问题:
单点问题
一次排查OOM的总结
谈谈缓存穿透雪崩和过载保护以及一致性等问题
关于skynet框架过载的几篇:
skynet 消息分发及服务调度的新设计
skynet 消息队列的新设计(接上文)
skynet 服务的过载保护
代理服务和过载保护
之前使用的开源skynet框架,底层是多线程,一个网络线程处理收发消息,并根据目的地址转发到对应服务的消息队列中,服务的消息队列由工作线程处理,消息队列是无长度限制的,这里就埋了隐患。
因为框架是没问题的,但使用方式不正确,会导致严重的问题,如标题说的过载,消息处理不过来,内存快速下降,导致可能的oom问题等,进而导致玩家在游戏中的体验卡,流失以及补偿,数据回档等这些。
框架本身性能不错,对于多线程处理消息,工作线程仅仅从全局一级队列,加锁获取到对应的服务消息队列,把它摘下来,这里仅几个指针的设置,临界区很小的,后面不再在这里参与全局竞争。在工作线程从队列中pop一条消息时,这里加锁并解锁,也设置几个指针,之后处理消息。如果此时有其他的服务(工作线程)给这个服务队列发消息,需要竞争一次,加锁,push消息并解锁,但基本是无竞争的,这样处理一条消息,一共两次加锁加一次可能额外的加锁(发消息给其他服务),源码如下:
58 struct message_queue *
59 skynet_globalmq_pop() {
60 struct global_queue *q = Q;
61
62 SPIN_LOCK(q)
63 struct message_queue *mq = q->head;
64 if(mq) {
65 q->head = mq->next;
66 if(q->head == NULL) {
67 assert(mq == q->tail);
68 q->tail = NULL;
69 }
70 mq->next = NULL;
71 }
72 SPIN_UNLOCK(q)
73
74 return mq;
75 }
从全局一级队列中获得一个服务的消息队列
137 int
138 skynet_mq_pop(struct message_queue *q, struct skynet_message *message) {
139 int ret = 1;
140 SPIN_LOCK(q)
141
142 if (q->head != q->tail) {
143 *message = q->queue[q->head++];
144 ret = 0;
145 int head = q->head;
146 int tail = q->tail;
147 int cap = q->cap;
148
149 if (head >= cap) {
150 q->head = head = 0;
151 }
152 int length = tail - head;
153 if (length < 0) {
154 length += cap;
155 }
156 while (length > q->overload_threshold) {
157 q->overload = length;
158 q->overload_threshold *= 2;
159 }
160 } else {
161 // reset overload_threshold when queue is empty
162 q->overload_threshold = MQ_OVERLOAD;
163 }
164
165 if (ret) {
166 q->in_global = 0;//空队列从全局队列中移走
167 }
168
169 SPIN_UNLOCK(q)
170
171 return ret;
172 }
从服务队列中pop一条消息并解锁。
189 void
190 skynet_mq_push(struct message_queue *q, struct skynet_message *message) {
191 assert(message);
192 SPIN_LOCK(q)
193
194 q->queue[q->tail] = *message;
195 if (++ q->tail >= q->cap) {
196 q->tail = 0;
197 }
198
199 if (q->head == q->tail) {
200 expand_queue(q);//满队列尝试扩容
201 }
202
203 if (q->in_global == 0) {
204 q->in_global = MQ_IN_GLOBAL;
205 skynet_globalmq_push(q);//有消息再挂到全局队列上
206 }
207
208 SPIN_UNLOCK(q)
209 }
给服务队列push消息,即若多个工作线程同时处理一条消息队列可能会有非常短暂的竞争下,但基本是很少竞争的。
以上几次都是自旋锁,自旋锁的实现及优化之前也分析过,没竞争的时候非常快。
如果工作线程处理一条消息的时候,这里会回调callback函数,进而走到上层业务具体实现,后者如果实现有问题并且延迟过大,比如常见的for循环,时间复杂度高的查找算法等有问题。
处理一条消息过久的话,而该服务收到很多类似广播消息,可能导致队列扩容,源码中每次以2的倍数,并移动到新队列,即消费者速度远远落后于生产者,结构如下:
7 struct skynet_message {
8 uint32_t source;
9 int session;
10 void * data;
11 size_t sz;
12 };
虽然直接赋值,但这里并没有队列长度的限制,那么当达万级数量以上时,不管从cpu还是内存,以及延迟都有问题,客户端没有响应,造成可能多次的重传,这些重传的请求再次注定超时,最后走断线重连等处理。若上层业务对于修改数据的重传请求处理不正确,没有幂等也有问题。
这里并不像其他业务场景,当某台server没有响应时,可能会根据某种负载均衡算法到其他server。游戏中单服管理着同时在线几千的玩家,所有的请求都到这台server来,进而进入队列,处理不过来,造成队列消息越来越多,工作线程调度不到其他消息队列或者慢很多。因为处理callback时花的时间过长。
当然这些根据实现和设计有关,个人感觉消息队列中的消息能尽快完成,或者根据上层实现来选择更好的方案,若单次时间过久则直接yield让出cpu让工作线程处理其他消息。可能感觉一次这么操作没什么问题,但多次和量级上来后就有明显问题,而这需要压力测试和覆盖测试到的路径,否则等线上出问题就来不急。
因为线上处理过几次,其中一次很严重,其他几次因为后续的某些实现调整解决了,基本没有什么问题。严重的一次是框架设计存在可优化的地方。skynet底层不支持处理负载的问题,不支持消息超时丢弃,只管收到消息转发并回调上层的业务函数。当然,不建议直接修改底层源码,可以额外写个服务(可以多个,根据某个规则hash到不同的转发队列)去对收到的消息加个时间,然后如果消息在队列中过久则判断下,是否转发还是丢。这里只是加了中转一次,基本没啥消耗,但大概率能间接解决掉内存过多问题,可是本质上层业务逻辑实现问题并没有解决,也不可控。
这里贴一下消息的转发实现,一条消息在各服务的消息队列中转发,不涉及额外的分配内存及拷贝,还是原来的指针:
790 void
791 skynet_callback(struct skynet_context * context, void *ud, skynet_cb cb) {
792 context->cb = cb;
793 context->cb_ud = ud;
794 }
101 static int
102 forward_cb(struct skynet_context * context, void * ud, int type, int session, uint32_t source, c onst void * msg, size_t sz) {
103 _cb(context, ud, type, session, source, msg, sz);
104 // don't delete msg in forward mode.
105 return 1;
106 }
107
108 static int
109 lcallback(lua_State *L) {
110 struct skynet_context * context = lua_touserdata(L, lua_upvalueindex(1));
111 int forward = lua_toboolean(L, 2);
112 luaL_checktype(L,1,LUA_TFUNCTION);
113 lua_settop(L,1);
114 lua_rawsetp(L, LUA_REGISTRYINDEX, _cb);
115
116 lua_rawgeti(L, LUA_REGISTRYINDEX, LUA_RIDX_MAINTHREAD);
117 lua_State *gL = lua_tothread(L,-1);
118
119 if (forward) {
120 skynet_callback(context, gL, forward_cb);
121 } else {
122 skynet_callback(context, gL, _cb);
123 }
124
125 return 0;
126 }
//dispatch_message是lua层的消息分发函数,由_cb中调用
54 function skynet.forward_type(map, start_func)
55 c.callback(function(ptype, msg, sz, ...)
56 local prototype = map[ptype]
57 if prototype then
58 dispatch_message(prototype, msg, sz, ...)
59 else
60 local ok, err = pcall(dispatch_message, ptype, msg, sz, ...)
61 c.trash(msg, sz)
62 if not ok then
63 error(err)
64 end
65 end
66 end, true)
67 skynet.timeout(0, function()
68 skynet.init_service(start_func)
69 end)
70 end
55 static int
56 _cb(struct skynet_context * context, void * ud, int type, int session, uint32_t source, const vo id * msg, size_t sz) {
57 lua_State *L = ud;
58 int trace = 1;
59 int r;
60 int top = lua_gettop(L);
61 if (top == 0) {
62 lua_pushcfunction(L, traceback);
63 lua_rawgetp(L, LUA_REGISTRYINDEX, _cb);
64 } else {
65 assert(top == 2);
66 }
67 lua_pushvalue(L,2);
68
69 lua_pushinteger(L, type);
70 lua_pushlightuserdata(L, (void *)msg);
71 lua_pushinteger(L,sz);
72 lua_pushinteger(L, session);
73 lua_pushinteger(L, source);
74
75 r = lua_pcall(L, 5, 0 , trace);//dispatch_message
76
77 if (r == LUA_OK) {
78 return 0;
79 }
80 //error...
99 }
//dispatch_message由工作线程调用的c层的函数
260 static void
261 dispatch_message(struct skynet_context *ctx, struct skynet_message *msg) {
262 assert(ctx->init);
263 CHECKCALLING_BEGIN(ctx)
264 pthread_setspecific(G_NODE.handle_key, (void *)(uintptr_t)(ctx->handle));
265 int type = msg->sz >> MESSAGE_TYPE_SHIFT;
266 size_t sz = msg->sz & MESSAGE_TYPE_MASK;
267 if (ctx->logfile) {
268 //
269 }
270 ++ctx->message_count;
271 int reserve_msg;
272 if (ctx->profile) {
273 //
277 } else {
278 reserve_msg = ctx->cb(ctx, ctx->cb_ud, type, msg->session, msg->source, msg->data, sz);
279 }
280 if (!reserve_msg) {
281 skynet_free(msg->data);//这里视cb返回值要不要释放消息内存
282 }
283 CHECKCALLING_END(ctx)
284 }
因为这些消息都是内部产生的,所以一定程度上是可以处理的,即给对应服务发一条消息后,返回给我它当前队列消息的长度,以此决定发的服务后续行为,当然这个长度是有一定的更新延迟,后来还有多少是否恢复一个合理的水平并不清楚。要从根本上解决队列中有过多铁定超时的消息,即不给它发才行,因为处理不过来,发了也白发,所以加个中转还是可行的。早些时间写的lua分析工具,能算出每条消息在队列中的时间,处理的时间,也可以根据这个来在上层判断要不要处理,当然这个做法也比较保守。总结下就是一种保证队列中消息数量合理,再多的消息肯定处理不过来,发过来也没用,另一种是可以一直收消息,但上层判断该消息在队列中的时间,是否是超时的,去决定该消息是否要丢。
理想情况下,处理消息很快返回,让工作线程去调度其他消息队列,但理想总是美好的,在实现需求和功能时,可以先思考下怎么实现和这么实现后可能引起的问题,尽量简单化,模块化,后期可以在压测时发现热点时,替换实现引起的改动少一些。
所以,如标题,其实针对不同的业务场景实现方法不同。主要考虑几个点:一是如何从源头避免当请求的服务过载时的反应,是重试还是如何?二是服务端在过载时如何优雅的拒绝后面的请求?三是过载时如何快速恢复正常服务?四是框架是否有可改进和优化的地方?五是如何检查到当前状态已经处于负载状态?六是...
在上面的连接中,云风大佬给的思路:
“我建议用一个简单的模式来解决这种过载之后的性能进一步恶化的问题:
对于这种热点服务,对于所有请求( 也只可以针对不能在 O(1) 时间处理的请求 )都在回应协议中增加一个 boolean ,表示服务繁忙。请求方必须处理这个繁忙标记。
服务提供方在收到每个请求时,先不急于处理,而是通过 skynet.mqlen 检查当前的消息队列长度,来判定是否繁忙。(一个小优化是,可以不用每个消息都取 mqlen 判定繁忙,而让繁忙标记在设置后维持连续几个消息。)如果繁忙,则直接扔掉请求,直接返回忙,直到消息队列被处理的差不多再做进一步处理。
而发送请求方则把 skynet.call 的调用改成一个循环。检查到忙,再接着重试,直到正确处理为止。做一个简单的封装后,可以看成和之前是一样的。”
个人理解:因为消息队列中的消息已经处理不过来,这里把消息投到对应的服务队列,等服务方处理该消息时再判断要不要处理,此时问题已经发生了来不及了。在出现负载时,不应该再堆积过多的请求,和可能造成铁定超时的情况,这里也会造成内存一定程度的下降。另一方面,超时再重传,这里没必要的,只会加剧对方的消息队列再多很多消息并加重负载。
网友评论