From 7ce34fe73114599453a2e58661f8e598c263261e Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Fri, 19 Apr 2024 21:51:10 +0800 Subject: [PATCH] [improve][offload] Apply autoSkipNonRecoverableData configuration to tiered storage (#22531) (cherry picked from commit fbf4cb71a3f3ed08786205dc5e60b810f3d62605) (cherry picked from commit ff8d3b7343781fbb874d55635df27dd13f48b68e) --- .../impl/BlobStoreBackedInputStreamImpl.java | 8 +++ .../impl/BlobStoreBackedReadHandleImpl.java | 13 ++++- .../impl/BlobStoreBackedReadHandleImplV2.java | 13 ++++- .../BlobStoreBackedInputStreamTest.java | 5 +- ...reManagedLedgerOffloaderStreamingTest.java | 54 +++++++++++++++++++ .../BlobStoreManagedLedgerOffloaderTest.java | 23 ++++++++ 6 files changed, 110 insertions(+), 6 deletions(-) diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamImpl.java index 0dea46726f50a..6cb60e14984f9 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamImpl.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamImpl.java @@ -28,6 +28,7 @@ import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.naming.TopicName; import org.jclouds.blobstore.BlobStore; +import org.jclouds.blobstore.KeyNotFoundException; import org.jclouds.blobstore.domain.Blob; import org.jclouds.blobstore.options.GetOptions; import org.slf4j.Logger; @@ -95,6 +96,9 @@ private boolean refillBufferIfNeeded() throws IOException { try { long startReadTime = System.nanoTime(); Blob blob = blobStore.getBlob(bucket, key, new GetOptions().range(startRange, endRange)); + if (blob == null) { + throw new KeyNotFoundException(bucket, key, ""); + } versionCheck.check(key, blob); try (InputStream stream = blob.getPayload().openStream()) { @@ -121,6 +125,10 @@ private boolean refillBufferIfNeeded() throws IOException { if (null != this.offloaderStats) { this.offloaderStats.recordReadOffloadError(this.topicName); } + // If the blob is not found, the original exception is thrown and handled by the caller. + if (e instanceof KeyNotFoundException) { + throw e; + } throw new IOException("Error reading from BlobStore", e); } } diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java index ef39f7f50d07e..fd7195673fad2 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java @@ -50,6 +50,7 @@ import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.naming.TopicName; import org.jclouds.blobstore.BlobStore; +import org.jclouds.blobstore.KeyNotFoundException; import org.jclouds.blobstore.domain.Blob; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -206,7 +207,11 @@ public CompletableFuture readAsync(long firstEntry, long lastEntr } catch (Throwable t) { log.error("Failed to read entries {} - {} from the offloader in ledger {}", firstEntry, lastEntry, ledgerId, t); - promise.completeExceptionally(t); + if (t instanceof KeyNotFoundException) { + promise.completeExceptionally(new BKException.BKNoSuchLedgerExistsException()); + } else { + promise.completeExceptionally(t); + } entries.forEach(LedgerEntry::close); } }); @@ -270,7 +275,7 @@ public static ReadHandle open(ScheduledExecutorService executor, VersionCheck versionCheck, long ledgerId, int readBufferSize, LedgerOffloaderStats offloaderStats, String managedLedgerName) - throws IOException { + throws IOException, BKException.BKNoSuchLedgerExistsException { int retryCount = 3; OffloadIndexBlock index = null; IOException lastException = null; @@ -283,6 +288,10 @@ public static ReadHandle open(ScheduledExecutorService executor, while (retryCount-- > 0) { long readIndexStartTime = System.nanoTime(); Blob blob = blobStore.getBlob(bucket, indexKey); + if (blob == null) { + log.error("{} not found in container {}", indexKey, bucket); + throw new BKException.BKNoSuchLedgerExistsException(); + } offloaderStats.recordReadOffloadIndexLatency(topicName, System.nanoTime() - readIndexStartTime, TimeUnit.NANOSECONDS); versionCheck.check(indexKey, blob); diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java index 53d96e08abf5e..502f475174cee 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java @@ -49,6 +49,7 @@ import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.naming.TopicName; import org.jclouds.blobstore.BlobStore; +import org.jclouds.blobstore.KeyNotFoundException; import org.jclouds.blobstore.domain.Blob; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -224,7 +225,11 @@ public CompletableFuture readAsync(long firstEntry, long lastEntr } } } catch (Throwable t) { - promise.completeExceptionally(t); + if (t instanceof KeyNotFoundException) { + promise.completeExceptionally(new BKException.BKNoSuchLedgerExistsException()); + } else { + promise.completeExceptionally(t); + } entries.forEach(LedgerEntry::close); } @@ -303,7 +308,7 @@ public static ReadHandle open(ScheduledExecutorService executor, VersionCheck versionCheck, long ledgerId, int readBufferSize, LedgerOffloaderStats offloaderStats, String managedLedgerName) - throws IOException { + throws IOException, BKException.BKNoSuchLedgerExistsException { List inputStreams = new LinkedList<>(); List indice = new LinkedList<>(); String topicName = TopicName.fromPersistenceNamingEncoding(managedLedgerName); @@ -313,6 +318,10 @@ public static ReadHandle open(ScheduledExecutorService executor, log.debug("open bucket: {} index key: {}", bucket, indexKey); long startTime = System.nanoTime(); Blob blob = blobStore.getBlob(bucket, indexKey); + if (blob == null) { + log.error("{} not found in container {}", indexKey, bucket); + throw new BKException.BKNoSuchLedgerExistsException(); + } offloaderStats.recordReadOffloadIndexLatency(topicName, System.nanoTime() - startTime, TimeUnit.NANOSECONDS); log.debug("indexKey blob: {} {}", indexKey, blob); diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/BlobStoreBackedInputStreamTest.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/BlobStoreBackedInputStreamTest.java index 775310925a1a3..3e5c4b609dfec 100644 --- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/BlobStoreBackedInputStreamTest.java +++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/BlobStoreBackedInputStreamTest.java @@ -32,6 +32,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.offload.jcloud.impl.BlobStoreBackedInputStreamImpl; import org.jclouds.blobstore.BlobStore; +import org.jclouds.blobstore.KeyNotFoundException; import org.jclouds.blobstore.domain.Blob; import org.jclouds.io.Payload; import org.jclouds.io.Payloads; @@ -142,8 +143,8 @@ public void testReadingFullObjectByBytes() throws Exception { assertStreamsMatchByBytes(toTest, toCompare); } - @Test(expectedExceptions = IOException.class) - public void testErrorOnRead() throws Exception { + @Test(expectedExceptions = KeyNotFoundException.class) + public void testNotFoundOnRead() throws Exception { BackedInputStream toTest = new BlobStoreBackedInputStreamImpl(blobStore, BUCKET, "doesn't exist", (key, md) -> {}, 1234, 1000); diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderStreamingTest.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderStreamingTest.java index 9056281a308f2..ad1529072f813 100644 --- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderStreamingTest.java +++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderStreamingTest.java @@ -18,16 +18,19 @@ */ package org.apache.bookkeeper.mledger.offload.jcloud.impl; +import static org.apache.bookkeeper.client.api.BKException.Code.NoSuchLedgerExistsException; import static org.mockito.AdditionalAnswers.delegatesTo; import static org.mockito.Mockito.mock; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.fail; import java.io.IOException; import java.util.HashMap; import java.util.LinkedList; import java.util.Map; import java.util.Random; import java.util.UUID; +import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.api.LedgerEntries; import org.apache.bookkeeper.client.api.LedgerEntry; import org.apache.bookkeeper.client.api.ReadHandle; @@ -445,4 +448,55 @@ public void testInvalidEntryIds() throws Exception { } catch (Exception e) { } } + + @Test + public void testReadNotExistLedger() throws Exception { + LedgerOffloader offloader = getOffloader(new HashMap() {{ + put(TieredStorageConfiguration.MAX_OFFLOAD_SEGMENT_SIZE_IN_BYTES, "1000"); + put(config.getKeys(TieredStorageConfiguration.METADATA_FIELD_MAX_BLOCK_SIZE).get(0), "5242880"); + put(TieredStorageConfiguration.MAX_OFFLOAD_SEGMENT_ROLLOVER_TIME_SEC, "600"); + }}); + ManagedLedger ml = createMockManagedLedger(); + UUID uuid = UUID.randomUUID(); + long beginLedger = 0; + long beginEntry = 0; + + Map driverMeta = new HashMap() {{ + put(TieredStorageConfiguration.METADATA_FIELD_BUCKET, BUCKET); + }}; + OffloadHandle offloadHandle = offloader + .streamingOffload(ml, uuid, beginLedger, beginEntry, driverMeta).get(); + + // Segment should closed because size in bytes full + final LinkedList entries = new LinkedList<>(); + for (int i = 0; i < 10; i++) { + final byte[] data = new byte[100]; + random.nextBytes(data); + final EntryImpl entry = EntryImpl.create(0, i, data); + offloadHandle.offerEntry(entry); + entries.add(entry); + } + + final LedgerOffloader.OffloadResult offloadResult = offloadHandle.getOffloadResultAsync().get(); + assertEquals(offloadResult.endLedger, 0); + assertEquals(offloadResult.endEntry, 9); + final OffloadContext.Builder contextBuilder = OffloadContext.newBuilder(); + contextBuilder.addOffloadSegment( + MLDataFormats.OffloadSegment.newBuilder() + .setUidLsb(uuid.getLeastSignificantBits()) + .setUidMsb(uuid.getMostSignificantBits()) + .setComplete(true).setEndEntryId(9).build()); + + final ReadHandle readHandle = offloader.readOffloaded(0, contextBuilder.build(), driverMeta).get(); + + // delete blob(ledger) + blobStore.removeBlob(BUCKET, uuid.toString()); + + try { + readHandle.read(0, 9); + fail("Should be read fail"); + } catch (BKException e) { + assertEquals(e.getCode(), NoSuchLedgerExistsException); + } + } } diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java index ac87a8e424038..6a66c19f68aeb 100644 --- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java +++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java @@ -18,6 +18,7 @@ */ package org.apache.bookkeeper.mledger.offload.jcloud.impl; +import static org.apache.bookkeeper.client.api.BKException.Code.NoSuchLedgerExistsException; import static org.mockito.AdditionalAnswers.delegatesTo; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; @@ -26,6 +27,7 @@ import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -591,4 +593,25 @@ public void testReadWithAClosedLedgerHandler() throws Exception { throw e; } } + + @Test + public void testReadNotExistLedger() throws Exception { + ReadHandle toWrite = buildReadHandle(DEFAULT_BLOCK_SIZE, 3); + LedgerOffloader offloader = getOffloader(); + + UUID uuid = UUID.randomUUID(); + offloader.offload(toWrite, uuid, new HashMap<>()).get(); + ReadHandle offloadRead = offloader.readOffloaded(toWrite.getId(), uuid, Collections.emptyMap()).get(); + assertEquals(offloadRead.getLastAddConfirmed(), toWrite.getLastAddConfirmed()); + + // delete blob(ledger) + blobStore.removeBlob(BUCKET, DataBlockUtils.dataBlockOffloadKey(toWrite.getId(), uuid)); + + try { + offloadRead.read(0, offloadRead.getLastAddConfirmed()); + fail("Should be read fail"); + } catch (BKException e) { + assertEquals(e.getCode(), NoSuchLedgerExistsException); + } + } }