From e5fd7e8ff2829726d394de19eaa0dfbdde8c4c94 Mon Sep 17 00:00:00 2001 From: Jun Nemoto <35618893+jnmt@users.noreply.github.com> Date: Wed, 3 Jul 2024 01:10:17 +0900 Subject: [PATCH] Backport to branch(3.11) : Fix snapshot management issues (#2023) --- .../consensuscommit/CrudHandler.java | 102 ++--- .../transaction/consensuscommit/Snapshot.java | 97 ++--- .../consensuscommit/CrudHandlerTest.java | 285 +++++++++----- .../consensuscommit/SnapshotTest.java | 352 ++++++++++-------- ...CommitNullMetadataIntegrationTestBase.java | 5 +- ...nsusCommitSpecificIntegrationTestBase.java | 97 ++++- ...nsusCommitSpecificIntegrationTestBase.java | 5 +- 7 files changed, 613 insertions(+), 330 deletions(-) diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CrudHandler.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CrudHandler.java index 5fb651868..973667bd2 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CrudHandler.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CrudHandler.java @@ -12,6 +12,7 @@ import com.scalar.db.api.Result; import com.scalar.db.api.Scan; import com.scalar.db.api.Scanner; +import com.scalar.db.api.Selection; import com.scalar.db.api.TableMetadata; import com.scalar.db.exception.storage.ExecutionException; import com.scalar.db.exception.transaction.CrudException; @@ -19,8 +20,11 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.io.IOException; import java.util.ArrayList; +import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; +import java.util.Map; +import java.util.Map.Entry; import java.util.Optional; import java.util.stream.Collectors; import javax.annotation.concurrent.NotThreadSafe; @@ -68,35 +72,43 @@ public CrudHandler( this.parallelExecutor = parallelExecutor; } - public Optional get(Get get) throws CrudException { - List originalProjections = new ArrayList<>(get.getProjections()); + public Optional get(Get originalGet) throws CrudException { + List originalProjections = new ArrayList<>(originalGet.getProjections()); + Get get = (Get) prepareStorageSelection(originalGet); Snapshot.Key key = new Snapshot.Key(get); readUnread(key, get); - return createGetResult(key, originalProjections); + return createGetResult(key, get, originalProjections); } @VisibleForTesting void readUnread(Snapshot.Key key, Get get) throws CrudException { - if (!snapshot.containsKeyInReadSet(key)) { + if (!snapshot.containsKeyInGetSet(get)) { read(key, get); } } - private void read(Snapshot.Key key, Get get) throws CrudException { + // Although this class is not thread-safe, this method is actually thread-safe, so we call it + // concurrently in the implicit pre-read + @VisibleForTesting + void read(Snapshot.Key key, Get get) throws CrudException { Optional result = getFromStorage(get); if (!result.isPresent() || result.get().isCommitted()) { + // Keep the read set latest to create before image by using the latest record (result) + // because another conflicting transaction might have updated the record after this + // transaction read it first. snapshot.put(key, result); + snapshot.put(get, result); // for re-read and validation return; } throw new UncommittedRecordException( get, result.get(), "This record needs recovery", snapshot.getId()); } - private Optional createGetResult(Snapshot.Key key, List projections) + private Optional createGetResult(Snapshot.Key key, Get get, List projections) throws CrudException { TableMetadata metadata = getTableMetadata(key.getNamespace(), key.getTable()); return snapshot - .get(key) + .mergeResult(key, snapshot.get(get)) .map(r -> new FilteredResult(r, projections, metadata, isIncludeMetadataEnabled)); } @@ -115,20 +127,22 @@ public List scan(Scan scan) throws CrudException { return results; } - private List scanInternal(Scan scan) throws CrudException { - List originalProjections = new ArrayList<>(scan.getProjections()); + private List scanInternal(Scan originalScan) throws CrudException { + List originalProjections = new ArrayList<>(originalScan.getProjections()); + Scan scan = (Scan) prepareStorageSelection(originalScan); - List results = new ArrayList<>(); + Map results = new LinkedHashMap<>(); - Optional> keysInSnapshot = snapshot.get(scan); - if (keysInSnapshot.isPresent()) { - for (Snapshot.Key key : keysInSnapshot.get()) { - snapshot.get(key).ifPresent(results::add); + Optional> resultsInSnapshot = snapshot.get(scan); + if (resultsInSnapshot.isPresent()) { + for (Entry entry : resultsInSnapshot.get().entrySet()) { + snapshot + .mergeResult(entry.getKey(), Optional.of(entry.getValue())) + .ifPresent(result -> results.put(entry.getKey(), result)); } return createScanResults(scan, originalProjections, results); } - List keys = new ArrayList<>(); Scanner scanner = null; try { scanner = getFromStorage(scan); @@ -141,12 +155,12 @@ private List scanInternal(Scan scan) throws CrudException { Snapshot.Key key = new Snapshot.Key(scan, r); - if (!snapshot.containsKeyInReadSet(key)) { - snapshot.put(key, Optional.of(result)); - } + // We always update the read set to create before image by using the latest record (result) + // because another conflicting transaction might have updated the record after this + // transaction read it first. + snapshot.put(key, Optional.of(result)); - keys.add(key); - snapshot.get(key).ifPresent(results::add); + snapshot.mergeResult(key, Optional.of(result)).ifPresent(value -> results.put(key, value)); } } finally { if (scanner != null) { @@ -157,15 +171,16 @@ private List scanInternal(Scan scan) throws CrudException { } } } - snapshot.put(scan, keys); + snapshot.put(scan, results); return createScanResults(scan, originalProjections, results); } - private List createScanResults(Scan scan, List projections, List results) + private List createScanResults( + Scan scan, List projections, Map results) throws CrudException { TableMetadata metadata = getTableMetadata(scan.forNamespace().get(), scan.forTable().get()); - return results.stream() + return results.values().stream() .map(r -> new FilteredResult(r, projections, metadata, isIncludeMetadataEnabled)) .collect(Collectors.toList()); } @@ -182,8 +197,8 @@ public void put(Put put) throws CrudException { } if (put.getCondition().isPresent()) { - if (put.isImplicitPreReadEnabled()) { - readUnread(key, createGet(key)); + if (put.isImplicitPreReadEnabled() && !snapshot.containsKeyInReadSet(key)) { + read(key, createGet(key)); } mutationConditionsValidator.checkIfConditionIsSatisfied( put, snapshot.getFromReadSet(key).orElse(null)); @@ -196,7 +211,9 @@ public void delete(Delete delete) throws CrudException { Snapshot.Key key = new Snapshot.Key(delete); if (delete.getCondition().isPresent()) { - readUnread(key, createGet(key)); + if (!snapshot.containsKeyInReadSet(key)) { + read(key, createGet(key)); + } mutationConditionsValidator.checkIfConditionIsSatisfied( delete, snapshot.getFromReadSet(key).orElse(null)); } @@ -226,14 +243,14 @@ public void readIfImplicitPreReadEnabled() throws CrudException { } } - private Get createGet(Snapshot.Key key) { + private Get createGet(Snapshot.Key key) throws CrudException { GetBuilder.BuildableGet buildableGet = Get.newBuilder() .namespace(key.getNamespace()) .table(key.getTable()) .partitionKey(key.getPartitionKey()); key.getClusteringKey().ifPresent(buildableGet::clusteringKey); - return buildableGet.build(); + return (Get) prepareStorageSelection(buildableGet.build()); } // Although this class is not thread-safe, this method is actually thread-safe because the storage @@ -241,15 +258,6 @@ private Get createGet(Snapshot.Key key) { @VisibleForTesting Optional getFromStorage(Get get) throws CrudException { try { - get.clearProjections(); - // Retrieve only the after images columns when including the metadata is disabled, otherwise - // retrieve all the columns - if (!isIncludeMetadataEnabled) { - LinkedHashSet afterImageColumnNames = - tableMetadataManager.getTransactionTableMetadata(get).getAfterImageColumnNames(); - get.withProjections(afterImageColumnNames); - } - get.withConsistency(Consistency.LINEARIZABLE); return storage.get(get).map(TransactionResult::new); } catch (ExecutionException e) { throw new CrudException("Get failed", e, snapshot.getId()); @@ -258,18 +266,26 @@ Optional getFromStorage(Get get) throws CrudException { private Scanner getFromStorage(Scan scan) throws CrudException { try { - scan.clearProjections(); + return storage.scan(scan); + } catch (ExecutionException e) { + throw new CrudException("Scan failed", e, snapshot.getId()); + } + } + + private Selection prepareStorageSelection(Selection selection) throws CrudException { + try { + selection.clearProjections(); // Retrieve only the after images columns when including the metadata is disabled, otherwise // retrieve all the columns if (!isIncludeMetadataEnabled) { LinkedHashSet afterImageColumnNames = - tableMetadataManager.getTransactionTableMetadata(scan).getAfterImageColumnNames(); - scan.withProjections(afterImageColumnNames); + tableMetadataManager.getTransactionTableMetadata(selection).getAfterImageColumnNames(); + selection.withProjections(afterImageColumnNames); } - scan.withConsistency(Consistency.LINEARIZABLE); - return storage.scan(scan); + selection.withConsistency(Consistency.LINEARIZABLE); + return selection; } catch (ExecutionException e) { - throw new CrudException("Scan failed", e, snapshot.getId()); + throw new CrudException("Getting a table metadata failed", e, snapshot.getId()); } } diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/Snapshot.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/Snapshot.java index 60c2def56..9d53f5b38 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/Snapshot.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/Snapshot.java @@ -56,7 +56,8 @@ public class Snapshot { private final TransactionTableMetadataManager tableMetadataManager; private final ParallelExecutor parallelExecutor; private final ConcurrentMap> readSet; - private final Map> scanSet; + private final ConcurrentMap> getSet; + private final Map> scanSet; private final Map writeSet; private final Map deleteSet; @@ -72,6 +73,7 @@ public Snapshot( this.tableMetadataManager = tableMetadataManager; this.parallelExecutor = parallelExecutor; readSet = new ConcurrentHashMap<>(); + getSet = new ConcurrentHashMap<>(); scanSet = new HashMap<>(); writeSet = new HashMap<>(); deleteSet = new HashMap<>(); @@ -85,7 +87,8 @@ public Snapshot( TransactionTableMetadataManager tableMetadataManager, ParallelExecutor parallelExecutor, ConcurrentMap> readSet, - Map> scanSet, + ConcurrentMap> getSet, + Map> scanSet, Map writeSet, Map deleteSet) { this.id = id; @@ -94,6 +97,7 @@ public Snapshot( this.tableMetadataManager = tableMetadataManager; this.parallelExecutor = parallelExecutor; this.readSet = readSet; + this.getSet = getSet; this.scanSet = scanSet; this.writeSet = writeSet; this.deleteSet = deleteSet; @@ -116,8 +120,14 @@ public void put(Key key, Optional result) { readSet.put(key, result); } - public void put(Scan scan, List keys) { - scanSet.put(scan, keys); + // Although this class is not thread-safe, this method is actually thread-safe because the getSet + // is a concurrent map + public void put(Get get, Optional result) { + getSet.put(get, result); + } + + public void put(Scan scan, Map results) { + scanSet.put(scan, results); } public void put(Key key, Put put) { @@ -154,21 +164,18 @@ public List getDeletesInDeleteSet() { return new ArrayList<>(deleteSet.values()); } - public Optional get(Key key) throws CrudException { + public Optional mergeResult(Key key, Optional result) + throws CrudException { if (deleteSet.containsKey(key)) { return Optional.empty(); - } else if (readSet.containsKey(key)) { - if (writeSet.containsKey(key)) { - // merge the result in the read set and the put in the write set - return Optional.of( - new TransactionResult( - new MergedResult(readSet.get(key), writeSet.get(key), getTableMetadata(key)))); - } else { - return readSet.get(key); - } + } else if (writeSet.containsKey(key)) { + // merge the result in the read set and the put in the write set + return Optional.of( + new TransactionResult( + new MergedResult(result, writeSet.get(key), getTableMetadata(key)))); + } else { + return result; } - throw new IllegalArgumentException( - "Getting data neither in the read set nor the delete set is not allowed"); } private TableMetadata getTableMetadata(Key key) throws CrudException { @@ -202,7 +209,17 @@ private TableMetadata getTableMetadata(Scan scan) throws ExecutionException { } } - public Optional> get(Scan scan) { + public boolean containsKeyInGetSet(Get get) { + return getSet.containsKey(get); + } + + public Optional get(Get get) { + // We expect this method is called after putting the result of the get operation in the get set. + assert getSet.containsKey(get); + return getSet.get(get); + } + + public Optional> get(Scan scan) { if (scanSet.containsKey(scan)) { return Optional.ofNullable(scanSet.get(scan)); } @@ -239,6 +256,10 @@ private boolean isWriteSetOverlappedWith(Scan scan) { } for (Map.Entry entry : writeSet.entrySet()) { + if (scanSet.get(scan).containsKey(entry.getKey())) { + return true; + } + Put put = entry.getValue(); if (!put.forNamespace().equals(scan.forNamespace()) @@ -295,7 +316,7 @@ private boolean isWriteSetOverlappedWith(Scan scan) { private boolean isWriteSetOverlappedWith(ScanWithIndex scan) { for (Map.Entry entry : writeSet.entrySet()) { - if (scanSet.get(scan).contains(entry.getKey())) { + if (scanSet.get(scan).containsKey(entry.getKey())) { return true; } @@ -332,7 +353,7 @@ private boolean isWriteSetOverlappedWith(ScanAll scan) { // yet. Thus, we need to evaluate if the scan condition potentially matches put operations. // Check for cases 1 and 2 - if (scanSet.get(scan).contains(entry.getKey())) { + if (scanSet.get(scan).containsKey(entry.getKey())) { return true; } @@ -453,14 +474,14 @@ void toSerializableWithExtraRead(DistributedStorage storage) List tasks = new ArrayList<>(); // Read set by scan is re-validated to check if there is no anti-dependency - for (Map.Entry> entry : scanSet.entrySet()) { + for (Map.Entry> entry : scanSet.entrySet()) { tasks.add( () -> { Map currentReadMap = new HashMap<>(); Set validatedReadSet = new HashSet<>(); Scanner scanner = null; + Scan scan = entry.getKey(); try { - Scan scan = entry.getKey(); // only get tx_id and tx_version columns because we use only them to compare scan.clearProjections(); scan.withProjection(Attribute.ID).withProjection(Attribute.VERSION); @@ -484,13 +505,15 @@ void toSerializableWithExtraRead(DistributedStorage storage) } } - for (Key key : entry.getValue()) { + for (Map.Entry e : entry.getValue().entrySet()) { + Key key = e.getKey(); + TransactionResult result = e.getValue(); if (writeSet.containsKey(key) || deleteSet.containsKey(key)) { continue; } // Check if read records are not changed TransactionResult latestResult = currentReadMap.get(key); - if (isChanged(Optional.ofNullable(latestResult), readSet.get(key))) { + if (isChanged(Optional.ofNullable(latestResult), Optional.of(result))) { throwExceptionDueToAntiDependency(); } validatedReadSet.add(key); @@ -503,35 +526,23 @@ void toSerializableWithExtraRead(DistributedStorage storage) }); } - // Calculate read set validated by scan - Set validatedReadSetByScan = new HashSet<>(); - for (List values : scanSet.values()) { - validatedReadSetByScan.addAll(values); - } - // Read set by get is re-validated to check if there is no anti-dependency - for (Map.Entry> entry : readSet.entrySet()) { - Key key = entry.getKey(); - if (writeSet.containsKey(key) - || deleteSet.containsKey(key) - || validatedReadSetByScan.contains(key)) { + for (Map.Entry> entry : getSet.entrySet()) { + Get get = entry.getKey(); + Key key = new Key(get); + if (writeSet.containsKey(key) || deleteSet.containsKey(key)) { continue; } tasks.add( () -> { + Optional originalResult = getSet.get(get); // only get tx_id and tx_version columns because we use only them to compare - Get get = - new Get(key.getPartitionKey(), key.getClusteringKey().orElse(null)) - .withProjection(Attribute.ID) - .withProjection(Attribute.VERSION) - .withConsistency(Consistency.LINEARIZABLE) - .forNamespace(key.getNamespace()) - .forTable(key.getTable()); - + get.clearProjections(); + get.withProjection(Attribute.ID).withProjection(Attribute.VERSION); Optional latestResult = storage.get(get).map(TransactionResult::new); // Check if a read record is not changed - if (isChanged(latestResult, entry.getValue())) { + if (isChanged(latestResult, originalResult)) { throwExceptionDueToAntiDependency(); } }); diff --git a/core/src/test/java/com/scalar/db/transaction/consensuscommit/CrudHandlerTest.java b/core/src/test/java/com/scalar/db/transaction/consensuscommit/CrudHandlerTest.java index 1f266a025..8b6b573c2 100644 --- a/core/src/test/java/com/scalar/db/transaction/consensuscommit/CrudHandlerTest.java +++ b/core/src/test/java/com/scalar/db/transaction/consensuscommit/CrudHandlerTest.java @@ -13,6 +13,7 @@ import com.google.common.collect.ImmutableMap; import com.scalar.db.api.ConditionBuilder; +import com.scalar.db.api.Consistency; import com.scalar.db.api.Delete; import com.scalar.db.api.DistributedStorage; import com.scalar.db.api.Get; @@ -66,6 +67,8 @@ public class CrudHandlerTest { .addPartitionKey(ANY_NAME_1) .addClusteringKey(ANY_NAME_2) .build()); + private static final TransactionTableMetadata TRANSACTION_TABLE_METADATA = + new TransactionTableMetadata(TABLE_METADATA); private CrudHandler handler; @Mock private DistributedStorage storage; @@ -103,6 +106,14 @@ private Get prepareGet() { .forTable(ANY_TABLE_NAME); } + private Get toGetForStorageFrom(Get get) { + return Get.newBuilder(get) + .clearProjections() + .projections(TRANSACTION_TABLE_METADATA.getAfterImageColumnNames()) + .consistency(Consistency.LINEARIZABLE) + .build(); + } + private Scan prepareScan() { Key partitionKey = new Key(ANY_NAME_1, ANY_TEXT_1); return new Scan(partitionKey).forNamespace(ANY_NAMESPACE_NAME).forTable(ANY_TABLE_NAME); @@ -117,6 +128,14 @@ private Scan prepareCrossPartitionScan() { .build(); } + private Scan toScanForStorageFrom(Scan scan) { + return Scan.newBuilder(scan) + .clearProjections() + .projections(TRANSACTION_TABLE_METADATA.getAfterImageColumnNames()) + .consistency(Consistency.LINEARIZABLE) + .build(); + } + private TransactionResult prepareResult(TransactionState state) { ImmutableMap> columns = ImmutableMap.>builder() @@ -136,12 +155,14 @@ private TransactionResult prepareResult(TransactionState state) { } @Test - public void get_KeyExistsInSnapshot_ShouldReturnFromSnapshot() throws CrudException { + public void get_GetExistsInSnapshot_ShouldReturnFromSnapshot() throws CrudException { // Arrange Get get = prepareGet(); + Get getForStorage = toGetForStorageFrom(get); Optional expected = Optional.of(prepareResult(TransactionState.COMMITTED)); - when(snapshot.containsKeyInReadSet(new Snapshot.Key(get))).thenReturn(true); - when(snapshot.get(new Snapshot.Key(get))).thenReturn(expected); + when(snapshot.containsKeyInGetSet(getForStorage)).thenReturn(true); + when(snapshot.get(getForStorage)).thenReturn(expected); + when(snapshot.mergeResult(new Snapshot.Key(getForStorage), expected)).thenReturn(expected); // Act Optional actual = handler.get(get); @@ -156,18 +177,21 @@ public void get_KeyExistsInSnapshot_ShouldReturnFromSnapshot() throws CrudExcept @Test public void - get_KeyNotExistsInSnapshotAndRecordInStorageCommitted_ShouldReturnFromStorageAndUpdateSnapshot() + get_GetNotExistsInSnapshotAndRecordInStorageCommitted_ShouldReturnFromStorageAndUpdateSnapshot() throws CrudException, ExecutionException { // Arrange Get get = prepareGet(); + Get getForStorage = toGetForStorageFrom(get); Optional expected = Optional.of(prepareResult(TransactionState.COMMITTED)); - Snapshot.Key key = new Snapshot.Key(get); - when(snapshot.containsKeyInReadSet(key)).thenReturn(false); + Optional transactionResult = expected.map(e -> (TransactionResult) e); + Snapshot.Key key = new Snapshot.Key(getForStorage); + when(snapshot.containsKeyInGetSet(getForStorage)).thenReturn(false); doNothing() .when(snapshot) .put(any(Snapshot.Key.class), ArgumentMatchers.>any()); - when(storage.get(get)).thenReturn(expected); - when(snapshot.get(key)).thenReturn(expected.map(e -> (TransactionResult) e)); + when(storage.get(getForStorage)).thenReturn(expected); + when(snapshot.get(getForStorage)).thenReturn(transactionResult); + when(snapshot.mergeResult(key, transactionResult)).thenReturn(transactionResult); // Act Optional result = handler.get(get); @@ -178,20 +202,22 @@ public void get_KeyExistsInSnapshot_ShouldReturnFromSnapshot() throws CrudExcept Optional.of( new FilteredResult( expected.get(), Collections.emptyList(), TABLE_METADATA, false))); - verify(storage).get(get); + verify(storage).get(getForStorage); verify(snapshot).put(key, Optional.of((TransactionResult) expected.get())); + verify(snapshot).put(get, Optional.of((TransactionResult) expected.get())); } @Test public void - get_KeyNotExistsInSnapshotAndRecordInStorageNotCommitted_ShouldThrowUncommittedRecordException() - throws CrudException, ExecutionException { + get_GetNotExistsInSnapshotAndRecordInStorageNotCommitted_ShouldThrowUncommittedRecordException() + throws ExecutionException { // Arrange Get get = prepareGet(); + Get getForStorage = toGetForStorageFrom(get); result = prepareResult(TransactionState.PREPARED); Optional expected = Optional.of(result); - when(snapshot.get(new Snapshot.Key(get))).thenReturn(Optional.empty()); - when(storage.get(get)).thenReturn(expected); + when(snapshot.containsKeyInGetSet(getForStorage)).thenReturn(false); + when(storage.get(getForStorage)).thenReturn(expected); // Act Assert assertThatThrownBy(() -> handler.get(get)) @@ -209,12 +235,13 @@ public void get_KeyExistsInSnapshot_ShouldReturnFromSnapshot() throws CrudExcept } @Test - public void get_KeyNeitherExistsInSnapshotNorInStorage_ShouldReturnEmpty() + public void get_GetNotExistsInSnapshotAndRecordNotExistsInStorage_ShouldReturnEmpty() throws CrudException, ExecutionException { // Arrange Get get = prepareGet(); - when(snapshot.get(new Snapshot.Key(get))).thenReturn(Optional.empty()); - when(storage.get(get)).thenReturn(Optional.empty()); + Get getForStorage = toGetForStorageFrom(get); + when(snapshot.containsKeyInGetSet(getForStorage)).thenReturn(false); + when(storage.get(getForStorage)).thenReturn(Optional.empty()); // Act Optional result = handler.get(get); @@ -224,37 +251,104 @@ public void get_KeyNeitherExistsInSnapshotNorInStorage_ShouldReturnEmpty() } @Test - public void get_KeyNotContainsInReadSetAndExceptionThrownInStorage_ShouldThrowCrudException() + public void get_GetNotExistsInSnapshotAndExceptionThrownInStorage_ShouldThrowCrudException() throws ExecutionException { // Arrange Get get = prepareGet(); - when(snapshot.containsKeyInReadSet(new Snapshot.Key(get))).thenReturn(false); + Get getForStorage = toGetForStorageFrom(get); + when(snapshot.containsKeyInGetSet(getForStorage)).thenReturn(false); ExecutionException toThrow = mock(ExecutionException.class); - when(storage.get(get)).thenThrow(toThrow); + when(storage.get(getForStorage)).thenThrow(toThrow); // Act Assert assertThatThrownBy(() -> handler.get(get)).isInstanceOf(CrudException.class).hasCause(toThrow); } + @Test + public void get_CalledTwice_SecondTimeShouldReturnTheSameFromSnapshot() + throws ExecutionException, CrudException { + // Arrange + Get originalGet = prepareGet(); + Get getForStorage = toGetForStorageFrom(originalGet); + Get get1 = prepareGet(); + Get get2 = prepareGet(); + Result result = prepareResult(TransactionState.COMMITTED); + Optional expected = Optional.of(new TransactionResult(result)); + doNothing() + .when(snapshot) + .put(any(Snapshot.Key.class), ArgumentMatchers.>any()); + Snapshot.Key key = new Snapshot.Key(getForStorage); + when(snapshot.containsKeyInGetSet(getForStorage)).thenReturn(false).thenReturn(true); + when(snapshot.get(getForStorage)).thenReturn(expected).thenReturn(expected); + when(snapshot.mergeResult(key, expected)).thenReturn(expected).thenReturn(expected); + when(storage.get(getForStorage)).thenReturn(Optional.of(result)); + + // Act + Optional results1 = handler.get(get1); + Optional results2 = handler.get(get2); + + // Assert + verify(snapshot).put(key, expected); + assertThat(results1) + .isEqualTo( + Optional.of( + new FilteredResult( + expected.get(), Collections.emptyList(), TABLE_METADATA, false))); + assertThat(results1).isEqualTo(results2); + verify(storage, never()).get(originalGet); + verify(storage).get(getForStorage); + } + + @Test + public void get_CalledTwiceUnderRealSnapshot_SecondTimeShouldReturnTheSameFromSnapshot() + throws ExecutionException, CrudException { + // Arrange + Get originalGet = prepareGet(); + Get getForStorage = toGetForStorageFrom(originalGet); + Get get1 = prepareGet(); + Get get2 = prepareGet(); + Result result = prepareResult(TransactionState.COMMITTED); + Optional expected = Optional.of(new TransactionResult(result)); + snapshot = + new Snapshot(ANY_TX_ID, Isolation.SNAPSHOT, null, tableMetadataManager, parallelExecutor); + handler = new CrudHandler(storage, snapshot, tableMetadataManager, false, parallelExecutor); + when(storage.get(getForStorage)).thenReturn(Optional.of(result)); + + // Act + Optional results1 = handler.get(get1); + Optional results2 = handler.get(get2); + + // Assert + assertThat(results1) + .isEqualTo( + Optional.of( + new FilteredResult( + expected.get(), Collections.emptyList(), TABLE_METADATA, false))); + assertThat(results1).isEqualTo(results2); + verify(storage, never()).get(originalGet); + verify(storage).get(getForStorage); + } + @Test public void scan_ResultGivenFromStorage_ShouldUpdateSnapshotAndReturn() throws ExecutionException, CrudException { // Arrange Scan scan = prepareScan(); + Scan scanForStorage = toScanForStorageFrom(scan); result = prepareResult(TransactionState.COMMITTED); Snapshot.Key key = new Snapshot.Key(scan, result); - when(snapshot.get(key)).thenReturn(Optional.of((TransactionResult) result)); + TransactionResult expected = new TransactionResult(result); doNothing() .when(snapshot) .put(any(Snapshot.Key.class), ArgumentMatchers.>any()); when(scanner.iterator()).thenReturn(Collections.singletonList(result).iterator()); - when(storage.scan(scan)).thenReturn(scanner); + when(storage.scan(scanForStorage)).thenReturn(scanner); + when(snapshot.mergeResult(any(), any())).thenReturn(Optional.of(expected)); // Act List results = handler.scan(scan); // Assert - TransactionResult expected = new TransactionResult(result); verify(snapshot).put(key, Optional.of(expected)); verify(snapshot).verify(scan); assertThat(results.size()).isEqualTo(1); @@ -268,9 +362,10 @@ public void scan_ResultGivenFromStorage_ShouldUpdateSnapshotAndReturn() throws ExecutionException { // Arrange Scan scan = prepareScan(); + Scan scanForStorage = toScanForStorageFrom(scan); result = prepareResult(TransactionState.PREPARED); when(scanner.iterator()).thenReturn(Collections.singletonList(result).iterator()); - when(storage.scan(scan)).thenReturn(scanner); + when(storage.scan(scanForStorage)).thenReturn(scanner); // Act Assert assertThatThrownBy(() -> handler.scan(scan)) @@ -291,99 +386,119 @@ public void scan_ResultGivenFromStorage_ShouldUpdateSnapshotAndReturn() public void scan_CalledTwice_SecondTimeShouldReturnTheSameFromSnapshot() throws ExecutionException, CrudException { // Arrange - Scan scan = prepareScan(); + Scan originalScan = prepareScan(); + Scan scanForStorage = toScanForStorageFrom(originalScan); + Scan scan1 = prepareScan(); + Scan scan2 = prepareScan(); result = prepareResult(TransactionState.COMMITTED); + TransactionResult expected = new TransactionResult(result); doNothing() .when(snapshot) .put(any(Snapshot.Key.class), ArgumentMatchers.>any()); when(scanner.iterator()).thenReturn(Collections.singletonList(result).iterator()); - when(storage.scan(scan)).thenReturn(scanner); - Snapshot.Key key = new Snapshot.Key(scan, result); - when(snapshot.get(scan)) + when(storage.scan(scanForStorage)).thenReturn(scanner); + Snapshot.Key key = new Snapshot.Key(scanForStorage, result); + when(snapshot.get(scanForStorage)) .thenReturn(Optional.empty()) - .thenReturn(Optional.of(Collections.singletonList(key))); - when(snapshot.containsKeyInReadSet(key)).thenReturn(false).thenReturn(true); - when(snapshot.get(key)).thenReturn(Optional.of((TransactionResult) result)); + .thenReturn(Optional.of(Collections.singletonMap(key, expected))); + when(snapshot.mergeResult(any(), any())).thenReturn(Optional.of(expected)); // Act - List results1 = handler.scan(scan); - List results2 = handler.scan(scan); + List results1 = handler.scan(scan1); + List results2 = handler.scan(scan2); // Assert - TransactionResult expected = new TransactionResult(result); verify(snapshot).put(key, Optional.of(expected)); assertThat(results1.size()).isEqualTo(1); assertThat(results1.get(0)) .isEqualTo(new FilteredResult(expected, Collections.emptyList(), TABLE_METADATA, false)); assertThat(results1).isEqualTo(results2); + verify(storage, never()).scan(originalScan); + verify(storage).scan(scanForStorage); } @Test public void scan_CalledTwiceUnderRealSnapshot_SecondTimeShouldReturnTheSameFromSnapshot() throws ExecutionException, CrudException { // Arrange - Scan scan = prepareScan(); + Scan originalScan = prepareScan(); + Scan scanForStorage = toScanForStorageFrom(originalScan); + Scan scan1 = prepareScan(); + Scan scan2 = prepareScan(); result = prepareResult(TransactionState.COMMITTED); + TransactionResult expected = new TransactionResult(result); snapshot = new Snapshot(ANY_TX_ID, Isolation.SNAPSHOT, null, tableMetadataManager, parallelExecutor); handler = new CrudHandler(storage, snapshot, tableMetadataManager, false, parallelExecutor); when(scanner.iterator()).thenReturn(Collections.singletonList(result).iterator()); - when(storage.scan(scan)).thenReturn(scanner); + when(storage.scan(scanForStorage)).thenReturn(scanner); // Act - List results1 = handler.scan(scan); - List results2 = handler.scan(scan); + List results1 = handler.scan(scan1); + List results2 = handler.scan(scan2); // Assert - TransactionResult expected = new TransactionResult(result); assertThat(results1.size()).isEqualTo(1); assertThat(results1.get(0)) .isEqualTo(new FilteredResult(expected, Collections.emptyList(), TABLE_METADATA, false)); assertThat(results1).isEqualTo(results2); + verify(storage, never()).scan(originalScan); + verify(storage).scan(scanForStorage); } @Test - public void scan_GetCalledAfterScan_ShouldReturnFromSnapshot() + public void scan_GetCalledAfterScan_ShouldReturnFromStorage() throws ExecutionException, CrudException { // Arrange Scan scan = prepareScan(); + Scan scanForStorage = toScanForStorageFrom(scan); result = prepareResult(TransactionState.COMMITTED); doNothing() .when(snapshot) .put(any(Snapshot.Key.class), ArgumentMatchers.>any()); when(scanner.iterator()).thenReturn(Collections.singletonList(result).iterator()); - when(storage.scan(scan)).thenReturn(scanner); - Snapshot.Key key = new Snapshot.Key(scan, result); - when(snapshot.get(scan)).thenReturn(Optional.empty()); - when(snapshot.containsKeyInReadSet(key)).thenReturn(false).thenReturn(true); - when(snapshot.get(key)).thenReturn(Optional.of((TransactionResult) result)); + when(storage.scan(scanForStorage)).thenReturn(scanner); + + Get get = prepareGet(); + Get getForStorage = toGetForStorageFrom(get); + Optional transactionResult = Optional.of(new TransactionResult(result)); + when(storage.get(getForStorage)).thenReturn(Optional.of(result)); + when(snapshot.get(get)).thenReturn(transactionResult); + when(snapshot.mergeResult(any(), any())).thenReturn(transactionResult); + when(snapshot.mergeResult(new Snapshot.Key(get), transactionResult)) + .thenReturn(transactionResult); // Act List results = handler.scan(scan); - Optional result = handler.get(prepareGet()); + Optional result = handler.get(get); // Assert - verify(storage, never()).get(any(Get.class)); + verify(storage).scan(scanForStorage); + verify(storage).get(getForStorage); assertThat(results.get(0)).isEqualTo(result.get()); } @Test - public void scan_GetCalledAfterScanUnderRealSnapshot_ShouldReturnFromSnapshot() + public void scan_GetCalledAfterScanUnderRealSnapshot_ShouldReturnFromStorage() throws ExecutionException, CrudException { // Arrange - Scan scan = prepareScan(); + Scan scan = toScanForStorageFrom(prepareScan()); result = prepareResult(TransactionState.COMMITTED); snapshot = new Snapshot(ANY_TX_ID, Isolation.SNAPSHOT, null, tableMetadataManager, parallelExecutor); handler = new CrudHandler(storage, snapshot, tableMetadataManager, false, parallelExecutor); when(scanner.iterator()).thenReturn(Collections.singletonList(result).iterator()); when(storage.scan(scan)).thenReturn(scanner); + Get get = toGetForStorageFrom(prepareGet()); + when(storage.get(get)).thenReturn(Optional.of(result)); // Act List results = handler.scan(scan); - Optional result = handler.get(prepareGet()); + Optional result = handler.get(get); // Assert + verify(storage).scan(scan); + verify(storage).get(get); assertThat(results.get(0)).isEqualTo(result.get()); } @@ -392,6 +507,7 @@ public void scan_CalledAfterDeleteUnderRealSnapshot_ShouldReturnResultsWithoutDe throws ExecutionException, CrudException { // Arrange Scan scan = prepareScan(); + Scan scanForStorage = toScanForStorageFrom(scan); result = prepareResult(TransactionState.COMMITTED); ImmutableMap> columns = @@ -422,12 +538,13 @@ public void scan_CalledAfterDeleteUnderRealSnapshot_ShouldReturnResultsWithoutDe tableMetadataManager, parallelExecutor, readSet, + new ConcurrentHashMap<>(), new HashMap<>(), new HashMap<>(), deleteSet); handler = new CrudHandler(storage, snapshot, tableMetadataManager, false, parallelExecutor); when(scanner.iterator()).thenReturn(Arrays.asList(result, result2).iterator()); - when(storage.scan(scan)).thenReturn(scanner); + when(storage.scan(scanForStorage)).thenReturn(scanner); Delete delete = new Delete(new Key(ANY_NAME_1, ANY_TEXT_1), new Key(ANY_NAME_2, ANY_TEXT_3)) @@ -459,29 +576,30 @@ public void scan_CalledAfterDeleteUnderRealSnapshot_ShouldReturnResultsWithoutDe @Test public void - scan_CrossPartitionScanndResultFromStorageGiven_ShouldUpdateSnapshotAndValidateThenReturn() + scan_CrossPartitionScanAndResultFromStorageGiven_ShouldUpdateSnapshotAndValidateThenReturn() throws ExecutionException, CrudException { // Arrange Scan scan = prepareCrossPartitionScan(); result = prepareResult(TransactionState.COMMITTED); Snapshot.Key key = new Snapshot.Key(scan, result); - when(snapshot.get(key)).thenReturn(Optional.of((TransactionResult) result)); doNothing() .when(snapshot) .put(any(Snapshot.Key.class), ArgumentMatchers.>any()); when(scanner.iterator()).thenReturn(Collections.singletonList(result).iterator()); when(storage.scan(any(ScanAll.class))).thenReturn(scanner); + TransactionResult transactionResult = new TransactionResult(result); + when(snapshot.mergeResult(any(), any())).thenReturn(Optional.of(transactionResult)); // Act List results = handler.scan(scan); // Assert - TransactionResult expected = new TransactionResult(result); - verify(snapshot).put(key, Optional.of(expected)); + verify(snapshot).put(key, Optional.of(transactionResult)); verify(snapshot).verify(scan); assertThat(results.size()).isEqualTo(1); assertThat(results.get(0)) - .isEqualTo(new FilteredResult(expected, Collections.emptyList(), TABLE_METADATA, false)); + .isEqualTo( + new FilteredResult(transactionResult, Collections.emptyList(), TABLE_METADATA, false)); } @Test @@ -560,7 +678,7 @@ public void put_PutWithoutConditionGiven_ShouldCallAppropriateMethods() throws C spied.put(put); // Assert - verify(spied).readUnread(key, getForKey); + verify(spied, never()).readUnread(key, getForKey); verify(snapshot).getFromReadSet(key); verify(mutationConditionsValidator).checkIfConditionIsSatisfied(put, result); verify(snapshot).put(key, put); @@ -586,11 +704,12 @@ public void put_PutWithoutConditionGiven_ShouldCallAppropriateMethods() throws C when(snapshot.getFromReadSet(any())).thenReturn(Optional.of(result)); Get getForKey = - Get.newBuilder() - .namespace(key.getNamespace()) - .table(key.getTable()) - .partitionKey(key.getPartitionKey()) - .build(); + toGetForStorageFrom( + Get.newBuilder() + .namespace(key.getNamespace()) + .table(key.getTable()) + .partitionKey(key.getPartitionKey()) + .build()); CrudHandler spied = spy(handler); doReturn(Optional.empty()).when(spied).getFromStorage(getForKey); @@ -599,7 +718,7 @@ public void put_PutWithoutConditionGiven_ShouldCallAppropriateMethods() throws C spied.put(put); // Assert - verify(spied).readUnread(key, getForKey); + verify(spied).read(key, getForKey); verify(snapshot).getFromReadSet(key); verify(mutationConditionsValidator).checkIfConditionIsSatisfied(put, result); verify(snapshot).put(key, put); @@ -713,7 +832,7 @@ public void delete_DeleteWithConditionGiven_WithResultInReadSet_ShouldCallApprop spied.delete(delete); // Assert - verify(spied).readUnread(key, getForKey); + verify(spied, never()).readUnread(key, getForKey); verify(snapshot).getFromReadSet(key); verify(mutationConditionsValidator).checkIfConditionIsSatisfied(delete, result); verify(snapshot).put(key, delete); @@ -735,11 +854,12 @@ public void delete_DeleteWithConditionGiven_WithoutResultInReadSet_ShouldCallApp when(snapshot.getFromReadSet(any())).thenReturn(Optional.empty()); Get getForKey = - Get.newBuilder() - .namespace(key.getNamespace()) - .table(key.getTable()) - .partitionKey(key.getPartitionKey()) - .build(); + toGetForStorageFrom( + Get.newBuilder() + .namespace(key.getNamespace()) + .table(key.getTable()) + .partitionKey(key.getPartitionKey()) + .build()); CrudHandler spied = spy(handler); doReturn(Optional.empty()).when(spied).getFromStorage(getForKey); @@ -748,7 +868,7 @@ public void delete_DeleteWithConditionGiven_WithoutResultInReadSet_ShouldCallApp spied.delete(delete); // Assert - verify(spied).readUnread(key, getForKey); + verify(spied).read(key, getForKey); verify(snapshot).getFromReadSet(key); verify(mutationConditionsValidator).checkIfConditionIsSatisfied(delete, null); verify(snapshot).put(key, delete); @@ -756,50 +876,46 @@ public void delete_DeleteWithConditionGiven_WithoutResultInReadSet_ShouldCallApp @SuppressWarnings("unchecked") @Test - public void readUnread_ContainsKeyInReadSet_ShouldCallAppropriateMethods() + public void readUnread_GetContainedInGetSet_ShouldCallAppropriateMethods() throws CrudException, ExecutionException { // Arrange Snapshot.Key key = mock(Snapshot.Key.class); when(key.getNamespace()).thenReturn(ANY_NAMESPACE_NAME); when(key.getTable()).thenReturn(ANY_TABLE_NAME); when(key.getPartitionKey()).thenReturn(Key.ofText(ANY_NAME_1, ANY_TEXT_1)); - - when(snapshot.containsKeyInReadSet(key)).thenReturn(true); - Get getForKey = Get.newBuilder() .namespace(key.getNamespace()) .table(key.getTable()) .partitionKey(key.getPartitionKey()) .build(); + when(snapshot.containsKeyInGetSet(getForKey)).thenReturn(true); // Act handler.readUnread(key, getForKey); // Assert verify(storage, never()).get(any()); - verify(snapshot, never()).put(any(), any(Optional.class)); + verify(snapshot, never()).put(any(Get.class), any(Optional.class)); } @Test public void - readUnread_NotContainsKeyInReadSet_EmptyResultReturnedByStorage_ShouldCallAppropriateMethods() + readUnread_GetNotContainedInGetSet_EmptyResultReturnedByStorage_ShouldCallAppropriateMethods() throws CrudException, ExecutionException { // Arrange Snapshot.Key key = mock(Snapshot.Key.class); when(key.getNamespace()).thenReturn(ANY_NAMESPACE_NAME); when(key.getTable()).thenReturn(ANY_TABLE_NAME); when(key.getPartitionKey()).thenReturn(Key.ofText(ANY_NAME_1, ANY_TEXT_1)); - - when(snapshot.containsKeyInReadSet(key)).thenReturn(false); - when(storage.get(any())).thenReturn(Optional.empty()); - Get getForKey = Get.newBuilder() .namespace(key.getNamespace()) .table(key.getTable()) .partitionKey(key.getPartitionKey()) .build(); + when(snapshot.containsKeyInGetSet(getForKey)).thenReturn(false); + when(storage.get(any())).thenReturn(Optional.empty()); // Act handler.readUnread(key, getForKey); @@ -807,11 +923,12 @@ public void readUnread_ContainsKeyInReadSet_ShouldCallAppropriateMethods() // Assert verify(storage).get(any()); verify(snapshot).put(key, Optional.empty()); + verify(snapshot).put(getForKey, Optional.empty()); } @Test public void - readUnread_NotContainsKeyInReadSet_CommittedRecordReturnedByStorage_ShouldCallAppropriateMethods() + readUnread_GetNotContainedInGetSet_CommittedRecordReturnedByStorage_ShouldCallAppropriateMethods() throws CrudException, ExecutionException { // Arrange Snapshot.Key key = mock(Snapshot.Key.class); @@ -819,8 +936,6 @@ public void readUnread_ContainsKeyInReadSet_ShouldCallAppropriateMethods() when(key.getTable()).thenReturn(ANY_TABLE_NAME); when(key.getPartitionKey()).thenReturn(Key.ofText(ANY_NAME_1, ANY_TEXT_1)); - when(snapshot.containsKeyInReadSet(key)).thenReturn(false); - Result result = mock(Result.class); when(result.getInt(Attribute.STATE)).thenReturn(TransactionState.COMMITTED.get()); when(storage.get(any())).thenReturn(Optional.of(result)); @@ -831,6 +946,7 @@ public void readUnread_ContainsKeyInReadSet_ShouldCallAppropriateMethods() .table(key.getTable()) .partitionKey(key.getPartitionKey()) .build(); + when(snapshot.containsKeyInGetSet(getForKey)).thenReturn(false); // Act handler.readUnread(key, getForKey); @@ -842,7 +958,7 @@ public void readUnread_ContainsKeyInReadSet_ShouldCallAppropriateMethods() @Test public void - readUnread_NotContainsKeyInReadSet_UncommittedRecordReturnedByStorage_ShouldThrowUncommittedRecordException() + readUnread_GetNotContainedInGetSet_UncommittedRecordReturnedByStorage_ShouldThrowUncommittedRecordException() throws ExecutionException { // Arrange Snapshot.Key key = mock(Snapshot.Key.class); @@ -850,8 +966,6 @@ public void readUnread_ContainsKeyInReadSet_ShouldCallAppropriateMethods() when(key.getTable()).thenReturn(ANY_TABLE_NAME); when(key.getPartitionKey()).thenReturn(Key.ofText(ANY_NAME_1, ANY_TEXT_1)); - when(snapshot.containsKeyInReadSet(key)).thenReturn(false); - Result result = mock(Result.class); when(result.getInt(Attribute.STATE)).thenReturn(TransactionState.PREPARED.get()); when(storage.get(any())).thenReturn(Optional.of(result)); @@ -862,6 +976,7 @@ public void readUnread_ContainsKeyInReadSet_ShouldCallAppropriateMethods() .table(key.getTable()) .partitionKey(key.getPartitionKey()) .build(); + when(snapshot.containsKeyInGetSet(getForKey)).thenReturn(false); // Act Assert assertThatThrownBy(() -> handler.readUnread(key, getForKey)) diff --git a/core/src/test/java/com/scalar/db/transaction/consensuscommit/SnapshotTest.java b/core/src/test/java/com/scalar/db/transaction/consensuscommit/SnapshotTest.java index b69244596..d1e798842 100644 --- a/core/src/test/java/com/scalar/db/transaction/consensuscommit/SnapshotTest.java +++ b/core/src/test/java/com/scalar/db/transaction/consensuscommit/SnapshotTest.java @@ -46,7 +46,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; @@ -94,7 +93,8 @@ public class SnapshotTest { private Snapshot snapshot; private ConcurrentMap> readSet; - private Map> scanSet; + private ConcurrentMap> getSet; + private Map> scanSet; private Map writeSet; private Map deleteSet; @@ -121,6 +121,7 @@ private Snapshot prepareSnapshot(Isolation isolation) { private Snapshot prepareSnapshot(Isolation isolation, SerializableStrategy strategy) { readSet = new ConcurrentHashMap<>(); + getSet = new ConcurrentHashMap<>(); scanSet = new HashMap<>(); writeSet = new HashMap<>(); deleteSet = new HashMap<>(); @@ -133,6 +134,7 @@ private Snapshot prepareSnapshot(Isolation isolation, SerializableStrategy strat tableMetadataManager, new ParallelExecutor(config), readSet, + getSet, scanSet, writeSet, deleteSet)); @@ -251,6 +253,17 @@ private Put preparePutWithIntColumns() { .build(); } + private Put preparePutForMergeTest() { + return Put.newBuilder() + .namespace(ANY_NAMESPACE_NAME) + .table(ANY_TABLE_NAME) + .partitionKey(Key.ofText(ANY_NAME_1, ANY_TEXT_1)) + .clusteringKey(Key.ofText(ANY_NAME_2, ANY_TEXT_2)) + .textValue(ANY_NAME_3, ANY_TEXT_5) + .textValue(ANY_NAME_4, null) + .build(); + } + private Delete prepareDelete() { Key partitionKey = new Key(ANY_NAME_1, ANY_TEXT_1); Key clusteringKey = new Key(ANY_NAME_2, ANY_TEXT_2); @@ -372,8 +385,9 @@ public void put_ScanGiven_ShouldHoldWhatsGivenInScanSet() { // Arrange snapshot = prepareSnapshot(Isolation.SNAPSHOT); Scan scan = prepareScan(); - Snapshot.Key key = new Snapshot.Key(scan, prepareResult(ANY_ID)); - List expected = Collections.singletonList(key); + TransactionResult result = prepareResult(ANY_ID); + Snapshot.Key key = new Snapshot.Key(scan, result); + Map expected = Collections.singletonMap(key, result); // Act snapshot.put(scan, expected); @@ -383,7 +397,7 @@ public void put_ScanGiven_ShouldHoldWhatsGivenInScanSet() { } @Test - public void get_KeyGivenContainedInWriteSetAndReadSet_ShouldReturnMergedResult() + public void mergeResult_KeyGivenContainedInWriteSet_ShouldReturnMergedResult() throws CrudException { // Arrange snapshot = prepareSnapshot(Isolation.SNAPSHOT); @@ -402,11 +416,47 @@ public void get_KeyGivenContainedInWriteSetAndReadSet_ShouldReturnMergedResult() snapshot.put(key, put); // Act - Optional actual = snapshot.get(key); + Optional actual = snapshot.mergeResult(key, Optional.of(result)); // Assert assertThat(actual).isPresent(); - assertThat(actual.get().getValues()) + assertMergedResultIsEqualTo(actual.get()); + } + + @Test + public void mergeResult_KeyGivenContainedInDeleteSet_ShouldReturnEmpty() throws CrudException { + // Arrange + snapshot = prepareSnapshot(Isolation.SNAPSHOT); + Delete delete = prepareDelete(); + Snapshot.Key key = new Snapshot.Key(delete); + snapshot.put(key, delete); + TransactionResult result = prepareResult(ANY_ID); + + // Act + Optional actual = snapshot.mergeResult(key, Optional.of(result)); + + // Assert + assertThat(actual).isNotPresent(); + } + + @Test + public void + mergeResult_KeyGivenNeitherContainedInDeleteSetNorWriteSet_ShouldReturnOriginalResult() + throws CrudException { + // Arrange + snapshot = prepareSnapshot(Isolation.SNAPSHOT); + Snapshot.Key key = new Snapshot.Key(prepareGet()); + TransactionResult result = prepareResult(ANY_ID); + + // Act + Optional actual = snapshot.mergeResult(key, Optional.of(result)); + + // Assert + assertThat(actual).isEqualTo(Optional.of(result)); + } + + private void assertMergedResultIsEqualTo(TransactionResult result) { + assertThat(result.getValues()) .isEqualTo( ImmutableMap.>builder() .put(ANY_NAME_1, new TextValue(ANY_NAME_1, ANY_TEXT_1)) @@ -416,25 +466,22 @@ public void get_KeyGivenContainedInWriteSetAndReadSet_ShouldReturnMergedResult() .put(Attribute.ID, Attribute.toIdValue(ANY_ID)) .put(Attribute.VERSION, Attribute.toVersionValue(ANY_VERSION)) .build()); - assertThat(actual.get().getValue(ANY_NAME_1).isPresent()).isTrue(); - assertThat(actual.get().getValue(ANY_NAME_1).get()) - .isEqualTo(new TextValue(ANY_NAME_1, ANY_TEXT_1)); - assertThat(actual.get().getValue(ANY_NAME_2).isPresent()).isTrue(); - assertThat(actual.get().getValue(ANY_NAME_2).get()) - .isEqualTo(new TextValue(ANY_NAME_2, ANY_TEXT_2)); - assertThat(actual.get().getValue(ANY_NAME_3).isPresent()).isTrue(); - assertThat(actual.get().getValue(ANY_NAME_3).get()) - .isEqualTo(new TextValue(ANY_NAME_3, ANY_TEXT_5)); - assertThat(actual.get().getValue(ANY_NAME_4).isPresent()).isTrue(); - assertThat(actual.get().getValue(ANY_NAME_4).get()) + assertThat(result.getValue(ANY_NAME_1).isPresent()).isTrue(); + assertThat(result.getValue(ANY_NAME_1).get()).isEqualTo(new TextValue(ANY_NAME_1, ANY_TEXT_1)); + assertThat(result.getValue(ANY_NAME_2).isPresent()).isTrue(); + assertThat(result.getValue(ANY_NAME_2).get()).isEqualTo(new TextValue(ANY_NAME_2, ANY_TEXT_2)); + assertThat(result.getValue(ANY_NAME_3).isPresent()).isTrue(); + assertThat(result.getValue(ANY_NAME_3).get()).isEqualTo(new TextValue(ANY_NAME_3, ANY_TEXT_5)); + assertThat(result.getValue(ANY_NAME_4).isPresent()).isTrue(); + assertThat(result.getValue(ANY_NAME_4).get()) .isEqualTo(new TextValue(ANY_NAME_4, (String) null)); - assertThat(actual.get().getValue(Attribute.ID).isPresent()).isTrue(); - assertThat(actual.get().getValue(Attribute.ID).get()).isEqualTo(Attribute.toIdValue(ANY_ID)); - assertThat(actual.get().getValue(Attribute.VERSION).isPresent()).isTrue(); - assertThat(actual.get().getValue(Attribute.VERSION).get()) + assertThat(result.getValue(Attribute.ID).isPresent()).isTrue(); + assertThat(result.getValue(Attribute.ID).get()).isEqualTo(Attribute.toIdValue(ANY_ID)); + assertThat(result.getValue(Attribute.VERSION).isPresent()).isTrue(); + assertThat(result.getValue(Attribute.VERSION).get()) .isEqualTo(Attribute.toVersionValue(ANY_VERSION)); - assertThat(actual.get().getContainedColumnNames()) + assertThat(result.getContainedColumnNames()) .isEqualTo( new HashSet<>( Arrays.asList( @@ -445,105 +492,35 @@ public void get_KeyGivenContainedInWriteSetAndReadSet_ShouldReturnMergedResult() Attribute.ID, Attribute.VERSION))); - assertThat(actual.get().contains(ANY_NAME_1)).isTrue(); - assertThat(actual.get().isNull(ANY_NAME_1)).isFalse(); - assertThat(actual.get().getText(ANY_NAME_1)).isEqualTo(ANY_TEXT_1); - assertThat(actual.get().getAsObject(ANY_NAME_1)).isEqualTo(ANY_TEXT_1); - - assertThat(actual.get().contains(ANY_NAME_2)).isTrue(); - assertThat(actual.get().isNull(ANY_NAME_2)).isFalse(); - assertThat(actual.get().getText(ANY_NAME_2)).isEqualTo(ANY_TEXT_2); - assertThat(actual.get().getAsObject(ANY_NAME_2)).isEqualTo(ANY_TEXT_2); - - assertThat(actual.get().contains(ANY_NAME_3)).isTrue(); - assertThat(actual.get().isNull(ANY_NAME_3)).isFalse(); - assertThat(actual.get().getText(ANY_NAME_3)).isEqualTo(ANY_TEXT_5); - assertThat(actual.get().getAsObject(ANY_NAME_3)).isEqualTo(ANY_TEXT_5); - - assertThat(actual.get().contains(ANY_NAME_4)).isTrue(); - assertThat(actual.get().isNull(ANY_NAME_4)).isTrue(); - assertThat(actual.get().getText(ANY_NAME_4)).isNull(); - assertThat(actual.get().getAsObject(ANY_NAME_4)).isNull(); - - assertThat(actual.get().contains(Attribute.ID)).isTrue(); - assertThat(actual.get().isNull(Attribute.ID)).isFalse(); - assertThat(actual.get().getText(Attribute.ID)).isEqualTo(ANY_ID); - assertThat(actual.get().getAsObject(Attribute.ID)).isEqualTo(ANY_ID); - - assertThat(actual.get().contains(Attribute.VERSION)).isTrue(); - assertThat(actual.get().isNull(Attribute.VERSION)).isFalse(); - assertThat(actual.get().getInt(Attribute.VERSION)).isEqualTo(ANY_VERSION); - assertThat(actual.get().getAsObject(Attribute.VERSION)).isEqualTo(ANY_VERSION); - } - - @Test - public void get_KeyGivenContainedInReadSet_ShouldReturnFromReadSet() throws CrudException { - // Arrange - snapshot = prepareSnapshot(Isolation.SNAPSHOT); - Snapshot.Key key = new Snapshot.Key(prepareGet()); - TransactionResult result = prepareResult(ANY_ID); - snapshot.put(key, Optional.of(result)); - - // Act - Optional actual = snapshot.get(key); - - // Assert - assertThat(actual).isEqualTo(Optional.of(result)); - } - - @Test - public void get_KeyGivenNotContainedInSnapshot_ShouldThrowIllegalArgumentException() { - // Arrange - snapshot = prepareSnapshot(Isolation.SNAPSHOT); - Snapshot.Key key = new Snapshot.Key(prepareGet()); - - // Act Assert - assertThatThrownBy(() -> snapshot.get(key)).isInstanceOf(IllegalArgumentException.class); - } - - @Test - public void get_KeyGivenContainedInWriteSet_ShouldThrowIllegalArgumentException() { - // Arrange - snapshot = prepareSnapshot(Isolation.SNAPSHOT); - Put put = preparePut(); - Snapshot.Key key = new Snapshot.Key(put); - snapshot.put(key, put); - - // Act Assert - assertThatThrownBy(() -> snapshot.get(key)).isInstanceOf(IllegalArgumentException.class); - } - - @Test - public void get_KeyGivenContainedInDeleteSet_ShouldReturnEmpty() throws CrudException { - // Arrange - snapshot = prepareSnapshot(Isolation.SNAPSHOT); - Delete delete = prepareDelete(); - Snapshot.Key key = new Snapshot.Key(delete); - snapshot.put(key, delete); + assertThat(result.contains(ANY_NAME_1)).isTrue(); + assertThat(result.isNull(ANY_NAME_1)).isFalse(); + assertThat(result.getText(ANY_NAME_1)).isEqualTo(ANY_TEXT_1); + assertThat(result.getAsObject(ANY_NAME_1)).isEqualTo(ANY_TEXT_1); - // Act - Optional actual = snapshot.get(key); - - // Assert - assertThat(actual).isNotPresent(); - } + assertThat(result.contains(ANY_NAME_2)).isTrue(); + assertThat(result.isNull(ANY_NAME_2)).isFalse(); + assertThat(result.getText(ANY_NAME_2)).isEqualTo(ANY_TEXT_2); + assertThat(result.getAsObject(ANY_NAME_2)).isEqualTo(ANY_TEXT_2); - @Test - public void get_KeyGivenContainedInReadSetAndDeleteSet_ShouldReturnEmpty() throws CrudException { - // Arrange - snapshot = prepareSnapshot(Isolation.SNAPSHOT); - Snapshot.Key key = new Snapshot.Key(prepareGet()); - TransactionResult result = prepareResult(ANY_ID); - snapshot.put(key, Optional.of(result)); + assertThat(result.contains(ANY_NAME_3)).isTrue(); + assertThat(result.isNull(ANY_NAME_3)).isFalse(); + assertThat(result.getText(ANY_NAME_3)).isEqualTo(ANY_TEXT_5); + assertThat(result.getAsObject(ANY_NAME_3)).isEqualTo(ANY_TEXT_5); - Delete delete = prepareDelete(); - snapshot.put(key, delete); + assertThat(result.contains(ANY_NAME_4)).isTrue(); + assertThat(result.isNull(ANY_NAME_4)).isTrue(); + assertThat(result.getText(ANY_NAME_4)).isNull(); + assertThat(result.getAsObject(ANY_NAME_4)).isNull(); - // Act - Optional actual = snapshot.get(key); + assertThat(result.contains(Attribute.ID)).isTrue(); + assertThat(result.isNull(Attribute.ID)).isFalse(); + assertThat(result.getText(Attribute.ID)).isEqualTo(ANY_ID); + assertThat(result.getAsObject(Attribute.ID)).isEqualTo(ANY_ID); - // Assert - assertThat(actual).isNotPresent(); + assertThat(result.contains(Attribute.VERSION)).isTrue(); + assertThat(result.isNull(Attribute.VERSION)).isFalse(); + assertThat(result.getInt(Attribute.VERSION)).isEqualTo(ANY_VERSION); + assertThat(result.getAsObject(Attribute.VERSION)).isEqualTo(ANY_VERSION); } @Test @@ -553,10 +530,10 @@ public void get_ScanNotContainedInSnapshotGiven_ShouldReturnEmptyList() { Scan scan = prepareScan(); // Act - Optional> keys = snapshot.get(scan); + Optional> results = snapshot.get(scan); // Assert - assertThat(keys.isPresent()).isFalse(); + assertThat(results.isPresent()).isFalse(); } @Test @@ -707,6 +684,7 @@ public void toSerializableWithExtraWrite_UnmutatedReadSetExists_ShouldConvertRea TransactionResult result = prepareResult(ANY_ID); TransactionResult txResult = new TransactionResult(result); snapshot.put(new Snapshot.Key(get), Optional.of(txResult)); + snapshot.put(get, Optional.of(txResult)); snapshot.put(new Snapshot.Key(put), put); // Act Assert @@ -733,6 +711,7 @@ public void toSerializableWithExtraWrite_UnmutatedReadSetExists_ShouldConvertRea Get get = prepareAnotherGet(); Put put = preparePut(); snapshot.put(new Snapshot.Key(get), Optional.empty()); + snapshot.put(get, Optional.empty()); snapshot.put(new Snapshot.Key(put), put); // Act Assert @@ -754,9 +733,11 @@ public void toSerializableWithExtraWrite_UnmutatedReadSetExists_ShouldConvertRea // Arrange snapshot = prepareSnapshot(Isolation.SERIALIZABLE, SerializableStrategy.EXTRA_WRITE); Scan scan = prepareScan(); - Snapshot.Key key = new Snapshot.Key(scan, prepareResult(ANY_ID)); + TransactionResult txResult = prepareResult(ANY_ID); + Snapshot.Key key = new Snapshot.Key(scan, txResult); Put put = preparePut(); - snapshot.put(scan, Collections.singletonList(key)); + snapshot.put(key, Optional.of(txResult)); + snapshot.put(scan, Collections.singletonMap(key, txResult)); snapshot.put(new Snapshot.Key(put), put); // Act Assert @@ -776,6 +757,7 @@ public void toSerializableWithExtraRead_ReadSetNotChanged_ShouldProcessWithoutEx TransactionResult result = prepareResult(ANY_ID); TransactionResult txResult = new TransactionResult(result); snapshot.put(new Snapshot.Key(get), Optional.of(txResult)); + snapshot.put(get, Optional.of(txResult)); snapshot.put(new Snapshot.Key(put), put); DistributedStorage storage = mock(DistributedStorage.class); Get getWithProjections = @@ -798,6 +780,7 @@ public void toSerializableWithExtraRead_ReadSetUpdated_ShouldThrowValidationConf Put put = preparePut(); TransactionResult txResult = prepareResult(ANY_ID); snapshot.put(new Snapshot.Key(get), Optional.of(txResult)); + snapshot.put(get, Optional.of(txResult)); snapshot.put(new Snapshot.Key(put), put); DistributedStorage storage = mock(DistributedStorage.class); TransactionResult changedTxResult = prepareResult(ANY_ID + "x"); @@ -821,6 +804,7 @@ public void toSerializableWithExtraRead_ReadSetExtended_ShouldThrowValidationCon Get get = prepareAnotherGet(); Put put = preparePut(); snapshot.put(new Snapshot.Key(get), Optional.empty()); + snapshot.put(get, Optional.empty()); snapshot.put(new Snapshot.Key(put), put); DistributedStorage storage = mock(DistributedStorage.class); TransactionResult txResult = prepareResult(ANY_ID); @@ -846,7 +830,7 @@ public void toSerializableWithExtraRead_ScanSetNotChanged_ShouldProcessWithoutEx TransactionResult txResult = prepareResult(ANY_ID); Snapshot.Key key = new Snapshot.Key(scan, txResult); snapshot.put(key, Optional.of(txResult)); - snapshot.put(scan, Collections.singletonList(key)); + snapshot.put(scan, Collections.singletonMap(key, txResult)); snapshot.put(new Snapshot.Key(put), put); DistributedStorage storage = mock(DistributedStorage.class); Scanner scanner = mock(Scanner.class); @@ -874,7 +858,7 @@ public void toSerializableWithExtraRead_ScanSetUpdated_ShouldThrowValidationConf TransactionResult txResult = prepareResult(ANY_ID); Snapshot.Key key = new Snapshot.Key(scan, txResult); snapshot.put(key, Optional.of(txResult)); - snapshot.put(scan, Collections.singletonList(key)); + snapshot.put(scan, Collections.singletonMap(key, txResult)); snapshot.put(new Snapshot.Key(put), put); DistributedStorage storage = mock(DistributedStorage.class); TransactionResult changedTxResult = prepareResult(ANY_ID + "x"); @@ -903,7 +887,7 @@ public void toSerializableWithExtraRead_ScanSetExtended_ShouldThrowValidationCon Scan scan = prepareScan(); Put put = preparePut(); TransactionResult result = prepareResult(ANY_ID + "x"); - snapshot.put(scan, Collections.emptyList()); + snapshot.put(scan, Collections.emptyMap()); snapshot.put(new Snapshot.Key(put), put); DistributedStorage storage = mock(DistributedStorage.class); TransactionResult txResult = new TransactionResult(result); @@ -974,8 +958,8 @@ public void toSerializableWithExtraRead_ScanSetExtended_ShouldThrowValidationCon Snapshot.Key key1 = new Snapshot.Key(scan1, result1); Snapshot.Key key2 = new Snapshot.Key(scan2, result2); - snapshot.put(scan1, Collections.singletonList(key1)); - snapshot.put(scan2, Collections.singletonList(key2)); + snapshot.put(scan1, Collections.singletonMap(key1, new TransactionResult(result1))); + snapshot.put(scan2, Collections.singletonMap(key2, new TransactionResult(result2))); snapshot.put(key1, Optional.of(new TransactionResult(result1))); snapshot.put(key2, Optional.of(new TransactionResult(result2))); @@ -1020,6 +1004,7 @@ public void toSerializableWithExtraRead_ScanSetExtended_ShouldThrowValidationCon TransactionResult result = prepareResultWithNullMetadata(); TransactionResult txResult = new TransactionResult(result); snapshot.put(new Snapshot.Key(get), Optional.of(result)); + snapshot.put(get, Optional.of(result)); snapshot.put(new Snapshot.Key(put), put); DistributedStorage storage = mock(DistributedStorage.class); Get getWithProjections = @@ -1044,6 +1029,7 @@ public void toSerializableWithExtraRead_ScanSetExtended_ShouldThrowValidationCon TransactionResult result = prepareResultWithNullMetadata(); TransactionResult changedResult = prepareResult(ANY_ID); snapshot.put(new Snapshot.Key(get), Optional.of(result)); + snapshot.put(get, Optional.of(result)); snapshot.put(new Snapshot.Key(put), put); DistributedStorage storage = mock(DistributedStorage.class); Get getWithProjections = @@ -1068,7 +1054,7 @@ public void toSerializableWithExtraRead_ScanSetExtended_ShouldThrowValidationCon TransactionResult result = prepareResult(ANY_ID); Snapshot.Key key = new Snapshot.Key(scan, result); snapshot.put(key, Optional.of(result)); - snapshot.put(scan, Collections.singletonList(key)); + snapshot.put(scan, Collections.singletonMap(key, result)); DistributedStorage storage = mock(DistributedStorage.class); Scan scanWithProjections = Scan.newBuilder(scan) @@ -1163,6 +1149,27 @@ public void get_ScanGivenAndPutInWriteSetNotOverlappedWithScan_ShouldNotThrowExc assertThat(thrown).doesNotThrowAnyException(); } + @Test + public void + verify_ScanGivenAndPutKeyAlreadyPresentInScanSet_ShouldThrowIllegalArgumentException() { + // Arrange + snapshot = prepareSnapshot(Isolation.SNAPSHOT); + Put put = preparePut(); + Snapshot.Key putKey = new Snapshot.Key(put); + snapshot.put(putKey, put); + Scan scan = prepareScan(); + TransactionResult result = prepareResult(ANY_ID); + Snapshot.Key key = new Snapshot.Key(scan, result); + snapshot.put(key, Optional.of(result)); + snapshot.put(scan, Collections.singletonMap(key, result)); + + // Act Assert + Throwable thrown = catchThrowable(() -> snapshot.verify(scan)); + + // Assert + assertThat(thrown).isInstanceOf(IllegalArgumentException.class); + } + @Test public void verify_ScanGivenAndPutWithSamePartitionKeyWithoutClusteringKeyInWriteSet_ShouldThrowIllegalArgumentException() { @@ -1172,6 +1179,7 @@ public void get_ScanGivenAndPutInWriteSetNotOverlappedWithScan_ShouldNotThrowExc Snapshot.Key putKey = new Snapshot.Key(put); snapshot.put(putKey, put); Scan scan = prepareScan(); + snapshot.put(scan, Collections.emptyMap()); // Act Assert Throwable thrown = catchThrowable(() -> snapshot.verify(scan)); @@ -1195,6 +1203,7 @@ public void get_ScanGivenAndPutInWriteSetNotOverlappedWithScan_ShouldNotThrowExc .withConsistency(Consistency.LINEARIZABLE) .forNamespace(ANY_NAMESPACE_NAME) .forTable(ANY_TABLE_NAME); + snapshot.put(scan, Collections.emptyMap()); // Act Assert Throwable thrown = catchThrowable(() -> snapshot.verify(scan)); @@ -1237,6 +1246,11 @@ public void get_ScanGivenAndPutInWriteSetNotOverlappedWithScan_ShouldNotThrowExc // ["text1", "text2") .withStart(new Key(ANY_NAME_2, ANY_TEXT_1), true) .withEnd(new Key(ANY_NAME_2, ANY_TEXT_2), false); + snapshot.put(scan1, Collections.emptyMap()); + snapshot.put(scan2, Collections.emptyMap()); + snapshot.put(scan3, Collections.emptyMap()); + snapshot.put(scan4, Collections.emptyMap()); + snapshot.put(scan5, Collections.emptyMap()); // Act Assert Throwable thrown1 = catchThrowable(() -> snapshot.verify(scan1)); @@ -1283,6 +1297,9 @@ public void get_ScanGivenAndPutInWriteSetNotOverlappedWithScan_ShouldNotThrowExc .withConsistency(Consistency.LINEARIZABLE) .forNamespace(ANY_NAMESPACE_NAME) .forTable(ANY_TABLE_NAME); + snapshot.put(scan1, Collections.emptyMap()); + snapshot.put(scan2, Collections.emptyMap()); + snapshot.put(scan3, Collections.emptyMap()); // Act Assert Throwable thrown1 = catchThrowable(() -> snapshot.verify(scan1)); @@ -1325,6 +1342,9 @@ public void get_ScanGivenAndPutInWriteSetNotOverlappedWithScan_ShouldNotThrowExc .withConsistency(Consistency.LINEARIZABLE) .forNamespace(ANY_NAMESPACE_NAME) .forTable(ANY_TABLE_NAME); + snapshot.put(scan1, Collections.emptyMap()); + snapshot.put(scan2, Collections.emptyMap()); + snapshot.put(scan3, Collections.emptyMap()); // Act Assert Throwable thrown1 = catchThrowable(() -> snapshot.verify(scan1)); @@ -1350,8 +1370,9 @@ public void verify_ScanWithIndexGivenAndPutInWriteSetInSameTable_ShouldThrowExce .table(ANY_TABLE_NAME) .indexKey(Key.ofText(ANY_NAME_4, ANY_TEXT_4)) .build(); - Snapshot.Key key = new Snapshot.Key(scan, prepareResult(ANY_ID)); - snapshot.put(scan, Collections.singletonList(key)); + TransactionResult result = prepareResult(ANY_ID); + Snapshot.Key key = new Snapshot.Key(scan, result); + snapshot.put(scan, Collections.singletonMap(key, result)); // Act Throwable thrown = catchThrowable(() -> snapshot.verify(scan)); @@ -1379,8 +1400,9 @@ public void verify_ScanWithIndexGivenAndPutInWriteSetInDifferentTable_ShouldNotT .table(ANY_TABLE_NAME) .indexKey(Key.ofText(ANY_NAME_4, ANY_TEXT_4)) .build(); - Snapshot.Key key = new Snapshot.Key(scan, prepareResult(ANY_ID)); - snapshot.put(scan, Collections.singletonList(key)); + TransactionResult result = prepareResult(ANY_ID); + Snapshot.Key key = new Snapshot.Key(scan, result); + snapshot.put(scan, Collections.singletonMap(key, result)); // Act Assert Throwable thrown = catchThrowable(() -> snapshot.verify(scan)); @@ -1419,8 +1441,9 @@ public void verify_ScanWithIndexAndPutWithSameIndexKeyGiven_ShouldThrowException .table(ANY_TABLE_NAME) .indexKey(Key.ofText(ANY_NAME_4, ANY_TEXT_4)) .build(); - Snapshot.Key key = new Snapshot.Key(scan, prepareResult(ANY_ID)); - snapshot.put(scan, Collections.singletonList(key)); + TransactionResult result = prepareResult(ANY_ID); + Snapshot.Key key = new Snapshot.Key(scan, result); + snapshot.put(scan, Collections.singletonMap(key, result)); // Act Throwable thrown = catchThrowable(() -> snapshot.verify(scan)); @@ -1442,8 +1465,9 @@ public void verify_ScanAllGivenAndPutInWriteSetInSameTable_ShouldThrowException( .withConsistency(Consistency.LINEARIZABLE) .forNamespace(ANY_NAMESPACE_NAME) .forTable(ANY_TABLE_NAME); - Snapshot.Key key = new Snapshot.Key(scanAll, prepareResult(ANY_ID)); - snapshot.put(scanAll, Collections.singletonList(key)); + TransactionResult result = prepareResult(ANY_ID); + Snapshot.Key key = new Snapshot.Key(scanAll, result); + snapshot.put(scanAll, Collections.singletonMap(key, result)); // Act Assert Throwable thrown = catchThrowable(() -> snapshot.verify(scanAll)); @@ -1466,8 +1490,9 @@ public void verify_ScanAllGivenAndPutInWriteSetInSameTable_ShouldThrowException( .withConsistency(Consistency.LINEARIZABLE) .forNamespace(ANY_NAMESPACE_NAME_2) .forTable(ANY_TABLE_NAME_2); - Snapshot.Key key = new Snapshot.Key(scanAll, prepareResult(ANY_ID)); - snapshot.put(scanAll, Collections.singletonList(key)); + TransactionResult result = prepareResult(ANY_ID); + Snapshot.Key key = new Snapshot.Key(scanAll, result); + snapshot.put(scanAll, Collections.singletonMap(key, result)); // Act Assert Throwable thrown = catchThrowable(() -> snapshot.verify(scanAll)); @@ -1476,6 +1501,22 @@ public void verify_ScanAllGivenAndPutInWriteSetInSameTable_ShouldThrowException( assertThat(thrown).doesNotThrowAnyException(); } + @Test + public void get_GetGivenAndAlreadyPresentInGetSet_ShouldReturnResult() { + // Arrange + snapshot = prepareSnapshot(Isolation.SNAPSHOT); + Get get = prepareGet(); + TransactionResult expected = prepareResult(ANY_ID); + snapshot.put(get, Optional.of(expected)); + + // Act + Optional actual = snapshot.get(get); + + // Assert + assertThat(actual).isPresent(); + assertThat(actual.get()).isEqualTo(expected); + } + @Test public void get_ScanAllGivenAndAlreadyPresentInScanSet_ShouldReturnKeys() { // Arrange @@ -1490,15 +1531,17 @@ public void get_ScanAllGivenAndAlreadyPresentInScanSet_ShouldReturnKeys() { .withConsistency(Consistency.LINEARIZABLE) .forNamespace(ANY_NAMESPACE_NAME_2) .forTable(ANY_TABLE_NAME_2); - Snapshot.Key aKey = mock(Snapshot.Key.class); - snapshot.put(scanAll, Collections.singletonList(aKey)); + TransactionResult result = prepareResult(ANY_ID); + Snapshot.Key key = new Snapshot.Key(scanAll, result); + snapshot.put(scanAll, Collections.singletonMap(key, result)); // Act Assert - Optional> keys = snapshot.get(scanAll); + Optional> results = snapshot.get(scanAll); // Assert - assertThat(keys).isNotEmpty(); - assertThat(keys.get()).containsExactly(aKey); + assertThat(results).isNotEmpty(); + assertThat(results.get()).containsKey(key); + assertThat(results.get().get(key)).isEqualTo(result); } @Test @@ -1509,8 +1552,9 @@ public void verify_CrossPartitionScanGivenAndPutInSameTable_ShouldThrowException Snapshot.Key putKey = new Snapshot.Key(put); snapshot.put(putKey, put); Scan scan = prepareCrossPartitionScan(); - Snapshot.Key key = new Snapshot.Key(scan, prepareResult(ANY_ID)); - snapshot.put(scan, Collections.singletonList(key)); + TransactionResult result = prepareResult(ANY_ID); + Snapshot.Key key = new Snapshot.Key(scan, result); + snapshot.put(scan, Collections.singletonMap(key, result)); // Act Throwable thrown = catchThrowable(() -> snapshot.verify(scan)); @@ -1527,8 +1571,9 @@ public void verify_CrossPartitionScanGivenAndPutInDifferentNamespace_ShouldNotTh Snapshot.Key putKey = new Snapshot.Key(put); snapshot.put(putKey, put); Scan scan = prepareCrossPartitionScan(ANY_NAMESPACE_NAME_2, ANY_TABLE_NAME); - Snapshot.Key key = new Snapshot.Key(scan, prepareResult(ANY_ID)); - snapshot.put(scan, Collections.singletonList(key)); + TransactionResult result = prepareResult(ANY_ID); + Snapshot.Key key = new Snapshot.Key(scan, result); + snapshot.put(scan, Collections.singletonMap(key, result)); // Act Throwable thrown = catchThrowable(() -> snapshot.verify(scan)); @@ -1545,8 +1590,9 @@ public void verify_CrossPartitionScanGivenAndPutInDifferentTable_ShouldNotThrowE Snapshot.Key putKey = new Snapshot.Key(put); snapshot.put(putKey, put); Scan scan = prepareCrossPartitionScan(ANY_NAMESPACE_NAME, ANY_TABLE_NAME_2); - Snapshot.Key key = new Snapshot.Key(scan, prepareResult(ANY_ID)); - snapshot.put(scan, Collections.singletonList(key)); + TransactionResult result = prepareResult(ANY_ID); + Snapshot.Key key = new Snapshot.Key(scan, result); + snapshot.put(scan, Collections.singletonMap(key, result)); // Act Throwable thrown = catchThrowable(() -> snapshot.verify(scan)); @@ -1580,7 +1626,7 @@ public void verify_CrossPartitionScanGivenAndPutInDifferentTable_ShouldNotThrowE ConditionBuilder.column(ANY_NAME_8).isNullInt())) .build()) .build(); - snapshot.put(scan, Collections.emptyList()); + snapshot.put(scan, Collections.emptyMap()); // Act Throwable thrown = catchThrowable(() -> snapshot.verify(scan)); @@ -1603,7 +1649,7 @@ public void verify_CrossPartitionScanGivenAndPutInDifferentTable_ShouldNotThrowE .where(ConditionBuilder.column(ANY_NAME_3).isEqualToText(ANY_TEXT_1)) .or(ConditionBuilder.column(ANY_NAME_4).isEqualToText(ANY_TEXT_4)) .build(); - snapshot.put(scan, Collections.emptyList()); + snapshot.put(scan, Collections.emptyMap()); // Act Throwable thrown = catchThrowable(() -> snapshot.verify(scan)); @@ -1626,7 +1672,7 @@ public void verify_CrossPartitionScanGivenAndPutInDifferentTable_ShouldNotThrowE .where(ConditionBuilder.column(ANY_NAME_3).isLikeText("text%")) .and(ConditionBuilder.column(ANY_NAME_4).isNotLikeText("text")) .build(); - snapshot.put(scan, Collections.emptyList()); + snapshot.put(scan, Collections.emptyMap()); // Act Throwable thrown = catchThrowable(() -> snapshot.verify(scan)); @@ -1649,7 +1695,7 @@ public void verify_CrossPartitionScanGivenAndPutInDifferentTable_ShouldNotThrowE .where(ConditionBuilder.column(ANY_NAME_4).isEqualToText(ANY_TEXT_1)) .or(ConditionBuilder.column(ANY_NAME_5).isEqualToText(ANY_TEXT_1)) .build(); - snapshot.put(scan, Collections.emptyList()); + snapshot.put(scan, Collections.emptyMap()); // Act Throwable thrown = catchThrowable(() -> snapshot.verify(scan)); @@ -1667,7 +1713,7 @@ public void verify_CrossPartitionScanGivenAndPutInDifferentTable_ShouldNotThrowE Snapshot.Key putKey = new Snapshot.Key(put); snapshot.put(putKey, put); Scan scan = Scan.newBuilder(prepareCrossPartitionScan()).clearConditions().build(); - snapshot.put(scan, Collections.emptyList()); + snapshot.put(scan, Collections.emptyMap()); // Act Throwable thrown = catchThrowable(() -> snapshot.verify(scan)); diff --git a/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitNullMetadataIntegrationTestBase.java b/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitNullMetadataIntegrationTestBase.java index b29d1b2c8..625da762d 100644 --- a/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitNullMetadataIntegrationTestBase.java +++ b/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitNullMetadataIntegrationTestBase.java @@ -1191,7 +1191,7 @@ public void get_GetGivenForDeletedWhenCoordinatorStateNotExistAndExpired_ShouldA } @Test - public void getAndScan_CommitHappenedInBetween_ShouldReadRepeatably() + public void getThenScanAndGet_CommitHappenedInBetween_OnlyGetShouldReadRepeatably() throws TransactionException, ExecutionException { // Arrange populateRecordsWithNullMetadata(namespace1, TABLE_1); @@ -1211,7 +1211,8 @@ public void getAndScan_CommitHappenedInBetween_ShouldReadRepeatably() // Assert assertThat(result1).isPresent(); - assertThat(result1.get()).isEqualTo(result2); + assertThat(result1.get()).isNotEqualTo(result2); + assertThat(result2.getInt(BALANCE)).isEqualTo(2); assertThat(result1).isEqualTo(result3); } diff --git a/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitSpecificIntegrationTestBase.java b/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitSpecificIntegrationTestBase.java index ca69877af..374c1557a 100644 --- a/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitSpecificIntegrationTestBase.java +++ b/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitSpecificIntegrationTestBase.java @@ -1022,7 +1022,7 @@ public void get_GetGivenForDeletedWhenCoordinatorStateNotExistAndExpired_ShouldA } @Test - public void getAndScan_CommitHappenedInBetween_ShouldReadRepeatably() + public void getThenScanAndGet_CommitHappenedInBetween_OnlyGetShouldReadRepeatably() throws TransactionException { // Arrange DistributedTransaction transaction = manager.begin(); @@ -1044,7 +1044,8 @@ public void getAndScan_CommitHappenedInBetween_ShouldReadRepeatably() // Assert assertThat(result1).isPresent(); - assertThat(result1.get()).isEqualTo(result2); + assertThat(result1.get()).isNotEqualTo(result2); + assertThat(result2.getInt(BALANCE)).isEqualTo(2); assertThat(result1).isEqualTo(result3); } @@ -2491,6 +2492,8 @@ public void scan_DeleteGivenBefore_ShouldScan() throws TransactionException { // Assert assertThat(results.size()).isEqualTo(1); + assertThat(results.get(0).getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(results.get(0).getInt(ACCOUNT_TYPE)).isEqualTo(1); } @Test @@ -2724,6 +2727,96 @@ public void scanAll_ScanAllGivenForPreparedWhenCoordinatorStateCommitted_ShouldR scanAll); } + @Test + public void scan_CalledTwice_ShouldReturnFromSnapshotInSecondTime() + throws TransactionException, ExecutionException { + // Arrange + populateRecords(namespace1, TABLE_1); + DistributedTransaction transaction = manager.begin(); + Scan scan = prepareScan(0, 0, 0, namespace1, TABLE_1); + + // Act + List result1 = transaction.scan(scan); + List result2 = transaction.scan(scan); + transaction.commit(); + + // Assert + verify(storage).scan(any(Scan.class)); + assertThat(result1).isEqualTo(result2); + } + + @Test + public void scan_CalledTwiceWithSameConditionsAndDeleteHappenedInBetween_ShouldReadRepeatably() + throws TransactionException { + // Arrange + DistributedTransaction transaction = manager.begin(); + transaction.put(preparePut(0, 0, namespace1, TABLE_1)); + transaction.commit(); + + DistributedTransaction transaction1 = manager.begin(); + Scan scan = + Scan.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .start(Key.ofInt(ACCOUNT_TYPE, 0)) + .build(); + List result1 = transaction1.scan(scan); + + DistributedTransaction transaction2 = manager.begin(); + transaction2.get(prepareGet(0, 0, namespace1, TABLE_1)); + transaction2.delete(prepareDelete(0, 0, namespace1, TABLE_1)); + transaction2.commit(); + + // Act + List result2 = transaction1.scan(scan); + transaction1.commit(); + + // Assert + assertThat(result1.size()).isEqualTo(1); + assertThat(result2.size()).isEqualTo(1); + assertThat(result1.get(0)).isEqualTo(result2.get(0)); + } + + @Test + public void + scan_CalledTwiceWithDifferentConditionsAndInsertHappenedInBetween_ShouldNotReadRepeatably() + throws TransactionException { + // Arrange + DistributedTransaction transaction = manager.begin(); + transaction.put(preparePut(0, 0, namespace1, TABLE_1).withValue(BALANCE, 1)); + transaction.commit(); + + DistributedTransaction transaction1 = manager.begin(); + Scan scan1 = + Scan.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .end(Key.ofInt(ACCOUNT_TYPE, 2)) + .build(); + Scan scan2 = + Scan.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .end(Key.ofInt(ACCOUNT_TYPE, 3)) + .build(); + List result1 = transaction1.scan(scan1); + + DistributedTransaction transaction2 = manager.begin(); + transaction2.put(preparePut(0, 1, namespace1, TABLE_1)); + transaction2.commit(); + + // Act + List result2 = transaction1.scan(scan2); + transaction1.commit(); + + // Assert + assertThat(result1.size()).isEqualTo(1); + assertThat(result2.size()).isEqualTo(2); + } + private DistributedTransaction prepareTransfer( int fromId, String fromNamespace, diff --git a/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitSpecificIntegrationTestBase.java b/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitSpecificIntegrationTestBase.java index 806668b35..76382991f 100644 --- a/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitSpecificIntegrationTestBase.java +++ b/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitSpecificIntegrationTestBase.java @@ -1011,7 +1011,7 @@ public void get_GetGivenForDeletedWhenCoordinatorStateNotExistAndExpired_ShouldA } @Test - public void getAndScan_CommitHappenedInBetween_ShouldReadRepeatably() + public void getThenScanAndGet_CommitHappenedInBetween_OnlyGetShouldReadRepeatably() throws TransactionException { // Arrange TwoPhaseCommitTransaction transaction = manager1.begin(); @@ -1036,7 +1036,8 @@ public void getAndScan_CommitHappenedInBetween_ShouldReadRepeatably() // Assert assertThat(result1).isPresent(); - assertThat(result1.get()).isEqualTo(result2); + assertThat(result1.get()).isNotEqualTo(result2); + assertThat(result2.getInt(BALANCE)).isEqualTo(2); assertThat(result1).isEqualTo(result3); }