Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Commit

Permalink
[ASTS] Add getSweepableBucket method
Browse files Browse the repository at this point in the history
  • Loading branch information
mdaudali committed Nov 12, 2024
1 parent 26515f9 commit b9e71ca
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,15 @@ public Set<SweepableBucket> getSweepableBuckets(Set<Bucket> startBuckets) {
return readSweepableBucketRows(rows);
}

@Override
public Optional<SweepableBucket> getSweepableBucket(Bucket bucket) {
Cell cell = SweepAssignedBucketStoreKeyPersister.INSTANCE.sweepBucketsCell(bucket);
return readCell(
cell,
value -> SweepAssignedBucketStoreKeyPersister.INSTANCE.fromSweepBucketCellAndValue(
cell, value, timestampRangePersister));
}

@Override
public void putTimestampRangeForBucket(
Bucket bucket, Optional<TimestampRange> oldTimestampRange, TimestampRange newTimestampRange) {
Expand Down Expand Up @@ -288,7 +297,7 @@ private Set<SweepableBucket> readSweepableBucketRows(List<SweepAssignedBucketsRo
Long.MAX_VALUE);
return reads.entrySet().stream()
.map(entry -> SweepAssignedBucketStoreKeyPersister.INSTANCE.fromSweepBucketCellAndValue(
entry.getKey(), entry.getValue(), timestampRangePersister))
entry.getKey(), entry.getValue().getContents(), timestampRangePersister))
.collect(Collectors.toSet());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package com.palantir.atlasdb.sweep.asts.bucketingthings;

import com.palantir.atlasdb.keyvalue.api.Cell;
import com.palantir.atlasdb.keyvalue.api.Value;
import com.palantir.atlasdb.schema.generated.SweepAssignedBucketsTable.SweepAssignedBucketsColumn;
import com.palantir.atlasdb.schema.generated.SweepAssignedBucketsTable.SweepAssignedBucketsRow;
import com.palantir.atlasdb.sweep.asts.Bucket;
Expand Down Expand Up @@ -76,11 +75,11 @@ SweepAssignedBucketsRow nextSweepBucketsRow(Bucket bucket) {
}

