Skip to content

Commit

Permalink
More robust tar file loading from AIStore (#11323)
Browse files Browse the repository at this point in the history
Signed-off-by: Piotr Żelasko <[email protected]>
  • Loading branch information
pzelasko authored Nov 20, 2024
1 parent 3d8bcc8 commit 4633db6
Showing 1 changed file with 35 additions and 33 deletions.
68 changes: 35 additions & 33 deletions nemo/collections/common/data/lhotse/nemo_adapters.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import random
import re
Expand Down Expand Up @@ -398,40 +397,43 @@ def basename(d: dict) -> str:

shard_manifest: dict[str, list[dict]] = groupby(basename, self.shard_id_to_manifest[sid])
tar_path = self.shard_id_to_tar_path[sid]
for data, raw_audio, tar_info in iter_fn(tar_path, shard_manifest, manifest_path):
meta = soundfile.info(BytesIO(raw_audio))
recording = Recording(
id=tar_info.path,
sources=[AudioSource(type="memory", channels=list(range(meta.channels)), source=raw_audio)],
sampling_rate=int(meta.samplerate),
num_samples=meta.frames,
duration=meta.duration,
)
cuts_for_recording = []
for data in sorted(shard_manifest[tar_info.name], key=lambda d: d["audio_filepath"]):
# Cut the recording into corresponding segment and discard audio data outside the segment.
cut = make_cut_with_subset_inmemory_recording(
recording, offset=data.get("offset", 0.0), duration=data.get("duration")
try:
for data, raw_audio, tar_info in iter_fn(tar_path, shard_manifest, manifest_path):
meta = soundfile.info(BytesIO(raw_audio))
recording = Recording(
id=tar_info.path,
sources=[AudioSource(type="memory", channels=list(range(meta.channels)), source=raw_audio)],
sampling_rate=int(meta.samplerate),
num_samples=meta.frames,
duration=meta.duration,
)
cut.supervisions.append(
SupervisionSegment(
id=cut.id,
recording_id=cut.recording_id,
start=0,
duration=cut.duration,
text=data.get(self.text_field),
language=data.get(self.lang_field),
cuts_for_recording = []
for data in sorted(shard_manifest[tar_info.name], key=lambda d: d["audio_filepath"]):
# Cut the recording into corresponding segment and discard audio data outside the segment.
cut = make_cut_with_subset_inmemory_recording(
recording, offset=data.get("offset", 0.0), duration=data.get("duration")
)
)
cut.custom = _to_custom_attr_dict(data)
cut.manifest_origin = manifest_path
cut.tar_origin = tar_path
for extra_field in extra_fields:
extra_field.attach_to(cut)
cuts_for_recording.append(cut)
del recording # free the memory - helps with very large audio files
del raw_audio
yield from cuts_for_recording
cut.supervisions.append(
SupervisionSegment(
id=cut.id,
recording_id=cut.recording_id,
start=0,
duration=cut.duration,
text=data.get(self.text_field),
language=data.get(self.lang_field),
)
)
cut.custom = _to_custom_attr_dict(data)
cut.manifest_origin = manifest_path
cut.tar_origin = tar_path
for extra_field in extra_fields:
extra_field.attach_to(cut)
cuts_for_recording.append(cut)
del recording # free the memory - helps with very large audio files
del raw_audio
yield from cuts_for_recording
except tarfile.ReadError:
logging.warning(f"Skipping tar file due to read errors (unstable storage or bad file?): {tar_path=}")

def __len__(self) -> int:
return len(self.source)
Expand Down

0 comments on commit 4633db6

Please sign in to comment.