Skip to content

Commit

Permalink
[fix][broker] Fix shadow topics cannot be consumed when the entry is …
Browse files Browse the repository at this point in the history
…not cached

### Motivation

For shadow topics, a `ReadOnlyLedgerHandle` is created to read messages
from the source topic when the entry is not cached. However, it
leverages the `readAsync` API that validates the `lastAddConfirmed`
field (LAC). In `ReadOnlyLedgerHandle`, this field could never be updated,
so `readAsync` could fail immediately. See `LedgerHandle#readAsync`:

```java
if (lastEntry > lastAddConfirmed) {
    LOG.error("ReadAsync exception on ledgerId:{} firstEntry:{} lastEntry:{} lastAddConfirmed:{}",
            ledgerId, firstEntry, lastEntry, lastAddConfirmed);
    return FutureUtils.exception(new BKReadException());
}
```

This bug is not exposed because:
1. `PulsarMockReadHandle` does not maintain a LAC field.
2. The case for cache miss is never tested.

### Modifications

Replace `readAsync` with `readUnconfirmedAsync`. The managed ledger
already maintains a `lastConfirmedEntry` to limit the last entry. See
`ManagedLedgerImpl#internalReadFromLedger`:

```java
Position lastPosition = lastConfirmedEntry;

if (ledger.getId() == lastPosition.getLedgerId()) {
    lastEntryInLedger = lastPosition.getEntryId();
```

Add `ShadowTopicRealBkTest` to cover two code changes
`RangeEntryCacheImpl#readFromStorage` and `EntryCache#asyncReadEntry`.
  • Loading branch information
BewareMyPower committed Aug 9, 2024
1 parent f4a8094 commit c14b699
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ private void asyncReadEntry0(ReadHandle lh, Position position, final ReadEntryCa
manager.mlFactoryMBean.recordCacheHit(cachedEntry.getLength());
callback.readEntryComplete(cachedEntry, ctx);
} else {
lh.readAsync(position.getEntryId(), position.getEntryId()).thenAcceptAsync(
lh.readUnconfirmedAsync(position.getEntryId(), position.getEntryId()).thenAcceptAsync(
ledgerEntries -> {
try {
Iterator<LedgerEntry> iterator = ledgerEntries.iterator();
Expand Down Expand Up @@ -429,7 +429,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
CompletableFuture<List<EntryImpl>> readFromStorage(ReadHandle lh,
long firstEntry, long lastEntry, boolean shouldCacheEntry) {
final int entriesToRead = (int) (lastEntry - firstEntry) + 1;
CompletableFuture<List<EntryImpl>> readResult = lh.readAsync(firstEntry, lastEntry)
CompletableFuture<List<EntryImpl>> readResult = lh.readUnconfirmedAsync(firstEntry, lastEntry)
.thenApply(
ledgerEntries -> {
requireNonNull(ml.getName());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service.persistent;

import com.google.common.collect.Lists;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.impl.ShadowManagedLedgerImpl;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.util.PortManager;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

public class ShadowTopicRealBkTest {

private static final String cluster = "test";
private final int zkPort = PortManager.nextLockedFreePort();
private final LocalBookkeeperEnsemble bk = new LocalBookkeeperEnsemble(2, zkPort, PortManager::nextLockedFreePort);
private PulsarService pulsar;
private PulsarAdmin admin;

@BeforeClass
public void setup() throws Exception {
bk.start();
final var config = new ServiceConfiguration();
config.setClusterName(cluster);
config.setAdvertisedAddress("localhost");
config.setBrokerServicePort(Optional.of(0));
config.setWebServicePort(Optional.of(0));
config.setMetadataStoreUrl("zk:localhost:" + zkPort);
pulsar = new PulsarService(config);
pulsar.start();
admin = pulsar.getAdminClient();
admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress())
.brokerServiceUrl(pulsar.getBrokerServiceUrl()).build());
admin.tenants().createTenant("public", TenantInfo.builder().allowedClusters(Set.of(cluster)).build());
admin.namespaces().createNamespace("public/default");
}

@AfterClass(alwaysRun = true)
public void cleanup() throws Exception {
if (pulsar != null) {
pulsar.close();
}
bk.stop();
}

@Test
public void testReadFromStorage() throws Exception {
final var sourceTopic = TopicName.get("test-read-from-source").toString();
final var shadowTopic = sourceTopic + "-shadow";

admin.topics().createNonPartitionedTopic(sourceTopic);
admin.topics().createShadowTopic(shadowTopic, sourceTopic);
admin.topics().setShadowTopics(sourceTopic, Lists.newArrayList(shadowTopic));

Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(()->{
final var sourcePersistentTopic = (PersistentTopic) pulsar.getBrokerService()
.getTopicIfExists(sourceTopic).get().orElseThrow();
final var replicator = (ShadowReplicator) sourcePersistentTopic.getShadowReplicators().get(shadowTopic);
Assert.assertNotNull(replicator);
Assert.assertEquals(String.valueOf(replicator.getState()), "Started");
});

final var client = pulsar.getClient();
// When the message was sent, there is no cursor, so it will read from the cache
final var producer = client.newProducer().topic(sourceTopic).create();
producer.send("message".getBytes());
// 1. Verify RangeEntryCacheImpl#readFromStorage
final var consumer = client.newConsumer().topic(shadowTopic).subscriptionName("sub")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
final var msg = consumer.receive(5, TimeUnit.SECONDS);
Assert.assertNotNull(msg);
Assert.assertEquals(msg.getValue(), "message".getBytes());

// 2. Verify EntryCache#asyncReadEntry
final var shadowManagedLedger = ((PersistentTopic) pulsar.getBrokerService().getTopicIfExists(shadowTopic).get()
.orElseThrow()).getManagedLedger();
Assert.assertTrue(shadowManagedLedger instanceof ShadowManagedLedgerImpl);
shadowManagedLedger.getEarliestMessagePublishTimeInBacklog().get(3, TimeUnit.SECONDS);
}
}

0 comments on commit c14b699

Please sign in to comment.