美文网首页
ViBe运动目标检测python实现

ViBe运动目标检测python实现

作者: Curya | 来源:发表于2020-07-19 22:43 被阅读0次

    代码实现参考官方开源C代码,仅实现了对灰度图的运动目标检测,即对应C代码中三个核心函数:

    libvibeModel_Sequential_AllocInit_8u_C1R
    libvibeModel_Sequential_Segmentation_8u_C1R
    libvibeModel_Sequential_Update_8u_C1R
    

    其中,Segmentation部分循环进行了相应的修改,没有对像素进行遍历逐一修改,而是通过获取mask,直接对图像矩阵进行修改;Update部分完全照着C代码改的python,判断语句太多了。

    官网截图

    全部代码:https://github.com/232525/ViBe.python

    ViBe类文件

    # -*- coding: utf-8 -*-
    """
    Created on Sat Jul 18 17:43:58 2020
    @filename: lib_vibe.py
    @author: curya
    """
    
    import numpy as np
    import time
    
    class vibe_gray:
        def __init__(self, ):
            self.width = 0
            self.height = 0
            self.numberOfSamples = 20
            self.matchingThreshold = 20
            self.matchingNumber = 2
            self.updateFactor = 16
            
            # Storage for the history
            self.historyImage = None
            self.historyBuffer = None
            self.lastHistoryImageSwapped = 0
            self.numberOfHistoryImages = 2
            
            # Buffers with random values
            self.jump = None
            self.neighbor = None
            self.position = None
            
        def AllocInit(self, image):
            print('AllocInit!')
            height, width = image.shape[:2]
            # set the parametors
            self.width = width
            self.height = height
            print(self.height, self.width)
            
            # create the historyImage
            self.historyImage = np.zeros((self.height, self.width, self.numberOfHistoryImages), np.uint8)
            for i in range(self.numberOfHistoryImages):
                self.historyImage[:, :, i] = image
                
            # create and fill the historyBuffer
            self.historyBuffer = np.zeros((self.height, self.width, self.numberOfSamples-self.numberOfHistoryImages), np.uint8)
            for i in range(self.numberOfSamples-self.numberOfHistoryImages):
                image_plus_noise = image + np.random.randint(-10, 10, (self.height, self.width))
                image_plus_noise[image_plus_noise > 255] = 255
                image_plus_noise[image_plus_noise < 0] = 0
                self.historyBuffer[:, :, i] = image_plus_noise.astype(np.uint8)
            
            # fill the buffers with random values
            size = 2 * self.width + 1 if (self.width > self.height) else 2 * self.height + 1
            self.jump = np.zeros((size), np.uint32)
            self.neighbor = np.zeros((size), np.int)
            self.position = np.zeros((size), np.uint32)
            for i in range(size):
                self.jump[i] = np.random.randint(1, 2*self.updateFactor+1)
                self.neighbor[i] = np.random.randint(-1, 3-1) + np.random.randint(-1, 3-1) * self.width
                self.position[i] = np.random.randint(0, self.numberOfSamples)
            
        
        def Segmentation(self, image):
            # segmentation_map init
            segmentation_map = np.zeros((self.height, self.width)) + (self.matchingNumber - 1)
            
            # first history image
            mask = np.abs(image - self.historyImage[:, :, 0]) > self.matchingThreshold
            segmentation_map[mask] = self.matchingNumber
            
            # next historyImages
            for i in range(1, self.numberOfHistoryImages):
                mask = np.abs(image - self.historyImage[:, :, i]) <= self.matchingThreshold
                segmentation_map[mask] = segmentation_map[mask] - 1
            
            # for swapping
            self.lastHistoryImageSwapped = (self.lastHistoryImageSwapped + 1) % self.numberOfHistoryImages
            swappingImageBuffer = self.historyImage[:, :, self.lastHistoryImageSwapped]
            
            # now, we move in the buffer and leave the historyImage
            numberOfTests = self.numberOfSamples - self.numberOfHistoryImages
            mask = segmentation_map > 0
            for i in range(numberOfTests):
                mask_ = np.abs(image - self.historyBuffer[:, :, i]) <= self.matchingThreshold
                mask_ = mask * mask_
                segmentation_map[mask_] = segmentation_map[mask_] - 1
                
                # Swapping: Putting found value in history image buffer
                temp = swappingImageBuffer[mask_].copy()
                swappingImageBuffer[mask_] = self.historyBuffer[:, :, i][mask_].copy()
                self.historyBuffer[:, :, i][mask_] = temp
            
            # simulate the exit inner loop
            mask_ = segmentation_map <= 0
            mask_ = mask * mask_
            segmentation_map[mask_] = 0
            
            # Produces the output. Note that this step is application-dependent
            mask = segmentation_map > 0
            segmentation_map[mask] = 255
            return segmentation_map.astype(np.uint8)
        
        def Update(self, image, updating_mask):
            numberOfTests = self.numberOfSamples - self.numberOfHistoryImages
            
            inner_time = 0
            t0 = time.time()
            # updating
            for y in range(1, self.height-1):
                t1 = time.time()
                shift = np.random.randint(0, self.width)
                indX = self.jump[shift]
                t2 = time.time()
                inner_time += (t2 - t1)
                #""" too slow
                while indX < self.width - 1:
                    t3 = time.time()
                    index = indX + y * self.width
                    t4 = time.time()
                    inner_time += (t4 - t3)
                    if updating_mask[y, indX] == 255:
                        t5 = time.time()
                        value = image[y, indX]
                        index_neighbor = index + self.neighbor[shift]
                        y_, indX_ = int(index_neighbor / self.width), int(index_neighbor % self.width)
                        
                        if self.position[shift] < self.numberOfHistoryImages:
                            self.historyImage[y, indX, self.position[shift]] = value
                            self.historyImage[y_, indX_, self.position[shift]] = value
                        else:
                            pos = self.position[shift] - self.numberOfHistoryImages
                            self.historyBuffer[y, indX, pos] = value
                            self.historyBuffer[y_, indX_, pos] = value
                        t6 = time.time()
                        inner_time += (t6 - t5)
                    t7 = time.time()
                    shift = shift + 1
                    indX = indX + self.jump[shift]
                    t8 = time.time()
                    inner_time += (t8 - t7)
                #"""
            t9 = time.time()
            # print('update: %.4f, inner time: %.4f' % (t9 - t0, inner_time))
            
            # First row
            y = 0
            shift = np.random.randint(0, self.width)
            indX = self.jump[shift]
            
            while indX <= self.width - 1:
                index = indX + y * self.width
                if updating_mask[y, indX] == 0:
                    if self.position[shift] < self.numberOfHistoryImages:
                        self.historyImage[y, indX, self.position[shift]] = image[y, indX]
                    else:
                        pos = self.position[shift] - self.numberOfHistoryImages
                        self.historyBuffer[y, indX, pos] = image[y, indX]
                
                shift = shift + 1
                indX = indX + self.jump[shift]
                
            # Last row
            y = self.height - 1
            shift = np.random.randint(0, self.width)
            indX = self.jump[shift]
            
            while indX <= self.width - 1:
                index = indX + y * self.width
                if updating_mask[y, indX] == 0:
                    if self.position[shift] < self.numberOfHistoryImages:
                        self.historyImage[y, indX, self.position[shift]] = image[y, indX]
                    else:
                        pos = self.position[shift] - self.numberOfHistoryImages
                        self.historyBuffer[y, indX, pos] = image[y, indX]
                
                shift = shift + 1
                indX = indX + self.jump[shift]
            
            # First column
            x = 0
            shift = np.random.randint(0, self.height)
            indY = self.jump[shift]
            
            while indY <= self.height - 1:
                index = x + indY * self.width
                if updating_mask[indY, x] == 0:
                    if self.position[shift] < self.numberOfHistoryImages:
                        self.historyImage[indY, x, self.position[shift]] = image[indY, x]
                    else:
                        pos = self.position[shift] - self.numberOfHistoryImages
                        self.historyBuffer[indY, x, pos] = image[indY, x]
                
                shift = shift + 1
                indY = indY + self.jump[shift]
                
            # Last column
            x = self.width - 1
            shift = np.random.randint(0, self.height)
            indY = self.jump[shift]
            
            while indY <= self.height - 1:
                index = x + indY * self.width
                if updating_mask[indY, x] == 0:
                    if self.position[shift] < self.numberOfHistoryImages:
                        self.historyImage[indY, x, self.position[shift]] = image[indY, x]
                    else:
                        pos = self.position[shift] - self.numberOfHistoryImages
                        self.historyBuffer[indY, x, pos] = image[indY, x]
                
                shift = shift + 1
                indY = indY + self.jump[shift]
                
            # The first pixel
            if np.random.randint(0, self.updateFactor) == 0:
                if updating_mask[0, 0] == 0:
                    position = np.random.randint(0, self.numberOfSamples)
                    
                    if position < self.numberOfHistoryImages:
                        self.historyImage[0, 0, position] = image[0, 0]
                    else:
                        pos = position - self.numberOfHistoryImages
                        self.historyBuffer[0, 0, pos] = image[0, 0]
    
    # TO DO
    class vibe_rgb:
        def __init__(self, ):
            pass
        
        def init(self, ):
            pass
        
        def segmentation(self, ):
            pass
        
        def update(self, ):
            pass
    

    调用主函数文件

    # -*- coding: utf-8 -*-
    """
    Created on Sat Jul 18 17:43:58 2020
    @filename: vibe_demo.py
    @author: curya
    """
    
    import numpy as np
    import cv2
    import time
    from lib_vibe import vibe_gray
    
    cap = cv2.VideoCapture('./TownCentreXVID.avi')
    vibe = vibe_gray()
    
    
    frame_index = 0
    segmentation_time = 0
    update_time = 0
    t1 = time.time()
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        gray_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        if frame_index % 100 == 0:
            print('Frame number: %d' % frame_index)
            
        if frame_index == 0:
            vibe.AllocInit(gray_frame)
        
        t2 = time.time()
        segmentation_map = vibe.Segmentation(gray_frame)
        t3 = time.time()
        vibe.Update(gray_frame, segmentation_map)
        t4 = time.time()
        segmentation_time += (t3-t2)
        update_time += (t4-t3)
        print('Frame %d, segmentation: %.4f, updating: %.4f' % (frame_index, t3-t2, t4-t3))
        segmentation_map = cv2.medianBlur(segmentation_map, 3)
        
        cv2.imshow('Actual Frame!', frame)  
        cv2.imshow('Gray Frame!', gray_frame)
        cv2.imshow('Segmentation Frame!', segmentation_map)
        frame_index += 1
        if cv2.waitKey(1) & 0xFF == ord('q'):                                 # Break while loop if video ends
            break
    t5 = time.time()    
    print('All time cost %.3f' % (t5-t1))
    print('segmentation time cost: %.3f, update time cost: %.3f' % (segmentation_time, update_time))
    
    cap.release()
    cv2.waitKey()
    cv2.destroyAllWindows()
    

    运行

    python vibe_demo.py
    

    问题

    和C代码比较,效率就是个弟弟,跑分辨率不是太高的视频勉强凑合,分辨率太高的视频就真的干看着了,不过和我找的几份别人用python实现的ViBe代码比较,感觉还算是快的。

    其次就是发现效率的硬伤之后,也就没去检查代码逻辑是不是和C代码一致,反正是可以检测目标。

    还是想办法用Cython复用C代码?我太难了。。。。。。好不容易照着别人的东西造了个轮子,结果轮子不是很圆。:-( Sad

    相关文章

      网友评论

          本文标题:ViBe运动目标检测python实现

          本文链接:https://www.haomeiwen.com/subject/qlhdkktx.html