diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java index bc6a1e9a782d6..dc731b8bfe57a 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java @@ -866,4 +866,11 @@ default void skipNonRecoverableLedger(long ledgerId){} * @return whether this cursor is closed. */ boolean isClosed(); + + /** + * Called by the system to trigger perdiodic rollover in absence of activity. + */ + default boolean periodicRollover() { + return false; + } } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java index f91d9ec3f5a02..955a0d7850275 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java @@ -631,6 +631,11 @@ void asyncSetProperties(Map properties, AsyncCallbacks.UpdatePro */ void trimConsumedLedgersInBackground(CompletableFuture promise); + /** + * Rollover cursors in background if needed. + */ + default void rolloverCursorsInBackground() {} + /** * If a ledger is lost, this ledger will be skipped after enabled "autoSkipNonRecoverableData", and the method is * used to delete information about this ledger in the ManagedCursor. diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index f67f534f86d45..fa51f6d367f64 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -3093,12 +3093,7 @@ void persistPositionToLedger(final LedgerHandle lh, MarkDeleteEntry mdEntry, fin lh1.getId()); } - if (shouldCloseLedger(lh1)) { - if (log.isDebugEnabled()) { - log.debug("[{}] Need to create new metadata ledger for cursor {}", ledger.getName(), name); - } - startCreatingNewMetadataLedger(); - } + rolloverLedgerIfNeeded(lh1); mbean.persistToLedger(true); mbean.addWriteCursorLedgerSize(data.length); @@ -3116,6 +3111,35 @@ void persistPositionToLedger(final LedgerHandle lh, MarkDeleteEntry mdEntry, fin }, null); } + public boolean periodicRollover() { + LedgerHandle lh = cursorLedger; + if (State.Open.equals(STATE_UPDATER.get(this)) + && lh != null && lh.getLength() > 0) { + boolean triggered = rolloverLedgerIfNeeded(lh); + if (triggered) { + log.info("[{}] Periodic rollover triggered for cursor {} (length={} bytes)", + ledger.getName(), name, lh.getLength()); + } else { + log.debug("[{}] Periodic rollover skipped for cursor {} (length={} bytes)", + ledger.getName(), name, lh.getLength()); + + } + return triggered; + } + return false; + } + + boolean rolloverLedgerIfNeeded(LedgerHandle lh1) { + if (shouldCloseLedger(lh1)) { + if (log.isDebugEnabled()) { + log.debug("[{}] Need to create new metadata ledger for cursor {}", ledger.getName(), name); + } + startCreatingNewMetadataLedger(); + return true; + } + return false; + } + void persistPositionToMetaStore(MarkDeleteEntry mdEntry, final VoidCallback callback) { final PositionImpl newPosition = mdEntry.newPosition; STATE_UPDATER.compareAndSet(ManagedCursorImpl.this, State.Open, State.NoLedger); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 10405134be939..1835b4aecd1bc 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -2828,6 +2828,15 @@ public void operationFailed(MetaStoreException e) { } } + @Override + public void rolloverCursorsInBackground() { + if (cursors.hasDurableCursors()) { + executor.execute(() -> { + cursors.forEach(ManagedCursor::periodicRollover); + }); + } + } + protected void doDeleteLedgers(List ledgersToDelete) { PositionImpl currentLastConfirmedEntry = lastConfirmedEntry; // Update metadata @@ -4530,4 +4539,4 @@ public Position getTheSlowestNonDurationReadPosition() { } return theSlowestNonDurableReadPosition; } -} \ No newline at end of file +} diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java index 7b59c3903d5bc..a79ba3fb5e23b 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java @@ -209,8 +209,8 @@ public void recycle() { entries = null; nextReadPosition = null; maxPosition = null; - recyclerHandle.recycle(this); skipCondition = null; + recyclerHandle.recycle(this); } private static final Logger log = LoggerFactory.getLogger(OpReadEntry.class); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java index 0281c8cdd88e3..bb505200ba75e 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java @@ -53,6 +53,7 @@ import org.apache.bookkeeper.mledger.util.ThrowableToStringUtil; import org.apache.bookkeeper.test.BookKeeperClusterTestCase; import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats; +import org.awaitility.Awaitility; import org.testng.annotations.Test; public class ManagedLedgerBkTest extends BookKeeperClusterTestCase { @@ -548,6 +549,42 @@ public void testChangeCrcType() throws Exception { } } + @Test + public void testPeriodicRollover() throws Exception { + ManagedLedgerFactoryConfig factoryConf = new ManagedLedgerFactoryConfig(); + factoryConf.setMaxCacheSize(0); + + int rolloverTimeForCursorInSeconds = 5; + + @Cleanup("shutdown") + ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc, factoryConf); + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setEnsembleSize(1).setWriteQuorumSize(1).setAckQuorumSize(1).setMetadataEnsembleSize(1) + .setMetadataAckQuorumSize(1) + .setLedgerRolloverTimeout(rolloverTimeForCursorInSeconds); + ManagedLedger ledger = factory.open("my-ledger" + testName, config); + ManagedCursor cursor = ledger.openCursor("c1"); + + Position pos = ledger.addEntry("entry-0".getBytes()); + ledger.addEntry("entry-1".getBytes()); + + List entries = cursor.readEntries(2); + assertEquals(2, entries.size()); + entries.forEach(Entry::release); + ManagedCursorImpl cursorImpl = (ManagedCursorImpl) cursor; + assertEquals(ManagedCursorImpl.State.NoLedger, cursorImpl.state); + + // this creates the ledger + cursor.delete(pos); + + Awaitility.await().until(() -> cursorImpl.state == ManagedCursorImpl.State.Open); + + Thread.sleep(rolloverTimeForCursorInSeconds * 1000 + 1000); + + long currentLedgerId = cursorImpl.getCursorLedger(); + assertTrue(cursor.periodicRollover()); + Awaitility.await().until(() -> cursorImpl.getCursorLedger() != currentLedgerId); + } } diff --git a/pom.xml b/pom.xml index 80a330f88a6db..bb045018eb0b2 100644 --- a/pom.xml +++ b/pom.xml @@ -136,7 +136,7 @@ flexible messaging model and an intuitive client API. 1.26.0 - 4.16.4 + 4.16.5-fx-f4db1a24ab 3.9.2 1.5.0 1.10.0 @@ -2749,26 +2749,26 @@ flexible messaging model and an intuitive client API. false - - datastax-releases - https://repo.aws.dsinternal.org/artifactory/datastax-releases-local - - false - - - true - - - - datastax-snapshots-local - https://repo.aws.dsinternal.org/artifactory/datastax-snapshots-local - - true - - - false - - + + + + + + + + + + + + + + + + + + + + public-datastax-releases https://repo.datastax.com/datastax-public-releases-local diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java index 4011a48207512..00e381e07292f 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java @@ -197,22 +197,21 @@ protected CompletableFuture deleteAsync(String path) { } protected CompletableFuture deleteIfExistsAsync(String path) { - return cache.exists(path).thenCompose(exists -> { - if (!exists) { - return CompletableFuture.completedFuture(null); + log.info("Deleting path: {}", path); + CompletableFuture future = new CompletableFuture<>(); + cache.delete(path).whenComplete((ignore, ex) -> { + if (ex != null && ex.getCause() instanceof MetadataStoreException.NotFoundException) { + log.info("Path {} did not exist in metadata store", path); + future.complete(null); + } else if (ex != null) { + log.info("Failed to delete path from metadata store: {}", path, ex); + future.completeExceptionally(ex); + } else { + log.info("Deleted path from metadata store: {}", path); + future.complete(null); } - CompletableFuture future = new CompletableFuture<>(); - cache.delete(path).whenComplete((ignore, ex) -> { - if (ex != null && ex.getCause() instanceof MetadataStoreException.NotFoundException) { - future.complete(null); - } else if (ex != null) { - future.completeExceptionally(ex); - } else { - future.complete(null); - } - }); - return future; }); + return future; } protected boolean exists(String path) throws MetadataStoreException { diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LocalPoliciesResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LocalPoliciesResources.java index c6b658c3bd025..ae3479fde59b8 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LocalPoliciesResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LocalPoliciesResources.java @@ -79,7 +79,7 @@ public void deleteLocalPolicies(NamespaceName ns) throws MetadataStoreException } public CompletableFuture deleteLocalPoliciesAsync(NamespaceName ns) { - return deleteAsync(joinPath(LOCAL_POLICIES_ROOT, ns.toString())); + return deleteIfExistsAsync(joinPath(LOCAL_POLICIES_ROOT, ns.toString())); } public CompletableFuture deleteLocalPoliciesTenantAsync(String tenant) { diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java index b5ccc9a5a9077..897658907a14b 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java @@ -112,7 +112,7 @@ public void deletePolicies(NamespaceName ns) throws MetadataStoreException{ } public CompletableFuture deletePoliciesAsync(NamespaceName ns){ - return deleteAsync(joinPath(BASE_POLICIES_PATH, ns.toString())); + return deleteIfExistsAsync(joinPath(BASE_POLICIES_PATH, ns.toString())); } public Optional getPolicies(NamespaceName ns) throws MetadataStoreException{ @@ -152,10 +152,18 @@ public static boolean pathIsNamespaceLocalPolicies(String path) { && path.substring(LOCAL_POLICIES_ROOT.length() + 1).contains("/"); } - // clear resource of `/namespace/{namespaceName}` for zk-node + /** + * Clear resource of `/namespace/{namespaceName}` for zk-node. + * @param ns the namespace name + * @return a handle to the results of the operation + * */ + // public CompletableFuture deleteNamespaceAsync(NamespaceName ns) { final String namespacePath = joinPath(NAMESPACE_BASE_PATH, ns.toString()); - return deleteIfExistsAsync(namespacePath); + // please beware that this will delete all the children of the namespace + // including the ownership nodes (ephemeral nodes) + // see ServiceUnitUtils.path(ns) for the ownership node path + return getStore().deleteRecursive(namespacePath); } // clear resource of `/namespace/{tenant}` for zk-node @@ -298,11 +306,14 @@ public CompletableFuture deletePartitionedTopicAsync(TopicName tn) { public CompletableFuture clearPartitionedTopicMetadataAsync(NamespaceName namespaceName) { final String globalPartitionedPath = joinPath(PARTITIONED_TOPIC_PATH, namespaceName.toString()); + log.info("Clearing partitioned topic metadata for namespace {}, path is {}", + namespaceName, globalPartitionedPath); return getStore().deleteRecursive(globalPartitionedPath); } public CompletableFuture clearPartitionedTopicTenantAsync(String tenant) { final String partitionedTopicPath = joinPath(PARTITIONED_TOPIC_PATH, tenant); + log.info("Clearing partitioned topic metadata for tenant {}, path is {}", tenant, partitionedTopicPath); return deleteIfExistsAsync(partitionedTopicPath); } diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java index 0963f25c3d31f..d1bfce2cba21a 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java @@ -75,11 +75,6 @@ public CompletableFuture> getExistingPartitions(NamespaceName ns, T ); } - public CompletableFuture deletePersistentTopicAsync(TopicName topic) { - String path = MANAGED_LEDGER_PATH + "/" + topic.getPersistenceNamingEncoding(); - return store.delete(path, Optional.of(-1L)); - } - public CompletableFuture createPersistentTopicAsync(TopicName topic) { String path = MANAGED_LEDGER_PATH + "/" + topic.getPersistenceNamingEncoding(); return store.put(path, new byte[0], Optional.of(-1L)) @@ -93,38 +88,20 @@ public CompletableFuture persistentTopicExists(TopicName topic) { public CompletableFuture clearNamespacePersistence(NamespaceName ns) { String path = MANAGED_LEDGER_PATH + "/" + ns; - return store.exists(path) - .thenCompose(exists -> { - if (exists) { - return store.delete(path, Optional.empty()); - } else { - return CompletableFuture.completedFuture(null); - } - }); + log.info("Clearing namespace persistence for namespace: {}, path {}", ns, path); + return store.deleteIfExists(path, Optional.empty()); } public CompletableFuture clearDomainPersistence(NamespaceName ns) { String path = MANAGED_LEDGER_PATH + "/" + ns + "/persistent"; - return store.exists(path) - .thenCompose(exists -> { - if (exists) { - return store.delete(path, Optional.empty()); - } else { - return CompletableFuture.completedFuture(null); - } - }); + log.info("Clearing domain persistence for namespace: {}, path {}", ns, path); + return store.deleteIfExists(path, Optional.empty()); } public CompletableFuture clearTenantPersistence(String tenant) { String path = MANAGED_LEDGER_PATH + "/" + tenant; - return store.exists(path) - .thenCompose(exists -> { - if (exists) { - return store.delete(path, Optional.empty()); - } else { - return CompletableFuture.completedFuture(null); - } - }); + log.info("Clearing tenant persistence for tenant: {}, path {}", tenant, path); + return store.deleteIfExists(path, Optional.empty()); } void handleNotification(Notification notification) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index f4732cad38040..616e874ca9da2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -309,8 +309,14 @@ private void internalRetryableDeleteNamespaceAsync0(boolean force, int retryTime clientAppId(), ex); return FutureUtil.failedFuture(ex); } + log.info("[{}] Deleting namespace bundle {}/{}", clientAppId(), + namespaceName, bundle.getBundleRange()); return admin.namespaces().deleteNamespaceBundleAsync(namespaceName.toString(), bundle.getBundleRange(), force); + } else { + log.warn("[{}] Skipping deleting namespace bundle {}/{} " + + "as it's not owned by any broker", + clientAppId(), namespaceName, bundle.getBundleRange()); } return CompletableFuture.completedFuture(null); }) @@ -321,8 +327,11 @@ private void internalRetryableDeleteNamespaceAsync0(boolean force, int retryTime final Throwable rc = FutureUtil.unwrapCompletionException(error); if (rc instanceof MetadataStoreException) { if (rc.getCause() != null && rc.getCause() instanceof KeeperException.NotEmptyException) { + KeeperException.NotEmptyException ne = + (KeeperException.NotEmptyException) rc.getCause(); log.info("[{}] There are in-flight topics created during the namespace deletion, " - + "retry to delete the namespace again.", namespaceName); + + "retry to delete the namespace again. (path {} is not empty on metadata)", + namespaceName, ne.getPath()); final int next = retryTimes - 1; if (next > 0) { // async recursive @@ -330,7 +339,8 @@ private void internalRetryableDeleteNamespaceAsync0(boolean force, int retryTime } else { callback.completeExceptionally( new RestException(Status.CONFLICT, "The broker still have in-flight topics" - + " created during namespace deletion, please try again.")); + + " created during namespace deletion (path " + ne.getPath() + ") " + + "is not empty on metadata store, please try again.")); // drop out recursive } return; @@ -475,6 +485,8 @@ protected CompletableFuture internalClearZkSources() { @SuppressWarnings("deprecation") protected CompletableFuture internalDeleteNamespaceBundleAsync(String bundleRange, boolean authoritative, boolean force) { + log.info("[{}] Deleting namespace bundle {}/{} authoritative:{} force:{}", + clientAppId(), namespaceName, bundleRange, authoritative, force); return validateNamespaceOperationAsync(namespaceName, NamespaceOperation.DELETE_BUNDLE) .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) .thenCompose(__ -> { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 8d289446b7b42..4dfadce19ef6d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -2114,6 +2114,7 @@ private void checkConsumedLedgers() { Optional.ofNullable(((PersistentTopic) t).getManagedLedger()).ifPresent( managedLedger -> { managedLedger.trimConsumedLedgersInBackground(Futures.NULL_PROMISE); + managedLedger.rolloverCursorsInBackground(); } ); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index 9485931304b0c..fd5af3a158b0d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -289,16 +289,29 @@ public Future sendMessages(final List entries, EntryBatch totalChunkedMessages, redeliveryTracker, DEFAULT_CONSUMER_EPOCH); } + public Future sendMessages(final List entries, EntryBatchSizes batchSizes, + EntryBatchIndexesAcks batchIndexesAcks, + int totalMessages, long totalBytes, long totalChunkedMessages, + RedeliveryTracker redeliveryTracker, long epoch) { + return sendMessages(entries, null, batchSizes, batchIndexesAcks, totalMessages, totalBytes, + totalChunkedMessages, redeliveryTracker, epoch); + } + /** * Dispatch a list of entries to the consumer.
* It is also responsible to release entries data and recycle entries object. * * @return a SendMessageInfo object that contains the detail of what was sent to consumer */ - public Future sendMessages(final List entries, EntryBatchSizes batchSizes, + public Future sendMessages(final List entries, + final List stickyKeyHashes, + EntryBatchSizes batchSizes, EntryBatchIndexesAcks batchIndexesAcks, - int totalMessages, long totalBytes, long totalChunkedMessages, - RedeliveryTracker redeliveryTracker, long epoch) { + int totalMessages, + long totalBytes, + long totalChunkedMessages, + RedeliveryTracker redeliveryTracker, + long epoch) { this.lastConsumedTimestamp = System.currentTimeMillis(); if (entries.isEmpty() || totalMessages == 0) { @@ -326,7 +339,7 @@ public Future sendMessages(final List entries, EntryBatch // because this consumer is possible to disconnect at this time. if (pendingAcks != null) { int batchSize = batchSizes.getBatchSize(i); - int stickyKeyHash = getStickyKeyHash(entry); + int stickyKeyHash = stickyKeyHashes == null ? getStickyKeyHash(entry) : stickyKeyHashes.get(i); long[] ackSet = batchIndexesAcks == null ? null : batchIndexesAcks.getAckSet(i); if (ackSet != null) { unackedMessages -= (batchSize - BitSet.valueOf(ackSet).cardinality()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java index 71f78e21f938f..9e18e8f8dfb81 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java @@ -496,7 +496,7 @@ private void readMorePoliciesAsync(SystemTopicClient.Reader reader) } else { Throwable cause = FutureUtil.unwrapCompletionException(ex); if (cause instanceof PulsarClientException.AlreadyClosedException) { - log.warn("Read more topic policies exception, close the read now!", ex); + log.info("Closing the topic policies reader for {}", reader.getSystemTopic().getTopicName()); cleanCacheAndCloseReader( reader.getSystemTopic().getTopicName().getNamespaceObject(), false); } else { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java index 2cad253f96ee2..fb7bd22de94a7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java @@ -126,6 +126,14 @@ protected Map> initialValue() throws Exception { } }; + private static final FastThreadLocal>> localGroupedStickyKeyHashes = + new FastThreadLocal>>() { + @Override + protected Map> initialValue() throws Exception { + return new HashMap<>(); + } + }; + @Override public void sendMessages(List entries) { if (entries.isEmpty()) { @@ -139,28 +147,38 @@ public void sendMessages(List entries) { final Map> groupedEntries = localGroupedEntries.get(); groupedEntries.clear(); + final Map> consumerStickyKeyHashesMap = localGroupedStickyKeyHashes.get(); + consumerStickyKeyHashesMap.clear(); for (Entry entry : entries) { - Consumer consumer = selector.select(peekStickyKey(entry.getDataBuffer())); + byte[] stickyKey = peekStickyKey(entry.getDataBuffer()); + int stickyKeyHash = StickyKeyConsumerSelector.makeStickyKeyHash(stickyKey); + + Consumer consumer = selector.select(stickyKeyHash); if (consumer != null) { - groupedEntries.computeIfAbsent(consumer, k -> new ArrayList<>()).add(entry); + int startingSize = Math.max(10, entries.size() / (2 * consumerSet.size())); + groupedEntries.computeIfAbsent(consumer, k -> new ArrayList<>(startingSize)).add(entry); + consumerStickyKeyHashesMap + .computeIfAbsent(consumer, k -> new ArrayList<>(startingSize)).add(stickyKeyHash); } else { entry.release(); } } for (Map.Entry> entriesByConsumer : groupedEntries.entrySet()) { - Consumer consumer = entriesByConsumer.getKey(); - List entriesForConsumer = entriesByConsumer.getValue(); + final Consumer consumer = entriesByConsumer.getKey(); + final List entriesForConsumer = entriesByConsumer.getValue(); + final List stickyKeysForConsumer = consumerStickyKeyHashesMap.get(consumer); SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal(); EntryBatchSizes batchSizes = EntryBatchSizes.get(entriesForConsumer.size()); filterEntriesForConsumer(entriesForConsumer, batchSizes, sendMessageInfo, null, null, false, consumer); if (consumer.getAvailablePermits() > 0 && consumer.isWritable()) { - consumer.sendMessages(entriesForConsumer, batchSizes, null, sendMessageInfo.getTotalMessages(), + consumer.sendMessages(entriesForConsumer, stickyKeysForConsumer, batchSizes, + null, sendMessageInfo.getTotalMessages(), sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(), - getRedeliveryTracker()); + getRedeliveryTracker(), Commands.DEFAULT_CONSUMER_EPOCH); TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this, -sendMessageInfo.getTotalMessages()); } else { entriesForConsumer.forEach(e -> { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java index b2638d53ab1c3..6b0f48a57cfe3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java @@ -128,15 +128,15 @@ public void testSendMessage() throws BrokerServiceException { assertEquals(byteBuf.toString(UTF_8), "message" + index); }; return mockPromise; - }).when(consumerMock).sendMessages(any(List.class), any(EntryBatchSizes.class), any(), - anyInt(), anyLong(), anyLong(), any(RedeliveryTracker.class)); + }).when(consumerMock).sendMessages(any(List.class), any(List.class), any(EntryBatchSizes.class), any(), + anyInt(), anyLong(), anyLong(), any(RedeliveryTracker.class), anyLong()); try { nonpersistentDispatcher.sendMessages(entries); } catch (Exception e) { fail("Failed to sendMessages.", e); } - verify(consumerMock, times(1)).sendMessages(any(List.class), any(EntryBatchSizes.class), - eq(null), anyInt(), anyLong(), anyLong(), any(RedeliveryTracker.class)); + verify(consumerMock, times(1)).sendMessages(any(List.class), any(List.class), any(EntryBatchSizes.class), + eq(null), anyInt(), anyLong(), anyLong(), any(RedeliveryTracker.class), anyLong()); } @Test(timeOut = 10000) diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java index 33942c19520a3..89b0e7a6fe1c0 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java @@ -23,9 +23,12 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.function.Consumer; import org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException; import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Metadata store client interface. @@ -36,6 +39,8 @@ @Beta public interface MetadataStore extends AutoCloseable { + Logger LOGGER = LoggerFactory.getLogger(MetadataStore.class); + /** * Read the value of one key, identified by the path * @@ -121,6 +126,23 @@ default CompletableFuture sync(String path) { */ CompletableFuture delete(String path, Optional expectedVersion); + default CompletableFuture deleteIfExists(String path, Optional expectedVersion) { + return delete(path, expectedVersion) + .exceptionally(e -> { + if (e.getCause() instanceof NotFoundException) { + LOGGER.info("Path {} not found while deleting (this is not a problem)", path); + return null; + } else { + if (expectedVersion.isEmpty()) { + LOGGER.info("Failed to delete path {}", path, e); + } else { + LOGGER.info("Failed to delete path {} with expected version {}", path, expectedVersion, e); + } + throw new CompletionException(e); + } + }); + } + /** * Delete a key-value pair and all the children nodes. * diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java index 4cadf2397a7fa..78e9980367210 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java @@ -349,6 +349,7 @@ public void accept(Notification n) { @Override public final CompletableFuture delete(String path, Optional expectedVersion) { + log.info("Deleting path: {} (v. {})", path, expectedVersion); if (isClosed()) { return FutureUtil.failedFuture( new MetadataStoreException.AlreadyClosedException()); @@ -394,11 +395,13 @@ private CompletableFuture deleteInternal(String path, Optional expec } metadataCaches.forEach(c -> c.invalidate(path)); + log.info("Deleted path: {} (v. {})", path, expectedVersion); }); } @Override public CompletableFuture deleteRecursive(String path) { + log.info("Deleting recursively path: {}", path); if (isClosed()) { return FutureUtil.failedFuture( new MetadataStoreException.AlreadyClosedException()); @@ -408,13 +411,9 @@ public CompletableFuture deleteRecursive(String path) { children.stream() .map(child -> deleteRecursive(path + "/" + child)) .collect(Collectors.toList()))) - .thenCompose(__ -> exists(path)) - .thenCompose(exists -> { - if (exists) { - return delete(path, Optional.empty()); - } else { - return CompletableFuture.completedFuture(null); - } + .thenCompose(__ -> { + log.info("After deleting all children, now deleting path: {}", path); + return deleteIfExists(path, Optional.empty()); }); }