From 1777b04d0c88d1388d023f12eb21324f2b9b8c9d Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Mon, 21 Oct 2024 11:45:47 -0700 Subject: [PATCH] Format with yapf 0.29.0 which beam relys on. --- .../runners/direct/direct_runner.py | 18 ------------------ .../runners/portability/prism_runner.py | 10 ++++------ 2 files changed, 4 insertions(+), 24 deletions(-) diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py index e7c845d311e1..2e7942da3b56 100644 --- a/sdks/python/apache_beam/runners/direct/direct_runner.py +++ b/sdks/python/apache_beam/runners/direct/direct_runner.py @@ -66,7 +66,6 @@ class SwitchingDirectRunner(PipelineRunner): which supports streaming execution and certain primitives not yet implemented in the FnApiRunner. """ - def is_fnapi_compatible(self): return BundleBasedDirectRunner.is_fnapi_compatible() @@ -79,7 +78,6 @@ def run_pipeline(self, pipeline, options): class _FnApiRunnerSupportVisitor(PipelineVisitor): """Visitor determining if a Pipeline can be run on the FnApiRunner.""" - def accept(self, pipeline): self.supported_by_fnapi_runner = True pipeline.visit(self) @@ -114,7 +112,6 @@ def visit_transform(self, applied_ptransform): class _PrismRunnerSupportVisitor(PipelineVisitor): """Visitor determining if a Pipeline can be run on the PrismRunner.""" - def accept(self, pipeline): self.supported_by_prism_runner = True pipeline.visit(self) @@ -190,7 +187,6 @@ def visit_transform(self, applied_ptransform): @typehints.with_output_types(typing.Tuple[K, typing.Iterable[V]]) class _GroupByKeyOnly(PTransform): """A group by key transform, ignoring windows.""" - def infer_output_type(self, input_type): key_type, value_type = trivial_inference.key_value_types(input_type) return typehints.KV[key_type, typehints.Iterable[value_type]] @@ -204,7 +200,6 @@ def expand(self, pcoll): @typehints.with_output_types(typing.Tuple[K, typing.Iterable[V]]) class _GroupAlsoByWindow(ParDo): """The GroupAlsoByWindow transform.""" - def __init__(self, windowing): super().__init__(_GroupAlsoByWindowDoFn(windowing)) self.windowing = windowing @@ -279,7 +274,6 @@ def from_runner_api_parameter(unused_ptransform, payload, context): @typehints.with_output_types(typing.Tuple[K, typing.Iterable[V]]) class _GroupByKey(PTransform): """The DirectRunner GroupByKey implementation.""" - def expand(self, pcoll): # Imported here to avoid circular dependencies. # pylint: disable=wrong-import-order, wrong-import-position @@ -342,7 +336,6 @@ def _get_transform_overrides(pipeline_options): from apache_beam.runners.direct.sdf_direct_runner import SplittableParDoOverride class CombinePerKeyOverride(PTransformOverride): - def matches(self, applied_ptransform): if isinstance(applied_ptransform.transform, CombinePerKey): return applied_ptransform.inputs[0].windowing.is_default() @@ -360,7 +353,6 @@ def get_replacement_transform_for_applied_ptransform( return transform class StreamingGroupByKeyOverride(PTransformOverride): - def matches(self, applied_ptransform): # Note: we match the exact class, since we replace it with a subclass. return applied_ptransform.transform.__class__ == _GroupByKeyOnly @@ -372,7 +364,6 @@ def get_replacement_transform_for_applied_ptransform( return transform class StreamingGroupAlsoByWindowOverride(PTransformOverride): - def matches(self, applied_ptransform): # Note: we match the exact class, since we replace it with a subclass. transform = applied_ptransform.transform @@ -389,7 +380,6 @@ def get_replacement_transform_for_applied_ptransform( return transform class TestStreamOverride(PTransformOverride): - def matches(self, applied_ptransform): from apache_beam.testing.test_stream import TestStream self.applied_ptransform = applied_ptransform @@ -405,7 +395,6 @@ class GroupByKeyPTransformOverride(PTransformOverride): This replaces the Beam implementation as a primitive. """ - def matches(self, applied_ptransform): # Imported here to avoid circular dependencies. # pylint: disable=wrong-import-order, wrong-import-position @@ -447,7 +436,6 @@ def get_replacement_transform_for_applied_ptransform( class _DirectReadFromPubSub(PTransform): - def __init__(self, source): self._source = source @@ -523,7 +511,6 @@ def _get_pubsub_transform_overrides(pipeline_options): from apache_beam.pipeline import PTransformOverride class ReadFromPubSubOverride(PTransformOverride): - def matches(self, applied_ptransform): return isinstance( applied_ptransform.transform, beam_pubsub.ReadFromPubSub) @@ -537,7 +524,6 @@ def get_replacement_transform_for_applied_ptransform( return _DirectReadFromPubSub(applied_ptransform.transform._source) class WriteToPubSubOverride(PTransformOverride): - def matches(self, applied_ptransform): return isinstance(applied_ptransform.transform, beam_pubsub.WriteToPubSub) @@ -554,7 +540,6 @@ def get_replacement_transform_for_applied_ptransform( class BundleBasedDirectRunner(PipelineRunner): """Executes a single pipeline on the local machine.""" - @staticmethod def is_fnapi_compatible(): return False @@ -577,7 +562,6 @@ def run_pipeline(self, pipeline, options): class VerifyNoCrossLanguageTransforms(PipelineVisitor): """Visitor determining whether a Pipeline uses a TestStream.""" - def visit_transform(self, applied_ptransform): if isinstance(applied_ptransform.transform, ExternalTransform): raise RuntimeError( @@ -591,7 +575,6 @@ def visit_transform(self, applied_ptransform): # If the TestStream I/O is used, use a mock test clock. class TestStreamUsageVisitor(PipelineVisitor): """Visitor determining whether a Pipeline uses a TestStream.""" - def __init__(self): self.uses_test_stream = False @@ -642,7 +625,6 @@ def visit_transform(self, applied_ptransform): class DirectPipelineResult(PipelineResult): """A DirectPipelineResult provides access to info about a pipeline.""" - def __init__(self, executor, evaluation_context): super().__init__(PipelineState.RUNNING) self._executor = executor diff --git a/sdks/python/apache_beam/runners/portability/prism_runner.py b/sdks/python/apache_beam/runners/portability/prism_runner.py index 2148ab9827fa..868cfa91b30a 100644 --- a/sdks/python/apache_beam/runners/portability/prism_runner.py +++ b/sdks/python/apache_beam/runners/portability/prism_runner.py @@ -56,7 +56,6 @@ class PrismRunner(portable_runner.PortableRunner): """A runner for launching jobs on Prism, automatically downloading and starting a Prism instance if needed. """ - def default_environment( self, options: pipeline_options.PipelineOptions) -> environments.Environment: @@ -229,11 +228,11 @@ def path_to_binary(self) -> str: 'Likely Go is not installed, or a local change to Prism did not compile.\n' 'Please install Go (see https://go.dev/doc/install) to enable automatic local builds.\n' 'Alternatively provide a binary with the --prism_location flag.' - '\nCaptured output:\n %s' % - (self._version, output)) + '\nCaptured output:\n %s' % (self._version, output)) # Go is installed and claims we're not in a Go module that has access to the Prism package. - # Fallback to using the @latest version of prism, which works everywhere. + + # Fallback to using the @latest version of prism, which works everywhere. process = subprocess.run(["go", "install", PRISMPKG + "@latest"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, @@ -246,8 +245,7 @@ def path_to_binary(self) -> str: raise ValueError( 'We were unable to execute the subprocess "%s" to automatically build prism. \n' 'Alternatively provide an alternate binary with the --prism_location flag.' - '\nCaptured output:\n %s' % - (process.args, output)) + '\nCaptured output:\n %s' % (process.args, output)) def subprocess_cmd_and_endpoint( self) -> typing.Tuple[typing.List[typing.Any], str]: