From c14b699673d9ba83b3ec1771c5d22bec5ff7ea6a Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Fri, 9 Aug 2024 16:55:33 +0800 Subject: [PATCH] [fix][broker] Fix shadow topics cannot be consumed when the entry is not cached ### Motivation For shadow topics, a `ReadOnlyLedgerHandle` is created to read messages from the source topic when the entry is not cached. However, it leverages the `readAsync` API that validates the `lastAddConfirmed` field (LAC). In `ReadOnlyLedgerHandle`, this field could never be updated, so `readAsync` could fail immediately. See `LedgerHandle#readAsync`: ```java if (lastEntry > lastAddConfirmed) { LOG.error("ReadAsync exception on ledgerId:{} firstEntry:{} lastEntry:{} lastAddConfirmed:{}", ledgerId, firstEntry, lastEntry, lastAddConfirmed); return FutureUtils.exception(new BKReadException()); } ``` This bug is not exposed because: 1. `PulsarMockReadHandle` does not maintain a LAC field. 2. The case for cache miss is never tested. ### Modifications Replace `readAsync` with `readUnconfirmedAsync`. The managed ledger already maintains a `lastConfirmedEntry` to limit the last entry. See `ManagedLedgerImpl#internalReadFromLedger`: ```java Position lastPosition = lastConfirmedEntry; if (ledger.getId() == lastPosition.getLedgerId()) { lastEntryInLedger = lastPosition.getEntryId(); ``` Add `ShadowTopicRealBkTest` to cover two code changes `RangeEntryCacheImpl#readFromStorage` and `EntryCache#asyncReadEntry`. --- .../impl/cache/RangeEntryCacheImpl.java | 4 +- .../persistent/ShadowTopicRealBkTest.java | 109 ++++++++++++++++++ 2 files changed, 111 insertions(+), 2 deletions(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicRealBkTest.java diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java index 254a517786a55..e60348b39f30f 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java @@ -249,7 +249,7 @@ private void asyncReadEntry0(ReadHandle lh, Position position, final ReadEntryCa manager.mlFactoryMBean.recordCacheHit(cachedEntry.getLength()); callback.readEntryComplete(cachedEntry, ctx); } else { - lh.readAsync(position.getEntryId(), position.getEntryId()).thenAcceptAsync( + lh.readUnconfirmedAsync(position.getEntryId(), position.getEntryId()).thenAcceptAsync( ledgerEntries -> { try { Iterator iterator = ledgerEntries.iterator(); @@ -429,7 +429,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { CompletableFuture> readFromStorage(ReadHandle lh, long firstEntry, long lastEntry, boolean shouldCacheEntry) { final int entriesToRead = (int) (lastEntry - firstEntry) + 1; - CompletableFuture> readResult = lh.readAsync(firstEntry, lastEntry) + CompletableFuture> readResult = lh.readUnconfirmedAsync(firstEntry, lastEntry) .thenApply( ledgerEntries -> { requireNonNull(ml.getName()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicRealBkTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicRealBkTest.java new file mode 100644 index 0000000000000..9d810b06a7c7b --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicRealBkTest.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.persistent; + +import com.google.common.collect.Lists; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import org.apache.bookkeeper.mledger.impl.ShadowManagedLedgerImpl; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.TenantInfo; +import org.apache.pulsar.common.util.PortManager; +import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; +import org.awaitility.Awaitility; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +public class ShadowTopicRealBkTest { + + private static final String cluster = "test"; + private final int zkPort = PortManager.nextLockedFreePort(); + private final LocalBookkeeperEnsemble bk = new LocalBookkeeperEnsemble(2, zkPort, PortManager::nextLockedFreePort); + private PulsarService pulsar; + private PulsarAdmin admin; + + @BeforeClass + public void setup() throws Exception { + bk.start(); + final var config = new ServiceConfiguration(); + config.setClusterName(cluster); + config.setAdvertisedAddress("localhost"); + config.setBrokerServicePort(Optional.of(0)); + config.setWebServicePort(Optional.of(0)); + config.setMetadataStoreUrl("zk:localhost:" + zkPort); + pulsar = new PulsarService(config); + pulsar.start(); + admin = pulsar.getAdminClient(); + admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()) + .brokerServiceUrl(pulsar.getBrokerServiceUrl()).build()); + admin.tenants().createTenant("public", TenantInfo.builder().allowedClusters(Set.of(cluster)).build()); + admin.namespaces().createNamespace("public/default"); + } + + @AfterClass(alwaysRun = true) + public void cleanup() throws Exception { + if (pulsar != null) { + pulsar.close(); + } + bk.stop(); + } + + @Test + public void testReadFromStorage() throws Exception { + final var sourceTopic = TopicName.get("test-read-from-source").toString(); + final var shadowTopic = sourceTopic + "-shadow"; + + admin.topics().createNonPartitionedTopic(sourceTopic); + admin.topics().createShadowTopic(shadowTopic, sourceTopic); + admin.topics().setShadowTopics(sourceTopic, Lists.newArrayList(shadowTopic)); + + Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(()->{ + final var sourcePersistentTopic = (PersistentTopic) pulsar.getBrokerService() + .getTopicIfExists(sourceTopic).get().orElseThrow(); + final var replicator = (ShadowReplicator) sourcePersistentTopic.getShadowReplicators().get(shadowTopic); + Assert.assertNotNull(replicator); + Assert.assertEquals(String.valueOf(replicator.getState()), "Started"); + }); + + final var client = pulsar.getClient(); + // When the message was sent, there is no cursor, so it will read from the cache + final var producer = client.newProducer().topic(sourceTopic).create(); + producer.send("message".getBytes()); + // 1. Verify RangeEntryCacheImpl#readFromStorage + final var consumer = client.newConsumer().topic(shadowTopic).subscriptionName("sub") + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe(); + final var msg = consumer.receive(5, TimeUnit.SECONDS); + Assert.assertNotNull(msg); + Assert.assertEquals(msg.getValue(), "message".getBytes()); + + // 2. Verify EntryCache#asyncReadEntry + final var shadowManagedLedger = ((PersistentTopic) pulsar.getBrokerService().getTopicIfExists(shadowTopic).get() + .orElseThrow()).getManagedLedger(); + Assert.assertTrue(shadowManagedLedger instanceof ShadowManagedLedgerImpl); + shadowManagedLedger.getEarliestMessagePublishTimeInBacklog().get(3, TimeUnit.SECONDS); + } +}