需求
最近做项目遇到这样的一个问题,客户的数据库需要维护一个绝对增长,连续的编号。但要求不能用数据库的auto_increment来维护。
分析
一般来说,常规的编号都可以由8位或16位或更多的位数,通过号码分区来实现,算法是以类似雪花算法那种。
但我这里的需求又没有那么复杂,就是要求从1开始,连续自增,不能auto_increment。
这种需求在单线程模式下,其实超级简单:每次入库的时候,先count一个总数,+1之后insert即可。但对于在多线程模式下,会出现资源争抢的问题。
为什么会造成资源争抢呢?
就好比你去银行办业务:常规是排队,A办完业务走了,B才能办。这也是常规单线程程序的原理,不会乱,但效率比较低。
为了不用排队,大堂经理改用了排号发号的机制。发号的依据是银行里有多少人,后面的人就号码+1,依次处理。
假设银行里有10个人(对应数据库里有10条记录),这时同时来了三个人(3个线程,每个线程读需要插入一条待分配编号的数据)。这个大堂经理比较轴,他看了眼银行,发现还有10个人,而眼前这三个人他又分不清谁先谁后,就分别给每个人都发了号牌11。拿到号牌之后,这三个人就一起进银行办业务了。由于大家都是11号,必然会出现冲突。
这种现象就是资源争抢,由于发号的依据仅看银行里有多少人(数据库表里有多少行记录),而不关心新来的人的数量情况,所以新的记录的号码永远是总数+1。
思路
最简单的做法就是先同时启动所有线程去处理数据。
当需要写入数据库的时候,先为线程上一个线程锁,查询数据库得到总数之后再+1,并人为的加上一个毫秒级的延迟,当分配完编号之后再释放这个线程锁。
这样当写库的时候,3个进程都拿到了完全不同的编号,完全可以同时写库而不会出现编号重复的资源争抢问题了。
代码实现
1.启动线程的地方:
import threading
if __name__ == '__main__':
task_list = Task.load_tasks() # 总之就是得到一个列表
for one_task in task_list:
sub = threading.Thread(target=do_func, kwargs={"one_task": one_task})
sub.start()
2.线程函数
import threading
def do_func(one_task):
# 省略业务处理和数据装箱
# 加线程锁
lock = threading.Lock()
lock.acquire()
# 通过人为加延迟来干预次序
time.sleep(0.5)
total = db.session.query(func.max(TABLE.serial_no)).scalar()
new_sn = total + 1 if total is not None else 1
lock.release()
dataObj.serial_no = new_sn
db.session.add(dataObj)
db.session.commit()
讲解
最核心的就是在do_func这里加了锁和延迟。
最开始我在启动线程那里加了0.5秒的延迟,10个线程同时运行大约需要5秒钟。当我修改为在分配编号时加延迟,时间仅仅需要1秒多就可以运行完毕了。
后面测试发现,启动线程的地方加延迟会更保险。以为若干线程同时启动,也会出现有2个线程同时运行到分配编号的可能性。
进阶
这种简单的做法只针对线程不太多的情况下。如果是高并发的时候,还是需要维护一个全局的编号池或专门为发号做一个表来实现了。
编号池思路基本就是启动线程之前,先统计总数。然后在编号池里根据可能会启动的线程总数,提前分配好编号。当启动了线程之后,需要分配编号时,可以采用数组pop的方式或直接从中随机分配即可。
而用数据库来维护发号器,思路就是利用数据库的行级锁来实现了。
这两块这里就不再贴代码,有兴趣的同学回头可以自己实现来试试。
网友评论