diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/model/ValueCaptureType.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/model/ValueCaptureType.java index 9e336e18eb7b..af1123c118de 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/model/ValueCaptureType.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/model/ValueCaptureType.java @@ -30,5 +30,6 @@ public enum ValueCaptureType { NEW_ROW, NEW_VALUES, OLD_AND_NEW_VALUES, + NEW_ROW_AND_OLD_VALUES, UNKNOWN } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapperTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapperTest.java index 05ed0bbae6cc..a06fb074e637 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapperTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapperTest.java @@ -223,6 +223,44 @@ public void testMappingUpdateStructRowNewValuesToDataChangeRecord() { mapper.toChangeStreamRecords(partition, resultSet, resultSetMetadata)); } + /* + * Change streams with NEW_ROW_AND_OLD_VALUES value capture type track both old values for + * modified columns and the whole new row. + */ + @Test + public void testMappingUpdateStructRowNewRowAndOldValuesToDataChangeRecord() { + final DataChangeRecord dataChangeRecord = + new DataChangeRecord( + "partitionToken", + Timestamp.ofTimeSecondsAndNanos(10L, 20), + "serverTransactionId", + true, + "1", + "tableName", + Arrays.asList( + new ColumnType("column1", new TypeCode("{\"code\":\"INT64\"}"), true, 1L), + new ColumnType("column2", new TypeCode("{\"code\":\"BYTES\"}"), false, 2L)), + Collections.singletonList( + new Mod( + "{\"column1\":\"value1\"}", + "{\"column2\":\"oldValue2\"}", + "{\"column2\":\"newValue2\"}")), + ModType.UPDATE, + ValueCaptureType.NEW_ROW, + 10L, + 2L, + "transactionTag", + true, + null); + final Struct jsonFieldsStruct = recordsToStructWithJson(dataChangeRecord); + ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class); + when(resultSet.getCurrentRowAsStruct()).thenReturn(jsonFieldsStruct); + + assertEquals( + Collections.singletonList(dataChangeRecord), + mapper.toChangeStreamRecords(partition, resultSet, resultSetMetadata)); + } + @Test public void testMappingInsertStructRowToDataChangeRecord() { final DataChangeRecord dataChangeRecord = @@ -316,6 +354,37 @@ public void testMappingInsertStructRowNewValuesToDataChangeRecord() { mapper.toChangeStreamRecords(partition, resultSet, resultSetMetadata)); } + @Test + public void testMappingInsertStructRowNewRowAndOldValuesToDataChangeRecord() { + final DataChangeRecord dataChangeRecord = + new DataChangeRecord( + "partitionToken", + Timestamp.ofTimeSecondsAndNanos(10L, 20), + "transactionId", + false, + "1", + "tableName", + Arrays.asList( + new ColumnType("column1", new TypeCode("{\"code\":\"INT64\"}"), true, 1L), + new ColumnType("column2", new TypeCode("{\"code\":\"BYTES\"}"), false, 2L)), + Collections.singletonList( + new Mod("{\"column1\":\"value1\"}", null, "{\"column2\":\"newValue2\"}")), + ModType.INSERT, + ValueCaptureType.NEW_ROW_AND_OLD_VALUES, + 10L, + 2L, + "transactionTag", + true, + null); + final Struct jsonFieldsStruct = recordsToStructWithJson(dataChangeRecord); + ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class); + when(resultSet.getCurrentRowAsStruct()).thenReturn(jsonFieldsStruct); + + assertEquals( + Collections.singletonList(dataChangeRecord), + mapper.toChangeStreamRecords(partition, resultSet, resultSetMetadata)); + } + @Test public void testMappingDeleteStructRowToDataChangeRecord() { final DataChangeRecord dataChangeRecord = @@ -407,6 +476,37 @@ public void testMappingDeleteStructRowNewValuesToDataChangeRecord() { mapper.toChangeStreamRecords(partition, resultSet, resultSetMetadata)); } + @Test + public void testMappingDeleteStructRowNewRowAndOldValuesToDataChangeRecord() { + final DataChangeRecord dataChangeRecord = + new DataChangeRecord( + "partitionToken", + Timestamp.ofTimeSecondsAndNanos(10L, 20), + "transactionId", + false, + "1", + "tableName", + Arrays.asList( + new ColumnType("column1", new TypeCode("{\"code\":\"INT64\"}"), true, 1L), + new ColumnType("column2", new TypeCode("{\"code\":\"BYTES\"}"), false, 2L)), + Collections.singletonList( + new Mod("{\"column1\":\"value1\"}", "{\"column2\":\"oldValue2\"}", null)), + ModType.DELETE, + ValueCaptureType.NEW_ROW_AND_OLD_VALUES, + 10L, + 2L, + "transactionTag", + true, + null); + final Struct jsonFieldsStruct = recordsToStructWithJson(dataChangeRecord); + ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class); + when(resultSet.getCurrentRowAsStruct()).thenReturn(jsonFieldsStruct); + + assertEquals( + Collections.singletonList(dataChangeRecord), + mapper.toChangeStreamRecords(partition, resultSet, resultSetMetadata)); + } + @Test public void testMappingStructRowWithUnknownModTypeAndValueCaptureTypeToDataChangeRecord() { final DataChangeRecord dataChangeRecord =