Skip to content

Commit

Permalink
Create YAML Join Transform (apache#30734)
Browse files Browse the repository at this point in the history
* basic SQLbacked transform

* add Join-calcite to providers

* translate join conditions to sql

* parse output fields

* validate join yaml entries

* reformat code

* fix pythonformatter issues

* add unit tests

* support equalities with more than 2 inputs

* fix lint formatting & remove networkx dependency

* fix lint

* fix format

* handle disconnect equalities

* correct disconnected inputs error message

* remove test.yaml

* add unit test annotation

* add fields validation

* Minor join fixes.

* Some simple join tests.

---------

Co-authored-by: Robert Bradshaw <[email protected]>
  • Loading branch information
itodotimothy6 and robertwb authored Apr 17, 2024
1 parent d70c253 commit 38a8b5f
Show file tree
Hide file tree
Showing 5 changed files with 698 additions and 6 deletions.
19 changes: 13 additions & 6 deletions sdks/python/apache_beam/yaml/integration_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,19 @@ def provider_sets(spec, require_available=False):
"""For transforms that are vended by multiple providers, yields all possible
combinations of providers to use.
"""
all_transform_types = set.union(
*(
set(
transform_types(
yaml_transform.preprocess(copy.deepcopy(p['pipeline']))))
for p in spec['pipelines']))
try:
for p in spec['pipelines']:
_ = yaml_transform.preprocess(copy.deepcopy(p['pipeline']))
except Exception as exn:
print(exn)
all_transform_types = []
else:
all_transform_types = set.union(
*(
set(
transform_types(
yaml_transform.preprocess(copy.deepcopy(p['pipeline']))))
for p in spec['pipelines']))

def filter_to_available(t, providers):
if require_available:
Expand Down
186 changes: 186 additions & 0 deletions sdks/python/apache_beam/yaml/tests/join.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the# Row(word='License'); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an# Row(word='AS IS' BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

pipelines:

- pipeline:
transforms:
- type: Create
name: A
config:
elements:
- {common: "x", a: 1}
- {common: "y", a: 2}
- {common: "z", a: 3}

- type: Create
name: B
config:
elements:
- {common: "x", b: 10, other: "t"}
- {common: "y", b: 20, other: "u"}
- {common: "z", b: 30, other: "v"}

- type: Create
name: C
config:
elements:
- {other: "t", c: 100}
- {other: "u", c: 200}

- type: Join
input:
A: A
B: B
C: C
config:
type: inner
equalities:
- B: other
C: other
- A: common
B: common
fields:
A: [a]
B: [b]
C: [c]

- type: AssertEqual
input: Join
config:
elements:
- {a: 1, b: 10, c: 100}
- {a: 2, b: 20, c: 200}


- pipeline:
transforms:
- type: Create
name: A
config:
elements:
- {common: "x", a: 1}
- {common: "y", a: 2}
- {common: "z", a: 3}

- type: Create
name: B
config:
elements:
- {common: "x", b: 10}
- {common: "y", b: 20}
- {common: "z", b: 30}

- type: Create
name: C
config:
elements:
- {common: "x", c: 100}
- {common: "y", c: 200}

- type: Join
name: InnerJoin
input:
A: A
B: B
C: C
config:
type: inner
equalities:
- A: common
B: common
C: common
fields:
A: [common, a]
B: [b]
C: {c: c, common_c: common}

- type: AssertEqual
input: InnerJoin
config:
elements:
- {common: "x", a: 1, b: 10, c: 100, common_c: "x"}
- {common: "y", a: 2, b: 20, c: 200, common_c: "y"}

- type: Join
name: OuterJoin
input:
A: A
B: B
C: C
config:
type: outer
equalities:
- A: common
B: common
C: common
fields:
A: [common, a]
B: [b]
C: {c: c, common_c: common}

- type: AssertEqual
input: OuterJoin
config:
elements:
- {common: "x", a: 1, b: 10, c: 100, common_c: "x"}
- {common: "y", a: 2, b: 20, c: 200, common_c: "y"}
- {common: "z", a: 3, b: 30, c: null, common_c: null}

- type: Join
name: LeftJoin
input:
A: A
C: C
config:
type: left
equalities:
- A: common
C: common
fields:
A: [a]
C: [c]

- type: AssertEqual
input: LeftJoin
config:
elements:
- {a: 1, c: 100}
- {a: 2, c: 200}
- {a: 3, c: null}

- type: Join
name: RightJoin
input:
A: A
C: C
config:
type: right
equalities:
- A: common
C: common
fields:
A: [a]
C: [c]

- type: AssertEqual
input: RightJoin
config:
elements:
- {a: 1, c: 100}
- {a: 2, c: 200}

Loading

0 comments on commit 38a8b5f

Please sign in to comment.