美文网首页python自学
传染病传播模拟(Python实现)

传染病传播模拟(Python实现)

作者: 烂泥先生 | 来源:发表于2020-03-27 00:56 被阅读0次

    COVID-19流行期间,很多博主提出了各种各样的传播模型,并利用数据可视化直观地展示了传染病传播情况,收到点赞无数。近期因为工作安排变化,最近时间比较充裕,自学了Python语言,并使用Python语言建立了一个传染病传播模型。此模型用到的技术有:

    • Python面向对象编程
    • 使用numpy库和matplotlib库绘制散点图和折线图
      注:此模型主要为练习Python数据分析技术而建立,参数选取有不完善之处,大家仅供参考。

    模型原理

    我们建立一个社区(community),社区具有固定面积,在这个社区中随机分布固定的人口,初始时每个人都具有易感性,开始随机产生第一个感染者(零号感染者),他可以感染距离他固定距离内的所有人。当一个人被感染后,经历一段时间潜伏期,潜伏期内没有症状但具有传染性,当潜伏期过了后,立即住院隔离治疗一段时间,住院期间按轻症和重症不同,具有不同的死亡率,度过治疗时间后,存活者回归社区并获得永久免疫,视为治愈人群,死亡者退出社区。

    模型分析

    • 所需变量:社区面积、人口、可传染的最大距离、潜伏期天数、住院治疗天数、重症概率、轻症概率、重症死亡率、轻症死亡率
    • 结果变量:社区中易感者人群集合、潜伏期人群集合、隔离治疗人群集合,基于以上数据,我们可以计算时时感染率、住院率、死亡率、免疫率(=治愈率)
    • 结果呈现形式:如下图,同一窗口显示四张图表,左上为人群分布散点图,蓝色为易感人群,红色为潜伏期人群,黑色为免疫人群(治愈人群);右上为感染率曲线;左下为住院人数曲线;右下为死亡人数曲线。


      结果示例

    具体实现

    1. 明确所需变量

    #导入库
    import math
    import numpy as np
    import matplotlib.pyplot as plt
    #基本参数
    itohday=3           #感染到住院天数
    obday=14            #住院观察天数
    ri=0.12             #感染范围
    mildrate=0.96       #轻症比例
    severerate=1-mildrate   #重症比例
    milddeath=0.001     #轻症每日死亡率
    severedeath=0.05    #重症每日死亡率
    persons=10000       #社区总人口
    days=100            #历史最大天数
    #容器
    community=[]        #社区人口
    latent=[]           #潜伏期人群
    hospital=[]         #住院人群
    death=[]            #死亡人群
    

    在以上代码中,我们可以通过调整基本变量改变模型参数,容器中保存时时人口分布列表

    2. 建立person类

    person类是本程序的关键类,包含了一个人的位置、状态信息以及各种状态的转变,该类的每一个实例就代表了一个人。我们现在来整理一下person类一个实例的生命周期:

    • 未感染:属于易感人群,需要计算与处于潜伏期人群的距离,若小于最大感染范围则被感染,否则继续处于未感染范围。此类人群均存在于community中。该过程对应成员函数为beinfected(self,day),意为“求感染”。
    • 被感染:一旦被感染则进入潜伏期,在潜伏期时,仍存在于community中,同时也存在于latent中,对应成员函数infected(self,day)。此时需要定义疾病为轻症还是重症,此功能在severity(self)中实现。
    • 潜伏期:在潜伏期内的任务是去感染别的未感染者,但这部分任务已经在beinfected(self,day)中实现了,因此潜伏期内没有其他特别的操作。
    • 住院:若潜伏期超过最大天数(即itohday),感染者即被送往医院治疗,此时从communitylatent列表中移除,加入到hospital列表中,此时没有传染力。在hospital(self,day)中实现。
    • 在院:每天利用随机数计算是否死亡,若达到死亡概率则调用die(self,day)函数,若未死亡则继续观察。在inhospital(self,day)中实现。
    • 死亡:从hospital中移除,加入death列表中。在die(self,day)中实现。
    • 治愈:若住院天数大于住院观察天数(即obday)则治愈,患者回到community中,从hospital列表中移除,并具有免疫力,不再被感染。在cured(self,day)中实现。
      从上面的生命周期中可以看出,整个周期的演化均是在类内部实现,外部无需知道这些演化过程,因此还需要对外部暴露一个可以推动演化的接口函数,在函数内部,通过判断此实例现在的状态来执行调用不同的成员函数,于是我们还需要引入一个tick(self,day)成员方法。
      具体代码如下:
    #pos类作为保存person坐标的辅助类
    class pos:
        x=0
        y=0
        def __init__(self):
            pass
        def setpos(self,x0,y0):
            self.x=x0
            self.y=y0
    #person类
    class person:
        p=pos() #位置坐标
        s=1     #是否易感,1=易感,0=免疫
        i=0     #是否患病/传染,1=患病,0=未患病
        c=0     #是否重症,1=重症,0=轻症,只有在i=1时有效
        ind=-1   #感染日
        add=-1   #住院日
        index=0 #索引号,调试时用
        #构造方法,idx为索引号,x0、y0为位置横纵坐标
        def __init__(self,idx,x0,y0):
            self.p=pos()
            self.index=idx
            self.p.setpos(x0,y0)
        #获得横坐标
        def getx(self):
            return self.p.x
        #获得纵坐标
        def gety(self):
            return self.p.y
        #求感染,未感染者计算与潜伏期患者的距离,若小于最大感染距离则被感染
        def beinfected(self,day):
            if self.s==1:
                for item in latent:
                    r=caldistance(self,item)
                    if r<ri:
                        self.infected(day)
        #被感染
        def infected(self,day):
            if self.i==0:
                self.i=1
                self.s=0
                self.ind=day
                self.severity()
                latent.append(self)
        #病情分级:轻症、重症
        def severity(self):
            r=np.random.rand()
            if r<mildrate:
                self.c=0
            else:
                self.c=1
        #住院
        def hospital(self,day):
            self.add=day
            hospital.append(self)
            latent.remove(self)
            community.remove(self)
        #在院
        def inhospital(self,day):
            if self.c==0:
                if np.random.rand()<milddeath:
                    self.die(day)
            else:
                if np.random.rand()<severedeath:
                    self.die(day)
        #治愈出院,获得永久免疫
        def cured(self,day):
            self.i=0
            community.append(self)
            hospital.remove(self)
        #死亡
        def die(self,day):
            death.append(self)
            hospital.remove(self)
        #接口函数
        def tick(self,day):
            if self.i==1:   #若被感染
                d=day-self.ind
                if d<itohday:   #潜伏期
                    pass
                elif d==itohday:   #住院日,潜伏期过
                    self.hospital(day)
                elif d<obday+itohday:    #在住院
                    self.inhospital(day)
                elif d==obday+itohday:    #出院日,被治愈
                    self.cured(day)
                else:
                    pass
            else:   #若没有被感染
                self.beinfected(day)
    #计算两人之间的距离,参数为两person类实例,返回两者距离
    def caldistance(p1,p2):
        x1=p1.p.x
        y1=p1.p.y
        x2=p2.p.x
        y2=p2.p.y
        dis=math.sqrt((x1-x2)**2+(y1-y2)**2)
        return dis
    

    3.主程序

    在主程序中有4个事件:

    • 生成社区
    • 产生零号感染者
    • 随天数增加,循环调用每个person实例的tick函数,推动疫情进展
    • 控制绘图
    #产生一个社区,横纵坐标分布为标准正态分布扩大3倍
    Xc=np.random.randn(persons)*3
    Yc=np.random.randn(persons)*3
    for i in range(persons):
        temp=person(i,Xc[i],Yc[i])
        community.append(temp)
    #时间开始
    day=0
    #随机产生零号感染者
    community[int(np.random.rand()*persons)].infected(day)
    #绘制Day 0图像
    plot()
    #流行开始
    while day<=days:
        day+=1
        #先处理在院人群
        for i in range(-len(hospital)+1,1):     #注意此处为从后往前处理列表中数据
            hospital[-i].tick(day)
        #再处理社区人群
        for i in range(-len(community)+1,1):    #注意此处为从后往前处理列表中数据
            community[-i].tick(day)
        #画图
        plot()
    print("END")
    

    需要注意的是处理在院人群和社区人群的两个for循环都是从列表后面向前处理的,这是因为如果从前往后按顺序处理,如果一个数据需要从列表中移除,则列表后面的变量就会前移,而计数变量i的值是不变的,因此可能漏掉列表当前位置后面一个实例的操作,从而引发错误。这是本程序编制过程中的一个坑。给我们的提示是,如果操作一个列表过程中,可能要对列表进行元素的插入append或删除remove操作,最好能从列表尾部向前进行,以免新增或新删除的元素影响到当前位置以后元素的索引值
    这里还差一个plot()函数,我们不妨先做散点图再做另外三个统计图。

    4.动态散点图绘制

    def plot():
        plt.ion()   #打开动态模式
        X=[]        #横坐标列表
        Y=[]        #纵坐标列表
        C=[]        #散点颜色列表
        i=0
        for i in range(len(community)):
            X.append(community[i].getx())
            Y.append(community[i].gety())
            if community[i].i==0:
                if community[i].s==1:
                    C.append('b')   #未感染者:蓝色
                else:
                    C.append('k')   #治愈者:黑色
            else:
                C.append('r')       #潜伏期:红色
        plt.clf()   #清除图像
        plt.title("%d day" %(day))  #标题
        plt.scatter(X,Y,c=C,s=1)    #绘制散点图,s参数为散点大小,最小值为1
        plt.draw()      #绘图
        plt.pause(1)    #暂停
    

    效果如下:


    效果图

    5.绘制统计图

    为了绘制统计图,我们需要引入4个统计变量,用来保存每天的数据。

    #统计数据
    infectedpersonbyday=[]  #每日累计感染人数
    admittedpersonbyday=[]  #每日现存在院人数
    deadpersonbyday=[]      #每日累计死亡人数
    curedpersonbyday=[]     #每日累计治愈人数
    

    为此,我们需要修改person类成员函数的相关实现,同时需要在主程序开始时进行初始化。修改后的person类代码如下:

    #person类
    class person:
        p=pos() #位置坐标
        s=1     #是否易感,1=易感,0=免疫
        i=0     #是否患病/传染,1=患病,0=未患病
        c=0     #是否重症,1=重症,0=轻症,只有在i=1时有效
        ind=-1   #感染日
        add=-1   #住院日
        index=0 #索引号,调试时用
        #构造方法,idx为索引号,x0、y0为位置横纵坐标
        def __init__(self,idx,x0,y0):
            self.p=pos()
            self.index=idx
            self.p.setpos(x0,y0)
        #获得横坐标
        def getx(self):
            return self.p.x
        #获得纵坐标
        def gety(self):
            return self.p.y
        #求感染,未感染者计算与潜伏期患者的距离,若小于最大感染距离则被感染
        def beinfected(self,day):
            if self.s==1:
                for item in latent:
                    r=caldistance(self,item)
                    if r<ri:
                        self.infected(day)
        #被感染
        def infected(self,day):
            if self.i==0:
                self.i=1
                self.s=0
                self.ind=day
                self.severity()
                infectedpersonbyday[day]+=1      #统计变量
                latent.append(self)
        #病情分级:轻症、重症
        def severity(self):
            r=np.random.rand()
            if r<mildrate:
                self.c=0
            else:
                self.c=1
        #住院
        def hospital(self,day):
            self.add=day
            admittedpersonbyday[day]+=1      #统计变量
            hospital.append(self)
            latent.remove(self)
            community.remove(self)
        #在院
        def inhospital(self,day):
            if self.c==0:
                if np.random.rand()<milddeath:
                    self.die(day)
            else:
                if np.random.rand()<severedeath:
                    self.die(day)
        #治愈出院,获得永久免疫
        def cured(self,day):
            self.i=0
            curedpersonbyday[day]+=1         #统计变量
            admittedpersonbyday[day]-=1      #统计变量
            community.append(self)
            hospital.remove(self)
        #死亡
        def die(self,day):
            deadpersonbyday[day]+=1          #统计变量
            admittedpersonbyday[day]-=1      #统计变量
            death.append(self)
            hospital.remove(self)
        #接口函数
        def tick(self,day):
            if self.i==1:   #若被感染
                d=day-self.ind
                if d<itohday:   #潜伏期
                    pass
                elif d==itohday:   #住院日,潜伏期过
                    self.hospital(day)
                elif d<obday+itohday:    #在住院
                    self.inhospital(day)
                elif d==obday+itohday:    #出院日,被治愈
                    self.cured(day)
                else:
                    pass
            else:   #若没有被感染
                self.beinfected(day)
    

    主程序代码修改如下:

    #产生一个社区,横纵坐标分布为标准正态分布扩大3倍
    Xc=np.random.randn(persons)*3
    Yc=np.random.randn(persons)*3
    for i in range(persons):
        temp=person(i,Xc[i],Yc[i])
        community.append(temp)
    #时间开始
    day=0
    #初始化统计变量
    curedpersonbyday.append(0)
    infectedpersonbyday.append(0)
    admittedpersonbyday.append(0)
    deadpersonbyday.append(0)
    #随机产生零号感染者
    community[int(np.random.rand()*persons)].infected(day)
    #绘制Day 0图像
    plots()
    #流行开始
    #la=1    #潜伏期人数
    while day<=days:
        day+=1
        #统计变量列表增加新增1天
        curedpersonbyday.append(curedpersonbyday[day-1])
        infectedpersonbyday.append(infectedpersonbyday[day-1])
        admittedpersonbyday.append(admittedpersonbyday[day-1])
        deadpersonbyday.append(deadpersonbyday[day-1])
        #先处理在院人群
        for i in range(-len(hospital)+1,1):     #注意此处为从后往前处理列表中数据
            hospital[-i].tick(day)
        #再处理社区人群
        for i in range(-len(community)+1,1):    #注意此处为从后往前处理列表中数据
            community[-i].tick(day)
        
        #print((infectedpersonbyday[day]-infectedpersonbyday[day-1])/la)
        #la=len(latent)
        #画图
        plots()
    print("END")
    

    6.动态多图绘图代码

    我们用plots()函数来同时绘制同一窗口显示四张图表,左上为人群分布散点图,蓝色为易感人群,红色为潜伏期人群,黑色为免疫人群(治愈人群);右上为感染率曲线;左下为住院人数曲线;右下为死亡人数曲线。

    def plots():
        X=[]        #横坐标列表
        Y=[]        #纵坐标列表
        C=[]        #散点颜色列表
        Xp=np.arange(0,days)    #统计图横坐标
        for i in range(len(community)):
            X.append(community[i].getx())
            Y.append(community[i].gety())
            if community[i].i==0:
                if community[i].s==1:
                    C.append('b')   #未感染者:蓝色
                else:
                    C.append('k')   #治愈者:黑色
            else:
                C.append('r')       #潜伏期:红色
        plt.ion()       #打开动态模式
        plt.clf()       #清除图像
        #绘制散点图
        apen=plt.subplot(2,2,1)
        apen.set_title("Day %d" %(day))
        apen.scatter(X,Y,c=C,s=1)
        #绘制累计感染人数图
        bpen=plt.subplot(2,2,2)
        bpen.set_title("Infected %.2f%%" %(infectedpersonbyday[day]/persons*100))
        bpen.plot(Xp[0:day+1],infectedpersonbyday)
        #绘制实时在院人数图
        cpen=plt.subplot(2,2,3)
        cpen.set_title("In hospital %.2f%%" %(admittedpersonbyday[day]/persons*100))
        cpen.plot(Xp[0:day+1],admittedpersonbyday)
        #绘制累计死亡人数图
        dpen=plt.subplot(2,2,4)
        dpen.set_title("Death %.2f%%" %(deadpersonbyday[day]/persons*100))
        dpen.plot(Xp[0:day+1],deadpersonbyday)
        plt.pause(1)    #暂停
        plt.show()      #显示图像
        plt.savefig('Day %d.png'%(day))     #保存图像
    

    7.调试参数

    至此,全部代码完成,效果如下:


    效果图(节选)

    下面可以调节不同的初始参数来改变模型初始值。我们可以根据新闻或官方报道数据调整参数,使模型更具有可信度。

    相关文章

      网友评论

        本文标题:传染病传播模拟(Python实现)

        本文链接:https://www.haomeiwen.com/subject/ocqhuhtx.html