COVID-19流行期间,很多博主提出了各种各样的传播模型,并利用数据可视化直观地展示了传染病传播情况,收到点赞无数。近期因为工作安排变化,最近时间比较充裕,自学了Python语言,并使用Python语言建立了一个传染病传播模型。此模型用到的技术有:
- Python面向对象编程
- 使用
numpy
库和matplotlib
库绘制散点图和折线图
注:此模型主要为练习Python数据分析技术而建立,参数选取有不完善之处,大家仅供参考。
模型原理
我们建立一个社区(community),社区具有固定面积,在这个社区中随机分布固定的人口,初始时每个人都具有易感性,开始随机产生第一个感染者(零号感染者),他可以感染距离他固定距离内的所有人。当一个人被感染后,经历一段时间潜伏期,潜伏期内没有症状但具有传染性,当潜伏期过了后,立即住院隔离治疗一段时间,住院期间按轻症和重症不同,具有不同的死亡率,度过治疗时间后,存活者回归社区并获得永久免疫,视为治愈人群,死亡者退出社区。
模型分析
- 所需变量:社区面积、人口、可传染的最大距离、潜伏期天数、住院治疗天数、重症概率、轻症概率、重症死亡率、轻症死亡率
- 结果变量:社区中易感者人群集合、潜伏期人群集合、隔离治疗人群集合,基于以上数据,我们可以计算时时感染率、住院率、死亡率、免疫率(=治愈率)
-
结果呈现形式:如下图,同一窗口显示四张图表,左上为人群分布散点图,蓝色为易感人群,红色为潜伏期人群,黑色为免疫人群(治愈人群);右上为感染率曲线;左下为住院人数曲线;右下为死亡人数曲线。
结果示例
具体实现
1. 明确所需变量
#导入库
import math
import numpy as np
import matplotlib.pyplot as plt
#基本参数
itohday=3 #感染到住院天数
obday=14 #住院观察天数
ri=0.12 #感染范围
mildrate=0.96 #轻症比例
severerate=1-mildrate #重症比例
milddeath=0.001 #轻症每日死亡率
severedeath=0.05 #重症每日死亡率
persons=10000 #社区总人口
days=100 #历史最大天数
#容器
community=[] #社区人口
latent=[] #潜伏期人群
hospital=[] #住院人群
death=[] #死亡人群
在以上代码中,我们可以通过调整基本变量改变模型参数,容器中保存时时人口分布列表
2. 建立person类
person
类是本程序的关键类,包含了一个人的位置、状态信息以及各种状态的转变,该类的每一个实例就代表了一个人。我们现在来整理一下person
类一个实例的生命周期:
- 未感染:属于易感人群,需要计算与处于潜伏期人群的距离,若小于最大感染范围则被感染,否则继续处于未感染范围。此类人群均存在于
community
中。该过程对应成员函数为beinfected(self,day)
,意为“求感染”。 - 被感染:一旦被感染则进入潜伏期,在潜伏期时,仍存在于
community
中,同时也存在于latent
中,对应成员函数infected(self,day)
。此时需要定义疾病为轻症还是重症,此功能在severity(self)
中实现。 - 潜伏期:在潜伏期内的任务是去感染别的未感染者,但这部分任务已经在
beinfected(self,day)
中实现了,因此潜伏期内没有其他特别的操作。 - 住院:若潜伏期超过最大天数(即
itohday
),感染者即被送往医院治疗,此时从community
和latent
列表中移除,加入到hospital
列表中,此时没有传染力。在hospital(self,day)
中实现。 - 在院:每天利用随机数计算是否死亡,若达到死亡概率则调用
die(self,day)
函数,若未死亡则继续观察。在inhospital(self,day)
中实现。 - 死亡:从
hospital
中移除,加入death
列表中。在die(self,day)
中实现。 - 治愈:若住院天数大于住院观察天数(即
obday
)则治愈,患者回到community
中,从hospital
列表中移除,并具有免疫力,不再被感染。在cured(self,day)
中实现。
从上面的生命周期中可以看出,整个周期的演化均是在类内部实现,外部无需知道这些演化过程,因此还需要对外部暴露一个可以推动演化的接口函数,在函数内部,通过判断此实例现在的状态来执行调用不同的成员函数,于是我们还需要引入一个tick(self,day)
成员方法。
具体代码如下:
#pos类作为保存person坐标的辅助类
class pos:
x=0
y=0
def __init__(self):
pass
def setpos(self,x0,y0):
self.x=x0
self.y=y0
#person类
class person:
p=pos() #位置坐标
s=1 #是否易感,1=易感,0=免疫
i=0 #是否患病/传染,1=患病,0=未患病
c=0 #是否重症,1=重症,0=轻症,只有在i=1时有效
ind=-1 #感染日
add=-1 #住院日
index=0 #索引号,调试时用
#构造方法,idx为索引号,x0、y0为位置横纵坐标
def __init__(self,idx,x0,y0):
self.p=pos()
self.index=idx
self.p.setpos(x0,y0)
#获得横坐标
def getx(self):
return self.p.x
#获得纵坐标
def gety(self):
return self.p.y
#求感染,未感染者计算与潜伏期患者的距离,若小于最大感染距离则被感染
def beinfected(self,day):
if self.s==1:
for item in latent:
r=caldistance(self,item)
if r<ri:
self.infected(day)
#被感染
def infected(self,day):
if self.i==0:
self.i=1
self.s=0
self.ind=day
self.severity()
latent.append(self)
#病情分级:轻症、重症
def severity(self):
r=np.random.rand()
if r<mildrate:
self.c=0
else:
self.c=1
#住院
def hospital(self,day):
self.add=day
hospital.append(self)
latent.remove(self)
community.remove(self)
#在院
def inhospital(self,day):
if self.c==0:
if np.random.rand()<milddeath:
self.die(day)
else:
if np.random.rand()<severedeath:
self.die(day)
#治愈出院,获得永久免疫
def cured(self,day):
self.i=0
community.append(self)
hospital.remove(self)
#死亡
def die(self,day):
death.append(self)
hospital.remove(self)
#接口函数
def tick(self,day):
if self.i==1: #若被感染
d=day-self.ind
if d<itohday: #潜伏期
pass
elif d==itohday: #住院日,潜伏期过
self.hospital(day)
elif d<obday+itohday: #在住院
self.inhospital(day)
elif d==obday+itohday: #出院日,被治愈
self.cured(day)
else:
pass
else: #若没有被感染
self.beinfected(day)
#计算两人之间的距离,参数为两person类实例,返回两者距离
def caldistance(p1,p2):
x1=p1.p.x
y1=p1.p.y
x2=p2.p.x
y2=p2.p.y
dis=math.sqrt((x1-x2)**2+(y1-y2)**2)
return dis
3.主程序
在主程序中有4个事件:
- 生成社区
- 产生零号感染者
- 随天数增加,循环调用每个
person
实例的tick
函数,推动疫情进展 - 控制绘图
#产生一个社区,横纵坐标分布为标准正态分布扩大3倍
Xc=np.random.randn(persons)*3
Yc=np.random.randn(persons)*3
for i in range(persons):
temp=person(i,Xc[i],Yc[i])
community.append(temp)
#时间开始
day=0
#随机产生零号感染者
community[int(np.random.rand()*persons)].infected(day)
#绘制Day 0图像
plot()
#流行开始
while day<=days:
day+=1
#先处理在院人群
for i in range(-len(hospital)+1,1): #注意此处为从后往前处理列表中数据
hospital[-i].tick(day)
#再处理社区人群
for i in range(-len(community)+1,1): #注意此处为从后往前处理列表中数据
community[-i].tick(day)
#画图
plot()
print("END")
需要注意的是处理在院人群和社区人群的两个for
循环都是从列表后面向前处理的,这是因为如果从前往后按顺序处理,如果一个数据需要从列表中移除,则列表后面的变量就会前移,而计数变量i
的值是不变的,因此可能漏掉列表当前位置后面一个实例的操作,从而引发错误。这是本程序编制过程中的一个坑。给我们的提示是,如果操作一个列表过程中,可能要对列表进行元素的插入append
或删除remove
操作,最好能从列表尾部向前进行,以免新增或新删除的元素影响到当前位置以后元素的索引值。
这里还差一个plot()
函数,我们不妨先做散点图再做另外三个统计图。
4.动态散点图绘制
def plot():
plt.ion() #打开动态模式
X=[] #横坐标列表
Y=[] #纵坐标列表
C=[] #散点颜色列表
i=0
for i in range(len(community)):
X.append(community[i].getx())
Y.append(community[i].gety())
if community[i].i==0:
if community[i].s==1:
C.append('b') #未感染者:蓝色
else:
C.append('k') #治愈者:黑色
else:
C.append('r') #潜伏期:红色
plt.clf() #清除图像
plt.title("%d day" %(day)) #标题
plt.scatter(X,Y,c=C,s=1) #绘制散点图,s参数为散点大小,最小值为1
plt.draw() #绘图
plt.pause(1) #暂停
效果如下:
效果图
5.绘制统计图
为了绘制统计图,我们需要引入4个统计变量,用来保存每天的数据。
#统计数据
infectedpersonbyday=[] #每日累计感染人数
admittedpersonbyday=[] #每日现存在院人数
deadpersonbyday=[] #每日累计死亡人数
curedpersonbyday=[] #每日累计治愈人数
为此,我们需要修改person
类成员函数的相关实现,同时需要在主程序开始时进行初始化。修改后的person
类代码如下:
#person类
class person:
p=pos() #位置坐标
s=1 #是否易感,1=易感,0=免疫
i=0 #是否患病/传染,1=患病,0=未患病
c=0 #是否重症,1=重症,0=轻症,只有在i=1时有效
ind=-1 #感染日
add=-1 #住院日
index=0 #索引号,调试时用
#构造方法,idx为索引号,x0、y0为位置横纵坐标
def __init__(self,idx,x0,y0):
self.p=pos()
self.index=idx
self.p.setpos(x0,y0)
#获得横坐标
def getx(self):
return self.p.x
#获得纵坐标
def gety(self):
return self.p.y
#求感染,未感染者计算与潜伏期患者的距离,若小于最大感染距离则被感染
def beinfected(self,day):
if self.s==1:
for item in latent:
r=caldistance(self,item)
if r<ri:
self.infected(day)
#被感染
def infected(self,day):
if self.i==0:
self.i=1
self.s=0
self.ind=day
self.severity()
infectedpersonbyday[day]+=1 #统计变量
latent.append(self)
#病情分级:轻症、重症
def severity(self):
r=np.random.rand()
if r<mildrate:
self.c=0
else:
self.c=1
#住院
def hospital(self,day):
self.add=day
admittedpersonbyday[day]+=1 #统计变量
hospital.append(self)
latent.remove(self)
community.remove(self)
#在院
def inhospital(self,day):
if self.c==0:
if np.random.rand()<milddeath:
self.die(day)
else:
if np.random.rand()<severedeath:
self.die(day)
#治愈出院,获得永久免疫
def cured(self,day):
self.i=0
curedpersonbyday[day]+=1 #统计变量
admittedpersonbyday[day]-=1 #统计变量
community.append(self)
hospital.remove(self)
#死亡
def die(self,day):
deadpersonbyday[day]+=1 #统计变量
admittedpersonbyday[day]-=1 #统计变量
death.append(self)
hospital.remove(self)
#接口函数
def tick(self,day):
if self.i==1: #若被感染
d=day-self.ind
if d<itohday: #潜伏期
pass
elif d==itohday: #住院日,潜伏期过
self.hospital(day)
elif d<obday+itohday: #在住院
self.inhospital(day)
elif d==obday+itohday: #出院日,被治愈
self.cured(day)
else:
pass
else: #若没有被感染
self.beinfected(day)
主程序代码修改如下:
#产生一个社区,横纵坐标分布为标准正态分布扩大3倍
Xc=np.random.randn(persons)*3
Yc=np.random.randn(persons)*3
for i in range(persons):
temp=person(i,Xc[i],Yc[i])
community.append(temp)
#时间开始
day=0
#初始化统计变量
curedpersonbyday.append(0)
infectedpersonbyday.append(0)
admittedpersonbyday.append(0)
deadpersonbyday.append(0)
#随机产生零号感染者
community[int(np.random.rand()*persons)].infected(day)
#绘制Day 0图像
plots()
#流行开始
#la=1 #潜伏期人数
while day<=days:
day+=1
#统计变量列表增加新增1天
curedpersonbyday.append(curedpersonbyday[day-1])
infectedpersonbyday.append(infectedpersonbyday[day-1])
admittedpersonbyday.append(admittedpersonbyday[day-1])
deadpersonbyday.append(deadpersonbyday[day-1])
#先处理在院人群
for i in range(-len(hospital)+1,1): #注意此处为从后往前处理列表中数据
hospital[-i].tick(day)
#再处理社区人群
for i in range(-len(community)+1,1): #注意此处为从后往前处理列表中数据
community[-i].tick(day)
#print((infectedpersonbyday[day]-infectedpersonbyday[day-1])/la)
#la=len(latent)
#画图
plots()
print("END")
6.动态多图绘图代码
我们用plots()
函数来同时绘制同一窗口显示四张图表,左上为人群分布散点图,蓝色为易感人群,红色为潜伏期人群,黑色为免疫人群(治愈人群);右上为感染率曲线;左下为住院人数曲线;右下为死亡人数曲线。
def plots():
X=[] #横坐标列表
Y=[] #纵坐标列表
C=[] #散点颜色列表
Xp=np.arange(0,days) #统计图横坐标
for i in range(len(community)):
X.append(community[i].getx())
Y.append(community[i].gety())
if community[i].i==0:
if community[i].s==1:
C.append('b') #未感染者:蓝色
else:
C.append('k') #治愈者:黑色
else:
C.append('r') #潜伏期:红色
plt.ion() #打开动态模式
plt.clf() #清除图像
#绘制散点图
apen=plt.subplot(2,2,1)
apen.set_title("Day %d" %(day))
apen.scatter(X,Y,c=C,s=1)
#绘制累计感染人数图
bpen=plt.subplot(2,2,2)
bpen.set_title("Infected %.2f%%" %(infectedpersonbyday[day]/persons*100))
bpen.plot(Xp[0:day+1],infectedpersonbyday)
#绘制实时在院人数图
cpen=plt.subplot(2,2,3)
cpen.set_title("In hospital %.2f%%" %(admittedpersonbyday[day]/persons*100))
cpen.plot(Xp[0:day+1],admittedpersonbyday)
#绘制累计死亡人数图
dpen=plt.subplot(2,2,4)
dpen.set_title("Death %.2f%%" %(deadpersonbyday[day]/persons*100))
dpen.plot(Xp[0:day+1],deadpersonbyday)
plt.pause(1) #暂停
plt.show() #显示图像
plt.savefig('Day %d.png'%(day)) #保存图像
7.调试参数
至此,全部代码完成,效果如下:
效果图(节选)
下面可以调节不同的初始参数来改变模型初始值。我们可以根据新闻或官方报道数据调整参数,使模型更具有可信度。
网友评论