Skip to content

Commit

Permalink
learning+website: add FlattenWith to java catalo
Browse files Browse the repository at this point in the history
Signed-off-by: Mohamed Awnallah <[email protected]>
  • Loading branch information
mohamedawnallah committed Dec 10, 2024
1 parent fa580e9 commit 292aeff
Show file tree
Hide file tree
Showing 6 changed files with 272 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.beam.learning.katas.coretransforms.flattenWith;

// beam-playground:
// name: Flatten
// description: Task from katas that merges two PCollections of words into a single PCollection.
// multifile: false
// context_line: 47
// categories:
// - Combiners
// - Flatten
// - Core Transforms
// complexity: BASIC
// tags:
// - transforms
// - join
// - strings

import org.apache.beam.learning.katas.util.Log;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;


public class Task {

public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
Pipeline pipeline = Pipeline.create(options);

PCollection<String> wordsStartingWithA =
pipeline.apply("Words starting with A",
Create.of("apple", "ant", "arrow")
);

PCollection<String> wordsStartingWithB =
pipeline.apply("Words starting with B",
Create.of("ball", "book", "bow")
);

PCollection<String> output = applyTransform(wordsStartingWithA, wordsStartingWithB);

output.apply(Log.ofElements());

pipeline.run();
}

static PCollection<String> applyTransform(
PCollection<String> words1, PCollection<String> words2) {

PTransform<PCollection<String>, PCollection<String>> flattenTransform = Flatten.with(words2);

return words1
.apply("Transform A to Uppercase",
MapElements.into(TypeDescriptors.strings())
.via((String word) -> word.toUpperCase()))
.apply("Flatten with words2", flattenTransform);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

type: edu
files:
- name: src/org/apache/beam/learning/katas/coretransforms/flattenWith/Task.java
visible: true
placeholders:
- offset: 2294
length: 85
placeholder_text: TODO()
- name: test/org/apache/beam/learning/katas/coretransforms/flattenWith/TaskTest.java
visible: false
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

FlattenWith
-------

FlattenWith is a Beam transform that merges multiple PCollection objects into
a single logical PCollection. It allows for the combination of both root
PCollection-producing transforms (like Create and Read) and existing PCollections.

**Kata:** Implement a
[FlattenWith](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/Flatten.html#with-org.apache.beam.sdk.values.PCollection-)
transform that merges two PCollection of words into a single PCollection,
optimized for chaining operations.

<div class="hint">
Refer to
<a href="https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/Flatten.html#with-org.apache.beam.sdk.values.PCollection-">
FlattenWith</a> to solve this problem.
</div>

<div class="hint">
Refer to the Beam Programming Guide
<a href="https://beam.apache.org/documentation/programming-guide/#flattenWith">
"FlattenWith"</a> section for more information.
</div>
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.beam.learning.katas.coretransforms.flattenWith;

import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.PCollection;
import org.junit.Rule;
import org.junit.Test;

public class TaskTest {

@Rule
public final transient TestPipeline testPipeline = TestPipeline.create();

@Test
public void flattenWith() {
PCollection<String> wordsStartingWithA =
testPipeline.apply("Words starting with A",
Create.of("apple", "ant", "arrow"));
PCollection<String> wordsStartingWithB =
testPipeline.apply("Words starting with B",
Create.of("ball", "book", "bow"));

PCollection<String> results = Task.applyTransform(wordsStartingWithA, wordsStartingWithB);

PAssert.that(results)
.containsInAnyOrder("APPLE", "ANT", "ARROW", "ball", "book", "bow");

testPipeline.run().waitUntilFinish();
}

}
21 changes: 21 additions & 0 deletions learning/katas/java/Core Transforms/FlattenWith/lesson-info.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

content:
- FlattenWith
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
---
title: "FlattenWith"
---
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
# FlattenWith
<table align="left">
<a target="_blank" class="button"
href="https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/Flatten.html#with-org.apache.beam.sdk.values.PCollection-">
<img src="/images/logos/sdks/java.png" width="20px" height="20px"
alt="Javadoc" />
Javadoc
</a>
</table>
<br><br>


Merges multiple `PCollection` objects into a single logical
`PCollection`. It allows for the combination of both root
`PCollection`-producing transforms (like `Create` and `Read`) and existing
PCollections.

See more information in the [Beam Programming Guide](/documentation/programming-guide/#flattenwith).

## Examples

{{< playground height="700px" >}}
{{< playground_snippet language="java" path="PG_BEAMDOC_SDK_JAVA_FlattenWith" show="main_section" >}}
{{< /playground >}}

## Related transforms
* [Flatten](/documentation/transforms/java/other/flatten) merges multiple
`PCollection` objects into a single logical `PCollection`. This is useful when
dealing with multiple collections of the same data type.
* [FlatMap](/documentation/transforms/java/elementwise/flatmap) applies a
simple 1-to-many mapping function over each element in the collection. This
transform might produce zero or more outputs.

0 comments on commit 292aeff

Please sign in to comment.