Skip to content

Commit

Permalink
Add outputWindowedValue capability to Java SDK
Browse files Browse the repository at this point in the history
  • Loading branch information
kennknowles committed Dec 13, 2023
1 parent 8672028 commit db0bb44
Show file tree
Hide file tree
Showing 18 changed files with 923 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.google.auto.service.AutoService;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
Expand Down Expand Up @@ -669,6 +670,15 @@ public void output(RestrictionT part) {
public void outputWithTimestamp(RestrictionT part, Instant timestamp) {
throw new UnsupportedOperationException();
}

@Override
public void outputWindowedValue(
RestrictionT output,
Instant timestamp,
Collection<? extends BoundedWindow> windows,
PaneInfo paneInfo) {
throw new UnsupportedOperationException();
}
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;

import java.util.Collection;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.beam.runners.core.construction.SplittableParDo.ProcessKeyedElements;
Expand Down Expand Up @@ -525,6 +526,15 @@ public void output(OutputT output) {
public void outputWithTimestamp(OutputT output, Instant timestamp) {
outerContext.outputWithTimestamp(output, timestamp);
}

@Override
public void outputWindowedValue(
OutputT output,
Instant timestamp,
Collection<? extends BoundedWindow> windows,
PaneInfo paneInfo) {
outerContext.outputWindowedValue(output, timestamp, windows, paneInfo);
}
};
}

Expand All @@ -543,6 +553,15 @@ public void output(T output) {
public void outputWithTimestamp(T output, Instant timestamp) {
outerContext.outputWithTimestamp(tag, output, timestamp);
}

@Override
public void outputWindowedValue(
T output,
Instant timestamp,
Collection<? extends BoundedWindow> windows,
PaneInfo paneInfo) {
outerContext.outputWindowedValue(tag, output, timestamp, windows, paneInfo);
}
};
}

Expand Down Expand Up @@ -583,6 +602,15 @@ public void outputWithTimestamp(OutputT output, Instant timestamp) {
outerContext.outputWithTimestamp(output, timestamp);
}

@Override
public void outputWindowedValue(
OutputT output,
Instant timestamp,
Collection<? extends BoundedWindow> windows,
PaneInfo paneInfo) {
outerContext.outputWindowedValue(output, timestamp, windows, paneInfo);
}

@Override
public <T> void output(TupleTag<T> tag, T output) {
outerContext.output(tag, output);
Expand All @@ -593,6 +621,16 @@ public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp
outerContext.outputWithTimestamp(tag, output, timestamp);
}

@Override
public <T> void outputWindowedValue(
TupleTag<T> tag,
T output,
Instant timestamp,
Collection<? extends BoundedWindow> windows,
PaneInfo paneInfo) {
outerContext.outputWindowedValue(tag, output, timestamp, windows, paneInfo);
}

