-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix Kafka with Redistribute #32344
Fix Kafka with Redistribute #32344
Changes from 4 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -88,6 +88,7 @@ | |
import org.apache.beam.sdk.metrics.SinkMetrics; | ||
import org.apache.beam.sdk.metrics.SourceMetrics; | ||
import org.apache.beam.sdk.options.PipelineOptionsFactory; | ||
import org.apache.beam.sdk.options.StreamingOptions; | ||
import org.apache.beam.sdk.testing.ExpectedLogs; | ||
import org.apache.beam.sdk.testing.PAssert; | ||
import org.apache.beam.sdk.testing.TestPipeline; | ||
|
@@ -381,7 +382,13 @@ public Consumer<byte[], byte[]> apply(Map<String, Object> config) { | |
|
||
static KafkaIO.Read<Integer, Long> mkKafkaReadTransform( | ||
int numElements, @Nullable SerializableFunction<KV<Integer, Long>, Instant> timestampFn) { | ||
return mkKafkaReadTransform(numElements, numElements, timestampFn, false, 0); | ||
return mkKafkaReadTransform( | ||
numElements, | ||
numElements, | ||
timestampFn, | ||
false, /*redistribute*/ | ||
false, /*allowDuplicates*/ | ||
0); | ||
} | ||
|
||
/** | ||
|
@@ -393,6 +400,7 @@ static KafkaIO.Read<Integer, Long> mkKafkaReadTransform( | |
@Nullable Integer maxNumRecords, | ||
@Nullable SerializableFunction<KV<Integer, Long>, Instant> timestampFn, | ||
@Nullable Boolean redistribute, | ||
@Nullable Boolean withAllowDuplicates, | ||
@Nullable Integer numKeys) { | ||
|
||
KafkaIO.Read<Integer, Long> reader = | ||
|
@@ -408,13 +416,21 @@ static KafkaIO.Read<Integer, Long> mkKafkaReadTransform( | |
reader = reader.withMaxNumRecords(maxNumRecords); | ||
} | ||
|
||
if (withAllowDuplicates == null) { | ||
withAllowDuplicates = false; | ||
} | ||
|
||
if (timestampFn != null) { | ||
reader = reader.withTimestampFn(timestampFn); | ||
} | ||
|
||
if (redistribute) { | ||
if (numKeys != null) { | ||
reader = reader.withRedistribute().withRedistributeNumKeys(numKeys); | ||
reader = | ||
reader | ||
.withRedistribute() | ||
.withAllowDuplicates(withAllowDuplicates) | ||
.withRedistributeNumKeys(numKeys); | ||
} | ||
reader = reader.withRedistribute(); | ||
} | ||
|
@@ -628,17 +644,47 @@ public void testRiskyConfigurationWarnsProperly() { | |
} | ||
|
||
@Test | ||
public void testCommitOffsetsInFinalizeAndRedistributeErrors() { | ||
thrown.expect(Exception.class); | ||
thrown.expectMessage("commitOffsetsInFinalize() can't be enabled with isRedistributed"); | ||
public void testCommitOffsetsInFinalizeAndRedistributeWarningsWithAllowDuplicates() { | ||
int numElements = 1000; | ||
|
||
PCollection<Long> input = | ||
p.apply( | ||
mkKafkaReadTransform( | ||
numElements, | ||
numElements, | ||
new ValueAsTimestampFn(), | ||
true, /*redistribute*/ | ||
true, /*allowDuplicates*/ | ||
0) | ||
.commitOffsetsInFinalize() | ||
.withConsumerConfigUpdates( | ||
ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG, "group_id")) | ||
.withoutMetadata()) | ||
.apply(Values.create()); | ||
|
||
addCountingAsserts(input, numElements); | ||
p.run(); | ||
|
||
kafkaIOExpectedLogs.verifyWarn( | ||
"Offsets committed due to usage of commitOffsetsInFinalize() may not capture all work processed due to use of withRedistribute()"); | ||
} | ||
|
||
@Test | ||
public void testCommitOffsetsInFinalizeAndRedistributeNoWarningsWithAllowDuplicates() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. seems like it should be NoWarningsWithNoAllowDuplicates There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I do want to mention commits are enabled, since if we do not commit offsets, enabling allow duplicates also has no warning. |
||
int numElements = 1000; | ||
|
||
PCollection<Long> input = | ||
p.apply( | ||
mkKafkaReadTransform(numElements, numElements, new ValueAsTimestampFn(), true, 0) | ||
mkKafkaReadTransform( | ||
numElements, | ||
numElements, | ||
new ValueAsTimestampFn(), | ||
true, /*redistribute*/ | ||
false, /*allowDuplicates*/ | ||
0) | ||
.commitOffsetsInFinalize() | ||
.withConsumerConfigUpdates( | ||
ImmutableMap.of(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true)) | ||
ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG, "group_id")) | ||
.withoutMetadata()) | ||
.apply(Values.create()); | ||
|
||
|
@@ -648,13 +694,25 @@ public void testCommitOffsetsInFinalizeAndRedistributeErrors() { | |
|
||
@Test | ||
public void testNumKeysIgnoredWithRedistributeNotEnabled() { | ||
thrown.expect(Exception.class); | ||
thrown.expectMessage( | ||
"withRedistributeNumKeys is ignored if withRedistribute() is not enabled on the transform"); | ||
|
||
int numElements = 1000; | ||
|
||
PCollection<Long> input = | ||
p.apply( | ||
mkKafkaReadTransform(numElements, numElements, new ValueAsTimestampFn(), false, 0) | ||
mkKafkaReadTransform( | ||
numElements, | ||
numElements, | ||
new ValueAsTimestampFn(), | ||
false, /*redistribute*/ | ||
false, /*allowDuplicates*/ | ||
0) | ||
.withRedistributeNumKeys(100) | ||
.commitOffsetsInFinalize() | ||
.withConsumerConfigUpdates( | ||
ImmutableMap.of(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true)) | ||
ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG, "group_id")) | ||
.withoutMetadata()) | ||
.apply(Values.create()); | ||
|
||
|
@@ -663,6 +721,32 @@ public void testNumKeysIgnoredWithRedistributeNotEnabled() { | |
p.run(); | ||
} | ||
|
||
@Test | ||
public void testDisableRedistributeKafkaOffsetLegacy() { | ||
thrown.expect(Exception.class); | ||
thrown.expectMessage( | ||
"Can not enable isRedistribute() while committing offsets prior to 2.60.0"); | ||
p.getOptions().as(StreamingOptions.class).setUpdateCompatibilityVersion("2.59.0"); | ||
|
||
p.apply( | ||
Create.of( | ||
KafkaSourceDescriptor.of( | ||
new TopicPartition("topic", 1), | ||
null, | ||
null, | ||
null, | ||
null, | ||
ImmutableList.of("8.8.8.8:9092")))) | ||
.apply( | ||
KafkaIO.<Long, Long>readSourceDescriptors() | ||
.withKeyDeserializer(LongDeserializer.class) | ||
.withValueDeserializer(LongDeserializer.class) | ||
.withRedistribute() | ||
.withProcessingTime() | ||
.commitOffsets()); | ||
p.run(); | ||
} | ||
|
||
@Test | ||
public void testUnreachableKafkaBrokers() { | ||
// Expect an exception when the Kafka brokers are not reachable on the workers. | ||
|
@@ -1982,7 +2066,13 @@ public void testUnboundedSourceStartReadTime() { | |
|
||
PCollection<Long> input = | ||
p.apply( | ||
mkKafkaReadTransform(numElements, maxNumRecords, new ValueAsTimestampFn(), false, 0) | ||
mkKafkaReadTransform( | ||
numElements, | ||
maxNumRecords, | ||
new ValueAsTimestampFn(), | ||
false, /*redistribute*/ | ||
false, /*allowDuplicates*/ | ||
0) | ||
.withStartReadTime(new Instant(startTime)) | ||
.withoutMetadata()) | ||
.apply(Values.create()); | ||
|
@@ -2006,7 +2096,13 @@ public void testUnboundedSourceStartReadTimeException() { | |
int startTime = numElements / 20; | ||
|
||
p.apply( | ||
mkKafkaReadTransform(numElements, numElements, new ValueAsTimestampFn(), false, 0) | ||
mkKafkaReadTransform( | ||
numElements, | ||
numElements, | ||
new ValueAsTimestampFn(), | ||
false, /*redistribute*/ | ||
false, /*allowDuplicates*/ | ||
0) | ||
.withStartReadTime(new Instant(startTime)) | ||
.withoutMetadata()) | ||
.apply(Values.create()); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
update the log to reflect allow duplicates.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.