美文网首页
异常检测的算法实现

异常检测的算法实现

作者: 此间不留白 | 来源:发表于2019-07-12 20:19 被阅读0次

前言

在这篇文章中,将会使用python实现异常检测算法检测服务器中的异常行为,我们将检测每一台服务器上吞吐量(mb/s)和延迟(ms)数据。在服务器运行过程中收集m=307个样本,这些样本都是无标签的,假设这些样本中的大多数数据表示服务器的正常行为,而少数样本的数据是表示服务器的异常行为。

算法实现

数据可视化

首先,通过二维图像可视化显示这些数据集,并且通过这些数据拟合高斯分布,并且可以发现某一些数据出现的概率很低,因此可以认为这些数据是异常数据。

加载数据之后,通过二维可视化之后如下所示:

plt.ion()
data = scio.loadmat('ex8data1.mat')
X = data['X']
Xval = data['Xval']
yval = data['yval'].flatten()
plt.figure()
plt.scatter(X[:, 0], X[:, 1], c='b', marker='x', s=15, linewidth=1)
plt.axis([0, 30, 0, 30])
plt.xlabel('Latency (ms)')
plt.ylabel('Throughput (mb/s')

如下图所示


将数据集可视化显示之后,可以看到,大多数数据落点比较密集,个别数据的落点比较分散,而这些数据很可能就是异常数据。

高斯分布

给定数据集X,需要计算高斯分布的参数,即也就是均值\mu和方差\sigma^2,高斯分布参数的计算公式如下所示:
\mu = \frac{1}{m}\sum_{i=1}^{m}x_i\sigma^2 = \frac{1}{m}\sum_{i=1}^{m}(x_i-\mu)^2

以上公式的代码实现如下所示:

def estimate_gaussian(X):
    # Useful variables
    m, n = X.shape
    mu = np.zeros(n)
    sigma2 = np.zeros(n)
    mu = np.mean(X,axis=0)
    sigma2 = np.var(X,axis=0)
    return mu, sigma2

代码中涉及到多元高斯模型,其计算公式如下所示:
p(x;\mu,A) = \frac{1}{(2 \pi)^{\frac{n}{2}}|A|^{\frac{1}{2}}}exp(-\frac{1}{2}{(x-\mu)}^TA^{-1}(x- \mu))
代码实现如下所示:

# 多维高斯分布

def multivariate_gaussian(X, mu, sigma2):
    k = mu.size
    if sigma2.ndim == 1 or (sigma2.ndim == 2 and (sigma2.shape[1] == 1 or sigma2.shape[0] == 1)):
        # 转换为对角矩阵
        sigma2 = np.diag(sigma2)

    x = X - mu
    p = (2 * np.pi) ** (-k / 2) * np.linalg.det(sigma2) ** (-0.5) * np.exp(-0.5*np.sum(np.dot(x, np.linalg.pinv(sigma2)) * x, axis=1))

    return p

对数据可视化显示并且以等高线形式显示,实现代码如下所示:

#拟合数据并可视化
def visualize_fit(X, mu, sigma2):
    #生成两个方阵,从0开始,到35.5 并且每一列递增速度为0.5
    grid = np.arange(0, 35.5, 0.5)
    x1, x2 = np.meshgrid(grid, grid)
    # x1.flatten('F') 以列优先展开
    Z = multivariate_gaussian(np.c_[x1.flatten('F'), x2.flatten('F')], mu, sigma2)
    Z = Z.reshape(x1.shape, order='F')

    plt.figure()
    plt.scatter(X[:, 0], X[:, 1], marker='x', c='b', s=15, linewidth=1)

    
    if np.sum(np.isinf(X)) == 0:
        lvls = 10 ** np.arange(-20, 0, 3).astype(np.float)
        # 画出等高线图
        plt.contour(x1, x2, Z, levels=lvls, colors='r', linewidths=0.7)

