diff --git a/docs/performance.md b/docs/performance.md index 033fdd55f..6eeea9da7 100644 --- a/docs/performance.md +++ b/docs/performance.md @@ -14,6 +14,7 @@ | Savant ver. | A4000 | Jetson NX | |----------------------------------------------------------------------------------|--------|-----------| | [#407](https://github.com/insight-platform/Savant/issues/407) (no queues) | 174.10 | 36.34 | +| [#456](https://github.com/insight-platform/Savant/issues/456) (no queues) | 171.82 | 35.73 | ### conditional_video_processing @@ -26,6 +27,7 @@ | [#341](https://github.com/insight-platform/Savant/issues/341) (no queues) | 311.62 | 61.46 | | [#347](https://github.com/insight-platform/Savant/issues/347) (no queues) | 263.44 | 59.86 | | [#407](https://github.com/insight-platform/Savant/issues/407) (no queues) | 263.89 | 63.52 | +| [#456](https://github.com/insight-platform/Savant/issues/456) (no queues) | 257.63 | 62.77 | ### face_reid @@ -35,6 +37,7 @@ | [#341](https://github.com/insight-platform/Savant/issues/341) (no queues) | 121.11 | 25.4 | | [#347](https://github.com/insight-platform/Savant/issues/347) (no queues) | 118.79 | 25.99 | | [#407](https://github.com/insight-platform/Savant/issues/407) (no queues) | 127.37 | 27.61 | +| [#456](https://github.com/insight-platform/Savant/issues/456) (no queues) | 127.23 | 28.71 | ### nvidia_car_classification @@ -47,6 +50,7 @@ | [#341](https://github.com/insight-platform/Savant/issues/341) (no queues) | 156.73 | 42.44 | | [#347](https://github.com/insight-platform/Savant/issues/347) (no queues) | 151.44 | 42.97 | | [#407](https://github.com/insight-platform/Savant/issues/407) (no queues) | 149.66 | 41.11 | +| [#456](https://github.com/insight-platform/Savant/issues/456) (no queues) | 150.07 | 38.76 | ### opencv_cuda_bg_remover_mog2 @@ -59,6 +63,7 @@ | [#341](https://github.com/insight-platform/Savant/issues/341) (no queues) | 671.34 | 92.24 | | [#347](https://github.com/insight-platform/Savant/issues/347) (no queues) | 608.54 | 91.69 | | [#407](https://github.com/insight-platform/Savant/issues/407) (no queues) | 607.48 | 90.92 | +| [#456](https://github.com/insight-platform/Savant/issues/456) (no queues) | 606.74 | 95.05 | ### opencv_cuda_bg_remover_mog2 (multi-stream) @@ -78,6 +83,7 @@ | [#341](https://github.com/insight-platform/Savant/issues/341) (no queues) | 117.22 | 29.27 | | [#347](https://github.com/insight-platform/Savant/issues/347) (no queues) | 116.43 | 28.05 | | [#407](https://github.com/insight-platform/Savant/issues/407) (no queues) | 116.61 | 28.54 | +| [#456](https://github.com/insight-platform/Savant/issues/456) (no queues) | 116.44 | 26.03 | ### traffic_meter (yolov8m) @@ -90,6 +96,7 @@ | [#341](https://github.com/insight-platform/Savant/issues/341) (no queues) | 135.19 | 24.67 | | [#347](https://github.com/insight-platform/Savant/issues/347) (no queues) | 136.49 | 24.40 | | [#407](https://github.com/insight-platform/Savant/issues/407) (no queues) | 136.16 | 24.66 | +| [#456](https://github.com/insight-platform/Savant/issues/456) (no queues) | 136.80 | 23.29 | ### yolov8_seg @@ -102,3 +109,4 @@ Note: `yolov8_seg` always has a buffer length of 10. `BUFFER_QUEUES` env doesn't | [#341](https://github.com/insight-platform/Savant/issues/341) | 45.21 | 14.02 | | [#347](https://github.com/insight-platform/Savant/issues/347) | 44.34 | 13.07 | | [#407](https://github.com/insight-platform/Savant/issues/407) | 67.73 | 21.57 | +| [#456](https://github.com/insight-platform/Savant/issues/456) | 68.48 | 21.71 | diff --git a/gst_plugins/python/pyfunc.py b/gst_plugins/python/pyfunc.py index eb2c58985..5fb68181e 100644 --- a/gst_plugins/python/pyfunc.py +++ b/gst_plugins/python/pyfunc.py @@ -70,6 +70,15 @@ class GstPluginPyFunc(LoggerMixin, GstBase.BaseTransform): 'VideoPipeline object from savant-rs.', GObject.ParamFlags.READWRITE, ), + 'stream-pool-size': ( + int, + 'Max stream pool size', + 'Max stream pool size', + 1, + GLib.MAXINT, + 1, + GObject.ParamFlags.READWRITE, + ), 'dev-mode': ( bool, 'Dev mode flag', @@ -90,6 +99,7 @@ def __init__(self): self.kwargs: Optional[str] = None self.video_pipeline: Optional[VideoPipeline] = None self.dev_mode: bool = False + self.max_stream_pool_size: int = 1 # pyfunc object self.pyfunc: Optional[PyFunc] = None @@ -106,6 +116,8 @@ def do_get_property(self, prop: GObject.GParamSpec) -> Any: return self.kwargs if prop.name == 'pipeline': return self.video_pipeline + if prop.name == 'stream-pool-size': + return self.max_stream_pool_size if prop.name == 'dev-mode': return self.dev_mode raise AttributeError(f'Unknown property {prop.name}.') @@ -124,6 +136,8 @@ def do_set_property(self, prop: GObject.GParamSpec, value: Any): self.kwargs = value elif prop.name == 'pipeline': self.video_pipeline = value + elif prop.name == 'stream-pool-size': + self.max_stream_pool_size = value elif prop.name == 'dev-mode': self.dev_mode = value else: diff --git a/samples/telemetry/blur.py b/samples/telemetry/blur.py index 861edc3b3..2e721f8e4 100644 --- a/samples/telemetry/blur.py +++ b/samples/telemetry/blur.py @@ -25,9 +25,8 @@ def process_frame(self, buffer: Gst.Buffer, frame_meta: NvDsFrameMeta): # logger messages will be added to span automatically self.logger.info('Try to blur frame #%d.', frame_meta.frame_num) + stream = self.get_cuda_stream(frame_meta) with nvds_to_gpu_mat(buffer, frame_meta.frame_meta) as frame_mat: - stream = self.get_cuda_stream(frame_meta) - # create a new span for an important code section # to track the time spent on its execution with frame_meta.telemetry_span.nested_span('blur-filter'): diff --git a/samples/yolov8_seg/module/overlay.py b/samples/yolov8_seg/module/overlay.py index 8d787f839..f6447af7c 100644 --- a/samples/yolov8_seg/module/overlay.py +++ b/samples/yolov8_seg/module/overlay.py @@ -17,8 +17,8 @@ def __init__(self, **kwargs): self.bg_color = np.array([0, 0, 0, 0], dtype=np.uint8) def draw(self, buffer: Gst.Buffer, frame_meta: NvDsFrameMeta): + stream = self.get_cuda_stream(frame_meta) with nvds_to_gpu_mat(buffer, frame_meta.frame_meta) as frame_mat: - stream = self.get_cuda_stream(frame_meta) for obj_meta in frame_meta.objects: if obj_meta.is_primary: continue diff --git a/savant/base/input_preproc.py b/savant/base/input_preproc.py index b1ce7fca2..e160112fa 100644 --- a/savant/base/input_preproc.py +++ b/savant/base/input_preproc.py @@ -8,7 +8,6 @@ from savant.base.model import OutputImage from savant.base.pyfunc import BasePyFuncCallableImpl, PyFuncNoopCallException -from savant.deepstream.cudastream import CudaStreams from savant.deepstream.meta.object import _NvDsObjectMetaImpl from savant.deepstream.opencv_utils import nvds_to_gpu_mat from savant.deepstream.utils import nvds_frame_meta_iterator, nvds_obj_meta_iterator @@ -52,9 +51,10 @@ def __call__( class ObjectsPreprocessing: - def __init__(self): + def __init__(self, batch_size: int): self._preprocessing_functions = {} self._frames_map = {} + self._stream_pool = [cv2.cuda.Stream() for _ in range(batch_size)] self.logger = get_logger(__name__) def add_preprocessing_function( @@ -95,77 +95,78 @@ def preprocessing( self._frames_map[buffer] = {} nvds_batch_meta = pyds.gst_buffer_get_nvds_batch_meta(buffer) - with CudaStreams() as cuda_streams: - for nvds_frame_meta in nvds_frame_meta_iterator(nvds_batch_meta): - left = 0 - top = 0 - row_height = 0 - cuda_stream = cuda_streams.get_cuda_stream(nvds_frame_meta) - with nvds_to_gpu_mat(buffer, nvds_frame_meta) as frame_mat: - frame_image = GPUImage(image=frame_mat, cuda_stream=cuda_stream) - copy_frame_image = GPUImage( - image=frame_mat.clone(), cuda_stream=cuda_stream + + for nvds_frame_meta in nvds_frame_meta_iterator(nvds_batch_meta): + left = 0 + top = 0 + row_height = 0 + cuda_stream = self._stream_pool[nvds_frame_meta.batch_id] + with nvds_to_gpu_mat(buffer, nvds_frame_meta) as frame_mat: + frame_image = GPUImage(image=frame_mat, cuda_stream=cuda_stream) + copy_frame_image = GPUImage( + image=frame_mat.clone(), cuda_stream=cuda_stream + ) + self._frames_map[buffer][nvds_frame_meta.batch_id] = copy_frame_image + for nvds_obj_meta in nvds_obj_meta_iterator(nvds_frame_meta): + if nvds_obj_meta.class_id != class_id: + continue + if nvds_obj_meta.unique_component_id != model_uid: + continue + object_meta = _NvDsObjectMetaImpl.from_nv_ds_object_meta( + object_meta=nvds_obj_meta, frame_meta=nvds_frame_meta ) - self._frames_map[buffer][ - nvds_frame_meta.batch_id - ] = copy_frame_image - for nvds_obj_meta in nvds_obj_meta_iterator(nvds_frame_meta): - if nvds_obj_meta.class_id != class_id: - continue - if nvds_obj_meta.unique_component_id != model_uid: - continue - object_meta = _NvDsObjectMetaImpl.from_nv_ds_object_meta( - object_meta=nvds_obj_meta, frame_meta=nvds_frame_meta + + try: + preprocess_image = preprocessing_func( + object_meta=object_meta, + frame_image=copy_frame_image, + cuda_stream=cuda_stream, ) + except Exception as exc: + if dev_mode: + if not isinstance(exc, PyFuncNoopCallException): + self.logger.exception( + 'Error in input image preprocessing.' + ) + continue + raise exc - try: - preprocess_image = preprocessing_func( - object_meta=object_meta, - frame_image=copy_frame_image, - cuda_stream=cuda_stream, - ) - except Exception as exc: - if dev_mode: - if not isinstance(exc, PyFuncNoopCallException): - self.logger.exception( - 'Error in input image preprocessing.' - ) - continue - raise exc - - if not isinstance(preprocess_image, GPUImage): - raise ValueError( - 'Preprocessing function must return Image object.' - ) - if output_image is not None: - preprocess_image = preprocess_image.resize( - resolution=(output_image.width, output_image.height), - method=output_image.method, - interpolation=output_image.cv2_interpolation, - ) - if left + preprocess_image.width > frame_image.width: - left = 0 - if row_height == 0: - row_height = preprocess_image.height - top += row_height - row_height = 0 - if top >= frame_image.height: - raise ValueError( - 'There is no place on frame ' 'to put object image.' - ) - if top + preprocess_image.height > frame_image.height: - raise ValueError( - 'There is no place on frame ' 'to put object image.' - ) - if preprocess_image.height > row_height: + if not isinstance(preprocess_image, GPUImage): + raise ValueError( + 'Preprocessing function must return Image object.' + ) + if output_image is not None: + preprocess_image = preprocess_image.resize( + resolution=(output_image.width, output_image.height), + method=output_image.method, + interpolation=output_image.cv2_interpolation, + ) + if left + preprocess_image.width > frame_image.width: + left = 0 + if row_height == 0: row_height = preprocess_image.height - - frame_image.paste(preprocess_image, (left, top)) - nvds_obj_meta.rect_params.top = top - nvds_obj_meta.rect_params.left = left - nvds_obj_meta.rect_params.width = preprocess_image.width - nvds_obj_meta.rect_params.height = preprocess_image.height - left += preprocess_image.width + top += row_height + row_height = 0 + if top >= frame_image.height: + raise ValueError( + 'There is no place on frame ' 'to put object image.' + ) + if top + preprocess_image.height > frame_image.height: + raise ValueError( + 'There is no place on frame ' 'to put object image.' + ) + if preprocess_image.height > row_height: + row_height = preprocess_image.height + + frame_image.paste(preprocess_image, (left, top)) + nvds_obj_meta.rect_params.top = top + nvds_obj_meta.rect_params.left = left + nvds_obj_meta.rect_params.width = preprocess_image.width + nvds_obj_meta.rect_params.height = preprocess_image.height + left += preprocess_image.width + + for stream in self._stream_pool: + stream.waitForCompletion() def restore_frame(self, buffer: Gst.Buffer): nvds_batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(buffer)) diff --git a/savant/deepstream/cudastream.py b/savant/deepstream/cudastream.py deleted file mode 100644 index dbd96af53..000000000 --- a/savant/deepstream/cudastream.py +++ /dev/null @@ -1,43 +0,0 @@ -import cv2 - -from savant.deepstream.meta.frame import NvDsFrameMeta -from savant.utils.logging import LoggerMixin - - -class CudaStreams(LoggerMixin): - """Class for managing CUDA streams for asynchronous frame processing.""" - - def __init__(self): - super().__init__() - self.frame_streams = {} - - def get_cuda_stream(self, frame_meta: NvDsFrameMeta): - """Get a CUDA stream that can be used to - asynchronously process a frame in a batch. - """ - self.logger.debug( - 'Getting CUDA stream for frame with batch_id=%d', frame_meta.batch_id - ) - if frame_meta.batch_id not in self.frame_streams: - self.logger.debug( - 'No existing CUDA stream for frame with batch_id=%d, init new', - frame_meta.batch_id, - ) - self.frame_streams[frame_meta.batch_id] = cv2.cuda.Stream() - - return self.frame_streams[frame_meta.batch_id] - - def sync_cuda_streams(self): - """ - Wait for all CUDA streams to complete. - :return: - """ - for stream in self.frame_streams.values(): - stream.waitForCompletion() - self.frame_streams.clear() - - def __enter__(self): - return self - - def __exit__(self, *exc_details): - self.sync_cuda_streams() diff --git a/savant/deepstream/drawfunc.py b/savant/deepstream/drawfunc.py index d7c30a3c8..ff7610566 100644 --- a/savant/deepstream/drawfunc.py +++ b/savant/deepstream/drawfunc.py @@ -43,8 +43,8 @@ def __init__(self, **kwargs): self.default_spec_no_track_id = get_default_draw_spec(track_id=False) def draw(self, buffer: Gst.Buffer, frame_meta: NvDsFrameMeta): + stream = self.get_cuda_stream(frame_meta) with nvds_to_gpu_mat(buffer, frame_meta.frame_meta) as frame_mat: - stream = self.get_cuda_stream(frame_meta) with Artist(frame_mat, stream) as artist: self.draw_on_frame(frame_meta, artist) diff --git a/savant/deepstream/pipeline.py b/savant/deepstream/pipeline.py index 2e9fe7981..d7323000a 100644 --- a/savant/deepstream/pipeline.py +++ b/savant/deepstream/pipeline.py @@ -98,7 +98,7 @@ def __init__( self._sources = SourceInfoRegistry() # c++ preprocessing class - self._objects_preprocessing = ObjectsPreprocessing() + self._objects_preprocessing = ObjectsPreprocessing(self._batch_size) self._internal_attrs = set() telemetry: TelemetryParameters = kwargs['telemetry'] @@ -203,6 +203,7 @@ def add_element( if element_idx is not None: if isinstance(element, PyFuncElement): gst_element.set_property('pipeline', self._video_pipeline) + gst_element.set_property('stream-pool-size', self._batch_size) # TODO: add stage names to element config? if isinstance(element_idx, int): stage = self._element_stages[element_idx] @@ -814,8 +815,8 @@ def _update_meta_for_single_frame( else: self._logger.debug('Skipping empty primary object.') continue - if self._logger.isEnabledFor(logging.DEBUG): - self._logger.debug( + if self._logger.isEnabledFor(logging.TRACE): + self._logger.trace( 'Collecting object (frame src %s, IDX %s, PTS %s): %s', video_frame.source_id, frame_idx, diff --git a/savant/deepstream/pyfunc.py b/savant/deepstream/pyfunc.py index 8e045be40..83e8738c2 100644 --- a/savant/deepstream/pyfunc.py +++ b/savant/deepstream/pyfunc.py @@ -43,11 +43,14 @@ def __init__(self, **kwargs): GST_NVEVENT_STREAM_EOS, ] } - self.frame_streams = {} + self._stream_pool_size = None + self._stream_pool = [] def on_start(self) -> bool: """Do on plugin start.""" self._video_pipeline = self.gst_element.get_property('pipeline') + # the prop is set to pipeline batch size during init + self._stream_pool_size = self.gst_element.get_property('stream-pool-size') return True def on_event(self, event: Gst.Event): @@ -88,21 +91,20 @@ def on_source_delete(self, source_id: str): # self.logger.debug('Source %s deleted.', source_id) def get_cuda_stream(self, frame_meta: NvDsFrameMeta): - """Get a CUDA stream that can be used to - asynchronously process a frame in a batch. + """Get a CUDA stream that can be used to asynchronously process + a frame in a batch. + All frame CUDA streams will be waited for at the end of batch processing. """ - self.logger.debug( - 'Getting CUDA stream for frame with batch_id=%d', frame_meta.batch_id - ) - if frame_meta.batch_id not in self.frame_streams: + if not self._stream_pool: self.logger.debug( - 'No existing CUDA stream for frame with batch_id=%d, init new', - frame_meta.batch_id, + 'Creating CUDA stream pool of size %d.', self._stream_pool_size ) - self.frame_streams[frame_meta.batch_id] = cv2.cuda.Stream() - - return self.frame_streams[frame_meta.batch_id] + self._stream_pool = [ + cv2.cuda.Stream() for _ in range(self._stream_pool_size) + ] + self.logger.debug('Using CUDA stream %d.', frame_meta.batch_id) + return self._stream_pool[frame_meta.batch_id] def process_buffer(self, buffer: Gst.Buffer): """Process gstreamer buffer directly. Throws an exception if fatal @@ -156,9 +158,8 @@ def process_buffer(self, buffer: Gst.Buffer): ) as frame_meta: self.process_frame(buffer, frame_meta) - for stream in self.frame_streams.values(): + for stream in self._stream_pool: stream.waitForCompletion() - self.frame_streams.clear() def process_frame(self, buffer: Gst.Buffer, frame_meta: NvDsFrameMeta): """Process gstreamer buffer and frame metadata. Throws an exception if fatal diff --git a/savant/utils/artist/artist_gpumat.py b/savant/utils/artist/artist_gpumat.py index 26a9464e1..0b668b35f 100644 --- a/savant/utils/artist/artist_gpumat.py +++ b/savant/utils/artist/artist_gpumat.py @@ -30,9 +30,10 @@ def __init__(self, frame: cv2.cuda.GpuMat, stream: cv2.cuda.Stream) -> None: def __exit__(self, *exc_details): # apply alpha comp if overlay is not null if self.overlay is not None: - overlay = cv2.cuda.GpuMat(self.overlay) + overlay_gpu = cv2.cuda.GpuMat(self.height, self.width, cv2.CV_8UC4) + overlay_gpu.upload(self.overlay, self.stream) cv2.cuda.alphaComp( - overlay, self.frame, self.alpha_op, self.frame, stream=self.stream + overlay_gpu, self.frame, self.alpha_op, self.frame, stream=self.stream ) @property