Skip to content

Commit

Permalink
Add some error handling on unsupported arg type.
Browse files Browse the repository at this point in the history
  • Loading branch information
shunping committed Dec 21, 2024
1 parent 6d105e5 commit bbaf640
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 5 deletions.
11 changes: 7 additions & 4 deletions sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -706,7 +706,7 @@ def stage_file_with_retry(
with open(path, 'rb') as stream:
self.stage_file(
gcs_or_local_path, file_name, stream, mime_type, total_size)
elif isinstance(stream_or_path, io.BufferedIOBase):
elif isinstance(stream_or_path, io.IOBase):
stream = stream_or_path
try:
self.stage_file(
Expand All @@ -717,9 +717,12 @@ def stage_file_with_retry(
raise exn
else:
raise retry.PermanentException(
"Skip retrying because we caught exception:",
''.join(traceback.format_exception_only(exn.__class__, exn)),
', but the stream is not seekable')
"Skip retrying because we caught exception:" +
''.join(traceback.format_exception_only(exn.__class__, exn)) +
', but the stream is not seekable.')
else:
raise retry.PermanentException(
f"Unsupported type {type(stream_or_path)} in stream_or_path")

@retry.no_retries # Using no_retries marks this as an integration point.
def create_job(self, job):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1678,7 +1678,7 @@ def effect(self, *args, **kwargs):
if count <= 2:
raise Exception("This exception is raised for testing purpose.")

class Unseekable(io.BufferedIOBase):
class Unseekable(io.IOBase):
def seekable(self):
return False

Expand Down

0 comments on commit bbaf640

Please sign in to comment.