I've developed a system for blob detection and tracking in Python using OpenCV and Numpy, which is intended for processing video input. The system is structured around custom classes for detecting and tracking objects (blobs) through video frames. However, I'm encountering significant performance issues, with the system running too slowly for real-time application. My setup is deployed on an NXP's IMX8M Plus board, which offers certain hardware acceleration capabilities, but I'm uncertain how to best leverage these or otherwise optimize my system for improved performance.
The system's architecture includes:
Blob: Represents a detected object, holding properties like ID, contour, and a method for updating its path with new contours.
BlobTracker: Manages multiple Blob instances, updates their states with new detections, and calculates metrics like Intersection over Union (IoU) and centroid distances for tracking.
MOG2ObjectDetector: Uses OpenCV's MOG2 background subtractor for initial blob detection, processes masks to generate bounding boxes, and feeds these into the BlobTracker.
Main Issues:
The system's performance does not meet the requirements for real-time processing.
I'm unsure how to utilize the hardware acceleration features of the NXP's IMX8M Plus board effectively.
I'm considering various optimizations, including potentially using spatial hashing for detected blobs, and open to suggestions on code or algorithmic improvements.
Questions:
What specific code optimizations can be made, especially with numpy or OpenCV operations, to enhance performance?
How can I take advantage of the hardware acceleration capabilities available on the NXP's IMX8M Plus board in this context?
Would implementing a spatial hash for blob detection significantly improve tracking performance? If so, how should I approach it?
Code:
We will call, for each frame, the MOG2ObjectDetector.get_matches(...):
import math from typing import List, Tuple, Optional import cv2 import numpy as np class Blob: """Class to represent a detected object (blob) in a video.""" def __init__(self, id: int, contour: List[int]): """Initialize a Blob with an ID, a contour, a path, a time_to_live and a centroid.""" self.id = id self.seen = False self.contour = contour self.path = [contour] self.time_to_live = 10 def update_path(self, new_contour): """Update the path of the Blob with a new contour and recalculate its centroid.""" self.path.append(new_contour) self.contour = new_contour self.time_to_live += 1 def get_detection(self): """Decrease the time_to_live by one and return the contour of the Blob.""" self.time_to_live -= 1 return self.contour class BlobTracker: """Class to track multiple Blobs in a video.""" def __init__(self, width, height, fps): """Initialize a BlobTracker with an ID counter, a list of Blobs, and frame parameters.""" self.next_id = 0 self.blobs: List[Blob] = [] self.frame_width = width self.frame_height = height self.flow_threshold = max(0, width // fps) ** 2 # Assuming path in video is 5 meters long and the maximum velocity of an object is 30km/h. (about 10m/s -> which takes 1/2s to pass the entire frame) self.iou_threshold = 0.6 def update_blobs(self, new_matches: List[List[int]]): """Update the Blobs based on new matches.""" for blob in self.blobs: self.update_blob(blob, new_matches) for match in new_matches: print("Created new matched blob:", self.next_id) new_blob = Blob(self.next_id, match[:4]) self.next_id += 1 self.blobs.append(new_blob) return self.get_detections() def get_detections(self): """Get the detections from the Blobs, and remove any Blobs that are no longer alive.""" detections = [] for blob in self.blobs: if not blob.seen: blob.seen = True detections.append(blob.get_detection()) if blob.time_to_live bool: """Check if the Intersection over Union (IoU) of a Blob and a match is above a threshold.""" iou = self.calculate_iou(blob.contour, match) return iou > self.iou_threshold @staticmethod def calculate_iou(blob_contour, match_contour): xA = max(blob_contour[0], match_contour[0]) yA = max(blob_contour[1], match_contour[1]) xB = min(blob_contour[2], match_contour[2]) yB = min(blob_contour[3], match_contour[3]) interArea = max(0, xB - xA + 1) * max(0, yB - yA + 1) boxAArea = (blob_contour[2] - blob_contour[0] + 1) * (blob_contour[3] - blob_contour[1] + 1) boxBArea = (match_contour[2] - match_contour[0] + 1) * (match_contour[3] - match_contour[1] + 1) iou = interArea / float(boxAArea + boxBArea - interArea) return iou def distance_squared_constraint_is_passed(self, blob: Blob, match: List[int]): """Check if the squared distance between the centroid of a Blob and a match is below a threshold.""" print(f"Calculate dS2: {blob.contour} vs. {match}") blob_centroid = self.calculate_centroid(*blob.contour) match_centroid = self.calculate_centroid(*match) dS2 = (blob_centroid[0] - match_centroid[0]) ** 2 + (blob_centroid[1] - match_centroid[1]) ** 2 return dS2 < self.flow_threshold @staticmethod def calculate_centroid(x_min: int, y_min: int, x_max: int, y_max: int): """Calculate the centroid of the match based on its contour.""" return list(map(lambda val: val // 2, (x_min + x_max, y_min + y_max))) class MOG2ObjectDetector: def __init__( self, inputShape: tuple, doTPU: bool = False, fps: int = 30, backgroundSubtractorHistory: int = 500, backgroundSubtractorvarThreshold: int = 64, backgroundSubtractordetectShadows: bool = False, bboxDetectionKernelSize: int = 64, bboxDetectionThreshold: int = 64, bboxMaxIncrease: int = 2, returnMask: bool = False, ): self.inputShape = inputShape self.doTPU = doTPU self.blob_tracker = BlobTracker(inputShape[0], inputShape[1], fps) self.backgroundSubtractor = cv2.createBackgroundSubtractorMOG2( history=backgroundSubtractorHistory, varThreshold=backgroundSubtractorvarThreshold, detectShadows=backgroundSubtractordetectShadows, ) self.bboxDetectionKernel = np.ones( (bboxDetectionKernelSize, bboxDetectionKernelSize), dtype=np.float16 ) / (bboxDetectionKernelSize**2) self.bboxDetectionThreshold = bboxDetectionThreshold self.bboxMaxIncrease = bboxMaxIncrease self.bboxLastFrame = [] self.returnMask = returnMask def get_matches(self, processed, frame): //blurred = cv2.GaussianBlur(processed, (5, 5), 0) blurred = processed // does not make a lot of sense to do gaussian before MOG imo, but it seemed like other's did it as well mask = self.frame_to_mask(blurred) processed = self.process_mask(mask) bbox = self.mask_to_bbox(processed) blobs = self.blob_tracker.update_blobs(bbox) print(blobs) return blobs def frame_to_mask(self, frame): """ Convert a frame to a mask. If TPU (in this case, GPU) is available, the frame is first converted to grayscale, then to UMat before applying the background subtractor. If TPU is not available, the background subtractor is applied directly to the frame. Args: frame: The frame to be converted to a mask. Returns: mask: The frame converted to a mask. """ if self.doTPU: # convert image to grayscale gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) # convert numpy.ndarray to UMat umat_gray = cv2.UMat(gray) umat_mask = self.backgroundSubtractor.apply(umat_gray) mask = umat_mask.get() return mask else: return self.backgroundSubtractor.apply(frame) def process_mask(self, mask): """ Process a mask. If TPU (in this case, GPU) is available, the mask is first converted to UMat, then convolved with a bboxDetectionKernel and thresholded. If TPU is not available, the mask is directly convolved and thresholded. Args: mask: The mask to be processed. Returns: processed: The processed mask. """ if self.doTPU: umat_mask = cv2.UMat(mask) umat_convolved = cv2.filter2D(umat_mask, -1, self.bboxDetectionKernel) _, umat_processed = cv2.threshold( umat_convolved, self.bboxDetectionThreshold, 255, cv2.THRESH_BINARY ) processed = umat_processed.get() return processed else: convolved = cv2.filter2D(mask, -1, self.bboxDetectionKernel) _, processed = cv2.threshold( convolved, self.bboxDetectionThreshold, 255, cv2.THRESH_BINARY ) return processed @staticmethod def get_centroid(x: int, y: int, w: int, h: int) -> Tuple[int, int]: x1 = int(w / 2) y1 = int(h / 2) cx = x + x1 cy = y + y1 return (cx, cy) def mask_to_bbox(self, thresholded): """ Convert a thresholded mask into bounding boxes. Each contour found in the mask is converted into a bounding box. The bounding box's parameters are calculated and a centroid is also calculated for each bounding box. Args: thresholded: The thresholded mask to be converted into bounding boxes. Returns: detections: A list of bounding boxes and their respective centroids. """ if self.doTPU: # convert numpy.ndarray to UMat umat_thresholded = cv2.UMat(thresholded.astype("uint8")) contours, _ = cv2.findContours( umat_thresholded, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_TC89_L1 ) detections = [_ for _ in range(len(contours))] for i, contour in enumerate(contours): (x, y, w, h) = cv2.boundingRect(contour) centroid = self.get_centroid(x, y, w, h) detections = [x, y, x + w, y + h, h * w, centroid] # convert UMat back to numpy.ndarray detections = [detection.get() for detection in detections] else: contours, _ = cv2.findContours( thresholded.astype("uint8"), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_TC89_L1 ) detections = [_ for _ in range(len(contours))] for i, contour in enumerate(contours): (x, y, w, h) = cv2.boundingRect(contour) centroid = self.get_centroid(x, y, w, h) detections = [x, y, x + w, y + h, h * w, centroid] return detections def limit_bbox(self, bbox): if len(bbox) - len(self.bboxLastFrame) > self.bboxMaxIncrease: return self.bboxLastFrame bbox = [b for b in bbox if b[4] > 1000] return bbox I have tried converting to umat and to simpler precision, but these seem to not make it go any faster. Documentation on OpenCV and OpenGL stated that converting to umat should make it run on the GPU, but this does not seem to be the case. I am really new to developing on nxp devices and am kind of struggling using the provided tools, even though I combed through their documentations multiple times.
I've developed a system for blob detection and tracking in Python using OpenCV and Numpy, which is intended for processing video input. The system is structured around custom classes for detecting and tracking objects (blobs) through video frames. However, I'm encountering significant performance issues, with the system running too slowly for real-time application. My setup is deployed on an NXP's IMX8M Plus board, which offers certain hardware acceleration capabilities, but I'm uncertain how to best leverage these or otherwise optimize my system for improved performance.
The system's architecture includes: [list] [*][b]Blob[/b]: Represents a detected object, holding properties like ID, contour, and a method for updating its path with new contours. [*][b]BlobTracker[/b]: Manages multiple Blob instances, updates their states with new detections, and calculates metrics like Intersection over Union (IoU) and centroid distances for tracking. [*][b]MOG2ObjectDetector[/b]: Uses OpenCV's MOG2 background subtractor for initial blob detection, processes masks to generate bounding boxes, and feeds these into the BlobTracker. [/list] Main Issues: [list] [*]The system's performance does not meet the requirements for real-time processing. [*]I'm unsure how to utilize the hardware acceleration features of the NXP's IMX8M Plus board effectively. [*]I'm considering various optimizations, including potentially using spatial hashing for detected blobs, and open to suggestions on code or algorithmic improvements. [/list] Questions: [list] [*]What specific code optimizations can be made, especially with numpy or OpenCV operations, to enhance performance? [*]How can I take advantage of the hardware acceleration capabilities available on the NXP's IMX8M Plus board in this context? [*]Would implementing a spatial hash for blob detection significantly improve tracking performance? If so, how should I approach it? [/list] Code: We will call, for each frame, the MOG2ObjectDetector.get_matches(...):
import math from typing import List, Tuple, Optional import cv2 import numpy as np class Blob: """Class to represent a detected object (blob) in a video.""" def __init__(self, id: int, contour: List[int]): """Initialize a Blob with an ID, a contour, a path, a time_to_live and a centroid.""" self.id = id self.seen = False self.contour = contour self.path = [contour] self.time_to_live = 10 def update_path(self, new_contour): """Update the path of the Blob with a new contour and recalculate its centroid.""" self.path.append(new_contour) self.contour = new_contour self.time_to_live += 1 def get_detection(self): """Decrease the time_to_live by one and return the contour of the Blob.""" self.time_to_live -= 1 return self.contour class BlobTracker: """Class to track multiple Blobs in a video.""" def __init__(self, width, height, fps): """Initialize a BlobTracker with an ID counter, a list of Blobs, and frame parameters.""" self.next_id = 0 self.blobs: List[Blob] = [] self.frame_width = width self.frame_height = height self.flow_threshold = max(0, width // fps) ** 2 # Assuming path in video is 5 meters long and the maximum velocity of an object is 30km/h. (about 10m/s -> which takes 1/2s to pass the entire frame) self.iou_threshold = 0.6 def update_blobs(self, new_matches: List[List[int]]): """Update the Blobs based on new matches.""" for blob in self.blobs: self.update_blob(blob, new_matches) for match in new_matches: print("Created new matched blob:", self.next_id) new_blob = Blob(self.next_id, match[:4]) self.next_id += 1 self.blobs.append(new_blob) return self.get_detections() def get_detections(self): """Get the detections from the Blobs, and remove any Blobs that are no longer alive.""" detections = [] for blob in self.blobs: if not blob.seen: blob.seen = True detections.append(blob.get_detection()) if blob.time_to_live bool: """Check if the Intersection over Union (IoU) of a Blob and a match is above a threshold.""" iou = self.calculate_iou(blob.contour, match) return iou > self.iou_threshold @staticmethod def calculate_iou(blob_contour, match_contour): xA = max(blob_contour[0], match_contour[0]) yA = max(blob_contour[1], match_contour[1]) xB = min(blob_contour[2], match_contour[2]) yB = min(blob_contour[3], match_contour[3]) interArea = max(0, xB - xA + 1) * max(0, yB - yA + 1) boxAArea = (blob_contour[2] - blob_contour[0] + 1) * (blob_contour[3] - blob_contour[1] + 1) boxBArea = (match_contour[2] - match_contour[0] + 1) * (match_contour[3] - match_contour[1] + 1) iou = interArea / float(boxAArea + boxBArea - interArea) return iou def distance_squared_constraint_is_passed(self, blob: Blob, match: List[int]): """Check if the squared distance between the centroid of a Blob and a match is below a threshold.""" print(f"Calculate dS2: {blob.contour} vs. {match}") blob_centroid = self.calculate_centroid(*blob.contour) match_centroid = self.calculate_centroid(*match) dS2 = (blob_centroid[0] - match_centroid[0]) ** 2 + (blob_centroid[1] - match_centroid[1]) ** 2 return dS2 < self.flow_threshold @staticmethod def calculate_centroid(x_min: int, y_min: int, x_max: int, y_max: int): """Calculate the centroid of the match based on its contour.""" return list(map(lambda val: val // 2, (x_min + x_max, y_min + y_max))) class MOG2ObjectDetector: def __init__( self, inputShape: tuple, doTPU: bool = False, fps: int = 30, backgroundSubtractorHistory: int = 500, backgroundSubtractorvarThreshold: int = 64, backgroundSubtractordetectShadows: bool = False, bboxDetectionKernelSize: int = 64, bboxDetectionThreshold: int = 64, bboxMaxIncrease: int = 2, returnMask: bool = False, ): self.inputShape = inputShape self.doTPU = doTPU self.blob_tracker = BlobTracker(inputShape[0], inputShape[1], fps) self.backgroundSubtractor = cv2.createBackgroundSubtractorMOG2( history=backgroundSubtractorHistory, varThreshold=backgroundSubtractorvarThreshold, detectShadows=backgroundSubtractordetectShadows, ) self.bboxDetectionKernel = np.ones( (bboxDetectionKernelSize, bboxDetectionKernelSize), dtype=np.float16 ) / (bboxDetectionKernelSize**2) self.bboxDetectionThreshold = bboxDetectionThreshold self.bboxMaxIncrease = bboxMaxIncrease self.bboxLastFrame = [] self.returnMask = returnMask def get_matches(self, processed, frame): //blurred = cv2.GaussianBlur(processed, (5, 5), 0) blurred = processed // does not make a lot of sense to do gaussian before MOG imo, but it seemed like other's did it as well mask = self.frame_to_mask(blurred) processed = self.process_mask(mask) bbox = self.mask_to_bbox(processed) blobs = self.blob_tracker.update_blobs(bbox) print(blobs) return blobs def frame_to_mask(self, frame): """ Convert a frame to a mask. If TPU (in this case, GPU) is available, the frame is first converted to grayscale, then to UMat before applying the background subtractor. If TPU is not available, the background subtractor is applied directly to the frame. Args: frame: The frame to be converted to a mask. Returns: mask: The frame converted to a mask. """ if self.doTPU: # convert image to grayscale gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) # convert numpy.ndarray to UMat umat_gray = cv2.UMat(gray) umat_mask = self.backgroundSubtractor.apply(umat_gray) mask = umat_mask.get() return mask else: return self.backgroundSubtractor.apply(frame) def process_mask(self, mask): """ Process a mask. If TPU (in this case, GPU) is available, the mask is first converted to UMat, then convolved with a bboxDetectionKernel and thresholded. If TPU is not available, the mask is directly convolved and thresholded. Args: mask: The mask to be processed. Returns: processed: The processed mask. """ if self.doTPU: umat_mask = cv2.UMat(mask) umat_convolved = cv2.filter2D(umat_mask, -1, self.bboxDetectionKernel) _, umat_processed = cv2.threshold( umat_convolved, self.bboxDetectionThreshold, 255, cv2.THRESH_BINARY ) processed = umat_processed.get() return processed else: convolved = cv2.filter2D(mask, -1, self.bboxDetectionKernel) _, processed = cv2.threshold( convolved, self.bboxDetectionThreshold, 255, cv2.THRESH_BINARY ) return processed @staticmethod def get_centroid(x: int, y: int, w: int, h: int) -> Tuple[int, int]: x1 = int(w / 2) y1 = int(h / 2) cx = x + x1 cy = y + y1 return (cx, cy) def mask_to_bbox(self, thresholded): """ Convert a thresholded mask into bounding boxes. Each contour found in the mask is converted into a bounding box. The bounding box's parameters are calculated and a centroid is also calculated for each bounding box. Args: thresholded: The thresholded mask to be converted into bounding boxes. Returns: detections: A list of bounding boxes and their respective centroids. """ if self.doTPU: # convert numpy.ndarray to UMat umat_thresholded = cv2.UMat(thresholded.astype("uint8")) contours, _ = cv2.findContours( umat_thresholded, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_TC89_L1 ) detections = [_ for _ in range(len(contours))] for i, contour in enumerate(contours): (x, y, w, h) = cv2.boundingRect(contour) centroid = self.get_centroid(x, y, w, h) detections[i] = [x, y, x + w, y + h, h * w, centroid] # convert UMat back to numpy.ndarray detections = [detection.get() for detection in detections] else: contours, _ = cv2.findContours( thresholded.astype("uint8"), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_TC89_L1 ) detections = [_ for _ in range(len(contours))] for i, contour in enumerate(contours): (x, y, w, h) = cv2.boundingRect(contour) centroid = self.get_centroid(x, y, w, h) detections[i] = [x, y, x + w, y + h, h * w, centroid] return detections def limit_bbox(self, bbox): if len(bbox) - len(self.bboxLastFrame) > self.bboxMaxIncrease: return self.bboxLastFrame bbox = [b for b in bbox if b[4] > 1000] return bbox I have tried converting to umat and to simpler precision, but these seem to not make it go any faster. Documentation on OpenCV and OpenGL stated that converting to umat should make it run on the GPU, but this does not seem to be the case. I am really new to developing on nxp devices and am kind of struggling using the provided tools, even though I combed through their documentations multiple times.
У меня есть код обработки изображений, который работает, но работает очень медленно. Я пытался оптимизировать его с помощью битов блокировки и параллельной обработки, но оптимизированный код не дает ожидаемых результатов. Я надеюсь, что кто-нибудь...
Проблема, с которой я столкнулся, заключается в оптимизации и рефакторинге кода с учетом определенных ограничений, чтобы сократить время ответа WebAPI, разработанного с использованием Entity Framework Core (EF Core). Текущее время отклика составляет...
Я использую пакеты сжатия для файлов изображений, таких как .jpeg, .jpg и .gif. Мне удалось уменьшить размер файлов .gif, но обработка проблемы занимает слишком много времени. Есть ли способ оптимизировать сжатие .gif, чтобы это не заняло слишком...
Я использую пакеты сжатия для файлов изображений, таких как .jpeg, .jpg и .gif. Мне удалось уменьшить размер файлов .gif, но обработка проблемы занимает слишком много времени. Есть ли способ оптимизировать сжатие .gif, чтобы это не заняло слишком...