Skip to content

Commit

Permalink
one more test, asserting it gets used as expected
Browse files Browse the repository at this point in the history
  • Loading branch information
robertwb committed Mar 29, 2024
1 parent 80a7d94 commit f5d3cb5
Showing 1 changed file with 46 additions and 1 deletion.
47 changes: 46 additions & 1 deletion sdks/python/apache_beam/yaml/yaml_provider_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,38 @@
import unittest

import apache_beam as beam
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
from apache_beam.yaml import yaml_provider
from apache_beam.yaml import yaml_transform


class FakeTransform(beam.PTransform):
def __init__(self, creator, urn):
self.creator = creator
self.urn = urn

def expand(self, pcoll):
return pcoll | beam.Map(lambda x: x + ((self.urn, self.creator), ))


class FakeExternalProvider(yaml_provider.ExternalProvider):
def __init__(self, id, known_transforms, extra_transform_urns):
def __init__(
self, id, known_transforms, extra_transform_urns, error_on_use=False):
super().__init__(known_transforms, None)
self._id = id
self._schema_transforms = {urn: None for urn in extra_transform_urns}
self._error_on_use = error_on_use

def create_transform(self, type, *unused_args, **unused_kwargs):
if self._error_on_use:
raise RuntimeError(f'Provider {self._id} should not be used.')
return FakeTransform(self._id, self._urns[type])

def available(self):
# Claim we're available even if we error on use.
return True


class YamlProvidersTest(unittest.TestCase):
def test_external_with_underlying_provider(self):
Expand Down Expand Up @@ -69,6 +83,37 @@ def test_renaming_with_underlying_provider(self):
self.assertEqual('A', t.creator)
self.assertEqual('b:urn', t.urn)

def test_extended_providers_reused(self):
providerA = FakeExternalProvider("A", {'A': 'a:urn'}, ['b:urn'])
providerB = FakeExternalProvider(
"B", {
'B': 'b:urn', 'C': 'c:urn'
}, [], error_on_use=True)
providerR = yaml_provider.RenamingProvider( # keep wrapping
{'RenamedB': 'B', 'RenamedC': 'C' },
{'RenamedB': {}, 'RenamedC': {}},
providerB)

with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
pickle_library='cloudpickle')) as p:
result = p | 'Yaml1' >> yaml_transform.YamlTransform(
'''
type: chain
transforms:
- type: Create
config:
elements: [0]
- type: A
- type: B
- type: RenamedB
''',
providers=[providerA, providerB, providerR])
# All of these transforms should be serviced by providerA,
# negating the need to invoke providerB.
assert_that(
result,
equal_to([(0, ('a:urn', 'A'), ('b:urn', 'A'), ('b:urn', 'A'))]))


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
Expand Down

0 comments on commit f5d3cb5

Please sign in to comment.