diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LightProtoHelper.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LightProtoHelper.java index 75b72cf2ff94f..23db046e8b83b 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LightProtoHelper.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LightProtoHelper.java @@ -24,23 +24,6 @@ public abstract class LightProtoHelper { public static MLDataFormats.ManagedLedgerInfo.LedgerInfo createLedgerInfo() { MLDataFormats.ManagedLedgerInfo.LedgerInfo li = new MLDataFormats.ManagedLedgerInfo.LedgerInfo(); - - // light proto doesn't return MLDataFormats.OffloadContext.getDefaultInstance() - // if it wasn't explicitly set and a lot of code expects that behavior - li.setOffloadContext() - .setBookkeeperDeleted(false) - .setComplete(false) - .setTimestamp(-1L); // like protobuf default instance - li.setOffloadContext() - .setDriverMetadata() - .setName(""); // like protobuf default instance - -// stuff like this seems to be a bad idea, -// unlike protobuf that just creates empty collection, -// these will add empty object to that collection. -// so this is something we can't mimic from protobuf -// li.setOffloadContext().setDriverMetadata().addProperty(); -// li.setOffloadContext().addOffloadSegment(); return li; } } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java index 840881763b676..e52dd0d5cfb96 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java @@ -959,7 +959,7 @@ public void operationFailed(MetaStoreException e) { MLDataFormats.ManagedLedgerInfo.LedgerInfo ls = ledgerInfos.get(li.ledgerId); - if (ls.getOffloadContext().hasUidMsb()) { + if (ls.hasOffloadContext() && ls.getOffloadContext().hasUidMsb()) { MLDataFormats.ManagedLedgerInfo.LedgerInfo newInfoBuilder = new MLDataFormats.ManagedLedgerInfo.LedgerInfo().copyFrom(ls); newInfoBuilder.setOffloadContext().setBookkeeperDeleted(true); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 64cd7cb3c5fe2..d8877732f1352 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -2774,7 +2774,7 @@ void internalTrimLedgers(boolean isTruncate, CompletableFuture promise) { } for (LedgerInfo ls : ledgers.values()) { - if (isOffloadedNeedsDelete(ls.getOffloadContext(), optionalOffloadPolicies) + if (ls.hasOffloadContext() && isOffloadedNeedsDelete(ls.getOffloadContext(), optionalOffloadPolicies) && !ledgersToDelete.contains(ls)) { log.debug("[{}] Ledger {} has been offloaded, bookkeeper ledger needs to be deleted", name, ls.getLedgerId()); @@ -3064,12 +3064,12 @@ private void asyncDeleteLedgerFromBookKeeper(long ledgerId) { } private void asyncDeleteLedger(long ledgerId, LedgerInfo info) { - if (!info.getOffloadContext().isBookkeeperDeleted()) { + if (!info.hasOffloadContext() || !info.getOffloadContext().isBookkeeperDeleted()) { // only delete if it hasn't been previously deleted for offload asyncDeleteLedger(ledgerId, DEFAULT_LEDGER_DELETE_RETRIES); } - if (info.getOffloadContext().hasUidMsb()) { + if (info.hasOffloadContext() && info.getOffloadContext().hasUidMsb()) { UUID uuid = new UUID(info.getOffloadContext().getUidMsb(), info.getOffloadContext().getUidLsb()); OffloadUtils.cleanupOffloaded(ledgerId, uuid, config, OffloadUtils.getOffloadDriverMetadata(info, config.getLedgerOffloader().getOffloadDriverMetadata()), @@ -3234,7 +3234,7 @@ public void asyncOffloadPrefix(Position pos, OffloadCallback callback, Object ct for (LedgerInfo ls : ledgers.headMap(current).values()) { if (requestOffloadTo.getLedgerId() > ls.getLedgerId()) { // don't offload if ledger has already been offloaded, or is empty - if (!ls.getOffloadContext().isComplete() && ls.getSize() > 0) { + if (ls.hasOffloadContext() && !ls.getOffloadContext().isComplete() && ls.getSize() > 0) { ledgersToOffload.add(ls); } } else { @@ -3442,6 +3442,10 @@ private CompletableFuture prepareLedgerInfoForOffloaded(long ledgerId, UUI log.info("[{}] Preparing metadata to offload ledger {} with uuid {}", name, ledgerId, uuid); return transformLedgerInfo(ledgerId, (oldInfo) -> { + if (!oldInfo.hasOffloadContext()) { + log.warn("oldInfo does not have offloadContext {}", oldInfo); + return oldInfo; + } if (oldInfo.getOffloadContext().hasUidMsb()) { UUID oldUuid = new UUID(oldInfo.getOffloadContext().getUidMsb(), oldInfo.getOffloadContext().getUidLsb()); @@ -3482,6 +3486,11 @@ private CompletableFuture completeLedgerInfoForOffloaded(long ledgerId, UU log.info("[{}] Completing metadata for offload of ledger {} with uuid {}", name, ledgerId, uuid); return transformLedgerInfo(ledgerId, (oldInfo) -> { + if (!oldInfo.hasOffloadContext()) { + throw new OffloadConflict( + "Ledger info for ledgerId=" + ledgerId + + ") does not have offload context"); + } UUID existingUuid = new UUID(oldInfo.getOffloadContext().getUidMsb(), oldInfo.getOffloadContext().getUidLsb()); if (existingUuid.equals(uuid)) { diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadEvictUnusedLedgersTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadEvictUnusedLedgersTest.java index a74f2266b41e3..49f2be91b9135 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadEvictUnusedLedgersTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadEvictUnusedLedgersTest.java @@ -65,6 +65,7 @@ public void testEvictUnusedLedgers() throws Exception { assertEquals(ledger.getLedgersInfoAsList().size(), 3); assertEquals(ledger.getLedgersInfoAsList().stream() + .filter(l -> l.hasOffloadContext()) .filter(e -> e.getOffloadContext().isComplete()) .map(e -> e.getLedgerId()).collect(Collectors.toSet()), offloader.offloadedLedgers()); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadLedgerDeleteTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadLedgerDeleteTest.java index 6a9ca9982c791..6f931444c30c5 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadLedgerDeleteTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadLedgerDeleteTest.java @@ -172,6 +172,7 @@ public void testLaggedDelete() throws Exception { Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 2); Assert.assertEquals(ledger.getLedgersInfoAsList().stream() + .filter(e -> e.hasOffloadContext()) .filter(e -> e.getOffloadContext().isComplete()) .map(e -> e.getLedgerId()).collect(Collectors.toSet()), offloader.offloadedLedgers()); @@ -193,6 +194,7 @@ public void testLaggedDelete() throws Exception { // ledger still exists in list Assert.assertEquals(ledger.getLedgersInfoAsList().stream() + .filter(e -> e.hasOffloadContext()) .filter(e -> e.getOffloadContext().isComplete()) .map(e -> e.getLedgerId()).collect(Collectors.toSet()), offloader.offloadedLedgers()); @@ -234,6 +236,7 @@ public void testFileSystemOffloadDeletePath() throws Exception { Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 2); Assert.assertEquals(ledger.getLedgersInfoAsList().stream() + .filter(e -> e.hasOffloadContext()) .filter(e -> e.getOffloadContext().isComplete()) .map(e -> e.getLedgerId()).collect(Collectors.toSet()), offloader.offloadedLedgers()); @@ -241,6 +244,7 @@ public void testFileSystemOffloadDeletePath() throws Exception { // ledger still exists in list Assert.assertEquals(ledger.getLedgersInfoAsList().stream() + .filter(e -> e.hasOffloadContext()) .filter(e -> e.getOffloadContext().isComplete()) .map(e -> e.getLedgerId()).collect(Collectors.toSet()), offloader.offloadedLedgers()); @@ -282,6 +286,7 @@ public void testLaggedDeleteRetentionSetLower() throws Exception { Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 2); Assert.assertEquals(ledger.getLedgersInfoAsList().stream() + .filter(e -> e.hasOffloadContext()) .filter(e -> e.getOffloadContext().isComplete()) .map(e -> e.getLedgerId()).collect(Collectors.toSet()), offloader.offloadedLedgers()); @@ -330,6 +335,7 @@ public void testLaggedDeleteSlowConsumer() throws Exception { Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 2); Assert.assertEquals(ledger.getLedgersInfoAsList().stream() + .filter(e -> e.hasOffloadContext()) .filter(e -> e.getOffloadContext().isComplete()) .map(e -> e.getLedgerId()).collect(Collectors.toSet()), offloader.offloadedLedgers()); @@ -352,6 +358,7 @@ public void testLaggedDeleteSlowConsumer() throws Exception { // ledger still exists in list Assert.assertEquals(ledger.getLedgersInfoAsList().stream() + .filter(e -> e.hasOffloadContext()) .filter(e -> e.getOffloadContext().isComplete()) .map(e -> e.getLedgerId()).collect(Collectors.toSet()), offloader.offloadedLedgers());