Skip to content

Commit

Permalink
Add support to use side inputs with Combine.PerKeyWithHotKeyFanout
Browse files Browse the repository at this point in the history
  • Loading branch information
marc7806 committed Oct 7, 2023
1 parent 7531501 commit 413ea75
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1555,7 +1555,7 @@ public PerKey<K, InputT, OutputT> withSideInputs(
*/
public PerKeyWithHotKeyFanout<K, InputT, OutputT> withHotKeyFanout(
SerializableFunction<? super K, Integer> hotKeyFanout) {
return new PerKeyWithHotKeyFanout<>(fn, fnDisplayData, hotKeyFanout, fewKeys);
return new PerKeyWithHotKeyFanout<>(fn, fnDisplayData, hotKeyFanout, fewKeys, sideInputs);
}

/**
Expand All @@ -1578,7 +1578,8 @@ public Integer apply(K unused) {
return hotKeyFanout;
}
},
fewKeys);
fewKeys,
sideInputs);
}

/** Returns the {@link GlobalCombineFn} used by this Combine operation. */
Expand Down Expand Up @@ -1624,18 +1625,20 @@ public static class PerKeyWithHotKeyFanout<K, InputT, OutputT>
private final GlobalCombineFn<? super InputT, ?, OutputT> fn;
private final DisplayData.ItemSpec<? extends Class<?>> fnDisplayData;
private final SerializableFunction<? super K, Integer> hotKeyFanout;

private final boolean fewKeys;
private final List<PCollectionView<?>> sideInputs;

private PerKeyWithHotKeyFanout(
GlobalCombineFn<? super InputT, ?, OutputT> fn,
DisplayData.ItemSpec<? extends Class<?>> fnDisplayData,
SerializableFunction<? super K, Integer> hotKeyFanout,
boolean fewKeys) {
boolean fewKeys,
List<PCollectionView<?>> sideInputs) {
this.fn = fn;
this.fnDisplayData = fnDisplayData;
this.hotKeyFanout = hotKeyFanout;
this.fewKeys = fewKeys;
this.sideInputs = sideInputs;
}

@Override
Expand Down Expand Up @@ -1928,6 +1931,10 @@ public void processElement(
fewKeys
? Combine.fewKeys(hotPreCombine, fnDisplayData)
: Combine.perKey(hotPreCombine, fnDisplayData);
if (!sideInputs.isEmpty()) {
hotPreCombineTransform = hotPreCombineTransform.withSideInputs(sideInputs);
}

PCollection<KV<K, InputOrAccum<InputT, AccumT>>> precombinedHot =
split
.get(hot)
Expand Down Expand Up @@ -1975,6 +1982,10 @@ public KV<K, InputOrAccum<InputT, AccumT>> apply(KV<K, InputT> element) {
fewKeys
? Combine.fewKeys(postCombine, fnDisplayData)
: Combine.perKey(postCombine, fnDisplayData);
if (!sideInputs.isEmpty()) {
postCombineTransform = postCombineTransform.withSideInputs(sideInputs);
}

return PCollectionList.of(precombinedHot)
.and(preprocessedCold)
.apply(Flatten.pCollections())
Expand All @@ -1993,6 +2004,11 @@ public void populateDisplayData(DisplayData.Builder builder) {
DisplayData.item("fanoutFn", hotKeyFanout.getClass()).withLabel("Fanout Function"));
}

/** Returns the side inputs used by this Combine operation. */
public List<PCollectionView<?>> getSideInputs() {
return sideInputs;
}

/**
* Used to store either an input or accumulator value, for flattening the hot and cold key
* paths.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1031,6 +1031,30 @@ public void testWithFanoutPreservesSideInputs() {

assertEquals(Collections.singletonList(view), combine.getSideInputs());
}

@Test
@Category({ValidatesRunner.class, UsesSideInputs.class})
public void testHotKeyCombineWithSideInputs() {
PCollection<KV<String, Integer>> input =
createInput(
pipeline,
Arrays.asList(
KV.of("a", 1), KV.of("a", 1), KV.of("a", 4), KV.of("b", 1), KV.of("b", 13)));
PCollection<Integer> sum =
input.apply(Values.create()).apply("Sum", Combine.globally(new SumInts()));
PCollectionView<Integer> sumView = sum.apply(View.asSingleton());

PCollection<KV<String, String>> combinePerKeyWithSideInputsAndHotKey =
input.apply(
Combine.<String, Integer, String>perKey(new TestCombineFnWithContext(sumView))
.withSideInputs(sumView)
.withHotKeyFanout(1));

PAssert.that(combinePerKeyWithSideInputsAndHotKey)
.containsInAnyOrder(Arrays.asList(KV.of("a", "20:114"), KV.of("b", "20:113")));

pipeline.run();
}
}

/** Tests validating windowing behaviors. */
Expand Down

0 comments on commit 413ea75

Please sign in to comment.