美文网首页
MCMC的Metropolis-hastings 算法pytho

MCMC的Metropolis-hastings 算法pytho

作者: 社交达人叔本华 | 来源:发表于2022-01-05 01:32 被阅读0次

    本文重点介绍贝叶斯推断中的MCMC方法,这是众多方法中的一个,具体的分类可以看图。基于估计的呢,是点估计,求出目标分布的极值。这里呢多说一句,贝叶斯的点估计是对后验概率求导(p(x|\lambda)p(\lambda)),而频率学派的点估计呢是对似然概率求导(p(x|\lambda))。基于采样的呢是,对目标分布采样,用样本来表示目标分布. 另外,MCMC的理解,要参考两个非常形象的比喻,寻找山峰和利用随机计算圆的面积。

    THbXLt.png
    import scipy.stats as stats
    import matplotlib.pyplot as plt
    import numpy as np
    
    # 生成数据 - 有两组数据X服从true_lambda_1 的指数分布,Y服从true_lambda_2的指数分布,我们要考察的是他们的联合概率分布的更新过程。
    # 
    N=10 # sample size
    
    true_lambda_1=1
    true_lambda_2=3
    
    data=np.concatenate(
        [
            stats.poisson.rvs(true_lambda_1,size=(N,1)),
            stats.poisson.rvs(true_lambda_2,size=(N,1)),
        ],
        axis=1
    ) # x和y各自抽样一次,样本大小是N
    
    x=y=np.linspace(.01,5,100) # 参数lambda的所有可能取值
    likelihood_x=np.array(
        [
            stats.poisson.pmf(data[:,0],xi) for xi in x 
        ]
    ).prod(axis=1) # 第一组10个样本同时出现,lambda的概率分布!!!
    likelihood_y=np.array(
        [
            stats.poisson.pmf(data[0,:],yi) for yi in y 
        ]
    ).prod(axis=1) # 第二组10个样本同时出现,lambda的概率分布!!
    L=np.dot(likelihood_x.reshape((-1,1)),likelihood_y.reshape((1,-1))) # likelihood乘积的矩阵,矩阵的每一个元素都是p(x|lambda)p(y|lambda),用来更新先验形成后验,直接乘以先验就行了
    print(L.shape)
    
    (100, 100)
    
    # 先验分布 - 均匀分布
    plt.subplot(121)
    uni_x=stats.uniform.pdf(x,loc=0,scale=5)
    uni_y=stats.uniform.pdf(x,loc=0,scale=5)
    M=np.dot(uni_x.reshape((-1,1)),uni_y.reshape((1,-1)))
    im=plt.imshow(M,interpolation="none",origin="lower",cmap=plt.cm.jet,extent=(0,5,0,5))
    plt.scatter(true_lambda_2,true_lambda_1,c="k",s=50) # 注意纵坐标是lambda_2的分布
    plt.title("uniform prior")
    plt.xlabel("lambda_2")
    plt.ylabel("lambda_1")
    # 更新分布
    plt.subplot(122)
    plt.contour(x,y,M*L) # 画等高线
    im=plt.imshow(M*L,interpolation="none",origin="lower",cmap=plt.cm.jet,extent=(0,5,0,5))
    plt.scatter(true_lambda_2,true_lambda_1,c="k",s=50) # 画出真实值的位置
    plt.title("postieror after 1 sampling")
    plt.xlabel("lambda_2")
    plt.ylabel("lambda_1")
    
    Text(0, 0.5, 'lambda_1')
    
    mcmc_3_1.png

    上面这个过程呢,其实是贝叶斯推断的标准过程。包含了几个要素:确定模型(x,y来自两个不同的泊松分布),确定模型参数(两个泊松分布的参数lambda_1,lambda_2),确定先验(均采用均匀分布),采样,更新。

    其实,有了这个标准过程就已经可以解决问题了。MCMC呢,对这个过程进行了改进,规避掉了计算复杂度最高的分母。MCMC结合了两个东西,第一个MC是markov chain,指的是当前值只依赖上一步的值。第二个MC呢,指的是monte carlo这是一个用根据分布获取随机数的东西,同时获取出来的随机数呢,又能反向勾勒出原分布的情况。蒙地卡罗还可以利用随机来测量圆形或者不规则图形面积哦,很重要的一个技巧,我们不需要知道原到底长什么样,我们只需要知道某个点到底来自不来自于这个圆内就行了。MCMC是这样一个核心思想(书上看到的一段话,因为写的太好了,所以直接搬运一下):

    首先明确一点,MCMC返回值不是后验分布,而是后验分布上的一些样本点。我们可以把MCMC的搜索过程看做是寻找正确的山峰的过程,上面的图也能看出来,概率大的地方就是形成了一个山峰,我们要找的就是这样一个山峰。但是我们返回的是这座上上的一些石头,用来表示这座山。我们可以把MCMC的搜索过程,想象成不断地问每一块石头(样本):你是不是来自我要找的那座山?如果是,那么就会沿着这块石头所指引的方向前进,如果不是就丢掉。最后我们把所有是那座山的石头都作为返回值给出。关于MCMC,我找到了一个很好的入门短视频MCMC

    MCMC的具体流程呢是这样的,整个过程和利用随机测圆面积很像:

    • 对参数\lambda,先给一个先验分布,并且给出一个最开始的初始值 \lambda_0
    • 利用马尔科夫链得到下一个参数 \lambda_1作为下一步的参数。具体做法是利用一个叫Proposal distribution的东西,这个分布呢其实是markov chain里的transition function,它结合当前值给出下一个值的分布,比如说我们在这里用的就是很简单的正态分布N(\lambda_0,\sigma),可以看出其实就是以当前参数作为期望,然后给一个方差,得到这个分布,然后从这个分布上采样出下一个参数。所以说呀,这个玩意用什么分布其实都不重要,感觉就其实有点随机了,主要是后面有拒绝和接受那一步进行把关,这一步主要就是要随机给点(对应到测面积里,就是在随机产生点
    • 然后我们要判断这个\lambda_1是不是来自我们要找的那个山,我们要决定是接受还是拒绝。我们定义一个接受率r=\frac{posterior of \lambda_1}{posterior of \lambda_0}。这个接受率其实挺好理解的,如何判断一个参数是好还是坏呢?我们就看这个参数下,针对指定的evidence X给出的后验概率是大还是小,如果大那就毫无疑问就直接接受,如果小我们也不一定都拒绝,要以一定的概率拒绝。什么概率呢?我们用一个0到1之间的均匀分布采样一个数\alpha,如果这个采样出来的数比r大,那就接受,如果比它小呢,就拒绝(对应到圆面积里,这一步就是在判断随机产生的点是不是在圆内
    • 重复上述过程

    下面的内容呢,我将参考这篇教程,首先用dummy data跑一遍MCMC,然后再用一个实际的例子跑一遍MCMC。代码只用到numpy,scipy, matplotlib,没有用到任何成熟的库

    #“”“Dummy Data Example。注意我们对acceptance 函数等式两边求了个log,不然计算机没办法计算”“”
    import numpy as np
    import scipy.stats as stats
    # prepare data
    mod1=lambda t:np.random.normal(10,3,t)
    population=mod1(30000) # 总数据是30000个
    observations=population[np.random.randint(0,30000,1000)] #假设只观察到1000个数据
    fig=plt.figure(figsize=(10,10))
    ax=fig.add_subplot(1,1,1)
    ax.hist(observations,bins=35)
    ax.set_xlabel("Value")
    ax.set_ylabel("Frequency")
    ax.set_title("Distribution of Dummy Data")
    mu=observations.mean()
    print("mu=",mu)
    
    
    mu= 10.177366628047666
    
    mcmc_5_1.png
    
    # 为了简化问题,我们只求参数sigma,mu我们通过观察这1000组数据,用平均值表示mu。其实mu也可以用mcmc求,但是这只是一个简单的例子,不想搞得太复杂
    # proposal distribution - 同样,我们采用一个很常用的proposal distribution,就是正态分布N(mu=sigma_current,sigma=1)
    sample_proposal_distribution=lambda mu: np.random.normal(mu,0.5,1)
    
    # function to calculate prior
    def get_log_prior(theta):
        """注意,我们对先验没有要求,认为就是个均匀分布,而且我们整个过程中也没有更新这个先验,始终认为先验就是均匀分布"""
        sigma=theta["sigma"]
        if sigma>0:
            return 0
        else:
            return -np.inf
        
    # function to calculate posterior
    def get_log_posterior(theta,evidence,log_prior):
        sigma=theta["sigma"]
        likelihood=stats.norm.pdf(evidence,loc=9.8,scale=sigma)
        log_likelihood=np.log(likelihood).sum()
        return log_likelihood+log_prior
    
    # acceptance function
    def acceptance(current_theta,proposed_theta):
        current_log_prior=get_log_prior(current_theta)
        proposed_log_prior=get_log_prior(proposed_theta)
        current_log_posterior=get_log_posterior(current_theta,observations,current_log_prior)
        proposed_log_posterior=get_log_posterior(proposed_theta,observations,proposed_log_prior)
        log_r=proposed_log_posterior-current_log_posterior
        if log_r>1:
            return True
        else:
            alpha=np.random.uniform(0,1)
            if alpha<np.exp(log_r):
                return True
            else:
                return False
    
    def metrolpolis_hastings(initial_theta,n_samples):
        count_samples=0
        accepted_samples=[]
        rejected_samples=[]
        current_theta=initial_theta
        
        while count_samples <n_samples:
            proposed_sigma=sample_proposal_distribution(current_theta["sigma"])
            proposed_theta={"sigma":proposed_sigma[0]}
            accept_or_reject=acceptance(current_theta,proposed_theta)
            
            if accept_or_reject is True:
                count_samples+=1
                accepted_samples.append(proposed_theta)
                current_theta=proposed_theta
            else:
                rejected_samples.append(proposed_theta)
        
        return accepted_samples,rejected_samples
    
    initial_theta={"sigma":0.5}
    n_samples=10000
    accepted_samples,rejected_samples=metrolpolis_hastings(initial_theta,n_samples)
    
        
        
        
    
    # visualize the trace of sigma
    accepted_sigmas=[s["sigma"] for s in accepted_samples]
    rejected_sigmas=[s["sigma"] for s in rejected_samples]
    steps=range(len(accepted_sigmas))
    plt.subplot(121)
    plt.hist(accepted_sigmas,bins=30)
    plt.title("Accepted values")
    plt.subplot(122)
    plt.hist(rejected_sigmas,bins=30,color="red")
    plt.title("rejected falues")
    
    Text(0.5, 1.0, 'rejected values')
    
    mcmc_7_1.png

    最后附上更简洁的完整代码

    """
    Author: xuqh
    Created on 2022/1/5
    This is a complete implementation of Metropolis-Hasting Algorithm (MCMC)
    """
    import numpy as np
    import scipy.stats as stats
    import matplotlib.pyplot as plt
    
    
    def sample_from_proposal_distribution(current_theta):
        current_sigma = current_theta["sigma"]
        return {
            "sigma": np.random.normal(current_sigma, 1, 1)[0]
        }
    
    
    def get_log_prior(theta):
        """log(p(theta))"""
        sigma = theta["sigma"]
        if sigma > 0:
            return 0
        else:
            return -np.inf
    
    
    def get_log_likelihood(evidence, theta):
        """log(p(evidence|theta))"""
        sigma = theta["sigma"]
        likelihood = stats.norm.pdf(evidence, loc=9.8, scale=sigma)
        log_likelihood = np.log(likelihood).sum()
        return log_likelihood
    
    
    def get_log_posterior(evidence, theta):
        """log(p(theta))+log(p(evidence|theta))"""
        log_prior = get_log_prior(theta)
        log_likelihood = get_log_likelihood(evidence, theta)
        return log_prior + log_likelihood
    
    
    def accept(current_theta, proposed_theta, evidence):
        current_log_posterior = get_log_posterior(evidence, current_theta)
        proposed_log_posterior = get_log_posterior(evidence, proposed_theta)
        r = proposed_log_posterior - current_log_posterior
        if r > 1:
            return True
        else:
            alpha = np.random.uniform(0, 1)
            if alpha < np.exp(r):
                return True
            else:
                return False
    
    
    def metropolis_hastings(initial_theta, n_samples, evidence):
        current_theta = initial_theta
        n_accepted_samples = 0
        accepted_samples = []
        rejected_samples = []
        while n_accepted_samples < n_samples:
            proposed_theta = sample_from_proposal_distribution(current_theta)
            if accept(current_theta, proposed_theta, evidence):
                n_accepted_samples += 1
                current_theta=proposed_theta
                accepted_samples.append(proposed_theta)
            else:
                rejected_samples.append((n_accepted_samples, proposed_theta))
    
        return accepted_samples, rejected_samples
    
    
    if __name__ == '__main__':
        # generate data
        observations = np.random.normal(10, 3, 1000)
    
        # metropolis-hastings
        initial_theta = {
            "sigma": 1.0
        }
        n_samples = 10000
        accepted_samples, rejected_samples = metropolis_hastings(initial_theta, n_samples, observations)
    
        # plot trace & target distribution
        plt.subplot(2, 1, 1)
        plt.title("trace of sigma")
        accepted_xs = range(len(accepted_samples))
        accepted_ys = [s["sigma"] for s in accepted_samples]
        plt.plot(accepted_xs, accepted_ys, color="green")
        rejected_xs = [i for i, s in rejected_samples]
        rejected_ys = [s["sigma"] for i, s in rejected_samples]
        plt.scatter(rejected_xs, rejected_ys, marker="x", color="red")
        plt.subplot(2, 1, 2)
        plt.title("target distribution")
        plt.hist(accepted_ys, bins=30)
        plt.show()
    
    

    再赠送一个实际的例子,数据来自这个网站的monthly sunspot数据

    """
    Author: xuqh
    Created on 2022/1/5
    Applicationo of MCMC and Metropolis-hastings algorithm in real-world cases
        Case: Sunspot number (monthly) X
        Assumption: X~Gamma(a,b)
    """
    import pandas as pd
    import numpy as np
    import matplotlib.pyplot as plt
    import scipy.stats as stats
    
    
    def sample_from_proposal_distribution(theta):
        a = theta['a']
        b = theta["b"]
        proposed_a = np.random.normal(a, 0.05, 5)[0]
        proposed_b = np.random.normal(b, 0.05, 5)[0]
        return {
            "a": proposed_a,
            "b": proposed_b
        }
    
    
    def get_data():
        fpath = "data/SN_m_tot_V2.0.csv"
        pdf = pd.read_csv(open(fpath), header=None, sep=";")
        data = pdf[3].to_numpy()
        return data
    
    
    def get_log_prior(theta):
        a = theta['a']
        b = theta["b"]
    
        if a <= 0 or b <= 0:
            return -np.inf
        else:
            return 0
    
    
    def get_log_likelihood(theta, evidence):
        a = theta['a']
        b = theta["b"]
        likelihood = stats.gamma.pdf(evidence,a=a, scale=b, loc=0)
    
        log_likelihood = np.log(likelihood).sum()
        temp=likelihood
        return log_likelihood
    
    
    def get_log_posterior(theta, evidence):
        log_prior = get_log_prior(theta)
        log_likelihood = get_log_likelihood(theta, evidence)
        return log_prior + log_likelihood
    
    
    def accept(current_theta, proposed_theta, evidence):
        current_log_posterior = get_log_posterior(current_theta, evidence)
        proposed_log_posterior = get_log_posterior(proposed_theta, evidence)
        log_r = proposed_log_posterior - current_log_posterior
        if log_r > 1:
            return True
        else:
            alpha = np.random.uniform(0, 1)
            if alpha < np.exp(log_r):
                return True
            else:
                return False
    
    
    def metropolis_hastings(initial_theta, n_samples, evidence):
        current_theta = initial_theta
        n_accepted_samples = 0
        accepted_samples = []
        rejected_samples = []
        while n_accepted_samples < n_samples:
    
            proposed_theta = sample_from_proposal_distribution(current_theta)
            if accept(current_theta, proposed_theta, evidence):
                n_accepted_samples += 1
                current_theta = proposed_theta
                accepted_samples.append(proposed_theta)
            else:
                rejected_samples.append(proposed_theta)
        return accepted_samples, rejected_samples
    
    
    if __name__ == '__main__':
        data = get_data()
        data+=1 # remove zeros
        # data=data[-1000:]
        initial_theta = {
            "a": 4,
            "b": 10
        }
        n_samples = 1000
        accepted_samples, rejected_samples = metropolis_hastings(initial_theta, n_samples, data)
    
        plt.title("trace of a and b")
        accepted_xs=[s["a"] for s in accepted_samples]
        accepted_ys=[s["b"] for s in accepted_samples]
        plt.plot(accepted_xs,accepted_ys)
        rejected_xs=[s["a"] for s in rejected_samples]
        rejected_ys=[s["b"] for s in rejected_samples]
        plt.scatter(rejected_xs,rejected_ys,marker="x",color="red")
        plt.show()
    

    相关文章

      网友评论

          本文标题:MCMC的Metropolis-hastings 算法pytho

          本文链接:https://www.haomeiwen.com/subject/wwfhcrtx.html