美文网首页
第二课:高斯,单峰,连续, 卡尔曼滤波

第二课:高斯,单峰,连续, 卡尔曼滤波

作者: Allen的光影天地 | 来源:发表于2019-03-01 10:21 被阅读2次

    一维高斯分布

    代码

    关键问题描述都放在代码注释里

    # Write a program that will iteratively update and
    # predict based on the location measurements 
    # and inferred motions shown below. 
    
    # 用传感器更新当前状态,主要是用高斯乘法
    # 也就是所谓的纠偏
    # 新的方差比原来两个方差都小
    # 新的均值在原来两个均值之间
    def update(mean1, var1, mean2, var2):
        new_mean = float(var2 * mean1 + var1 * mean2) / (var1 + var2)
        new_var = 1./(1./var1 + 1./var2)
        return [new_mean, new_var]
    
    
    # 移动部分就是直接按照当前的速度或者方向,向前推进对应的距离
    # 在移动的过程,我们直接将移动造成的均值和方差加在原来均值和方差之上
    # 势必会导致方差变大,这也是合乎常理,因为移动预测势必会造成信息丢失
    def predict(mean1, var1, mean2, var2):
        new_mean = mean1 + mean2
        new_var = var1 + var2
        return [new_mean, new_var]
    
    # 传感器的更新状态
    measurements = [5., 6., 7., 9., 10.]
    # 每次移动的距离
    motion = [1., 1., 2., 1., 1.]
    #传感器方差
    measurement_sig = 4.
    # 移动的方差
    motion_sig = 2.
    # 初始均值
    mu = 0.
    # 初始方差
    sig = 10000.
    
    #Please print out ONLY the final values of the mean
    #and the variance in a list [mu, sig]. 
    
    # Insert code here
    
    for i in range(len(measurements)):
        
        [mu, sig] = update(mu, sig, measurements[i], measurement_sig)
        print ('update',[mu, sig])
        [mu, sig] = predict(mu, sig, motion[i], motion_sig)
        print ('predict',[mu, sig])
    
    print [mu, sig]
    
    

    输出

    update [4.998000799680128, 3.9984006397441023]
    predict [5.998000799680128, 5.998400639744102]
    update [5.999200191953932, 2.399744061425258]
    predict [6.999200191953932, 4.399744061425258]
    update [6.999619127420922, 2.0951800575117594]
    predict [8.999619127420921, 4.09518005751176]
    update [8.999811802788143, 2.0235152416216957]
    predict [9.999811802788143, 4.023515241621696]
    update [9.999906177177365, 2.0058615808441944]
    predict [10.999906177177365, 4.005861580844194]
    

    分析

    可以很直观的看到,传感器对整体定位的逐步调整

    mu = 0
    sig = 0.0001
    

    值得注意的一点是:
    如上述代码
    如果我们将初始方差调到很小:意味着我们非常确信最开始的定位是准确的。
    我们同样会有对应的一组输出

    update [1.2499968750078127e-05, 9.999975000062502e-06]
    predict [1.00001249996875, 2.000009999975]
    update [2.6666805554976856, 1.3333377777592592]
    predict [3.6666805554976856, 3.333337777759259]
    update [5.181826859462716, 1.8181831404895565]
    predict [7.181826859462716, 3.8181831404895563]
    update [8.06977203891725, 1.9534887182243577]
    predict [9.06977203891725, 3.9534887182243574]
    update [9.532166075020006, 1.9883041811151205]
    predict [10.532166075020006, 3.9883041811151205]
    

    但是随着一次次的更新,我们发现,传感器具有更高的可信度,于是最终结果也是非常接近我们预期效果位置11

    高维高斯分布

    先看高维情况的卡尔曼滤波
    https://zhuanlan.zhihu.com/p/39912633

    一图胜千言
    二维4阶卡尔曼滤波实现
    # Write a function 'kalman_filter' that implements a multi-
    # dimensional Kalman Filter for the example given
    
    from math import *
    
    
    class matrix:
    
        # implements basic operations of a matrix class
    
        def __init__(self, value):
            self.value = value
            self.dimx = len(value)
            self.dimy = len(value[0])
            if value == [[]]:
                self.dimx = 0
    
        def zero(self, dimx, dimy):
            # check if valid dimensions
            if dimx < 1 or dimy < 1:
                raise ValueError("Invalid size of matrix")
            else:
                self.dimx = dimx
                self.dimy = dimy
                self.value = [[0 for row in range(dimy)] for col in range(dimx)]
    
        def identity(self, dim):
            # check if valid dimension
            if dim < 1:
                raise ValueError("Invalid size of matrix")
            else:
                self.dimx = dim
                self.dimy = dim
                self.value = [[0 for row in range(dim)] for col in range(dim)]
                for i in range(dim):
                    self.value[i][i] = 1
    
        def show(self):
            for i in range(self.dimx):
                print(self.value[i])
            print(' ')
    
        def __add__(self, other):
            # check if correct dimensions
            if self.dimx != other.dimx or self.dimy != other.dimy:
                raise ValueError("Matrices must be of equal dimensions to add")
            else:
                # add if correct dimensions
                res = matrix([[]])
                res.zero(self.dimx, self.dimy)
                for i in range(self.dimx):
                    for j in range(self.dimy):
                        res.value[i][j] = self.value[i][j] + other.value[i][j]
                return res
    
        def __sub__(self, other):
            # check if correct dimensions
            if self.dimx != other.dimx or self.dimy != other.dimy:
                raise ValueError("Matrices must be of equal dimensions to subtract")
            else:
                # subtract if correct dimensions
                res = matrix([[]])
                res.zero(self.dimx, self.dimy)
                for i in range(self.dimx):
                    for j in range(self.dimy):
                        res.value[i][j] = self.value[i][j] - other.value[i][j]
                return res
    
        def __mul__(self, other):
            # check if correct dimensions
            if self.dimy != other.dimx:
                raise ValueError("Matrices must be m*n and n*p to multiply")
            else:
                # multiply if correct dimensions
                res = matrix([[]])
                res.zero(self.dimx, other.dimy)
                for i in range(self.dimx):
                    for j in range(other.dimy):
                        for k in range(self.dimy):
                            res.value[i][j] += self.value[i][k] * other.value[k][j]
                return res
    
        def transpose(self):
            # compute transpose
            res = matrix([[]])
            res.zero(self.dimy, self.dimx)
            for i in range(self.dimx):
                for j in range(self.dimy):
                    res.value[j][i] = self.value[i][j]
            return res
    
        # Thanks to Ernesto P. Adorio for use of Cholesky and CholeskyInverse functions
    
        def Cholesky(self, ztol=1.0e-5):
            # Computes the upper triangular Cholesky factorization of
            # a positive definite matrix.
            res = matrix([[]])
            res.zero(self.dimx, self.dimx)
    
            for i in range(self.dimx):
                S = sum([(res.value[k][i]) ** 2 for k in range(i)])
                d = self.value[i][i] - S
                if abs(d) < ztol:
                    res.value[i][i] = 0.0
                else:
                    if d < 0.0:
                        raise ValueError("Matrix not positive-definite")
                    res.value[i][i] = sqrt(d)
                for j in range(i + 1, self.dimx):
                    S = sum([res.value[k][i] * res.value[k][j] for k in range(self.dimx)])
                    if abs(S) < ztol:
                        S = 0.0
                    try:
                        res.value[i][j] = (self.value[i][j] - S) / res.value[i][i]
                    except:
                        raise ValueError("Zero diagonal")
            return res
    
        def CholeskyInverse(self):
            # Computes inverse of matrix given its Cholesky upper Triangular
            # decomposition of matrix.
            res = matrix([[]])
            res.zero(self.dimx, self.dimx)
    
            # Backward step for inverse.
            for j in reversed(range(self.dimx)):
                tjj = self.value[j][j]
                S = sum([self.value[j][k] * res.value[j][k] for k in range(j + 1, self.dimx)])
                res.value[j][j] = 1.0 / tjj ** 2 - S / tjj
                for i in reversed(range(j)):
                    res.value[j][i] = res.value[i][j] = -sum(
                        [self.value[i][k] * res.value[k][j] for k in range(i + 1, self.dimx)]) / self.value[i][i]
            return res
    
        def inverse(self):
            aux = self.Cholesky()
            res = aux.CholeskyInverse()
            return res
    
        def __repr__(self):
            return repr(self.value)
    
    
    # z 是measurement
    def update(x, P, z):
        y = z - H * x
        S = H * P * H.transpose() + R  # 加入波动的协方差
        K = P * H.transpose() * S.inverse()
        x = x + (K * y)
        P = (I - K * H) * P
        return x, P
    
    
    def prediction(x, P):
        x = F * x + u
        P = F * P * F.transpose()
        return x, P
    
    
    def kalman_filter(x, P):
        for n in range(len(measurements)):
            z = matrix([measurements[n]]).transpose()
            # 本例中是先预测后测量
            x, P = prediction(x, P)
            x, P = update(x, P, z)
    
        return x, P
    
    
    # measurements = [1, 2, 3]  # 传感器的均值
    # x = matrix([[0.], [0.]])  # initial state (location and velocity) 均值
    # P = matrix([[1000., 0.], [0., 1000.]])  # initial uncertainty 协方差
    # u = matrix([[0.], [0.]])  # external motion 其他控制建模,比如司机手刹
    # F = matrix([[1., 1.], [0, 1.]])  # next state function 用于预测
    # H = matrix([[1., 0.]])  # measurement function
    # R = matrix([[1.]])  # measurement uncertainty 传感器的协方差
    # I = matrix([[1., 0.], [0., 1.]])  # identity matrix
    
    
    # measurements = [[5., 10.], [6., 8.], [7., 6.], [8., 4.], [9., 2.], [10., 0.]]
    # initial_xy = [4., 12.]
    
    # measurements = [[1., 4.], [6., 0.], [11., -4.], [16., -8.]]
    # initial_xy = [-4., 8.]
    
    measurements = [[1., 17.], [1., 15.], [1., 13.], [1., 11.]]
    initial_xy = [1., 19.]
    
    dt = 0.1
    
    x = matrix([[initial_xy[0]], [initial_xy[1]], [0.], [0.]]) # initial state (location and velocity)
    u = matrix([[0.], [0.], [0.], [0.]]) # external motion
    
    
    P = matrix([[0., 0., 0., 0.],
                [0., 0., 0., 0.],
                [0., 0., 1000., 0.],
                [0., 0., 0., 1000.]]) # initial uncertainty: 0 for positions x and y, 1000 for the two velocities
    F = matrix([[1., 0, dt, 0.],
                [0., 1, 0., dt],
                [0., 0, 1., 0.],
                [0., 0., 0., 1.]]) # next state function: generalize the 2d version to 4d
    H = matrix([[1., 0., 0., 0.],
                [0.,1.,0.,0.]]) # measurement function: reflect the fact that we observe x and y but not the two velocities
    R = matrix([[0.1, 0], [0, 0.1]]) # measurement uncertainty: use 2x2 matrix with 0.1 as main diagonal
    I = matrix([[1., 0.,0.,0.], [0., 1.,0.,0.],[0.,0.,1.,0.],[0.,0.,0.,1.]])  # 4d identity matrix
    
    a = kalman_filter(x, P)
    
    a[0].show()
    a[1].show()
    

    核心为后面,前面是实现了一个矩阵的API便于使用,其实完全可以用numpy


    本例中使用的卡尔曼滤波公式

    其中 R矩阵 也就是传感器的误差(方差),在真实情况下,可以通过实验获得,或者最开始设置一个较大的数字,然后观看效果。如果不错的话,就保留

    相关文章

      网友评论

          本文标题:第二课:高斯,单峰,连续, 卡尔曼滤波

          本文链接:https://www.haomeiwen.com/subject/nvbruqtx.html