diff --git a/.dockerignore b/.dockerignore index 81686199..ff908cc3 100644 --- a/.dockerignore +++ b/.dockerignore @@ -15,6 +15,7 @@ Makefile cache/ var/ data/ +downloads/ logs/ .venv/ env/ diff --git a/adapters/gst/sinks/video_files.py b/adapters/gst/sinks/video_files.py index dcceb8a2..c5deb915 100755 --- a/adapters/gst/sinks/video_files.py +++ b/adapters/gst/sinks/video_files.py @@ -9,7 +9,13 @@ from savant_rs.primitives import EndOfStream, VideoFrame from adapters.python.sinks.chunk_writer import ChunkWriter, CompositeChunkWriter -from adapters.python.sinks.metadata_json import MetadataJsonWriter, Patterns +from adapters.python.sinks.metadata_json import ( + MetadataJsonSink, + MetadataJsonWriter, + Patterns, + get_location, + get_tag_location, +) from gst_plugins.python.savant_rs_video_demux_common import FrameParams, build_caps from savant.api.parser import convert_ts from savant.gstreamer import GLib, Gst, GstApp @@ -286,10 +292,11 @@ def _open(self): self.appsrc.set_caps(self.caps) filesink: Gst.Element = self.pipeline.get_by_name(filesink_name) - os.makedirs(self.base_location, exist_ok=True) - dst_location = os.path.join( - self.base_location, f'{self.chunk_idx:04}.{file_ext}' + dst_location = self.base_location.replace( + Patterns.CHUNK_IDX, f'{self.chunk_idx:0{self.chunk_size_digits}}' ) + os.makedirs(dst_location, exist_ok=True) + dst_location = os.path.join(dst_location, f'video.{file_ext}') self.logger.info( 'Writing video from source %s to file %s', self.source_id, dst_location ) @@ -329,6 +336,7 @@ def __init__( self.location = location self.chunk_size = chunk_size self.writers: Dict[str, ChunkWriter] = {} + self.last_writer_per_source: Dict[str, (str, ChunkWriter)] = {} def write(self, zmq_message: ZeroMQMessage): message = zmq_message.message @@ -362,12 +370,20 @@ def _write_video_frame( ) return False - writer = self.writers.get(video_frame.source_id) + src_file_location = get_tag_location(video_frame) or 'unknown' + location = get_location(self.location, video_frame.source_id, src_file_location) + if self.chunk_size > 0 and Patterns.CHUNK_IDX not in location: + location = os.path.join(location, Patterns.CHUNK_IDX) + + writer = self.writers.get(location) + last_source_location, last_source_writer = self.last_writer_per_source.get( + video_frame.source_id + ) or (None, None) + if writer is None: - base_location = os.path.join(self.location, video_frame.source_id) - json_filename_pattern = f'{Patterns.CHUNK_IDX}.json' + video_writer = VideoFilesWriter( - base_location, + location, video_frame.source_id, self.chunk_size, frame_params, @@ -376,13 +392,34 @@ def _write_video_frame( [ video_writer, MetadataJsonWriter( - os.path.join(base_location, json_filename_pattern), + os.path.join(location, 'metadata.json'), self.chunk_size, ), ], self.chunk_size, ) - self.writers[video_frame.source_id] = writer + self.writers[location] = writer + if writer is not last_source_writer: + if last_source_writer is not None: + self.logger.info( + 'Flushing previous writer for source=%s, location=%s', + video_frame.source_id, + last_source_location, + ) + last_source_writer.flush() + self.logger.info( + 'Removing previous writer for source=%s, location=%s', + video_frame.source_id, + last_source_location, + ) + del self.writers[last_source_location] + self.logger.info( + 'New writer for source=%s, location=%s is initialized, amount of resident writers is %d', + video_frame.source_id, + location, + len(self.writers), + ) + self.last_writer_per_source[video_frame.source_id] = (location, writer) return writer.write_video_frame(video_frame, content, video_frame.keyframe) diff --git a/adapters/python/sinks/chunk_writer.py b/adapters/python/sinks/chunk_writer.py index c51c9a05..a8ab950e 100644 --- a/adapters/python/sinks/chunk_writer.py +++ b/adapters/python/sinks/chunk_writer.py @@ -12,10 +12,7 @@ class ChunkWriter: def __init__(self, chunk_size: int, logger_prefix: str = __name__): self.logger = get_logger(f'{logger_prefix}.{self.__class__.__name__}') self.chunk_size = chunk_size - if chunk_size > 0: - self.chunk_size_digits = int(math.log10(chunk_size)) + 1 - else: - self.chunk_size_digits = 6 + self.chunk_size_digits = 8 self.chunk_idx = -1 self.frames_in_chunk = 0 self.opened = False diff --git a/adapters/python/sinks/image_files.py b/adapters/python/sinks/image_files.py index 78b4281f..df6c1d1a 100755 --- a/adapters/python/sinks/image_files.py +++ b/adapters/python/sinks/image_files.py @@ -8,9 +8,12 @@ from adapters.python.sinks.chunk_writer import ChunkWriter, CompositeChunkWriter from adapters.python.sinks.metadata_json import ( + MetadataJsonSink, MetadataJsonWriter, Patterns, frame_has_objects, + get_location, + get_tag_location, ) from savant.api.enums import ExternalFrameType from savant.utils.config import opt_config, req_config, strtobool @@ -67,13 +70,9 @@ def _write_eos(self, eos: EndOfStream) -> bool: return True def _open(self): - if self.chunk_size > 0: - self.chunk_location = os.path.join( - self.base_location, - f'{self.chunk_idx:04}', - ) - else: - self.chunk_location = self.base_location + self.chunk_location = self.base_location.replace( + Patterns.CHUNK_IDX, f'{self.chunk_idx:0{self.chunk_size_digits}}' + ) self.logger.info('Creating directory %s', self.chunk_location) os.makedirs(self.chunk_location, exist_ok=True) @@ -90,6 +89,7 @@ def __init__( self.chunk_size = chunk_size self.skip_frames_without_objects = skip_frames_without_objects self.writers: Dict[str, ChunkWriter] = {} + self.last_writer_per_source: Dict[str, (str, ChunkWriter)] = {} def write(self, zmq_message: ZeroMQMessage): message = zmq_message.message @@ -111,24 +111,49 @@ def _write_video_frame(self, video_frame: VideoFrame, content: bytes) -> bool: video_frame.pts, ) return False - writer = self.writers.get(video_frame.source_id) + src_file_location = get_tag_location(video_frame) or 'unknown' + location = get_location(self.location, video_frame.source_id, src_file_location) + if self.chunk_size > 0 and Patterns.CHUNK_IDX not in location: + location = os.path.join(location, Patterns.CHUNK_IDX) + writer = self.writers.get(location) + last_source_location, last_source_writer = self.last_writer_per_source.get( + video_frame.source_id + ) or (None, None) + if writer is None: - base_location = os.path.join(self.location, video_frame.source_id) - if self.chunk_size > 0: - json_filename_pattern = f'{Patterns.CHUNK_IDX}.json' - else: - json_filename_pattern = 'meta.json' writer = CompositeChunkWriter( [ - ImageFilesWriter(base_location, self.chunk_size), + ImageFilesWriter(os.path.join(location, 'images'), self.chunk_size), MetadataJsonWriter( - os.path.join(base_location, json_filename_pattern), + os.path.join(location, 'metadata.json'), self.chunk_size, ), ], self.chunk_size, ) - self.writers[video_frame.source_id] = writer + self.writers[location] = writer + if writer is not last_source_writer: + if last_source_writer is not None: + self.logger.info( + 'Flushing previous writer for source=%s, location=%s', + video_frame.source_id, + last_source_location, + ) + last_source_writer.flush() + self.logger.info( + 'Removing previous writer for source=%s, location=%s', + video_frame.source_id, + last_source_location, + ) + del self.writers[last_source_location] + self.logger.info( + 'New writer for source=%s, location=%s is initialized, amount of resident writers is %d', + video_frame.source_id, + location, + len(self.writers), + ) + self.last_writer_per_source[video_frame.source_id] = (location, writer) + return writer.write_video_frame(video_frame, content, video_frame.keyframe) def _write_eos(self, eos: EndOfStream): @@ -147,7 +172,7 @@ def terminate(self): def main(): init_logging() - # To gracefully shutdown the adapter on SIGTERM (raise KeyboardInterrupt) + # To gracefully shut down the adapter on SIGTERM (raise KeyboardInterrupt) signal.signal(signal.SIGTERM, signal.getsignal(signal.SIGINT)) logger = get_logger(LOGGER_NAME) diff --git a/adapters/python/sinks/metadata_json.py b/adapters/python/sinks/metadata_json.py index f98933f0..4250fc03 100755 --- a/adapters/python/sinks/metadata_json.py +++ b/adapters/python/sinks/metadata_json.py @@ -34,8 +34,9 @@ class Patterns: class MetadataJsonWriter(ChunkWriter): def __init__(self, pattern: str, chunk_size: int): - self.pattern = pattern super().__init__(chunk_size, logger_prefix=LOGGER_NAME) + self.pattern = pattern + self.logger.info('File name pattern is %s', self.pattern) def _write_video_frame( self, @@ -77,7 +78,9 @@ def _close(self): self.file.close() def _open(self): - self.location = self.pattern.replace(Patterns.CHUNK_IDX, f'{self.chunk_idx:04}') + self.location = self.pattern.replace( + Patterns.CHUNK_IDX, f'{self.chunk_idx:0{self.chunk_size_digits}}' + ) self.lines = 0 self.logger.info('Opening file %s', self.location) os.makedirs(os.path.dirname(self.location), exist_ok=True) @@ -97,7 +100,7 @@ def __init__( self.skip_frames_without_objects = skip_frames_without_objects self.chunk_size = chunk_size self.writers: Dict[str, MetadataJsonWriter] = {} - self.last_writer_per_source: Dict[str, MetadataJsonWriter] = {} + self.last_writer_per_source: Dict[str, (str, MetadataJsonWriter)] = {} path, ext = os.path.splitext(location) ext = ext or '.json' @@ -118,28 +121,49 @@ def write(self, zmq_message: ZeroMQMessage): return self._write_eos(message.as_end_of_stream()) self.logger.debug('Unsupported message type for message %r', message) - def _write_video_frame(self, frame: VideoFrame): - if self.skip_frames_without_objects and not frame_has_objects(frame): + def _write_video_frame(self, video_frame: VideoFrame): + if self.skip_frames_without_objects and not frame_has_objects(video_frame): self.logger.debug( 'Frame %s from source %s does not have objects. Skipping it.', - frame.source_id, - frame.pts, + video_frame.source_id, + video_frame.pts, ) return False - src_file_location = get_tag_location(frame) or '' - location = self.get_location(frame.source_id, src_file_location) + src_file_location = get_tag_location(video_frame) or 'unknown' + location = get_location(self.location, video_frame.source_id, src_file_location) writer = self.writers.get(location) - last_writer = self.last_writer_per_source.get(frame.source_id) + last_source_location, last_source_writer = self.last_writer_per_source.get( + video_frame.source_id + ) or (None, None) + if writer is None: writer = MetadataJsonWriter(location, self.chunk_size) self.writers[location] = writer - if writer is not last_writer: - if last_writer is not None: - last_writer.flush() - self.last_writer_per_source[frame.source_id] = writer + if writer is not last_source_writer: + if last_source_writer is not None: + self.logger.info( + 'Flushing previous writer for source=%s, location=%s', + video_frame.source_id, + last_source_location, + ) + last_source_writer.flush() + self.logger.info( + 'Removing previous writer for source=%s, location=%s', + video_frame.source_id, + last_source_location, + ) + del self.writers[last_source_location] + + self.logger.info( + 'New writer for source=%s, location=%s is initialized, amount of resident writers is %d', + video_frame.source_id, + location, + len(self.writers), + ) + self.last_writer_per_source[video_frame.source_id] = (location, writer) - return writer.write_video_frame(frame, None, frame.keyframe) + return writer.write_video_frame(video_frame, None, video_frame.keyframe) def _write_eos(self, eos: EndOfStream): self.logger.info('Received EOS from source %s.', eos.source_id) @@ -150,15 +174,17 @@ def _write_eos(self, eos: EndOfStream): writer.flush() return result - def get_location( - self, - source_id: str, - src_file_location: str, - ): - location = self.location.replace(Patterns.SOURCE_ID, source_id) - src_filename = os.path.splitext(os.path.basename(src_file_location))[0] - location = location.replace(Patterns.SRC_FILENAME, src_filename) - return location + +def get_location( + location_pattern, + source_id: str, + src_file_location: str, +): + src_filename = os.path.splitext(os.path.basename(src_file_location))[0] + location = location_pattern.replace(Patterns.SOURCE_ID, source_id).replace( + Patterns.SRC_FILENAME, src_filename + ) + return location def frame_has_objects(frame: VideoFrame): @@ -177,13 +203,13 @@ def get_tag_location(frame: VideoFrame): def main(): init_logging() - # To gracefully shutdown the adapter on SIGTERM (raise KeyboardInterrupt) + # To gracefully shut down the adapter on SIGTERM (raise KeyboardInterrupt) signal.signal(signal.SIGTERM, signal.getsignal(signal.SIGINT)) logger = get_logger(LOGGER_NAME) logger.info(get_starting_message('metadata sink adapter')) - location = req_config('LOCATION') + location = req_config('FILENAME_PATTERN') zmq_endpoint = req_config('ZMQ_ENDPOINT') zmq_socket_type = opt_config('ZMQ_TYPE', 'SUB') zmq_bind = opt_config('ZMQ_BIND', False, strtobool) diff --git a/docs/source/savant_101/10_adapters.rst b/docs/source/savant_101/10_adapters.rst index 1ffd7669..984d85c2 100644 --- a/docs/source/savant_101/10_adapters.rst +++ b/docs/source/savant_101/10_adapters.rst @@ -942,12 +942,51 @@ The JSON Metadata Sink Adapter writes received messages as newline-delimited JSO **Parameters**: -- ``DIR_LOCATION``: a location to write files to; can be a plain location or a pattern; supported substitution parameters are ``%source_id`` and ``%src_filename``; +- ``FILENAME_PATTERN``: a filesystem pattern to write files to; can be a plain location or a pattern; supported substitution parameters are ``%source_id``, ``%src_filename``, and ``%chunk_idx``; - ``CHUNK_SIZE``: a chunk size in a number of frames; the stream is split into chunks and is written to separate folders with consecutive numbering; default is ``10000``; a value of ``0`` disables chunking, resulting in a continuous stream of frames by ``source_id``; - ``SKIP_FRAMES_WITHOUT_OBJECTS``: a flag indicating whether frames without detected objects are ignored in output; the default value is ``False``; - ``SOURCE_ID``: an optional filter to filter out frames with a specific ``source_id`` only; - ``SOURCE_ID_PREFIX`` an optional filter to filter out frames with a matching ``source_id`` prefix only. +If the ``FILENAME_PATTERN`` contains an extension (e.g., ``.json-stream``) it is extracted and used in the final file name. if the extension is missing, the system will add ``.json``. + +When the ``FILENAME_PATTERN`` contains ``%chunk_idx`` the pattern will be used as is, for example: + +.. code-block:: + + /out/%source_id/%chunk_idx/metadata.json + + +or + + +.. code-block:: + + /out/%source_id/%chunk_idx/metadata # .json will be added + + +If the ``FILENAME_PATTERN`` does not contain ``%chunk_idx`` and the ``CHUNK_SIZE`` is set to a value greater than ``0``, the filename is constructed as: + +.. code-block:: + + {FILENAME_PATTERN}_{CHUNK_IDX}.{EXTENSION} + + e.g. + + /out/camera1/metadata_00000001.json + + +When the ``CHUNK_SIZE`` is set to ``0``, the name is built as: + +.. code-block:: + + {FILENAME_PATTERN}.json + + e.g. + + /out/camera1/metadata.json + + Running the adapter with Docker: .. code-block:: bash @@ -955,7 +994,7 @@ Running the adapter with Docker: docker run --rm -it --name sink-meta-json \ --entrypoint /opt/savant/adapters/python/sinks/metadata_json.py \ -e ZMQ_ENDPOINT=sub+connect:ipc:///tmp/zmq-sockets/output-video.ipc \ - -e LOCATION=/path/to/output/%source_id-%src_filename \ + -e FILENAME_PATTERN=/path/to/output/%source_id-%src_filename \ -e CHUNK_SIZE=0 \ -v /path/to/output/:/path/to/output/ \ -v /tmp/zmq-sockets:/tmp/zmq-sockets \ @@ -976,12 +1015,16 @@ The image file sink adapter extends the JSON metadata adapter by writing image f **Parameters**: -- ``DIR_LOCATION``: a location to write files to; can be a regular path or a path template; supported substitution parameters are ``%source_id`` and ``%src_filename``; +- ``DIR_LOCATION``: a location to write files to; can be a regular path or a path template; supported substitution parameters are ``%source_id``, ``%src_filename``, and ``%chunk_idx``; - ``CHUNK_SIZE``: a chunk size in a number of frames; the stream is split into chunks and is written to separate directories with consecutive numbering; default is ``10000``; A value of ``0`` disables chunking, resulting in a continuous stream of frames by ``source_id``; - ``SKIP_FRAMES_WITHOUT_OBJECTS``: a flag indicating whether frames without objects are ignored in output; the default value is ``False``; - ``SOURCE_ID``: an optional filter to filter out frames with a specific ``source_id`` only; - ``SOURCE_ID_PREFIX`` an optional filter to filter out frames with a matching ``source_id`` prefix only. + +If ``DIR_LOCATION`` does not contain ``%chunk_idx`` it is created as a subdirectory containing: ``metadata.json`` file and ``images`` directory with images. Otherwise, extra directory is not created. + + Running the adapter with Docker: .. code-block:: bash @@ -1009,11 +1052,15 @@ The video file sink adapter extends the JSON metadata adapter by writing video f **Parameters**: -- ``DIR_LOCATION``: a location to write files to; can be a regular path or a path template; supported substitution parameters are ``%source_id`` and ``%src_filename``; +- ``DIR_LOCATION``: a location to write files to; can be a regular path or a path template; supported substitution parameters are ``%source_id``, ``%src_filename``, and ``%chunk_idx``; - ``CHUNK_SIZE``: a chunk size in a number of frames; the stream is split into chunks and is written to separate folders with consecutive numbering; default is ``10000``; A value of ``0`` disables limit for number of frames in a chunk: the stream will be split into chunks only by EOS messages; - ``SOURCE_ID``: an optional filter to filter out frames with a specific ``source_id`` only; - ``SOURCE_ID_PREFIX`` an optional filter to filter out frames with a matching ``source_id`` prefix only. + +If ``DIR_LOCATION`` does not contain ``%chunk_idx`` it is created as a subdirectory containing: ``metadata.json`` and ``video.{mov, webm}`` files. Otherwise, extra directory is not created. + + Running the adapter with Docker: .. code-block:: bash