Skip to content

Commit

Permalink
Format with yapf 0.29.0 which beam relys on.
Browse files Browse the repository at this point in the history
  • Loading branch information
lostluck committed Oct 21, 2024
1 parent ba48a1a commit 1777b04
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 24 deletions.
18 changes: 0 additions & 18 deletions sdks/python/apache_beam/runners/direct/direct_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ class SwitchingDirectRunner(PipelineRunner):
which supports streaming execution and certain primitives not yet
implemented in the FnApiRunner.
"""

def is_fnapi_compatible(self):
return BundleBasedDirectRunner.is_fnapi_compatible()

Expand All @@ -79,7 +78,6 @@ def run_pipeline(self, pipeline, options):

class _FnApiRunnerSupportVisitor(PipelineVisitor):
"""Visitor determining if a Pipeline can be run on the FnApiRunner."""

def accept(self, pipeline):
self.supported_by_fnapi_runner = True
pipeline.visit(self)
Expand Down Expand Up @@ -114,7 +112,6 @@ def visit_transform(self, applied_ptransform):

class _PrismRunnerSupportVisitor(PipelineVisitor):
"""Visitor determining if a Pipeline can be run on the PrismRunner."""

def accept(self, pipeline):
self.supported_by_prism_runner = True
pipeline.visit(self)
Expand Down Expand Up @@ -190,7 +187,6 @@ def visit_transform(self, applied_ptransform):
@typehints.with_output_types(typing.Tuple[K, typing.Iterable[V]])
class _GroupByKeyOnly(PTransform):
"""A group by key transform, ignoring windows."""

def infer_output_type(self, input_type):
key_type, value_type = trivial_inference.key_value_types(input_type)
return typehints.KV[key_type, typehints.Iterable[value_type]]
Expand All @@ -204,7 +200,6 @@ def expand(self, pcoll):
@typehints.with_output_types(typing.Tuple[K, typing.Iterable[V]])
class _GroupAlsoByWindow(ParDo):
"""The GroupAlsoByWindow transform."""

def __init__(self, windowing):
super().__init__(_GroupAlsoByWindowDoFn(windowing))
self.windowing = windowing
Expand Down Expand Up @@ -279,7 +274,6 @@ def from_runner_api_parameter(unused_ptransform, payload, context):
@typehints.with_output_types(typing.Tuple[K, typing.Iterable[V]])
class _GroupByKey(PTransform):
"""The DirectRunner GroupByKey implementation."""

def expand(self, pcoll):
# Imported here to avoid circular dependencies.
# pylint: disable=wrong-import-order, wrong-import-position
Expand Down Expand Up @@ -342,7 +336,6 @@ def _get_transform_overrides(pipeline_options):
from apache_beam.runners.direct.sdf_direct_runner import SplittableParDoOverride

class CombinePerKeyOverride(PTransformOverride):

def matches(self, applied_ptransform):
if isinstance(applied_ptransform.transform, CombinePerKey):
return applied_ptransform.inputs[0].windowing.is_default()
Expand All @@ -360,7 +353,6 @@ def get_replacement_transform_for_applied_ptransform(
return transform

class StreamingGroupByKeyOverride(PTransformOverride):

def matches(self, applied_ptransform):
# Note: we match the exact class, since we replace it with a subclass.
return applied_ptransform.transform.__class__ == _GroupByKeyOnly
Expand All @@ -372,7 +364,6 @@ def get_replacement_transform_for_applied_ptransform(
return transform

class StreamingGroupAlsoByWindowOverride(PTransformOverride):

def matches(self, applied_ptransform):
# Note: we match the exact class, since we replace it with a subclass.
transform = applied_ptransform.transform
Expand All @@ -389,7 +380,6 @@ def get_replacement_transform_for_applied_ptransform(
return transform

class TestStreamOverride(PTransformOverride):

def matches(self, applied_ptransform):
from apache_beam.testing.test_stream import TestStream
self.applied_ptransform = applied_ptransform
Expand All @@ -405,7 +395,6 @@ class GroupByKeyPTransformOverride(PTransformOverride):
This replaces the Beam implementation as a primitive.
"""

