Skip to content

Commit

Permalink
preserve non-global windowing
Browse files Browse the repository at this point in the history
Signed-off-by: Jeffrey Kinard <[email protected]>
  • Loading branch information
Polber committed Sep 30, 2024
1 parent f3a5b25 commit 821793e
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.apache.commons.csv.CSVFormat;

Expand Down Expand Up @@ -139,7 +139,10 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
// Preserve input windowing
CsvIO.Write<Row> writeTransform =
CsvIO.writeRows(configuration.getPath(), format).withSuffix("");
if (input.get(INPUT_ROWS_TAG).isBounded() == PCollection.IsBounded.UNBOUNDED) {
if (!input
.get(INPUT_ROWS_TAG)
.getWindowingStrategy()
.equals(WindowingStrategy.globalDefault())) {
writeTransform = writeTransform.withWindowedWrites();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;

/**
Expand Down Expand Up @@ -124,7 +124,10 @@ protected static class JsonWriteTransform extends SchemaTransform {
public PCollectionRowTuple expand(PCollectionRowTuple input) {
// Preserve input windowing
JsonIO.Write<Row> writeTransform = JsonIO.writeRows(configuration.getPath()).withSuffix("");
if (input.get(INPUT_ROWS_TAG).isBounded() == PCollection.IsBounded.UNBOUNDED) {
if (!input
.get(INPUT_ROWS_TAG)
.getWindowingStrategy()
.equals(WindowingStrategy.globalDefault())) {
writeTransform = writeTransform.withWindowedWrites();
}

Expand Down

0 comments on commit 821793e

Please sign in to comment.