Skip to content

Commit

Permalink
trying to remove lightproto helper's hack, offloaders break
Browse files Browse the repository at this point in the history
  • Loading branch information
dlg99 committed May 15, 2024
1 parent 9873904 commit a4d04d2
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,6 @@ public abstract class LightProtoHelper {

public static MLDataFormats.ManagedLedgerInfo.LedgerInfo createLedgerInfo() {
MLDataFormats.ManagedLedgerInfo.LedgerInfo li = new MLDataFormats.ManagedLedgerInfo.LedgerInfo();

// light proto doesn't return MLDataFormats.OffloadContext.getDefaultInstance()
// if it wasn't explicitly set and a lot of code expects that behavior
li.setOffloadContext()
.setBookkeeperDeleted(false)
.setComplete(false)
.setTimestamp(-1L); // like protobuf default instance
li.setOffloadContext()
.setDriverMetadata()
.setName(""); // like protobuf default instance

// stuff like this seems to be a bad idea,
// unlike protobuf that just creates empty collection,
// these will add empty object to that collection.
// so this is something we can't mimic from protobuf
// li.setOffloadContext().setDriverMetadata().addProperty();
// li.setOffloadContext().addOffloadSegment();
return li;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -959,7 +959,7 @@ public void operationFailed(MetaStoreException e) {

MLDataFormats.ManagedLedgerInfo.LedgerInfo ls = ledgerInfos.get(li.ledgerId);

if (ls.getOffloadContext().hasUidMsb()) {
if (ls.hasOffloadContext() && ls.getOffloadContext().hasUidMsb()) {
MLDataFormats.ManagedLedgerInfo.LedgerInfo newInfoBuilder =
new MLDataFormats.ManagedLedgerInfo.LedgerInfo().copyFrom(ls);
newInfoBuilder.setOffloadContext().setBookkeeperDeleted(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2774,7 +2774,7 @@ void internalTrimLedgers(boolean isTruncate, CompletableFuture<?> promise) {
}

for (LedgerInfo ls : ledgers.values()) {
if (isOffloadedNeedsDelete(ls.getOffloadContext(), optionalOffloadPolicies)
if (ls.hasOffloadContext() && isOffloadedNeedsDelete(ls.getOffloadContext(), optionalOffloadPolicies)
&& !ledgersToDelete.contains(ls)) {
log.debug("[{}] Ledger {} has been offloaded, bookkeeper ledger needs to be deleted", name,
ls.getLedgerId());
Expand Down Expand Up @@ -3064,12 +3064,12 @@ private void asyncDeleteLedgerFromBookKeeper(long ledgerId) {
}

private void asyncDeleteLedger(long ledgerId, LedgerInfo info) {
if (!info.getOffloadContext().isBookkeeperDeleted()) {
if (!info.hasOffloadContext() || !info.getOffloadContext().isBookkeeperDeleted()) {
// only delete if it hasn't been previously deleted for offload
asyncDeleteLedger(ledgerId, DEFAULT_LEDGER_DELETE_RETRIES);
}

if (info.getOffloadContext().hasUidMsb()) {
if (info.hasOffloadContext() && info.getOffloadContext().hasUidMsb()) {
UUID uuid = new UUID(info.getOffloadContext().getUidMsb(), info.getOffloadContext().getUidLsb());
OffloadUtils.cleanupOffloaded(ledgerId, uuid, config,
OffloadUtils.getOffloadDriverMetadata(info, config.getLedgerOffloader().getOffloadDriverMetadata()),
Expand Down Expand Up @@ -3234,7 +3234,7 @@ public void asyncOffloadPrefix(Position pos, OffloadCallback callback, Object ct
for (LedgerInfo ls : ledgers.headMap(current).values()) {
if (requestOffloadTo.getLedgerId() > ls.getLedgerId()) {
// don't offload if ledger has already been offloaded, or is empty
if (!ls.getOffloadContext().isComplete() && ls.getSize() > 0) {
if (ls.hasOffloadContext() && !ls.getOffloadContext().isComplete() && ls.getSize() > 0) {
ledgersToOffload.add(ls);
}
} else {
Expand Down Expand Up @@ -3442,6 +3442,10 @@ private CompletableFuture<Void> prepareLedgerInfoForOffloaded(long ledgerId, UUI
log.info("[{}] Preparing metadata to offload ledger {} with uuid {}", name, ledgerId, uuid);
return transformLedgerInfo(ledgerId,
(oldInfo) -> {
if (!oldInfo.hasOffloadContext()) {
log.warn("oldInfo does not have offloadContext {}", oldInfo);
return oldInfo;
}
if (oldInfo.getOffloadContext().hasUidMsb()) {
UUID oldUuid = new UUID(oldInfo.getOffloadContext().getUidMsb(),
oldInfo.getOffloadContext().getUidLsb());
Expand Down Expand Up @@ -3482,6 +3486,11 @@ private CompletableFuture<Void> completeLedgerInfoForOffloaded(long ledgerId, UU
log.info("[{}] Completing metadata for offload of ledger {} with uuid {}", name, ledgerId, uuid);
return transformLedgerInfo(ledgerId,
(oldInfo) -> {
if (!oldInfo.hasOffloadContext()) {
throw new OffloadConflict(
"Ledger info for ledgerId=" + ledgerId
+ ") does not have offload context");
}
UUID existingUuid = new UUID(oldInfo.getOffloadContext().getUidMsb(),
oldInfo.getOffloadContext().getUidLsb());
if (existingUuid.equals(uuid)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public void testEvictUnusedLedgers() throws Exception {

assertEquals(ledger.getLedgersInfoAsList().size(), 3);
assertEquals(ledger.getLedgersInfoAsList().stream()
.filter(l -> l.hasOffloadContext())
.filter(e -> e.getOffloadContext().isComplete())
.map(e -> e.getLedgerId()).collect(Collectors.toSet()),
offloader.offloadedLedgers());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ public void testLaggedDelete() throws Exception {

Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 2);
Assert.assertEquals(ledger.getLedgersInfoAsList().stream()
.filter(e -> e.hasOffloadContext())
.filter(e -> e.getOffloadContext().isComplete())
.map(e -> e.getLedgerId()).collect(Collectors.toSet()),
offloader.offloadedLedgers());
Expand All @@ -193,6 +194,7 @@ public void testLaggedDelete() throws Exception {

// ledger still exists in list
Assert.assertEquals(ledger.getLedgersInfoAsList().stream()
.filter(e -> e.hasOffloadContext())
.filter(e -> e.getOffloadContext().isComplete())
.map(e -> e.getLedgerId()).collect(Collectors.toSet()),
offloader.offloadedLedgers());
Expand Down Expand Up @@ -234,13 +236,15 @@ public void testFileSystemOffloadDeletePath() throws Exception {

Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 2);
Assert.assertEquals(ledger.getLedgersInfoAsList().stream()
.filter(e -> e.hasOffloadContext())
.filter(e -> e.getOffloadContext().isComplete())
.map(e -> e.getLedgerId()).collect(Collectors.toSet()),
offloader.offloadedLedgers());
Assert.assertTrue(bkc.getLedgers().contains(firstLedgerId));

// ledger still exists in list
Assert.assertEquals(ledger.getLedgersInfoAsList().stream()
.filter(e -> e.hasOffloadContext())
.filter(e -> e.getOffloadContext().isComplete())
.map(e -> e.getLedgerId()).collect(Collectors.toSet()),
offloader.offloadedLedgers());
Expand Down Expand Up @@ -282,6 +286,7 @@ public void testLaggedDeleteRetentionSetLower() throws Exception {

Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 2);
Assert.assertEquals(ledger.getLedgersInfoAsList().stream()
.filter(e -> e.hasOffloadContext())
.filter(e -> e.getOffloadContext().isComplete())
.map(e -> e.getLedgerId()).collect(Collectors.toSet()),
offloader.offloadedLedgers());
Expand Down Expand Up @@ -330,6 +335,7 @@ public void testLaggedDeleteSlowConsumer() throws Exception {

Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 2);
Assert.assertEquals(ledger.getLedgersInfoAsList().stream()
.filter(e -> e.hasOffloadContext())
.filter(e -> e.getOffloadContext().isComplete())
.map(e -> e.getLedgerId()).collect(Collectors.toSet()),
offloader.offloadedLedgers());
Expand All @@ -352,6 +358,7 @@ public void testLaggedDeleteSlowConsumer() throws Exception {

// ledger still exists in list
Assert.assertEquals(ledger.getLedgersInfoAsList().stream()
.filter(e -> e.hasOffloadContext())
.filter(e -> e.getOffloadContext().isComplete())
.map(e -> e.getLedgerId()).collect(Collectors.toSet()),
offloader.offloadedLedgers());
Expand Down

0 comments on commit a4d04d2

Please sign in to comment.