def matches(self, applied_ptransform):
# Imported here to avoid circular dependencies.
# pylint: disable=wrong-import-order, wrong-import-position
Expand Down Expand Up @@ -447,7 +436,6 @@ def get_replacement_transform_for_applied_ptransform(


class _DirectReadFromPubSub(PTransform):

def __init__(self, source):
self._source = source

Expand Down Expand Up @@ -523,7 +511,6 @@ def _get_pubsub_transform_overrides(pipeline_options):
from apache_beam.pipeline import PTransformOverride

class ReadFromPubSubOverride(PTransformOverride):

def matches(self, applied_ptransform):
return isinstance(
applied_ptransform.transform, beam_pubsub.ReadFromPubSub)
Expand All @@ -537,7 +524,6 @@ def get_replacement_transform_for_applied_ptransform(
return _DirectReadFromPubSub(applied_ptransform.transform._source)

class WriteToPubSubOverride(PTransformOverride):

def matches(self, applied_ptransform):
return isinstance(applied_ptransform.transform, beam_pubsub.WriteToPubSub)

Expand All @@ -554,7 +540,6 @@ def get_replacement_transform_for_applied_ptransform(

class BundleBasedDirectRunner(PipelineRunner):
"""Executes a single pipeline on the local machine."""

@staticmethod
def is_fnapi_compatible():
return False
Expand All @@ -577,7 +562,6 @@ def run_pipeline(self, pipeline, options):

class VerifyNoCrossLanguageTransforms(PipelineVisitor):
"""Visitor determining whether a Pipeline uses a TestStream."""

def visit_transform(self, applied_ptransform):
if isinstance(applied_ptransform.transform, ExternalTransform):
raise RuntimeError(
Expand All @@ -591,7 +575,6 @@ def visit_transform(self, applied_ptransform):
# If the TestStream I/O is used, use a mock test clock.
class TestStreamUsageVisitor(PipelineVisitor):
"""Visitor determining whether a Pipeline uses a TestStream."""

def __init__(self):
self.uses_test_stream = False

Expand Down Expand Up @@ -642,7 +625,6 @@ def visit_transform(self, applied_ptransform):

class DirectPipelineResult(PipelineResult):
"""A DirectPipelineResult provides access to info about a pipeline."""

def __init__(self, executor, evaluation_context):
super().__init__(PipelineState.RUNNING)
self._executor = executor
Expand Down
10 changes: 4 additions & 6 deletions sdks/python/apache_beam/runners/portability/prism_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ class PrismRunner(portable_runner.PortableRunner):
"""A runner for launching jobs on Prism, automatically downloading and
starting a Prism instance if needed.
"""

def default_environment(
self,
options: pipeline_options.PipelineOptions) -> environments.Environment:
Expand Down Expand Up @@ -229,11 +228,11 @@ def path_to_binary(self) -> str:
'Likely Go is not installed, or a local change to Prism did not compile.\n'
'Please install Go (see https://go.dev/doc/install) to enable automatic local builds.\n'
'Alternatively provide a binary with the --prism_location flag.'
'\nCaptured output:\n %s' %
(self._version, output))
'\nCaptured output:\n %s' % (self._version, output))

# Go is installed and claims we're not in a Go module that has access to the Prism package.
# Fallback to using the @latest version of prism, which works everywhere.

# Fallback to using the @latest version of prism, which works everywhere.
process = subprocess.run(["go", "install", PRISMPKG + "@latest"],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
Expand All @@ -246,8 +245,7 @@ def path_to_binary(self) -> str:
raise ValueError(
'We were unable to execute the subprocess "%s" to automatically build prism. \n'
'Alternatively provide an alternate binary with the --prism_location flag.'
'\nCaptured output:\n %s' %
(process.args, output))
'\nCaptured output:\n %s' % (process.args, output))

def subprocess_cmd_and_endpoint(
self) -> typing.Tuple[typing.List[typing.Any], str]:
Expand Down

0 comments on commit 1777b04

Please sign in to comment.