最后,可视化显示得到的图像如下所示,以等高线形式表示概率的大小,等高线从外向内,表示概率以此变大,从图像中可以看出大多数数据位于概率较高的地区,而异常数据应该处于概率较低的地区。

visualize_fit(X, mu, sigma2)
plt.xlabel('Latency (ms)')
plt.ylabel('Throughput (mb/s')

input('Program paused. Press ENTER to continue')

\epsilon参数的选择

为了选择合适的\epsilon参数,将会使用交叉验证集\{ x_{cv}^{(i)} , y_{cv}^{(i)}\},其中标签y=1表示此样本为异常样本,而标签y=0表示正常样本,对于每一个交叉验证样本,利用高斯公式会得到一个概率值p(x_{cv}^{(i)}),最后通过高斯公式计算出所有交叉验证集的概率值,并将其组合得到一个向量,通过此概率向量{p(x_{cv}^{(i)})}和输出y_{cv}^{(i)}所组成的向量可以计算\epsilon参数。

对于给定的\epsilon参数,如果样本x通过高斯分布得到的概率p(x) < \epsilon,则该样本为异常样本,引入一个参数F_1评估不同的参数\epsilon的工作性能。

参数F_1的计算公式如下所示:

F_1 = \frac{2·prec·rec}{prec+rec}

其中
prec = \frac{tp}{tp+fp}

rec = \frac{tp}{tp+fn}

  • tp表示正确的正样本的个数,表示算法能够正确的将其识别为异常样本。
  • fp表示错误的正样本个数,并且表示算法将正常样本识别为异常样本。
  • fn表示错误的负样本数量,并且表示算法将异常样本识别为正常样本。

注意:通过高斯分布得到的概率小于\epsilon,则认为其预测值是1,否则为0,通过交叉验证集得到的一系列概率值与\epsilon比较可以得到一组二进制向量predictions,对于以上F_1中,参数tp,fp,fn的计算可以直接通过验证集样本y_{val}predictions通过与操作,再通过np.sum()函数可以直接得到其数量。

最后,以上过程的实现代码如下所示:

def select_threshold(yval, pval):
    f1 = 0

 
    best_eps = 0
    best_f1 = 0

    for epsilon in np.linspace(np.min(pval), np.max(pval), num=1001):
        False(0)'s and True(1)'s of the outlier predictions
       

        predictions = np.less(pval, epsilon) #pval < epsilon = 0
        tp = np.sum(np.logical_and(predictions, yval)) #tp = 8
        fp = np.sum(np.logical_and(predictions, yval == 0)) # fp = 8
        fn = np.sum(np.logical_and(np.logical_not(predictions), yval == 1))   #fn = 2
        precision = tp / (tp + fp)
        recall = tp / (tp + fn)
        f1 = (2 * precision * recall) / (precision + recall)

        # 迭代寻找最佳值
        if f1 > best_f1:
            best_f1 = f1
            best_eps = epsilon

    return best_eps, best_f1

载入数据,运行以上函数

data = scio.loadmat('ex8data2.mat')
X = data['X']
Xval = data['Xval']
yval = data['yval'].flatten()

# Apply the same steps to the larger dataset
mu, sigma2 = estimate_gaussian(X)

# Training set
p = multivariate_gaussian(X, mu, sigma2)

# Cross Validation set
pval = multivariate_gaussian(Xval, mu, sigma2)

# Find the best threshold
epsilon, f1 = select_threshold(yval, pval)

print('Best epsilon found using cross-validation: {:0.4e}'.format(epsilon))
print('Best F1 on Cross Validation Set: {:0.6f}'.format(f1))
print('# Outliers found: {}'.format(np.sum(np.less(p, epsilon))))

通过运行以上代码,得到的\epsilon值如下图所示:

相关文章

网友评论

      本文标题:异常检测的算法实现

      本文链接:https://www.haomeiwen.com/subject/wwvwtctx.html