Skip to content

Commit

Permalink
Filename pattern fixes (#867)
Browse files Browse the repository at this point in the history
* changed JSON structure and improved adapter docs.

* Increased chunk spacer to 8 digits from 4.

* Increased chunk spacer to 8 digits from 4.

* Implemented pattern replacement in image files sink.

* Implemented pattern replacement in video files sink.

* Implemented pattern replacement in video files sink.

* Refactored get_location function.

* Refactored get_location function.

* doc fix

* Update adapters/python/sinks/metadata_json.py

Co-authored-by: Pavel Tomskikh <[email protected]>

* removed indentation.

* improved last writer management.

---------

Co-authored-by: Pavel Tomskikh <[email protected]>
  • Loading branch information
bwsw and tomskikh authored Sep 27, 2024
1 parent b4147b0 commit bb5a043
Show file tree
Hide file tree
Showing 6 changed files with 194 additions and 61 deletions.
1 change: 1 addition & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ Makefile
cache/
var/
data/
downloads/
logs/
.venv/
env/
Expand Down
57 changes: 47 additions & 10 deletions adapters/gst/sinks/video_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,13 @@
from savant_rs.primitives import EndOfStream, VideoFrame

from adapters.python.sinks.chunk_writer import ChunkWriter, CompositeChunkWriter
from adapters.python.sinks.metadata_json import MetadataJsonWriter, Patterns
from adapters.python.sinks.metadata_json import (
MetadataJsonSink,
MetadataJsonWriter,
Patterns,
get_location,
get_tag_location,
)
from gst_plugins.python.savant_rs_video_demux_common import FrameParams, build_caps
from savant.api.parser import convert_ts
from savant.gstreamer import GLib, Gst, GstApp
Expand Down Expand Up @@ -286,10 +292,11 @@ def _open(self):
self.appsrc.set_caps(self.caps)

filesink: Gst.Element = self.pipeline.get_by_name(filesink_name)
os.makedirs(self.base_location, exist_ok=True)
dst_location = os.path.join(
self.base_location, f'{self.chunk_idx:04}.{file_ext}'
dst_location = self.base_location.replace(
Patterns.CHUNK_IDX, f'{self.chunk_idx:0{self.chunk_size_digits}}'
)
os.makedirs(dst_location, exist_ok=True)
dst_location = os.path.join(dst_location, f'video.{file_ext}')
self.logger.info(
'Writing video from source %s to file %s', self.source_id, dst_location
)
Expand Down Expand Up @@ -329,6 +336,7 @@ def __init__(
self.location = location
self.chunk_size = chunk_size
self.writers: Dict[str, ChunkWriter] = {}
self.last_writer_per_source: Dict[str, (str, ChunkWriter)] = {}

def write(self, zmq_message: ZeroMQMessage):
message = zmq_message.message
Expand Down Expand Up @@ -362,12 +370,20 @@ def _write_video_frame(
)
return False

writer = self.writers.get(video_frame.source_id)
src_file_location = get_tag_location(video_frame) or 'unknown'
location = get_location(self.location, video_frame.source_id, src_file_location)
if self.chunk_size > 0 and Patterns.CHUNK_IDX not in location:
location = os.path.join(location, Patterns.CHUNK_IDX)

writer = self.writers.get(location)
last_source_location, last_source_writer = self.last_writer_per_source.get(
video_frame.source_id
) or (None, None)

if writer is None:
base_location = os.path.join(self.location, video_frame.source_id)
json_filename_pattern = f'{Patterns.CHUNK_IDX}.json'

video_writer = VideoFilesWriter(
base_location,
location,
video_frame.source_id,
self.chunk_size,
frame_params,
Expand All @@ -376,13 +392,34 @@ def _write_video_frame(
[
video_writer,
MetadataJsonWriter(
os.path.join(base_location, json_filename_pattern),
os.path.join(location, 'metadata.json'),
self.chunk_size,
),
],
self.chunk_size,
)
self.writers[video_frame.source_id] = writer
self.writers[location] = writer
if writer is not last_source_writer:
if last_source_writer is not None:
self.logger.info(
'Flushing previous writer for source=%s, location=%s',
video_frame.source_id,
last_source_location,
)
last_source_writer.flush()
self.logger.info(
'Removing previous writer for source=%s, location=%s',
video_frame.source_id,
last_source_location,
)
del self.writers[last_source_location]
self.logger.info(
'New writer for source=%s, location=%s is initialized, amount of resident writers is %d',
video_frame.source_id,
location,
len(self.writers),
)
self.last_writer_per_source[video_frame.source_id] = (location, writer)

return writer.write_video_frame(video_frame, content, video_frame.keyframe)

Expand Down
5 changes: 1 addition & 4 deletions adapters/python/sinks/chunk_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,7 @@ class ChunkWriter:
def __init__(self, chunk_size: int, logger_prefix: str = __name__):
self.logger = get_logger(f'{logger_prefix}.{self.__class__.__name__}')
self.chunk_size = chunk_size
if chunk_size > 0:
self.chunk_size_digits = int(math.log10(chunk_size)) + 1
else:
self.chunk_size_digits = 6
self.chunk_size_digits = 8
self.chunk_idx = -1
self.frames_in_chunk = 0
self.opened = False
Expand Down
59 changes: 42 additions & 17 deletions adapters/python/sinks/image_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@

from adapters.python.sinks.chunk_writer import ChunkWriter, CompositeChunkWriter
from adapters.python.sinks.metadata_json import (
MetadataJsonSink,
MetadataJsonWriter,
Patterns,
frame_has_objects,
get_location,
get_tag_location,
)
from savant.api.enums import ExternalFrameType
from savant.utils.config import opt_config, req_config, strtobool
Expand Down Expand Up @@ -67,13 +70,9 @@ def _write_eos(self, eos: EndOfStream) -> bool:
return True

def _open(self):
if self.chunk_size > 0:
self.chunk_location = os.path.join(
self.base_location,
f'{self.chunk_idx:04}',
)
else:
self.chunk_location = self.base_location
self.chunk_location = self.base_location.replace(
Patterns.CHUNK_IDX, f'{self.chunk_idx:0{self.chunk_size_digits}}'
)
self.logger.info('Creating directory %s', self.chunk_location)
os.makedirs(self.chunk_location, exist_ok=True)

Expand All @@ -90,6 +89,7 @@ def __init__(
self.chunk_size = chunk_size
self.skip_frames_without_objects = skip_frames_without_objects
self.writers: Dict[str, ChunkWriter] = {}
self.last_writer_per_source: Dict[str, (str, ChunkWriter)] = {}

def write(self, zmq_message: ZeroMQMessage):
message = zmq_message.message
Expand All @@ -111,24 +111,49 @@ def _write_video_frame(self, video_frame: VideoFrame, content: bytes) -> bool:
video_frame.pts,
)
return False
writer = self.writers.get(video_frame.source_id)
src_file_location = get_tag_location(video_frame) or 'unknown'
location = get_location(self.location, video_frame.source_id, src_file_location)
if self.chunk_size > 0 and Patterns.CHUNK_IDX not in location:
location = os.path.join(location, Patterns.CHUNK_IDX)
writer = self.writers.get(location)
last_source_location, last_source_writer = self.last_writer_per_source.get(
video_frame.source_id
) or (None, None)

if writer is None:
base_location = os.path.join(self.location, video_frame.source_id)
if self.chunk_size > 0:
json_filename_pattern = f'{Patterns.CHUNK_IDX}.json'
else:
json_filename_pattern = 'meta.json'
writer = CompositeChunkWriter(
[
ImageFilesWriter(base_location, self.chunk_size),
ImageFilesWriter(os.path.join(location, 'images'), self.chunk_size),
MetadataJsonWriter(
os.path.join(base_location, json_filename_pattern),
os.path.join(location, 'metadata.json'),
self.chunk_size,
),
],
self.chunk_size,
)
self.writers[video_frame.source_id] = writer
self.writers[location] = writer
if writer is not last_source_writer:
if last_source_writer is not None:
self.logger.info(
'Flushing previous writer for source=%s, location=%s',
video_frame.source_id,
last_source_location,
)
last_source_writer.flush()
self.logger.info(
'Removing previous writer for source=%s, location=%s',
video_frame.source_id,
last_source_location,
)
del self.writers[last_source_location]
self.logger.info(
'New writer for source=%s, location=%s is initialized, amount of resident writers is %d',
video_frame.source_id,
location,
len(self.writers),
)
self.last_writer_per_source[video_frame.source_id] = (location, writer)

return writer.write_video_frame(video_frame, content, video_frame.keyframe)

def _write_eos(self, eos: EndOfStream):
Expand All @@ -147,7 +172,7 @@ def terminate(self):

def main():
init_logging()
# To gracefully shutdown the adapter on SIGTERM (raise KeyboardInterrupt)
# To gracefully shut down the adapter on SIGTERM (raise KeyboardInterrupt)
signal.signal(signal.SIGTERM, signal.getsignal(signal.SIGINT))

logger = get_logger(LOGGER_NAME)
Expand Down
78 changes: 52 additions & 26 deletions adapters/python/sinks/metadata_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ class Patterns:

class MetadataJsonWriter(ChunkWriter):
def __init__(self, pattern: str, chunk_size: int):
self.pattern = pattern
super().__init__(chunk_size, logger_prefix=LOGGER_NAME)
self.pattern = pattern
self.logger.info('File name pattern is %s', self.pattern)

def _write_video_frame(
self,
Expand Down Expand Up @@ -77,7 +78,9 @@ def _close(self):
self.file.close()

def _open(self):
self.location = self.pattern.replace(Patterns.CHUNK_IDX, f'{self.chunk_idx:04}')
self.location = self.pattern.replace(
Patterns.CHUNK_IDX, f'{self.chunk_idx:0{self.chunk_size_digits}}'
)
self.lines = 0
self.logger.info('Opening file %s', self.location)
os.makedirs(os.path.dirname(self.location), exist_ok=True)
Expand All @@ -97,7 +100,7 @@ def __init__(
self.skip_frames_without_objects = skip_frames_without_objects
self.chunk_size = chunk_size
self.writers: Dict[str, MetadataJsonWriter] = {}
self.last_writer_per_source: Dict[str, MetadataJsonWriter] = {}
self.last_writer_per_source: Dict[str, (str, MetadataJsonWriter)] = {}

path, ext = os.path.splitext(location)
ext = ext or '.json'
Expand All @@ -118,28 +121,49 @@ def write(self, zmq_message: ZeroMQMessage):
return self._write_eos(message.as_end_of_stream())
self.logger.debug('Unsupported message type for message %r', message)

def _write_video_frame(self, frame: VideoFrame):
if self.skip_frames_without_objects and not frame_has_objects(frame):
def _write_video_frame(self, video_frame: VideoFrame):
if self.skip_frames_without_objects and not frame_has_objects(video_frame):
self.logger.debug(
'Frame %s from source %s does not have objects. Skipping it.',
frame.source_id,
frame.pts,
video_frame.source_id,
video_frame.pts,
)
return False

src_file_location = get_tag_location(frame) or ''
location = self.get_location(frame.source_id, src_file_location)
src_file_location = get_tag_location(video_frame) or 'unknown'
location = get_location(self.location, video_frame.source_id, src_file_location)
writer = self.writers.get(location)
last_writer = self.last_writer_per_source.get(frame.source_id)
last_source_location, last_source_writer = self.last_writer_per_source.get(
video_frame.source_id
) or (None, None)

if writer is None:
writer = MetadataJsonWriter(location, self.chunk_size)
self.writers[location] = writer
if writer is not last_writer:
if last_writer is not None:
last_writer.flush()
self.last_writer_per_source[frame.source_id] = writer
if writer is not last_source_writer:
if last_source_writer is not None:
self.logger.info(
'Flushing previous writer for source=%s, location=%s',
video_frame.source_id,
last_source_location,
)
last_source_writer.flush()
self.logger.info(
'Removing previous writer for source=%s, location=%s',
video_frame.source_id,
last_source_location,
)
del self.writers[last_source_location]

self.logger.info(
'New writer for source=%s, location=%s is initialized, amount of resident writers is %d',
video_frame.source_id,
location,
len(self.writers),
)
self.last_writer_per_source[video_frame.source_id] = (location, writer)

return writer.write_video_frame(frame, None, frame.keyframe)
return writer.write_video_frame(video_frame, None, video_frame.keyframe)

def _write_eos(self, eos: EndOfStream):
self.logger.info('Received EOS from source %s.', eos.source_id)
Expand All @@ -150,15 +174,17 @@ def _write_eos(self, eos: EndOfStream):
writer.flush()
return result

def get_location(
self,
source_id: str,
src_file_location: str,
):
location = self.location.replace(Patterns.SOURCE_ID, source_id)
src_filename = os.path.splitext(os.path.basename(src_file_location))[0]
location = location.replace(Patterns.SRC_FILENAME, src_filename)
return location

def get_location(
location_pattern,
source_id: str,
src_file_location: str,
):
src_filename = os.path.splitext(os.path.basename(src_file_location))[0]
location = location_pattern.replace(Patterns.SOURCE_ID, source_id).replace(
Patterns.SRC_FILENAME, src_filename
)
return location


def frame_has_objects(frame: VideoFrame):
Expand All @@ -177,13 +203,13 @@ def get_tag_location(frame: VideoFrame):

def main():
init_logging()
# To gracefully shutdown the adapter on SIGTERM (raise KeyboardInterrupt)
# To gracefully shut down the adapter on SIGTERM (raise KeyboardInterrupt)
signal.signal(signal.SIGTERM, signal.getsignal(signal.SIGINT))

logger = get_logger(LOGGER_NAME)
logger.info(get_starting_message('metadata sink adapter'))

location = req_config('LOCATION')
location = req_config('FILENAME_PATTERN')
zmq_endpoint = req_config('ZMQ_ENDPOINT')
zmq_socket_type = opt_config('ZMQ_TYPE', 'SUB')
zmq_bind = opt_config('ZMQ_BIND', False, strtobool)
Expand Down
Loading

0 comments on commit bb5a043

Please sign in to comment.