Skip to content

Commit

Permalink
KAFKA-15561 [2/N]: Background event and subscription state changes fo…
Browse files Browse the repository at this point in the history
…r RE2J pattern (apache#17918)

Reviewers: David Jacot <[email protected]>
  • Loading branch information
lianetm authored and chiacyu committed Nov 30, 2024
1 parent 7072494 commit 2658c6f
Show file tree
Hide file tree
Showing 8 changed files with 214 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.apache.kafka.clients.consumer.internals.events.SyncCommitEvent;
import org.apache.kafka.clients.consumer.internals.events.TopicMetadataEvent;
import org.apache.kafka.clients.consumer.internals.events.TopicPatternSubscriptionChangeEvent;
import org.apache.kafka.clients.consumer.internals.events.TopicRe2JPatternSubscriptionChangeEvent;
import org.apache.kafka.clients.consumer.internals.events.TopicSubscriptionChangeEvent;
import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent;
import org.apache.kafka.clients.consumer.internals.events.UpdatePatternSubscriptionEvent;
Expand Down Expand Up @@ -1871,16 +1872,23 @@ private void subscribeInternal(Pattern pattern, Optional<ConsumerRebalanceListen

/**
* Subscribe to the RE2/J pattern. This will generate an event to update the pattern in the
* subscription, so it's included in a next heartbeat request sent to the broker. No validation of the pattern is
* performed by the client (other than null/empty checks).
* subscription state, so it's included in the next heartbeat request sent to the broker.
* No validation of the pattern is performed by the client (other than null/empty checks).
*/
private void subscribeToRegex(SubscriptionPattern pattern,
Optional<ConsumerRebalanceListener> listener) {
maybeThrowInvalidGroupIdException();
throwIfSubscriptionPatternIsInvalid(pattern);
log.info("Subscribing to regular expression {}", pattern);

// TODO: generate event to update subscribed regex so it's included in the next HB.
acquireAndEnsureOpen();
try {
maybeThrowInvalidGroupIdException();
throwIfSubscriptionPatternIsInvalid(pattern);
log.info("Subscribing to regular expression {}", pattern);
applicationEventHandler.addAndGet(new TopicRe2JPatternSubscriptionChangeEvent(
pattern,
listener,
calculateDeadlineMs(time.timer(defaultApiTimeoutMs))));
} finally {
release();
}
}

private void throwIfSubscriptionPatternIsInvalid(SubscriptionPattern subscriptionPattern) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.SubscriptionPattern;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.internals.PartitionStates;
Expand Down Expand Up @@ -75,7 +76,7 @@ public class SubscriptionState {
private final Logger log;

private enum SubscriptionType {
NONE, AUTO_TOPICS, AUTO_PATTERN, USER_ASSIGNED, AUTO_TOPICS_SHARE
NONE, AUTO_TOPICS, AUTO_PATTERN, AUTO_PATTERN_RE2J, USER_ASSIGNED, AUTO_TOPICS_SHARE
}

/* the type of subscription */
Expand All @@ -84,6 +85,9 @@ private enum SubscriptionType {
/* the pattern user has requested */
private Pattern subscribedPattern;

/* the Re2J pattern user has requested */
private SubscriptionPattern subscribedRe2JPattern;

/* the list of topics the user has requested */
private Set<String> subscription;

Expand All @@ -107,13 +111,21 @@ private enum SubscriptionType {
public synchronized String toString() {
return "SubscriptionState{" +
"type=" + subscriptionType +
", subscribedPattern=" + subscribedPattern +
", subscribedPattern=" + subscribedPatternInUse() +
", subscription=" + String.join(",", subscription) +
", groupSubscription=" + String.join(",", groupSubscription) +
", defaultResetStrategy=" + defaultResetStrategy +
", assignment=" + assignment.partitionStateValues() + " (id=" + assignmentId + ")}";
}

private Object subscribedPatternInUse() {
if (subscriptionType == SubscriptionType.AUTO_PATTERN_RE2J)
return subscribedRe2JPattern;
if (subscriptionType == SubscriptionType.AUTO_PATTERN)
return subscribedPattern;
return null;
}

public synchronized String prettyString() {
switch (subscriptionType) {
case NONE:
Expand All @@ -122,6 +134,8 @@ public synchronized String prettyString() {
return "Subscribe(" + String.join(",", subscription) + ")";
case AUTO_PATTERN:
return "Subscribe(" + subscribedPattern + ")";
case AUTO_PATTERN_RE2J:
return "Subscribe(" + subscribedRe2JPattern + ")";
case USER_ASSIGNED:
return "Assign(" + assignedPartitions() + " , id=" + assignmentId + ")";
case AUTO_TOPICS_SHARE:
Expand All @@ -138,6 +152,7 @@ public SubscriptionState(LogContext logContext, AutoOffsetResetStrategy defaultR
this.assignment = new PartitionStates<>();
this.groupSubscription = new HashSet<>();
this.subscribedPattern = null;
this.subscribedRe2JPattern = null;
this.subscriptionType = SubscriptionType.NONE;
}

Expand Down Expand Up @@ -176,6 +191,12 @@ public synchronized void subscribe(Pattern pattern, Optional<ConsumerRebalanceLi
this.subscribedPattern = pattern;
}

public synchronized void subscribe(SubscriptionPattern pattern, Optional<ConsumerRebalanceListener> listener) {
registerRebalanceListener(listener);
setSubscriptionType(SubscriptionType.AUTO_PATTERN_RE2J);
this.subscribedRe2JPattern = pattern;
}

public synchronized boolean subscribeFromPattern(Set<String> topics) {
if (subscriptionType != SubscriptionType.AUTO_PATTERN)
throw new IllegalArgumentException("Attempt to subscribe from pattern while subscription type set to " +
Expand Down Expand Up @@ -249,7 +270,11 @@ public synchronized boolean assignFromUser(Set<TopicPartition> partitions) {
}

/**
* @return true if assignments matches subscription, otherwise false
* Check if an assignment received while using the classic group protocol matches the subscription.
* Note that this only considers the subscribedPattern because this functionality is only used under the
* classic protocol, where subscribedRe2JPattern is not supported.
*
* @return true if assignments matches subscription, otherwise false.
*/
public synchronized boolean checkAssignmentMatchedSubscription(Collection<TopicPartition> assignments) {
for (TopicPartition topicPartition : assignments) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ public abstract class ApplicationEvent {
public enum Type {
COMMIT_ASYNC, COMMIT_SYNC, POLL, FETCH_COMMITTED_OFFSETS, NEW_TOPICS_METADATA_UPDATE, ASSIGNMENT_CHANGE,
LIST_OFFSETS, CHECK_AND_UPDATE_POSITIONS, RESET_OFFSET, TOPIC_METADATA, ALL_TOPICS_METADATA,
TOPIC_SUBSCRIPTION_CHANGE, TOPIC_PATTERN_SUBSCRIPTION_CHANGE, UPDATE_SUBSCRIPTION_METADATA,
UNSUBSCRIBE, CONSUMER_REBALANCE_LISTENER_CALLBACK_COMPLETED,
TOPIC_SUBSCRIPTION_CHANGE, TOPIC_PATTERN_SUBSCRIPTION_CHANGE, TOPIC_RE2J_PATTERN_SUBSCRIPTION_CHANGE,
UPDATE_SUBSCRIPTION_METADATA, UNSUBSCRIBE,
CONSUMER_REBALANCE_LISTENER_CALLBACK_COMPLETED,
COMMIT_ON_CLOSE, CREATE_FETCH_REQUESTS, LEAVE_GROUP_ON_CLOSE,
SHARE_FETCH, SHARE_ACKNOWLEDGE_ASYNC, SHARE_ACKNOWLEDGE_SYNC,
SHARE_SUBSCRIPTION_CHANGE, SHARE_UNSUBSCRIBE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ public void process(ApplicationEvent event) {
process((TopicPatternSubscriptionChangeEvent) event);
return;

case TOPIC_RE2J_PATTERN_SUBSCRIPTION_CHANGE:
process((TopicRe2JPatternSubscriptionChangeEvent) event);
return;

case UPDATE_SUBSCRIPTION_METADATA:
process((UpdatePatternSubscriptionEvent) event);
return;
Expand Down Expand Up @@ -300,9 +304,11 @@ private void process(final TopicSubscriptionChangeEvent event) {
}

/**
* Process event that indicates that the subscription topic pattern changed. This will make the
* consumer join the group if it is not part of it yet, or send the updated subscription if
* it is already a member on the next poll.
* Process event that indicates that the subscription java pattern changed.
* This will update the subscription state in the client to persist the new pattern.
* It will also evaluate the pattern against the latest metadata to find the matching topics,
* and send an updated subscription to the broker on the next poll
* (joining the group if it's not already part of it).
*/
private void process(final TopicPatternSubscriptionChangeEvent event) {
try {
Expand All @@ -315,6 +321,28 @@ private void process(final TopicPatternSubscriptionChangeEvent event) {
}
}

/**
* Process event that indicates that the subscription RE2J pattern changed.
* This will update the subscription state in the client to persist the new pattern.
* It will also make the consumer send the updated pattern on the next poll,
* joining the group if it's not already part of it.
* Note that this does not evaluate the pattern, it just passes it to the broker.
*/
private void process(final TopicRe2JPatternSubscriptionChangeEvent event) {
if (requestManagers.consumerMembershipManager.isEmpty()) {
event.future().completeExceptionally(
new KafkaException("MembershipManager is not available when processing a subscribe event"));
return;
}
try {
subscriptions.subscribe(event.pattern(), event.listener());
requestManagers.consumerMembershipManager.get().onSubscriptionUpdated();
event.future().complete(null);
} catch (Exception e) {
event.future().completeExceptionally(e);
}
}

/**
* Process event that re-evaluates the subscribed regular expression using the latest topics from metadata, only if metadata changed.
* This will make the consumer send the updated subscription on the next poll.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals.events;

import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.SubscriptionPattern;

import java.util.Optional;

/**
* Application event indicating triggered by a call to the subscribe API
* providing a {@link SubscriptionPattern} (RE2J-compatible pattern).
* This will make the consumer send the updated subscription to the
* broker on the next poll, joining the group if it is not already part of it.
*/
public class TopicRe2JPatternSubscriptionChangeEvent extends SubscriptionChangeEvent {
private final SubscriptionPattern pattern;

public TopicRe2JPatternSubscriptionChangeEvent(final SubscriptionPattern pattern,
final Optional<ConsumerRebalanceListener> listener,
final long deadlineMs) {
super(Type.TOPIC_RE2J_PATTERN_SUBSCRIPTION_CHANGE, listener, deadlineMs);
this.pattern = pattern;
}

public SubscriptionPattern pattern() {
return pattern;
}

@Override
public String toStringBase() {
return super.toStringBase() + ", subscriptionPattern=" + pattern;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.kafka.clients.consumer.internals.events.SeekUnvalidatedEvent;
import org.apache.kafka.clients.consumer.internals.events.SyncCommitEvent;
import org.apache.kafka.clients.consumer.internals.events.TopicPatternSubscriptionChangeEvent;
import org.apache.kafka.clients.consumer.internals.events.TopicRe2JPatternSubscriptionChangeEvent;
import org.apache.kafka.clients.consumer.internals.events.TopicSubscriptionChangeEvent;
import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent;
import org.apache.kafka.clients.consumer.internals.events.UpdatePatternSubscriptionEvent;
Expand Down Expand Up @@ -138,6 +139,7 @@
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
Expand Down Expand Up @@ -1864,6 +1866,27 @@ public void testSubscribeToRe2JPatternValidation() {
assertDoesNotThrow(() -> consumer.subscribe(new SubscriptionPattern("t*")));
}

@Test
public void testSubscribeToRe2JPatternThrowsIfNoGroupId() {
consumer = newConsumer(requiredConsumerConfig());
assertThrows(InvalidGroupIdException.class, () -> consumer.subscribe(new SubscriptionPattern("t*")));
assertThrows(InvalidGroupIdException.class, () -> consumer.subscribe(new SubscriptionPattern("t*"),
mock(ConsumerRebalanceListener.class)));
}

@Test
public void testSubscribeToRe2JPatternGeneratesEvent() {
consumer = newConsumer();
completeTopicRe2JPatternSubscriptionChangeEventSuccessfully();

consumer.subscribe(new SubscriptionPattern("t*"));
verify(applicationEventHandler).addAndGet(ArgumentMatchers.isA(TopicRe2JPatternSubscriptionChangeEvent.class));

clearInvocations(applicationEventHandler);
consumer.subscribe(new SubscriptionPattern("t*"), mock(ConsumerRebalanceListener.class));
verify(applicationEventHandler).addAndGet(ArgumentMatchers.isA(TopicRe2JPatternSubscriptionChangeEvent.class));
}

private Map<TopicPartition, OffsetAndMetadata> mockTopicPartitionOffset() {
final TopicPartition t0 = new TopicPartition("t0", 2);
final TopicPartition t1 = new TopicPartition("t0", 3);
Expand Down Expand Up @@ -1982,6 +2005,15 @@ private void completeTopicPatternSubscriptionChangeEventSuccessfully() {
}).when(applicationEventHandler).addAndGet(ArgumentMatchers.isA(TopicPatternSubscriptionChangeEvent.class));
}

private void completeTopicRe2JPatternSubscriptionChangeEventSuccessfully() {
doAnswer(invocation -> {
TopicRe2JPatternSubscriptionChangeEvent event = invocation.getArgument(0);
consumer.subscriptions().subscribe(event.pattern(), event.listener());
event.future().complete(null);
return null;
}).when(applicationEventHandler).addAndGet(ArgumentMatchers.isA(TopicRe2JPatternSubscriptionChangeEvent.class));
}

private void completeSeekUnvalidatedEventSuccessfully() {
doAnswer(invocation -> {
SeekUnvalidatedEvent event = invocation.getArgument(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.SubscriptionPattern;
import org.apache.kafka.clients.consumer.internals.SubscriptionState.LogTruncation;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.Node;
Expand Down Expand Up @@ -397,6 +398,27 @@ public void patternSubscription() {
assertEquals(2, state.subscription().size(), "Expected subscribed topics count is incorrect");
}

@Test
public void testSubscribeToRe2JPattern() {
String pattern = "t*";
state.subscribe(new SubscriptionPattern(pattern), Optional.of(rebalanceListener));
assertTrue(state.toString().contains("type=AUTO_PATTERN_RE2J"));
assertTrue(state.toString().contains("subscribedPattern=" + pattern));
}

@Test
public void testMixedPatternSubscriptionNotAllowed() {
state.subscribe(Pattern.compile(".*"), Optional.of(rebalanceListener));
assertThrows(IllegalStateException.class, () -> state.subscribe(new SubscriptionPattern("t*"),
Optional.of(rebalanceListener)));

state.unsubscribe();

state.subscribe(new SubscriptionPattern("t*"), Optional.of(rebalanceListener));
assertThrows(IllegalStateException.class, () -> state.subscribe(Pattern.compile(".*"), Optional.of(rebalanceListener)));
}


@Test
public void unsubscribeUserAssignment() {
state.assignFromUser(new HashSet<>(Arrays.asList(tp0, tp1)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.SubscriptionPattern;
import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy;
import org.apache.kafka.clients.consumer.internals.CommitRequestManager;
import org.apache.kafka.clients.consumer.internals.ConsumerHeartbeatRequestManager;
Expand Down Expand Up @@ -382,6 +383,40 @@ public void testUpdatePatternSubscriptionEventOnlyTakesEffectWhenMetadataHasNewV
assertDoesNotThrow(() -> event2.future().get());
}

@Test
public void testR2JPatternSubscriptionEventSuccess() {
SubscriptionPattern pattern = new SubscriptionPattern("t*");
Optional<ConsumerRebalanceListener> listener = Optional.of(mock(ConsumerRebalanceListener.class));
TopicRe2JPatternSubscriptionChangeEvent event =
new TopicRe2JPatternSubscriptionChangeEvent(pattern, listener, 12345);

setupProcessor(true);
processor.process(event);

verify(subscriptionState).subscribe(pattern, listener);
verify(subscriptionState, never()).subscribeFromPattern(any());
verify(membershipManager).onSubscriptionUpdated();
assertDoesNotThrow(() -> event.future().get());
}

@Test
public void testR2JPatternSubscriptionEventFailureWithMixedSubscriptionType() {
SubscriptionPattern pattern = new SubscriptionPattern("t*");
Optional<ConsumerRebalanceListener> listener = Optional.of(mock(ConsumerRebalanceListener.class));
TopicRe2JPatternSubscriptionChangeEvent event =
new TopicRe2JPatternSubscriptionChangeEvent(pattern, listener, 12345);
Exception mixedSubscriptionError = new IllegalStateException("Subscription to topics, partitions and " +
"pattern are mutually exclusive");
doThrow(mixedSubscriptionError).when(subscriptionState).subscribe(pattern, listener);

setupProcessor(true);
processor.process(event);

verify(subscriptionState).subscribe(pattern, listener);
Exception thrown = assertFutureThrows(event.future(), mixedSubscriptionError.getClass());
assertEquals(mixedSubscriptionError, thrown);
}

@ParameterizedTest
@MethodSource("offsetsGenerator")
public void testSyncCommitEvent(Optional<Map<TopicPartition, OffsetAndMetadata>> offsets) {
Expand Down

0 comments on commit 2658c6f

Please sign in to comment.