@Override
public InputT element() {
return element;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;

import java.util.Collection;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Future;
Expand All @@ -42,6 +43,7 @@
import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
import org.apache.beam.sdk.transforms.splittabledofn.TimestampObservingWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
Expand Down Expand Up @@ -388,11 +390,20 @@ public void output(OutputT output) {

@Override
public void outputWithTimestamp(OutputT value, Instant timestamp) {
outputWindowedValue(value, timestamp, element.getWindows(), element.getPane());
}

@Override
public void outputWindowedValue(
OutputT value,
Instant timestamp,
Collection<? extends BoundedWindow> windows,
PaneInfo paneInfo) {
noteOutput();
if (watermarkEstimator instanceof TimestampObservingWatermarkEstimator) {
((TimestampObservingWatermarkEstimator) watermarkEstimator).observeTimestamp(timestamp);
}
output.outputWindowedValue(value, timestamp, element.getWindows(), element.getPane());
output.outputWindowedValue(value, timestamp, windows, paneInfo);
}

@Override
Expand All @@ -402,11 +413,21 @@ public <T> void output(TupleTag<T> tag, T value) {

@Override
public <T> void outputWithTimestamp(TupleTag<T> tag, T value, Instant timestamp) {
outputWindowedValue(tag, value, timestamp, element.getWindows(), element.getPane());
}

@Override
public <T> void outputWindowedValue(
TupleTag<T> tag,
T value,
Instant timestamp,
Collection<? extends BoundedWindow> windows,
PaneInfo paneInfo) {
noteOutput();
if (watermarkEstimator instanceof TimestampObservingWatermarkEstimator) {
((TimestampObservingWatermarkEstimator) watermarkEstimator).observeTimestamp(timestamp);
}
output.outputWindowedValue(tag, value, timestamp, element.getWindows(), element.getPane());
output.outputWindowedValue(tag, value, timestamp, windows, paneInfo);
}

private void noteOutput() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -413,22 +414,40 @@ public void output(OutputT output) {

@Override
public void outputWithTimestamp(OutputT output, Instant timestamp) {
checkTimestamp(elem.getTimestamp(), timestamp);
outputWithTimestamp(mainOutputTag, output, timestamp);
}

@Override
public void outputWindowedValue(
OutputT output,
Instant timestamp,
Collection<? extends BoundedWindow> windows,
PaneInfo paneInfo) {
outputWindowedValue(mainOutputTag, output, timestamp, windows, paneInfo);
}

@Override
public <T> void output(TupleTag<T> tag, T output) {
checkNotNull(tag, "Tag passed to output cannot be null");
outputWindowedValue(tag, elem.withValue(output));
SimpleDoFnRunner.this.outputWindowedValue(tag, elem.withValue(output));
}

@Override
public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
checkNotNull(tag, "Tag passed to outputWithTimestamp cannot be null");
checkTimestamp(elem.getTimestamp(), timestamp);
outputWindowedValue(
tag, WindowedValue.of(output, timestamp, elem.getWindows(), elem.getPane()));
outputWindowedValue(tag, output, timestamp, elem.getWindows(), elem.getPane());
}

@Override
public <T> void outputWindowedValue(
TupleTag<T> tag,
T output,
Instant timestamp,
Collection<? extends BoundedWindow> windows,
PaneInfo paneInfo) {
SimpleDoFnRunner.this.outputWindowedValue(
tag, WindowedValue.of(output, timestamp, windows, paneInfo));
}

@Override
Expand Down Expand Up @@ -838,16 +857,38 @@ public void outputWithTimestamp(OutputT output, Instant timestamp) {
outputWithTimestamp(mainOutputTag, output, timestamp);
}

@Override
public void outputWindowedValue(
OutputT output,
Instant timestamp,
Collection<? extends BoundedWindow> windows,
PaneInfo paneInfo) {
outputWindowedValue(mainOutputTag, output, timestamp, windows, paneInfo);
}

@Override
public <T> void output(TupleTag<T> tag, T output) {
checkTimestamp(timestamp(), timestamp);
outputWindowedValue(tag, WindowedValue.of(output, timestamp, window(), PaneInfo.NO_FIRING));
outputWithTimestamp(tag, output, timestamp);
}

@Override
public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
checkTimestamp(timestamp(), timestamp);
outputWindowedValue(tag, WindowedValue.of(output, timestamp, window(), PaneInfo.NO_FIRING));
outputWindowedValue(
tag, output, timestamp, Collections.singleton(window()), PaneInfo.NO_FIRING);
}

@Override
public <T> void outputWindowedValue(
TupleTag<T> tag,
T output,
Instant timestamp,
Collection<? extends BoundedWindow> windows,
PaneInfo paneInfo) {
checkTimestamp(timestamp(), timestamp);
SimpleDoFnRunner.this.outputWindowedValue(
tag, WindowedValue.of(output, timestamp, windows, paneInfo));
}

@Override
Expand Down Expand Up @@ -1045,16 +1086,38 @@ public void outputWithTimestamp(OutputT output, Instant timestamp) {
outputWithTimestamp(mainOutputTag, output, timestamp);
}

@Override
public void outputWindowedValue(
OutputT output,
Instant timestamp,
Collection<? extends BoundedWindow> windows,
PaneInfo paneInfo) {
outputWindowedValue(mainOutputTag, output, timestamp, windows, paneInfo);
}

@Override
public <T> void output(TupleTag<T> tag, T output) {
checkTimestamp(this.timestamp, timestamp);
outputWindowedValue(tag, WindowedValue.of(output, timestamp, window(), PaneInfo.NO_FIRING));
outputWithTimestamp(tag, output, timestamp);
}

@Override
public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
checkTimestamp(this.timestamp, timestamp);
outputWindowedValue(tag, WindowedValue.of(output, timestamp, window(), PaneInfo.NO_FIRING));
outputWindowedValue(
tag, output, timestamp, Collections.singleton(window()), PaneInfo.NO_FIRING);
}

@Override
public <T> void outputWindowedValue(
TupleTag<T> tag,
T output,
Instant timestamp,
Collection<? extends BoundedWindow> windows,
PaneInfo paneInfo) {
checkTimestamp(this.timestamp, timestamp);
SimpleDoFnRunner.this.outputWindowedValue(
tag, WindowedValue.of(output, timestamp, windows, paneInfo));
}

@Override
Expand Down
Loading

0 comments on commit db0bb44

Please sign in to comment.