Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDDS-11658. Skip known tombstones when scanning rocksdb deletedtable in KeyDeletingService #7436

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public void testKeysPurgingByKeyDeletingService() throws Exception {
GenericTestUtils.waitFor(
() -> {
try {
return keyManager.getPendingDeletionKeys(Integer.MAX_VALUE)
return keyManager.getPendingDeletionKeys(Integer.MAX_VALUE, "")
.getKeyBlocksList().size() == 0;
} catch (IOException e) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ private KeyDeletingService getMockedKeyDeletingService(AtomicBoolean keyDeletion
keyDeletingService.shutdown();
GenericTestUtils.waitFor(() -> keyDeletingService.getThreadCount() == 0, 1000,
100000);
when(keyManager.getPendingDeletionKeys(anyInt())).thenAnswer(i -> {
when(keyManager.getPendingDeletionKeys(anyInt(), "")).thenAnswer(i -> {
// wait for SDS to reach the KDS wait block before processing any key.
GenericTestUtils.waitFor(keyDeletionWaitStarted::get, 1000, 100000);
keyDeletionStarted.set(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ ListKeysResult listKeys(String volumeName, String bucketName, String startKey,
* and a hashmap for key-value pair to be updated in the deletedTable.
* @throws IOException
*/
PendingKeysDeletion getPendingDeletionKeys(int count) throws IOException;
PendingKeysDeletion getPendingDeletionKeys(int count, String startKey)
throws IOException;

/**
* Returns a list rename entries from the snapshotRenamedTable.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -658,12 +658,13 @@ public ListKeysResult listKeys(String volumeName, String bucketName,
}

@Override
public PendingKeysDeletion getPendingDeletionKeys(final int count)
public PendingKeysDeletion getPendingDeletionKeys(final int count,
final String startKey)
throws IOException {
OmMetadataManagerImpl omMetadataManager =
(OmMetadataManagerImpl) metadataManager;
return omMetadataManager
.getPendingDeletionKeys(count, ozoneManager.getOmSnapshotManager());
return omMetadataManager.getPendingDeletionKeys(count, startKey,
ozoneManager.getOmSnapshotManager());
}

private <V, R> List<Table.KeyValue<String, R>> getTableEntries(String startKey,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1592,17 +1592,22 @@ private PersistedUserVolumeInfo getVolumesByUser(String userNameKey)
* @throws IOException
*/
public PendingKeysDeletion getPendingDeletionKeys(final int keyCount,
OmSnapshotManager omSnapshotManager)
final String startKey, OmSnapshotManager omSnapshotManager)
throws IOException {
List<BlockGroup> keyBlocksList = Lists.newArrayList();
HashMap<String, RepeatedOmKeyInfo> keysToModify = new HashMap<>();
String lastKey = "";
try (TableIterator<String, ? extends KeyValue<String, RepeatedOmKeyInfo>>
keyIter = getDeletedTable().iterator()) {
if (!Strings.isNullOrEmpty(startKey)) {
keyIter.seek(startKey);
}
int currentCount = 0;
while (keyIter.hasNext() && currentCount < keyCount) {
RepeatedOmKeyInfo notReclaimableKeyInfo = new RepeatedOmKeyInfo();
KeyValue<String, RepeatedOmKeyInfo> kv = keyIter.next();
if (kv != null) {
lastKey = kv.getKey();
List<BlockGroup> blockGroupList = Lists.newArrayList();
// Get volume name and bucket name
String[] keySplit = kv.getKey().split(OM_KEY_PREFIX);
Expand Down Expand Up @@ -1723,7 +1728,7 @@ public PendingKeysDeletion getPendingDeletionKeys(final int keyCount,
}
}
}
return new PendingKeysDeletion(keyBlocksList, keysToModify);
return new PendingKeysDeletion(keyBlocksList, keysToModify, lastKey);
}

private boolean versionExistsInPreviousSnapshot(OmKeyInfo omKeyInfo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,18 @@ public class PendingKeysDeletion {

private HashMap<String, RepeatedOmKeyInfo> keysToModify;
private List<BlockGroup> keyBlocksList;
private String lastKey;

public PendingKeysDeletion(List<BlockGroup> keyBlocksList,
HashMap<String, RepeatedOmKeyInfo> keysToModify) {
HashMap<String, RepeatedOmKeyInfo> keysToModify,
String lastKey) {
this.keysToModify = keysToModify;
this.keyBlocksList = keyBlocksList;
this.lastKey = lastKey;
}

public String getLastKey() {
return lastKey;
}

public HashMap<String, RepeatedOmKeyInfo> getKeysToModify() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ public class KeyDeletingService extends AbstractKeyDeletingService {
private final boolean deepCleanSnapshots;
private final SnapshotChainManager snapshotChainManager;

private String lastScanKey = "";

public KeyDeletingService(OzoneManager ozoneManager,
ScmBlockLocationProtocol scmClient,
KeyManager manager, long serviceInterval,
Expand Down Expand Up @@ -214,18 +216,22 @@ public BackgroundTaskResult call() {
// snapshotId since AOS could process multiple buckets in one iteration.
UUID expectedPreviousSnapshotId = snapshotChainManager.getLatestGlobalSnapshotId();
PendingKeysDeletion pendingKeysDeletion = manager
.getPendingDeletionKeys(getKeyLimitPerTask());
.getPendingDeletionKeys(getKeyLimitPerTask(), lastScanKey);
List<BlockGroup> keyBlocksList = pendingKeysDeletion
.getKeyBlocksList();
if (keyBlocksList != null && !keyBlocksList.isEmpty()) {
lastScanKey = pendingKeysDeletion.getLastKey();
delCount = processKeyDeletes(keyBlocksList,
getOzoneManager().getKeyManager(),
pendingKeysDeletion.getKeysToModify(), null, expectedPreviousSnapshotId);
deletedKeyCount.addAndGet(delCount);
} else {
lastScanKey = "";
}
} catch (IOException e) {
LOG.error("Error while running delete keys background task. Will " +
"retry at next run.", e);
lastScanKey = "";
}

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_DELETING_SERVICE_INTERVAL;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_DEEP_CLEANING_ENABLED;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -144,6 +145,7 @@ private void createConfig(File testDir) {
conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL,
200, TimeUnit.MILLISECONDS);
conf.setBoolean(OZONE_SNAPSHOT_DEEP_CLEANING_ENABLED, true);
conf.setInt(OZONE_KEY_DELETING_LIMIT_PER_TASK, 10);
conf.setQuietMode(false);
}

Expand Down Expand Up @@ -203,7 +205,7 @@ void checkIfDeleteServiceIsDeletingKeys()
() -> getDeletedKeyCount() >= initialDeletedCount + keyCount,
100, 10000);
assertThat(getRunCount()).isGreaterThan(initialRunCount);
assertThat(keyManager.getPendingDeletionKeys(Integer.MAX_VALUE).getKeyBlocksList())
assertThat(keyManager.getPendingDeletionKeys(Integer.MAX_VALUE, "").getKeyBlocksList())
.isEmpty();
}

Expand Down Expand Up @@ -232,7 +234,7 @@ void checkDeletionForKeysWithMultipleVersions() throws Exception {
1000, 10000);
assertThat(getRunCount())
.isGreaterThan(initialRunCount);
assertThat(keyManager.getPendingDeletionKeys(Integer.MAX_VALUE).getKeyBlocksList())
assertThat(keyManager.getPendingDeletionKeys(Integer.MAX_VALUE, "").getKeyBlocksList())
.isEmpty();

// The 1st version of the key has 1 block and the 2nd version has 2
Expand Down Expand Up @@ -274,7 +276,7 @@ void checkDeletedTableCleanUpForSnapshot() throws Exception {
1000, 10000);
assertThat(getRunCount())
.isGreaterThan(initialRunCount);
assertThat(keyManager.getPendingDeletionKeys(Integer.MAX_VALUE).getKeyBlocksList())
assertThat(keyManager.getPendingDeletionKeys(Integer.MAX_VALUE, "").getKeyBlocksList())
.isEmpty();

// deletedTable should have deleted key of the snapshot bucket
Expand Down Expand Up @@ -381,7 +383,7 @@ public void testAOSKeyDeletingWithSnapshotCreateParallelExecution()
Assertions.assertNotEquals(deletePathKey[0], group.getGroupID());
}
return pendingKeysDeletion;
}).when(km).getPendingDeletionKeys(anyInt());
}).when(km).getPendingDeletionKeys(anyInt(), "");
service.runPeriodicalTaskNow();
service.runPeriodicalTaskNow();
assertTableRowCount(snapshotInfoTable, initialSnapshotCount + 2, metadataManager);
Expand Down Expand Up @@ -882,7 +884,7 @@ private long getRunCount() {

private int countKeysPendingDeletion() {
try {
final int count = keyManager.getPendingDeletionKeys(Integer.MAX_VALUE)
final int count = keyManager.getPendingDeletionKeys(Integer.MAX_VALUE, "")
.getKeyBlocksList().size();
LOG.debug("KeyManager keys pending deletion: {}", count);
return count;
Expand All @@ -893,7 +895,7 @@ private int countKeysPendingDeletion() {

private long countBlocksPendingDeletion() {
try {
return keyManager.getPendingDeletionKeys(Integer.MAX_VALUE)
return keyManager.getPendingDeletionKeys(Integer.MAX_VALUE, "")
.getKeyBlocksList()
.stream()
.map(BlockGroup::getBlockIDList)
Expand Down
Loading