Skip to content

Commit

Permalink
Fix test serialization
Browse files Browse the repository at this point in the history
Address pr comments
  • Loading branch information
scwhittle committed Aug 29, 2024
1 parent d64c400 commit 10d8cd9
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
Expand All @@ -42,6 +41,7 @@
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
Expand All @@ -52,12 +52,12 @@ public class KafkaCommitOffset<K, V>
extends PTransform<
PCollection<KV<KafkaSourceDescriptor, KafkaRecord<K, V>>>, PCollection<Void>> {
private final KafkaIO.ReadSourceDescriptors<K, V> readSourceDescriptors;
private final boolean useLegacyImplementation;
private final boolean use259implementation;

KafkaCommitOffset(
KafkaIO.ReadSourceDescriptors<K, V> readSourceDescriptors, boolean useLegacyImplementation) {
KafkaIO.ReadSourceDescriptors<K, V> readSourceDescriptors, boolean use259implementation) {
this.readSourceDescriptors = readSourceDescriptors;
this.useLegacyImplementation = useLegacyImplementation;
this.use259implementation = use259implementation;
}

static class CommitOffsetDoFn extends DoFn<KV<KafkaSourceDescriptor, Long>, Void> {
Expand Down Expand Up @@ -105,9 +105,9 @@ private Map<String, Object> overrideBootstrapServersConfig(
}
}

static class MaxOffsetFn<K, V>
private static final class MaxOffsetFn<K, V>
extends DoFn<KV<KafkaSourceDescriptor, KafkaRecord<K, V>>, KV<KafkaSourceDescriptor, Long>> {
static class OffsetAndTimestamp {
private static class OffsetAndTimestamp {
OffsetAndTimestamp(long offset, Instant timestamp) {
this.offset = offset;
this.timestamp = timestamp;
Expand All @@ -124,15 +124,20 @@ void merge(long offset, Instant timestamp) {
Instant timestamp;
}

private transient Map<KafkaSourceDescriptor, OffsetAndTimestamp> maxObserved = new HashMap<>();
private transient @MonotonicNonNull Map<KafkaSourceDescriptor, OffsetAndTimestamp> maxObserved;

@StartBundle
public void startBundle() {
maxObserved.clear();
if (maxObserved == null) {
maxObserved = new HashMap<>();
} else {
maxObserved.clear();
}
}

@RequiresStableInput
@ProcessElement
@SuppressWarnings("nullness") // startBundle guaranteed to initialize
public void processElement(
@Element KV<KafkaSourceDescriptor, KafkaRecord<K, V>> element,
@Timestamp Instant timestamp) {
Expand All @@ -149,23 +154,18 @@ public void processElement(
}

@FinishBundle
@SuppressWarnings("nullness") // startBundle guaranteed to initialize
public void finishBundle(FinishBundleContext context) {
maxObserved.forEach(
(k, v) -> context.output(KV.of(k, v.offset), v.timestamp, GlobalWindow.INSTANCE));
}

private void readObject(java.io.ObjectInputStream in)
throws IOException, ClassNotFoundException {
in.defaultReadObject();
maxObserved = new HashMap<>();
}
}

@Override
public PCollection<Void> expand(PCollection<KV<KafkaSourceDescriptor, KafkaRecord<K, V>>> input) {
try {
PCollection<KV<KafkaSourceDescriptor, Long>> offsets;
if (useLegacyImplementation) {
if (use259implementation) {
offsets =
input.apply(
MapElements.into(new TypeDescriptor<KV<KafkaSourceDescriptor, Long>>() {})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.schemas.JavaFieldSchema;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
import org.apache.beam.sdk.schemas.SchemaRegistry;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.annotations.SchemaCreate;
import org.apache.beam.sdk.schemas.transforms.Convert;
Expand Down Expand Up @@ -2686,55 +2687,60 @@ public PCollection<KafkaRecord<K, V>> expand(PCollection<KafkaSourceDescriptor>
.getSchemaCoder(KafkaSourceDescriptor.class),
recordCoder));

if (isCommitOffsetEnabled() && !configuredKafkaCommit() && !isRedistribute()) {
// Add transform for committing offsets to Kafka with consistency with beam pipeline data
// processing.
boolean useLegacyImplementation = false;
String requestedVersionString =
input
.getPipeline()
.getOptions()
.as(StreamingOptions.class)
.getUpdateCompatibilityVersion();
if (requestedVersionString != null) {
List<String> requestedVersion = Arrays.asList(requestedVersionString.split("\\."));
List<String> targetVersion = Arrays.asList("2", "59", "0");

if (Comparators.lexicographical(Comparator.<String>naturalOrder())
.compare(requestedVersion, targetVersion)
< 0) {
useLegacyImplementation = true;
}
}
// Use expensive reshuffle of payloads for update compatibility and tweak how offsets are
// committed.
if (useLegacyImplementation) {
outputWithDescriptor =
outputWithDescriptor
.apply(Reshuffle.viaRandomKey())
.setCoder(
KvCoder.of(
input
.getPipeline()
.getSchemaRegistry()
.getSchemaCoder(KafkaSourceDescriptor.class),
recordCoder));
boolean applyCommitOffsets =
isCommitOffsetEnabled() && !configuredKafkaCommit() && !isRedistribute();
if (!applyCommitOffsets) {
return outputWithDescriptor
.apply(MapElements.into(new TypeDescriptor<KafkaRecord<K, V>>() {}).via(KV::getValue))
.setCoder(recordCoder);
}

// Add transform for committing offsets to Kafka with consistency with beam pipeline data
// processing.
String requestedVersionString =
input
.getPipeline()
.getOptions()
.as(StreamingOptions.class)
.getUpdateCompatibilityVersion();
if (requestedVersionString != null) {
List<String> requestedVersion = Arrays.asList(requestedVersionString.split("\\."));
List<String> targetVersion = Arrays.asList("2", "60", "0");

if (Comparators.lexicographical(Comparator.<String>naturalOrder())
.compare(requestedVersion, targetVersion)
< 0) {
return expand259Commits(
outputWithDescriptor, recordCoder, input.getPipeline().getSchemaRegistry());
}
outputWithDescriptor
.apply(new KafkaCommitOffset<>(this, useLegacyImplementation))
.setCoder(VoidCoder.of());
}
PCollection<KafkaRecord<K, V>> output =
outputWithDescriptor
.apply(
MapElements.into(new TypeDescriptor<KafkaRecord<K, V>>() {}).via(KV::getValue))
.setCoder(recordCoder);
return output;
outputWithDescriptor.apply(new KafkaCommitOffset<>(this, false)).setCoder(VoidCoder.of());
return outputWithDescriptor
.apply(MapElements.into(new TypeDescriptor<KafkaRecord<K, V>>() {}).via(KV::getValue))
.setCoder(recordCoder);
} catch (NoSuchSchemaException e) {
throw new RuntimeException(e.getMessage());
}
}

private PCollection<KafkaRecord<K, V>> expand259Commits(
PCollection<KV<KafkaSourceDescriptor, KafkaRecord<K, V>>> outputWithDescriptor,
Coder<KafkaRecord<K, V>> recordCoder,
SchemaRegistry schemaRegistry)
throws NoSuchSchemaException {
// Reshuffles the data and then branches off applying commit offsets.
outputWithDescriptor =
outputWithDescriptor
.apply(Reshuffle.viaRandomKey())
.setCoder(
KvCoder.of(
schemaRegistry.getSchemaCoder(KafkaSourceDescriptor.class), recordCoder));
outputWithDescriptor.apply(new KafkaCommitOffset<>(this, true)).setCoder(VoidCoder.of());
return outputWithDescriptor
.apply(MapElements.into(new TypeDescriptor<KafkaRecord<K, V>>() {}).via(KV::getValue))
.setCoder(recordCoder);
}

private Coder<K> getKeyCoder(CoderRegistry coderRegistry) {
return (getKeyCoder() != null)
? getKeyCoder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,12 @@ public class KafkaCommitOffsetTest {
new KafkaCommitOffsetMockConsumer(null, false);
private final KafkaCommitOffsetMockConsumer errorConsumer =
new KafkaCommitOffsetMockConsumer(null, true);
private final KafkaCommitOffsetMockConsumer compositeConsumerDefaultBootstrap =

private static final KafkaCommitOffsetMockConsumer COMPOSITE_CONSUMER =
new KafkaCommitOffsetMockConsumer(null, false);
private final KafkaCommitOffsetMockConsumer compositeConsumerBootstrapOverridden =
private static final KafkaCommitOffsetMockConsumer COMPOSITE_CONSUMER_BOOTSTRAP =
new KafkaCommitOffsetMockConsumer(null, false);

private static final Map<String, Object> configMap =
ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG, "group1");

Expand Down Expand Up @@ -112,8 +114,11 @@ public void testKafkaOffsetCompositeLegacy() throws CannotProvideCoderException
testKafkaOffsetHelper(true);
}

private void testKafkaOffsetHelper(boolean useLegacyImplementation)
private void testKafkaOffsetHelper(boolean use259Implementation)
throws CannotProvideCoderException {
COMPOSITE_CONSUMER.commitOffsets.clear();
COMPOSITE_CONSUMER_BOOTSTRAP.commitOffsets.clear();

ReadSourceDescriptors<String, String> descriptors =
ReadSourceDescriptors.<String, String>read()
.withBootstrapServers("bootstrap_server")
Expand All @@ -125,29 +130,31 @@ private void testKafkaOffsetHelper(boolean useLegacyImplementation)
if (input
.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)
.equals("bootstrap_server")) {
return compositeConsumerDefaultBootstrap;
return COMPOSITE_CONSUMER;
}
Assert.assertEquals(
"bootstrap_overridden",
input.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
return compositeConsumerBootstrapOverridden;
return COMPOSITE_CONSUMER_BOOTSTRAP;
});

String topic0 = "topic0_" + (use259Implementation ? "259" : "260");
String topic1 = "topic1_" + (use259Implementation ? "259" : "260");
KafkaSourceDescriptor d1 =
KafkaSourceDescriptor.of(new TopicPartition("topic", 0), null, null, null, null, null);
KafkaSourceDescriptor.of(new TopicPartition(topic0, 0), null, null, null, null, null);
KafkaSourceDescriptor d2 =
KafkaSourceDescriptor.of(new TopicPartition("topic", 1), null, null, null, null, null);
KafkaSourceDescriptor.of(new TopicPartition(topic0, 1), null, null, null, null, null);
KafkaSourceDescriptor d3 =
KafkaSourceDescriptor.of(
new TopicPartition("topic", 0),
new TopicPartition(topic1, 0),
null,
null,
null,
null,
ImmutableList.of("bootstrap_overridden"));
KafkaSourceDescriptor d4 =
KafkaSourceDescriptor.of(
new TopicPartition("topic1", 0),
new TopicPartition(topic1, 1),
null,
null,
null,
Expand All @@ -168,17 +175,17 @@ private void testKafkaOffsetHelper(boolean useLegacyImplementation)
KvCoder.of(
pipeline.getCoderRegistry().getCoder(KafkaSourceDescriptor.class),
KafkaRecordCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))));
input.apply(new KafkaCommitOffset<>(descriptors, useLegacyImplementation));
input.apply(new KafkaCommitOffset<>(descriptors, use259Implementation));
pipeline.run();

HashMap<TopicPartition, Long> expectedOffsets = new HashMap<>();
expectedOffsets.put(d1.getTopicPartition(), 101L);
expectedOffsets.put(d2.getTopicPartition(), 21L);
Assert.assertEquals(expectedOffsets, compositeConsumerDefaultBootstrap.commitOffsets);
Assert.assertEquals(expectedOffsets, COMPOSITE_CONSUMER.commitOffsets);
expectedOffsets.clear();
expectedOffsets.put(d3.getTopicPartition(), 31L);
expectedOffsets.put(d4.getTopicPartition(), 41L);
Assert.assertEquals(expectedOffsets, compositeConsumerBootstrapOverridden.commitOffsets);
Assert.assertEquals(expectedOffsets, COMPOSITE_CONSUMER_BOOTSTRAP.commitOffsets);
}

@Test
Expand Down

0 comments on commit 10d8cd9

Please sign in to comment.