为啥需要线程池
- 普通的erlang进程,可以并行很多,但是不意味着无限。需要对进程进行管理。
- 特殊的erlang进程。在运行时比较消耗资源/时间。典型的,数据库连接池。建立数据库的连接就比较耗时,or查询请求等待,也可能比较耗时。
开源线程池模块 poolboy
大致思路:先建立一大批的,连接好了数据库的的服务进程。这些预先启动的进程,就构成了一个服务线程池,来一个请求,就取出来一个check_out出闲置的服务进程直接使用,用完就check_in放回去线程池。
代码封装了一个 transaction/3
实现 一个整体的过程
transaction(Pool, Fun, Timeout) ->
Worker = poolboy:checkout(Pool, true, Timeout),
try
Fun(Worker)
after
ok = poolboy:checkin(Pool, Worker)
end.
实现思路
- 使用gen_server 保证多线程
- link supversior 来管理
- mornitor 来监控管理任务
- 出错自动处理
- check_out 出错 cancel_waiting;
- worker 出错,自动check_in
关键函数
State结构
- supervisor :: pid():supversior pid
- workers :: [pid()]:工作进程
- waiting :: pid_queue(): 等待队列
- monitors :: ets:tid():监听ets
- size = 5 :: non_neg_integer():worker 数量限制
- overflow = 0 :: non_neg_integer(),
- max_overflow = 10 :: non_neg_integer():overflow 限制
- strategy = lifo :: lifo | fifo: lifo:last in first out | fifo:first in first out
- overflow_check_period :: non_neg_integer(), %milliseconds
- overflow_ttl = 0 :: non_neg_integer() %microseconds
child_spec
- 启动poolboy gen_server,可以传名字进来
init
- 等待队列,queue:new()
- 创建 mornitors ets
- child_spec 传入的 PoolArgs处理
- {worker_module,Mod} 创建 poolboy_sup
- {size, Size} 数量
- 其他必要or默认参数,初始化处理
- prepopulate(Size, Sup) 处理size,
new_worker/1
启动sup子进程,将worker 和 poolboy 链接起来,监控worker状态
check_out
- gen_sever:call
- errror 啥的,cast cancel_waiting,实现自动checkin
- 存在worker
- add_worker_execute:监控FromPid, 将监控关系 {WorkerId, CRef, MRef} 写入 mornitor ets中
- 移除使用的Worker
- 已经overflow,未超过上限
-
new_worker/1
,启动一个新的 - 监控关系保存
- Overflow + 1
-
- [] when Block = false
- {reply, full, State};
- [];
- 监控 FromPid
- 写入 waiting 队列
queue:in/1
- block 参数,false的话,不会进入waiting 列表,返回full,一般调用都是true
check_in
- gen_sever:cast
- 查找对应的监控关系,解除监控关系
- handle_checkin
- 存在 等待队列,处理
- 不存在等待队列,且 Overflow > 0, OverflowTtl > 0 return_worker 返还工作进程 (主要根据策略)
- 不存在等待队列,且 Overflow > 0,
dismiss_worker/2
, Overflow - 1 - 不存在等待队列,
return_worker
, overflow = 0
handle_info
-
handle_info({'DOWN', MRef, _, _, _}, State)
- 用户进程崩掉的时候,删除监控关系,
handle_check_in
,自动 返还 - 或者队列中的:删除 等待队列里面的
- 用户进程崩掉的时候,删除监控关系,
-
handle_info({'EXIT', Pid, _Reason}, State)
- 工作中的worker:
handle_worker_exit/2
,处理等待队列 - 空闲中的,Workers删除旧的,new_worker 一个
- 工作中的worker:
overflow 溢出处理策略
- checkout 的时候,如果剩余的工作列表 workers 为空了,判断 max_overflow 的值,如果,不为0,并且,当前溢出的数目,不超过max值,会
new_worker/1
新建链接,放入worker中,此处会去重新连接数据库,会有消耗,注意 - 定时器,rip_workers 超时的
dismiss_worker/2
or 按照策略处理,闲置的,超过ttl的 overflow
网友评论