diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index 18615babaa64..04af4e450c77 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -62,6 +62,7 @@ import org.apache.kafka.clients.consumer.internals.events.SyncCommitEvent; import org.apache.kafka.clients.consumer.internals.events.TopicMetadataEvent; import org.apache.kafka.clients.consumer.internals.events.TopicPatternSubscriptionChangeEvent; +import org.apache.kafka.clients.consumer.internals.events.TopicRe2JPatternSubscriptionChangeEvent; import org.apache.kafka.clients.consumer.internals.events.TopicSubscriptionChangeEvent; import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent; import org.apache.kafka.clients.consumer.internals.events.UpdatePatternSubscriptionEvent; @@ -1871,16 +1872,23 @@ private void subscribeInternal(Pattern pattern, Optional listener) { - maybeThrowInvalidGroupIdException(); - throwIfSubscriptionPatternIsInvalid(pattern); - log.info("Subscribing to regular expression {}", pattern); - - // TODO: generate event to update subscribed regex so it's included in the next HB. + acquireAndEnsureOpen(); + try { + maybeThrowInvalidGroupIdException(); + throwIfSubscriptionPatternIsInvalid(pattern); + log.info("Subscribing to regular expression {}", pattern); + applicationEventHandler.addAndGet(new TopicRe2JPatternSubscriptionChangeEvent( + pattern, + listener, + calculateDeadlineMs(time.timer(defaultApiTimeoutMs)))); + } finally { + release(); + } } private void throwIfSubscriptionPatternIsInvalid(SubscriptionPattern subscriptionPattern) { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java index 661fc0a198e8..a51383d39fa5 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java @@ -22,6 +22,7 @@ import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.NoOffsetForPartitionException; import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.SubscriptionPattern; import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.internals.PartitionStates; @@ -75,7 +76,7 @@ public class SubscriptionState { private final Logger log; private enum SubscriptionType { - NONE, AUTO_TOPICS, AUTO_PATTERN, USER_ASSIGNED, AUTO_TOPICS_SHARE + NONE, AUTO_TOPICS, AUTO_PATTERN, AUTO_PATTERN_RE2J, USER_ASSIGNED, AUTO_TOPICS_SHARE } /* the type of subscription */ @@ -84,6 +85,9 @@ private enum SubscriptionType { /* the pattern user has requested */ private Pattern subscribedPattern; + /* the Re2J pattern user has requested */ + private SubscriptionPattern subscribedRe2JPattern; + /* the list of topics the user has requested */ private Set subscription; @@ -107,13 +111,21 @@ private enum SubscriptionType { public synchronized String toString() { return "SubscriptionState{" + "type=" + subscriptionType + - ", subscribedPattern=" + subscribedPattern + + ", subscribedPattern=" + subscribedPatternInUse() + ", subscription=" + String.join(",", subscription) + ", groupSubscription=" + String.join(",", groupSubscription) + ", defaultResetStrategy=" + defaultResetStrategy + ", assignment=" + assignment.partitionStateValues() + " (id=" + assignmentId + ")}"; } + private Object subscribedPatternInUse() { + if (subscriptionType == SubscriptionType.AUTO_PATTERN_RE2J) + return subscribedRe2JPattern; + if (subscriptionType == SubscriptionType.AUTO_PATTERN) + return subscribedPattern; + return null; + } + public synchronized String prettyString() { switch (subscriptionType) { case NONE: @@ -122,6 +134,8 @@ public synchronized String prettyString() { return "Subscribe(" + String.join(",", subscription) + ")"; case AUTO_PATTERN: return "Subscribe(" + subscribedPattern + ")"; + case AUTO_PATTERN_RE2J: + return "Subscribe(" + subscribedRe2JPattern + ")"; case USER_ASSIGNED: return "Assign(" + assignedPartitions() + " , id=" + assignmentId + ")"; case AUTO_TOPICS_SHARE: @@ -138,6 +152,7 @@ public SubscriptionState(LogContext logContext, AutoOffsetResetStrategy defaultR this.assignment = new PartitionStates<>(); this.groupSubscription = new HashSet<>(); this.subscribedPattern = null; + this.subscribedRe2JPattern = null; this.subscriptionType = SubscriptionType.NONE; } @@ -176,6 +191,12 @@ public synchronized void subscribe(Pattern pattern, Optional listener) { + registerRebalanceListener(listener); + setSubscriptionType(SubscriptionType.AUTO_PATTERN_RE2J); + this.subscribedRe2JPattern = pattern; + } + public synchronized boolean subscribeFromPattern(Set topics) { if (subscriptionType != SubscriptionType.AUTO_PATTERN) throw new IllegalArgumentException("Attempt to subscribe from pattern while subscription type set to " + @@ -249,7 +270,11 @@ public synchronized boolean assignFromUser(Set partitions) { } /** - * @return true if assignments matches subscription, otherwise false + * Check if an assignment received while using the classic group protocol matches the subscription. + * Note that this only considers the subscribedPattern because this functionality is only used under the + * classic protocol, where subscribedRe2JPattern is not supported. + * + * @return true if assignments matches subscription, otherwise false. */ public synchronized boolean checkAssignmentMatchedSubscription(Collection assignments) { for (TopicPartition topicPartition : assignments) { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java index 9bd229a3fe80..406ca55b79b4 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java @@ -31,8 +31,9 @@ public abstract class ApplicationEvent { public enum Type { COMMIT_ASYNC, COMMIT_SYNC, POLL, FETCH_COMMITTED_OFFSETS, NEW_TOPICS_METADATA_UPDATE, ASSIGNMENT_CHANGE, LIST_OFFSETS, CHECK_AND_UPDATE_POSITIONS, RESET_OFFSET, TOPIC_METADATA, ALL_TOPICS_METADATA, - TOPIC_SUBSCRIPTION_CHANGE, TOPIC_PATTERN_SUBSCRIPTION_CHANGE, UPDATE_SUBSCRIPTION_METADATA, - UNSUBSCRIBE, CONSUMER_REBALANCE_LISTENER_CALLBACK_COMPLETED, + TOPIC_SUBSCRIPTION_CHANGE, TOPIC_PATTERN_SUBSCRIPTION_CHANGE, TOPIC_RE2J_PATTERN_SUBSCRIPTION_CHANGE, + UPDATE_SUBSCRIPTION_METADATA, UNSUBSCRIBE, + CONSUMER_REBALANCE_LISTENER_CALLBACK_COMPLETED, COMMIT_ON_CLOSE, CREATE_FETCH_REQUESTS, LEAVE_GROUP_ON_CLOSE, SHARE_FETCH, SHARE_ACKNOWLEDGE_ASYNC, SHARE_ACKNOWLEDGE_SYNC, SHARE_SUBSCRIPTION_CHANGE, SHARE_UNSUBSCRIBE, diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java index c9735617abbb..2a84c18e234b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java @@ -120,6 +120,10 @@ public void process(ApplicationEvent event) { process((TopicPatternSubscriptionChangeEvent) event); return; + case TOPIC_RE2J_PATTERN_SUBSCRIPTION_CHANGE: + process((TopicRe2JPatternSubscriptionChangeEvent) event); + return; + case UPDATE_SUBSCRIPTION_METADATA: process((UpdatePatternSubscriptionEvent) event); return; @@ -300,9 +304,11 @@ private void process(final TopicSubscriptionChangeEvent event) { } /** - * Process event that indicates that the subscription topic pattern changed. This will make the - * consumer join the group if it is not part of it yet, or send the updated subscription if - * it is already a member on the next poll. + * Process event that indicates that the subscription java pattern changed. + * This will update the subscription state in the client to persist the new pattern. + * It will also evaluate the pattern against the latest metadata to find the matching topics, + * and send an updated subscription to the broker on the next poll + * (joining the group if it's not already part of it). */ private void process(final TopicPatternSubscriptionChangeEvent event) { try { @@ -315,6 +321,28 @@ private void process(final TopicPatternSubscriptionChangeEvent event) { } } + /** + * Process event that indicates that the subscription RE2J pattern changed. + * This will update the subscription state in the client to persist the new pattern. + * It will also make the consumer send the updated pattern on the next poll, + * joining the group if it's not already part of it. + * Note that this does not evaluate the pattern, it just passes it to the broker. + */ + private void process(final TopicRe2JPatternSubscriptionChangeEvent event) { + if (requestManagers.consumerMembershipManager.isEmpty()) { + event.future().completeExceptionally( + new KafkaException("MembershipManager is not available when processing a subscribe event")); + return; + } + try { + subscriptions.subscribe(event.pattern(), event.listener()); + requestManagers.consumerMembershipManager.get().onSubscriptionUpdated(); + event.future().complete(null); + } catch (Exception e) { + event.future().completeExceptionally(e); + } + } + /** * Process event that re-evaluates the subscribed regular expression using the latest topics from metadata, only if metadata changed. * This will make the consumer send the updated subscription on the next poll. diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/TopicRe2JPatternSubscriptionChangeEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/TopicRe2JPatternSubscriptionChangeEvent.java new file mode 100644 index 000000000000..37e707c7b8b1 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/TopicRe2JPatternSubscriptionChangeEvent.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals.events; + +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.SubscriptionPattern; + +import java.util.Optional; + +/** + * Application event indicating triggered by a call to the subscribe API + * providing a {@link SubscriptionPattern} (RE2J-compatible pattern). + * This will make the consumer send the updated subscription to the + * broker on the next poll, joining the group if it is not already part of it. + */ +public class TopicRe2JPatternSubscriptionChangeEvent extends SubscriptionChangeEvent { + private final SubscriptionPattern pattern; + + public TopicRe2JPatternSubscriptionChangeEvent(final SubscriptionPattern pattern, + final Optional listener, + final long deadlineMs) { + super(Type.TOPIC_RE2J_PATTERN_SUBSCRIPTION_CHANGE, listener, deadlineMs); + this.pattern = pattern; + } + + public SubscriptionPattern pattern() { + return pattern; + } + + @Override + public String toStringBase() { + return super.toStringBase() + ", subscriptionPattern=" + pattern; + } +} diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java index 93424846b990..62e3f23caf98 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java @@ -51,6 +51,7 @@ import org.apache.kafka.clients.consumer.internals.events.SeekUnvalidatedEvent; import org.apache.kafka.clients.consumer.internals.events.SyncCommitEvent; import org.apache.kafka.clients.consumer.internals.events.TopicPatternSubscriptionChangeEvent; +import org.apache.kafka.clients.consumer.internals.events.TopicRe2JPatternSubscriptionChangeEvent; import org.apache.kafka.clients.consumer.internals.events.TopicSubscriptionChangeEvent; import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent; import org.apache.kafka.clients.consumer.internals.events.UpdatePatternSubscriptionEvent; @@ -138,6 +139,7 @@ import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.clearInvocations; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; @@ -1864,6 +1866,27 @@ public void testSubscribeToRe2JPatternValidation() { assertDoesNotThrow(() -> consumer.subscribe(new SubscriptionPattern("t*"))); } + @Test + public void testSubscribeToRe2JPatternThrowsIfNoGroupId() { + consumer = newConsumer(requiredConsumerConfig()); + assertThrows(InvalidGroupIdException.class, () -> consumer.subscribe(new SubscriptionPattern("t*"))); + assertThrows(InvalidGroupIdException.class, () -> consumer.subscribe(new SubscriptionPattern("t*"), + mock(ConsumerRebalanceListener.class))); + } + + @Test + public void testSubscribeToRe2JPatternGeneratesEvent() { + consumer = newConsumer(); + completeTopicRe2JPatternSubscriptionChangeEventSuccessfully(); + + consumer.subscribe(new SubscriptionPattern("t*")); + verify(applicationEventHandler).addAndGet(ArgumentMatchers.isA(TopicRe2JPatternSubscriptionChangeEvent.class)); + + clearInvocations(applicationEventHandler); + consumer.subscribe(new SubscriptionPattern("t*"), mock(ConsumerRebalanceListener.class)); + verify(applicationEventHandler).addAndGet(ArgumentMatchers.isA(TopicRe2JPatternSubscriptionChangeEvent.class)); + } + private Map mockTopicPartitionOffset() { final TopicPartition t0 = new TopicPartition("t0", 2); final TopicPartition t1 = new TopicPartition("t0", 3); @@ -1982,6 +2005,15 @@ private void completeTopicPatternSubscriptionChangeEventSuccessfully() { }).when(applicationEventHandler).addAndGet(ArgumentMatchers.isA(TopicPatternSubscriptionChangeEvent.class)); } + private void completeTopicRe2JPatternSubscriptionChangeEventSuccessfully() { + doAnswer(invocation -> { + TopicRe2JPatternSubscriptionChangeEvent event = invocation.getArgument(0); + consumer.subscriptions().subscribe(event.pattern(), event.listener()); + event.future().complete(null); + return null; + }).when(applicationEventHandler).addAndGet(ArgumentMatchers.isA(TopicRe2JPatternSubscriptionChangeEvent.class)); + } + private void completeSeekUnvalidatedEventSuccessfully() { doAnswer(invocation -> { SeekUnvalidatedEvent event = invocation.getArgument(0); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java index 6854cbdf6ea4..a4a020a22f3d 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java @@ -21,6 +21,7 @@ import org.apache.kafka.clients.NodeApiVersions; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.SubscriptionPattern; import org.apache.kafka.clients.consumer.internals.SubscriptionState.LogTruncation; import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.common.Node; @@ -397,6 +398,27 @@ public void patternSubscription() { assertEquals(2, state.subscription().size(), "Expected subscribed topics count is incorrect"); } + @Test + public void testSubscribeToRe2JPattern() { + String pattern = "t*"; + state.subscribe(new SubscriptionPattern(pattern), Optional.of(rebalanceListener)); + assertTrue(state.toString().contains("type=AUTO_PATTERN_RE2J")); + assertTrue(state.toString().contains("subscribedPattern=" + pattern)); + } + + @Test + public void testMixedPatternSubscriptionNotAllowed() { + state.subscribe(Pattern.compile(".*"), Optional.of(rebalanceListener)); + assertThrows(IllegalStateException.class, () -> state.subscribe(new SubscriptionPattern("t*"), + Optional.of(rebalanceListener))); + + state.unsubscribe(); + + state.subscribe(new SubscriptionPattern("t*"), Optional.of(rebalanceListener)); + assertThrows(IllegalStateException.class, () -> state.subscribe(Pattern.compile(".*"), Optional.of(rebalanceListener))); + } + + @Test public void unsubscribeUserAssignment() { state.assignFromUser(new HashSet<>(Arrays.asList(tp0, tp1))); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java index dfd67ce4710a..bdbda721b00d 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java @@ -19,6 +19,7 @@ import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.SubscriptionPattern; import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy; import org.apache.kafka.clients.consumer.internals.CommitRequestManager; import org.apache.kafka.clients.consumer.internals.ConsumerHeartbeatRequestManager; @@ -382,6 +383,40 @@ public void testUpdatePatternSubscriptionEventOnlyTakesEffectWhenMetadataHasNewV assertDoesNotThrow(() -> event2.future().get()); } + @Test + public void testR2JPatternSubscriptionEventSuccess() { + SubscriptionPattern pattern = new SubscriptionPattern("t*"); + Optional listener = Optional.of(mock(ConsumerRebalanceListener.class)); + TopicRe2JPatternSubscriptionChangeEvent event = + new TopicRe2JPatternSubscriptionChangeEvent(pattern, listener, 12345); + + setupProcessor(true); + processor.process(event); + + verify(subscriptionState).subscribe(pattern, listener); + verify(subscriptionState, never()).subscribeFromPattern(any()); + verify(membershipManager).onSubscriptionUpdated(); + assertDoesNotThrow(() -> event.future().get()); + } + + @Test + public void testR2JPatternSubscriptionEventFailureWithMixedSubscriptionType() { + SubscriptionPattern pattern = new SubscriptionPattern("t*"); + Optional listener = Optional.of(mock(ConsumerRebalanceListener.class)); + TopicRe2JPatternSubscriptionChangeEvent event = + new TopicRe2JPatternSubscriptionChangeEvent(pattern, listener, 12345); + Exception mixedSubscriptionError = new IllegalStateException("Subscription to topics, partitions and " + + "pattern are mutually exclusive"); + doThrow(mixedSubscriptionError).when(subscriptionState).subscribe(pattern, listener); + + setupProcessor(true); + processor.process(event); + + verify(subscriptionState).subscribe(pattern, listener); + Exception thrown = assertFutureThrows(event.future(), mixedSubscriptionError.getClass()); + assertEquals(mixedSubscriptionError, thrown); + } + @ParameterizedTest @MethodSource("offsetsGenerator") public void testSyncCommitEvent(Optional> offsets) {