SweepableBucket fromSweepBucketCellAndValue(
Cell cell, Value value, ObjectPersister<TimestampRange> timestampRangePersister) {
Cell cell, byte[] value, ObjectPersister<TimestampRange> timestampRangePersister) {
SweepAssignedBucketsRow row = SweepAssignedBucketsRow.BYTES_HYDRATOR.hydrateFromBytes(cell.getRowName());
SweepAssignedBucketsColumn column =
SweepAssignedBucketsColumn.BYTES_HYDRATOR.hydrateFromBytes(cell.getColumnName());
TimestampRange timestampRange = timestampRangePersister.tryDeserialize(value.getContents());
TimestampRange timestampRange = timestampRangePersister.tryDeserialize(value);
int shard = Math.toIntExact(row.getShard()); // throws if invalid shard
return SweepableBucket.of(
Bucket.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ public interface SweepBucketsTable {
*/
Set<SweepableBucket> getSweepableBuckets(Set<Bucket> startBuckets);

Optional<SweepableBucket> getSweepableBucket(Bucket bucket);

void putTimestampRangeForBucket(
Bucket bucket, Optional<TimestampRange> oldTimestampRange, TimestampRange newTimestampRange);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import com.google.common.io.BaseEncoding;
import com.palantir.atlasdb.keyvalue.api.Cell;
import com.palantir.atlasdb.keyvalue.api.Value;
import com.palantir.atlasdb.schema.generated.SweepAssignedBucketsTable.SweepAssignedBucketsRow;
import com.palantir.atlasdb.sweep.asts.Bucket;
import com.palantir.atlasdb.sweep.asts.SweepableBucket;
Expand Down Expand Up @@ -59,14 +58,11 @@ public final class SweepAssignedBucketStoreKeyPersisterTest {
Bucket.of(ShardAndStrategy.nonSweepable(), Long.MAX_VALUE),
fromBase64("PpljSwhiGMuA/4FHrhR64UeuAg==", "hw=="));

private static final Map<TimestampRange, Value> GOLDEN_TIMESTAMP_RANGES = Map.of(
private static final Map<TimestampRange, byte[]> GOLDEN_TIMESTAMP_RANGES = Map.of(
TimestampRange.of(912301923, Long.MAX_VALUE),
Value.create(
BaseEncoding.base64()
.decode("OikKBfqLZW5kRXhjbHVzaXZlJQN/f39/f39/f76Nc3RhcnRJbmNsdXNpdmUkDUwJe4b7"),
-1),
BaseEncoding.base64().decode("OikKBfqLZW5kRXhjbHVzaXZlJQN/f39/f39/f76Nc3RhcnRJbmNsdXNpdmUkDUwJe4b7"),
TimestampRange.openBucket(13123),
Value.create(BaseEncoding.base64().decode("OikKBfqLZW5kRXhjbHVzaXZlwY1zdGFydEluY2x1c2l2ZSQDGob7"), -1));
BaseEncoding.base64().decode("OikKBfqLZW5kRXhjbHVzaXZlwY1zdGFydEluY2x1c2l2ZSQDGob7"));

private static final Cell GOLDEN_SWEEP_BUCKET_ASSIGNER_STATE_MACHINE_CELL = Cell.create(
BaseEncoding.base64().decode("GTH5RX8BW6N/f/8="),
Expand Down Expand Up @@ -127,9 +123,7 @@ public void nextSweepBucketsRowGetsNextRowByMajorBucketIdentifier(Bucket bucket)
@ParameterizedTest
@MethodSource("sweepableBuckets")
public void canDeserializeCellsAndValuesBackToSweepableBucket(SweepableBucket sweepableBucket) {
Value value = Value.create(
TIMESTAMP_RANGE_PERSISTER.trySerialize(sweepableBucket.timestampRange()),
-1); // Timestamp does not matter
byte[] value = TIMESTAMP_RANGE_PERSISTER.trySerialize(sweepableBucket.timestampRange());
Cell cell = SweepAssignedBucketStoreKeyPersister.INSTANCE.sweepBucketsCell(sweepableBucket.bucket());
SweepableBucket deserialisedSweepableBucket =
SweepAssignedBucketStoreKeyPersister.INSTANCE.fromSweepBucketCellAndValue(
Expand All @@ -141,7 +135,7 @@ public void canDeserializeCellsAndValuesBackToSweepableBucket(SweepableBucket sw
@ParameterizedTest
@MethodSource("goldenSweepBucketCellsAndValues")
public void canDeserializeHistoricCellsAndValuesBackToSweepableBucket(
SweepableBucket sweepableBucket, Cell cell, Value value) {
SweepableBucket sweepableBucket, Cell cell, byte[] value) {
SweepableBucket deserialisedSweepableBucket =
SweepAssignedBucketStoreKeyPersister.INSTANCE.fromSweepBucketCellAndValue(
cell, value, TIMESTAMP_RANGE_PERSISTER);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,20 @@ public void getSweepableBucketsReturnsBucketsPerShardAndStrategy() {
assertThat(store.getSweepableBuckets(new HashSet<>(startBuckets))).containsExactlyInAnyOrderElementsOf(buckets);
}

@Test
public void getSweepableBucketReturnsSweepableBucketIfExists() {
Bucket bucket = Bucket.of(ShardAndStrategy.of(12, SweeperStrategy.THOROUGH), 21);
TimestampRange range = TimestampRange.of(1, 4);
store.putTimestampRangeForBucket(bucket, Optional.empty(), range);
assertThat(store.getSweepableBucket(bucket)).contains(SweepableBucket.of(bucket, range));
}

@Test
public void getSweepableBucketReturnsEmptyIfSweepableBucketDoesNotExist() {
Bucket bucket = Bucket.of(ShardAndStrategy.of(12, SweeperStrategy.THOROUGH), 21);
assertThat(store.getSweepableBucket(bucket)).isEmpty();
}

@Test
public void putTimestampRangeForBucketFailsIfOldTimestampRangeDoesNotMatchCurrent() {
Bucket bucket = Bucket.of(ShardAndStrategy.of(12, SweeperStrategy.THOROUGH), 512);
Expand All @@ -272,8 +286,7 @@ public void putTimestampRangeForBucketSucceedsIfOldTimestampRangeMatchesCurrent(
TimestampRange newTimestampRange = TimestampRange.of(1, 2);

store.putTimestampRangeForBucket(bucket, Optional.empty(), newTimestampRange);
Set<SweepableBucket> sweepableBuckets = store.getSweepableBuckets(Set.of(bucket));
assertThat(sweepableBuckets).containsExactly(SweepableBucket.of(bucket, newTimestampRange));
assertThat(store.getSweepableBucket(bucket)).contains(SweepableBucket.of(bucket, newTimestampRange));
}

@Test
Expand All @@ -287,8 +300,10 @@ public void deleteBucketEntryDeletesBucket() {
Bucket bucket = Bucket.of(ShardAndStrategy.of(12, SweeperStrategy.THOROUGH), 512);
TimestampRange timestampRange = TimestampRange.of(1, 2);
store.putTimestampRangeForBucket(bucket, Optional.empty(), timestampRange);
assertThat(store.getSweepableBucket(bucket)).hasValue(SweepableBucket.of(bucket, timestampRange));

store.deleteBucketEntry(bucket);
assertThat(store.getSweepableBuckets(Set.of(bucket))).isEmpty();
assertThat(store.getSweepableBucket(bucket)).isEmpty();
}

@Test
Expand Down

0 comments on commit b9e71ca

Please sign in to comment.