Skip to content

Commit

Permalink
[improve][offload] Apply autoSkipNonRecoverableData configuration to …
Browse files Browse the repository at this point in the history
…tiered storage (apache#22531)

(cherry picked from commit fbf4cb7)
(cherry picked from commit ff8d3b7)
  • Loading branch information
shibd authored and mukesh-ctds committed Apr 19, 2024
1 parent fbfcfa6 commit 7ce34fe
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.naming.TopicName;
import org.jclouds.blobstore.BlobStore;
import org.jclouds.blobstore.KeyNotFoundException;
import org.jclouds.blobstore.domain.Blob;
import org.jclouds.blobstore.options.GetOptions;
import org.slf4j.Logger;
Expand Down Expand Up @@ -95,6 +96,9 @@ private boolean refillBufferIfNeeded() throws IOException {
try {
long startReadTime = System.nanoTime();
Blob blob = blobStore.getBlob(bucket, key, new GetOptions().range(startRange, endRange));
if (blob == null) {
throw new KeyNotFoundException(bucket, key, "");
}
versionCheck.check(key, blob);

try (InputStream stream = blob.getPayload().openStream()) {
Expand All @@ -121,6 +125,10 @@ private boolean refillBufferIfNeeded() throws IOException {
if (null != this.offloaderStats) {
this.offloaderStats.recordReadOffloadError(this.topicName);
}
// If the blob is not found, the original exception is thrown and handled by the caller.
if (e instanceof KeyNotFoundException) {
throw e;
}
throw new IOException("Error reading from BlobStore", e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.naming.TopicName;
import org.jclouds.blobstore.BlobStore;
import org.jclouds.blobstore.KeyNotFoundException;
import org.jclouds.blobstore.domain.Blob;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -206,7 +207,11 @@ public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntr
} catch (Throwable t) {
log.error("Failed to read entries {} - {} from the offloader in ledger {}",
firstEntry, lastEntry, ledgerId, t);
promise.completeExceptionally(t);
if (t instanceof KeyNotFoundException) {
promise.completeExceptionally(new BKException.BKNoSuchLedgerExistsException());
} else {
promise.completeExceptionally(t);
}
entries.forEach(LedgerEntry::close);
}
});
Expand Down Expand Up @@ -270,7 +275,7 @@ public static ReadHandle open(ScheduledExecutorService executor,
VersionCheck versionCheck,
long ledgerId, int readBufferSize,
LedgerOffloaderStats offloaderStats, String managedLedgerName)
throws IOException {
throws IOException, BKException.BKNoSuchLedgerExistsException {
int retryCount = 3;
OffloadIndexBlock index = null;
IOException lastException = null;
Expand All @@ -283,6 +288,10 @@ public static ReadHandle open(ScheduledExecutorService executor,
while (retryCount-- > 0) {
long readIndexStartTime = System.nanoTime();
Blob blob = blobStore.getBlob(bucket, indexKey);
if (blob == null) {
log.error("{} not found in container {}", indexKey, bucket);
throw new BKException.BKNoSuchLedgerExistsException();
}
offloaderStats.recordReadOffloadIndexLatency(topicName,
System.nanoTime() - readIndexStartTime, TimeUnit.NANOSECONDS);
versionCheck.check(indexKey, blob);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.naming.TopicName;
import org.jclouds.blobstore.BlobStore;
import org.jclouds.blobstore.KeyNotFoundException;
import org.jclouds.blobstore.domain.Blob;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -224,7 +225,11 @@ public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntr
}
}
} catch (Throwable t) {
promise.completeExceptionally(t);
if (t instanceof KeyNotFoundException) {
promise.completeExceptionally(new BKException.BKNoSuchLedgerExistsException());
} else {
promise.completeExceptionally(t);
}
entries.forEach(LedgerEntry::close);
}

Expand Down Expand Up @@ -303,7 +308,7 @@ public static ReadHandle open(ScheduledExecutorService executor,
VersionCheck versionCheck,
long ledgerId, int readBufferSize, LedgerOffloaderStats offloaderStats,
String managedLedgerName)
throws IOException {
throws IOException, BKException.BKNoSuchLedgerExistsException {
List<BackedInputStream> inputStreams = new LinkedList<>();
List<OffloadIndexBlockV2> indice = new LinkedList<>();
String topicName = TopicName.fromPersistenceNamingEncoding(managedLedgerName);
Expand All @@ -313,6 +318,10 @@ public static ReadHandle open(ScheduledExecutorService executor,
log.debug("open bucket: {} index key: {}", bucket, indexKey);
long startTime = System.nanoTime();
Blob blob = blobStore.getBlob(bucket, indexKey);
if (blob == null) {
log.error("{} not found in container {}", indexKey, bucket);
throw new BKException.BKNoSuchLedgerExistsException();
}
offloaderStats.recordReadOffloadIndexLatency(topicName,
System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
log.debug("indexKey blob: {} {}", indexKey, blob);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.offload.jcloud.impl.BlobStoreBackedInputStreamImpl;
import org.jclouds.blobstore.BlobStore;
import org.jclouds.blobstore.KeyNotFoundException;
import org.jclouds.blobstore.domain.Blob;
import org.jclouds.io.Payload;
import org.jclouds.io.Payloads;
Expand Down Expand Up @@ -142,8 +143,8 @@ public void testReadingFullObjectByBytes() throws Exception {
assertStreamsMatchByBytes(toTest, toCompare);
}

@Test(expectedExceptions = IOException.class)
public void testErrorOnRead() throws Exception {
@Test(expectedExceptions = KeyNotFoundException.class)
public void testNotFoundOnRead() throws Exception {
BackedInputStream toTest = new BlobStoreBackedInputStreamImpl(blobStore, BUCKET, "doesn't exist",
(key, md) -> {},
1234, 1000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,19 @@
*/
package org.apache.bookkeeper.mledger.offload.jcloud.impl;

import static org.apache.bookkeeper.client.api.BKException.Code.NoSuchLedgerExistsException;
import static org.mockito.AdditionalAnswers.delegatesTo;
import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.fail;
import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.api.LedgerEntries;
import org.apache.bookkeeper.client.api.LedgerEntry;
import org.apache.bookkeeper.client.api.ReadHandle;
Expand Down Expand Up @@ -445,4 +448,55 @@ public void testInvalidEntryIds() throws Exception {
} catch (Exception e) {
}
}

@Test
public void testReadNotExistLedger() throws Exception {
LedgerOffloader offloader = getOffloader(new HashMap<String, String>() {{
put(TieredStorageConfiguration.MAX_OFFLOAD_SEGMENT_SIZE_IN_BYTES, "1000");
put(config.getKeys(TieredStorageConfiguration.METADATA_FIELD_MAX_BLOCK_SIZE).get(0), "5242880");
put(TieredStorageConfiguration.MAX_OFFLOAD_SEGMENT_ROLLOVER_TIME_SEC, "600");
}});
ManagedLedger ml = createMockManagedLedger();
UUID uuid = UUID.randomUUID();
long beginLedger = 0;
long beginEntry = 0;

Map<String, String> driverMeta = new HashMap<String, String>() {{
put(TieredStorageConfiguration.METADATA_FIELD_BUCKET, BUCKET);
}};
OffloadHandle offloadHandle = offloader
.streamingOffload(ml, uuid, beginLedger, beginEntry, driverMeta).get();

// Segment should closed because size in bytes full
final LinkedList<Entry> entries = new LinkedList<>();
for (int i = 0; i < 10; i++) {
final byte[] data = new byte[100];
random.nextBytes(data);
final EntryImpl entry = EntryImpl.create(0, i, data);
offloadHandle.offerEntry(entry);
entries.add(entry);
}

final LedgerOffloader.OffloadResult offloadResult = offloadHandle.getOffloadResultAsync().get();
assertEquals(offloadResult.endLedger, 0);
assertEquals(offloadResult.endEntry, 9);
final OffloadContext.Builder contextBuilder = OffloadContext.newBuilder();
contextBuilder.addOffloadSegment(
MLDataFormats.OffloadSegment.newBuilder()
.setUidLsb(uuid.getLeastSignificantBits())
.setUidMsb(uuid.getMostSignificantBits())
.setComplete(true).setEndEntryId(9).build());

final ReadHandle readHandle = offloader.readOffloaded(0, contextBuilder.build(), driverMeta).get();

// delete blob(ledger)
blobStore.removeBlob(BUCKET, uuid.toString());

try {
readHandle.read(0, 9);
fail("Should be read fail");
} catch (BKException e) {
assertEquals(e.getCode(), NoSuchLedgerExistsException);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.bookkeeper.mledger.offload.jcloud.impl;

import static org.apache.bookkeeper.client.api.BKException.Code.NoSuchLedgerExistsException;
import static org.mockito.AdditionalAnswers.delegatesTo;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
Expand All @@ -26,6 +27,7 @@
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -591,4 +593,25 @@ public void testReadWithAClosedLedgerHandler() throws Exception {
throw e;
}
}

@Test
public void testReadNotExistLedger() throws Exception {
ReadHandle toWrite = buildReadHandle(DEFAULT_BLOCK_SIZE, 3);
LedgerOffloader offloader = getOffloader();

UUID uuid = UUID.randomUUID();
offloader.offload(toWrite, uuid, new HashMap<>()).get();
ReadHandle offloadRead = offloader.readOffloaded(toWrite.getId(), uuid, Collections.emptyMap()).get();
assertEquals(offloadRead.getLastAddConfirmed(), toWrite.getLastAddConfirmed());

// delete blob(ledger)
blobStore.removeBlob(BUCKET, DataBlockUtils.dataBlockOffloadKey(toWrite.getId(), uuid));

try {
offloadRead.read(0, offloadRead.getLastAddConfirmed());
fail("Should be read fail");
} catch (BKException e) {
assertEquals(e.getCode(), NoSuchLedgerExistsException);
}
}
}

0 comments on commit 7ce34fe

Please sign in to comment.