diff --git a/airbyte/_writers/file_writers.py b/airbyte/_writers/file_writers.py index b82b4a74..d8d03454 100644 --- a/airbyte/_writers/file_writers.py +++ b/airbyte/_writers/file_writers.py @@ -13,6 +13,7 @@ from airbyte import exceptions as exc from airbyte import progress from airbyte._batch_handles import BatchHandle +from airbyte._util.name_normalizers import LowerCaseNormalizer from airbyte._writers.base import AirbyteWriterInterface from airbyte.records import StreamRecord, StreamRecordHandler @@ -61,7 +62,14 @@ def _get_new_cache_file_path( batch_id = batch_id or str(ulid.ULID()) target_dir = Path(self._cache_dir) target_dir.mkdir(parents=True, exist_ok=True) - return target_dir / f"{stream_name}_{batch_id}{self.default_cache_file_suffix}" + # If a stream contains a special Character, the temporary jsonl.gz + # file can't be created, because of OS restrictions. Therefore, we + # remove the special characters, using the `LowerCaseNormalizer`. + # Specifically: we remove any of these characters: `<>:"/\|?*` + # and we remove characters in the ASCII range from 0 to 31. + normalizer = LowerCaseNormalizer() + normalized_stream_name = normalizer.normalize(stream_name) + return target_dir / f"{normalized_stream_name}_{batch_id}{self.default_cache_file_suffix}" def _open_new_file( self,