Skip to content

Commit

Permalink
Merge pull request apache#30012 Support spanner NEW_ROW_AND_OLD_VALUES.
Browse files Browse the repository at this point in the history
* A new value capture type enum value is added to the spannerIO connector to ensure it can recognize NEW_ROW_AND_OLD_VALUES as a new valid value capture type.

* Unit tests for NEW_ROW_AND_OLD_VALUES with ModType.UPDATE, ModType.Insert, and ModType.Delete are added in ChangeStreamRecordMapperTest.java to ensure the connector successfully recognizes the new value capture type.
  • Loading branch information
robertwb authored Jan 18, 2024
2 parents 8772389 + ba05f0f commit e019de8
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,6 @@ public enum ValueCaptureType {
NEW_ROW,
NEW_VALUES,
OLD_AND_NEW_VALUES,
NEW_ROW_AND_OLD_VALUES,
UNKNOWN
}
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,44 @@ public void testMappingUpdateStructRowNewValuesToDataChangeRecord() {
mapper.toChangeStreamRecords(partition, resultSet, resultSetMetadata));
}

/*
* Change streams with NEW_ROW_AND_OLD_VALUES value capture type track both old values for
* modified columns and the whole new row.
*/
@Test
public void testMappingUpdateStructRowNewRowAndOldValuesToDataChangeRecord() {
final DataChangeRecord dataChangeRecord =
new DataChangeRecord(
"partitionToken",
Timestamp.ofTimeSecondsAndNanos(10L, 20),
"serverTransactionId",
true,
"1",
"tableName",
Arrays.asList(
new ColumnType("column1", new TypeCode("{\"code\":\"INT64\"}"), true, 1L),
new ColumnType("column2", new TypeCode("{\"code\":\"BYTES\"}"), false, 2L)),
Collections.singletonList(
new Mod(
"{\"column1\":\"value1\"}",
"{\"column2\":\"oldValue2\"}",
"{\"column2\":\"newValue2\"}")),
ModType.UPDATE,
ValueCaptureType.NEW_ROW,
10L,
2L,
"transactionTag",
true,
null);
final Struct jsonFieldsStruct = recordsToStructWithJson(dataChangeRecord);
ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class);
when(resultSet.getCurrentRowAsStruct()).thenReturn(jsonFieldsStruct);

assertEquals(
Collections.singletonList(dataChangeRecord),
mapper.toChangeStreamRecords(partition, resultSet, resultSetMetadata));
}

@Test
public void testMappingInsertStructRowToDataChangeRecord() {
final DataChangeRecord dataChangeRecord =
Expand Down Expand Up @@ -316,6 +354,37 @@ public void testMappingInsertStructRowNewValuesToDataChangeRecord() {
mapper.toChangeStreamRecords(partition, resultSet, resultSetMetadata));
}

@Test
public void testMappingInsertStructRowNewRowAndOldValuesToDataChangeRecord() {
final DataChangeRecord dataChangeRecord =
new DataChangeRecord(
"partitionToken",
Timestamp.ofTimeSecondsAndNanos(10L, 20),
"transactionId",
false,
"1",
"tableName",
Arrays.asList(
new ColumnType("column1", new TypeCode("{\"code\":\"INT64\"}"), true, 1L),
new ColumnType("column2", new TypeCode("{\"code\":\"BYTES\"}"), false, 2L)),
Collections.singletonList(
new Mod("{\"column1\":\"value1\"}", null, "{\"column2\":\"newValue2\"}")),
ModType.INSERT,
ValueCaptureType.NEW_ROW_AND_OLD_VALUES,
10L,
2L,
"transactionTag",
true,
null);
final Struct jsonFieldsStruct = recordsToStructWithJson(dataChangeRecord);
ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class);
when(resultSet.getCurrentRowAsStruct()).thenReturn(jsonFieldsStruct);

assertEquals(
Collections.singletonList(dataChangeRecord),
mapper.toChangeStreamRecords(partition, resultSet, resultSetMetadata));
}

@Test
public void testMappingDeleteStructRowToDataChangeRecord() {
final DataChangeRecord dataChangeRecord =
Expand Down Expand Up @@ -407,6 +476,37 @@ public void testMappingDeleteStructRowNewValuesToDataChangeRecord() {
mapper.toChangeStreamRecords(partition, resultSet, resultSetMetadata));
}

@Test
public void testMappingDeleteStructRowNewRowAndOldValuesToDataChangeRecord() {
final DataChangeRecord dataChangeRecord =
new DataChangeRecord(
"partitionToken",
Timestamp.ofTimeSecondsAndNanos(10L, 20),
"transactionId",
false,
"1",
"tableName",
Arrays.asList(
new ColumnType("column1", new TypeCode("{\"code\":\"INT64\"}"), true, 1L),
new ColumnType("column2", new TypeCode("{\"code\":\"BYTES\"}"), false, 2L)),
Collections.singletonList(
new Mod("{\"column1\":\"value1\"}", "{\"column2\":\"oldValue2\"}", null)),
ModType.DELETE,
ValueCaptureType.NEW_ROW_AND_OLD_VALUES,
10L,
2L,
"transactionTag",
true,
null);
final Struct jsonFieldsStruct = recordsToStructWithJson(dataChangeRecord);
ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class);
when(resultSet.getCurrentRowAsStruct()).thenReturn(jsonFieldsStruct);

assertEquals(
Collections.singletonList(dataChangeRecord),
mapper.toChangeStreamRecords(partition, resultSet, resultSetMetadata));
}

@Test
public void testMappingStructRowWithUnknownModTypeAndValueCaptureTypeToDataChangeRecord() {
final DataChangeRecord dataChangeRecord =
Expand Down

0 comments on commit e019de8

Please sign in to comment.