diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py index 530b2e8efd7..3df37db8220 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py @@ -706,7 +706,7 @@ def stage_file_with_retry( with open(path, 'rb') as stream: self.stage_file( gcs_or_local_path, file_name, stream, mime_type, total_size) - elif isinstance(stream_or_path, io.BufferedIOBase): + elif isinstance(stream_or_path, io.IOBase): stream = stream_or_path try: self.stage_file( @@ -717,9 +717,12 @@ def stage_file_with_retry( raise exn else: raise retry.PermanentException( - "Skip retrying because we caught exception:", - ''.join(traceback.format_exception_only(exn.__class__, exn)), - ', but the stream is not seekable') + "Skip retrying because we caught exception:" + + ''.join(traceback.format_exception_only(exn.__class__, exn)) + + ', but the stream is not seekable.') + else: + raise retry.PermanentException( + f"Unsupported type {type(stream_or_path)} in stream_or_path") @retry.no_retries # Using no_retries marks this as an integration point. def create_job(self, job): diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py index 9d3804b0ce3..f2f4fd2d2be 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py @@ -1678,7 +1678,7 @@ def effect(self, *args, **kwargs): if count <= 2: raise Exception("This exception is raised for testing purpose.") - class Unseekable(io.BufferedIOBase): + class Unseekable(io.IOBase): def seekable(self): return False