Skip to content

Commit

Permalink
Additional tests, fixes and docs
Browse files Browse the repository at this point in the history
  • Loading branch information
pravinbhat committed Oct 18, 2024
1 parent 6eaa55a commit 6ee6434
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 12 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,9 @@ spark-submit --properties-file cdm.properties \
# Things to know
- Each run (Migration or Validation) can be tracked (when enabled). You can find summary and details of the same in tables `cdm_run_info` and `cdm_run_details` in the target keyspace.
- CDM does not migrate `ttl` & `writetime` at the field-level (for optimization reasons). It instead finds the field with the highest `ttl` & the field with the highest `writetime` within an `origin` row and uses those values on the entire `target` row.
- CDM ignores `ttl` & `writetime` on collection and UDT fields while computing the highest value
- If a table has only collection and/or UDT non-key columns and not table-level `ttl` configuration, the target will have no `ttl`, which can lead to inconsistencies between `origin` and `target` as rows expire on `origin` due to `ttl` expiry.
- If a table has only collection and/or UDT non-key columns, the `writetime` used on target will be time the job was run. Alternatively if needed, the param `spark.cdm.transform.custom.writetime` can be used to set a static custom value for `writetime`.
- CDM ignores using collection and UDT fields for `ttl` & `writetime` calculations by default for performance reasons. If you want to include such fields, set `spark.cdm.schema.ttlwritetime.calc.useCollections` param to `true`.
- If a table has only collection and/or UDT non-key columns and no table-level `ttl` configuration, the target will have no `ttl`, which can lead to inconsistencies between `origin` and `target` as rows expire on `origin` due to `ttl` expiry. If you want to avoid this, we recommend setting `spark.cdm.schema.ttlwritetime.calc.useCollections` param to `true` in such scenarios.
- If a table has only collection and/or UDT non-key columns, the `writetime` used on target will be time the job was run. If you want to avoid this, we recommend setting `spark.cdm.schema.ttlwritetime.calc.useCollections` param to `true` in such scenarios.
- When CDM migration (or validation with autocorrect) is run multiple times on the same table (for whatever reasons), it could lead to duplicate entries in `list` type columns. Note this is [due to a Cassandra/DSE bug](https://issues.apache.org/jira/browse/CASSANDRA-11368) and not a CDM issue. This issue can be addressed by enabling and setting a positive value for `spark.cdm.transform.custom.writetime.incrementBy` param. This param was specifically added to address this issue.
- When you rerun job to resume from a previous run, the run metrics (read, write, skipped, etc.) captured in table `cdm_run_info` will be only for the current run. If the previous run was killed for some reasons, its run metrics may not have been saved. If the previous run did complete (not killed) but with errors, then you will have all run metrics from previous run as well.

Expand Down
3 changes: 3 additions & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
# Release Notes
## [4.6.0] - 2024-10-18
- Allow using Collections and/or UDTs for `ttl` & `writetime` calculations. This is specifically helpful in scenarios where the only non-key columns are Collections and/or UDTs.

## [4.5.1] - 2024-10-11
- Made CDM generated SCB unique & much short-lived when using the TLS option to connect to Astra more securely.

Expand Down
11 changes: 6 additions & 5 deletions src/main/java/com/datastax/cdm/feature/WritetimeTTL.java
Original file line number Diff line number Diff line change
Expand Up @@ -246,12 +246,13 @@ private OptionalLong getMaxWriteTimeStampForCollections(Row row) {
return this.writetimeSelectColumnIndexes.stream().map(col -> {
if (row.getType(col).equals(DataTypes.BIGINT))
return Arrays.asList(row.getLong(col));
return row.getList(col, BigInteger.class).stream().map(BigInteger::longValue).collect(Collectors.toList());
}).flatMap(List::stream).mapToLong(Long::longValue).max();
return row.getList(col, BigInteger.class).stream().filter(Objects::nonNull).map(BigInteger::longValue)
.collect(Collectors.toList());
}).flatMap(List::stream).filter(Objects::nonNull).mapToLong(Long::longValue).max();
}

private OptionalLong getMaxWriteTimeStamp(Row row) {
return this.writetimeSelectColumnIndexes.stream().mapToLong(row::getLong).filter(Objects::nonNull).max();
return this.writetimeSelectColumnIndexes.stream().filter(Objects::nonNull).mapToLong(row::getLong).max();
}

public Integer getLargestTTL(Row row) {
Expand All @@ -271,12 +272,12 @@ private OptionalInt getMaxTTLForCollections(Row row) {
return this.ttlSelectColumnIndexes.stream().map(col -> {
if (row.getType(col).equals(DataTypes.INT))
return Arrays.asList(row.getInt(col));
return row.getList(col, Integer.class).stream().collect(Collectors.toList());
return row.getList(col, Integer.class).stream().filter(Objects::nonNull).collect(Collectors.toList());
}).flatMap(List::stream).filter(Objects::nonNull).mapToInt(Integer::intValue).max();
}

private OptionalInt getMaxTTL(Row row) {
return this.ttlSelectColumnIndexes.stream().mapToInt(row::getInt).filter(Objects::nonNull).max();
return this.ttlSelectColumnIndexes.stream().filter(Objects::nonNull).mapToInt(row::getInt).max();
}

private void validateTTLColumns(CqlTable originTable) {
Expand Down
14 changes: 10 additions & 4 deletions src/resources/cdm-detailed.properties
Original file line number Diff line number Diff line change
Expand Up @@ -94,27 +94,33 @@ spark.cdm.connect.target.password cassandra
# the WRITETIME of the target record.
#
# Other Parameters:
# spark.cdm.schema.origin
# .column
# spark.cdm.schema
# .origin.column
# .names.to.target : Default is empty. If column names are changed between Origin and Target, then
# this map-like list provides a mechanism to associate the two. The format is
# origin_column_name:target_column_name. The list is comma-separated. Only renamed
# columns need to be listed.
#
# .ttlwritetime.calc
# .useCollections : Default is false. When true, TTL and WRITETIME max calculations will include
# collections and UDTs. This is useful when the only non-PK columns are collections
# and/or UDTs.
#-----------------------------------------------------------------------------------------------------------
spark.cdm.schema.origin.keyspaceTable keyspace_name.table_name

# Max TTL value of all non-PK columns will be used for insert on target
#spark.cdm.schema.origin.column.ttl.automatic true
spark.cdm.schema.origin.column.ttl.automatic true

# Max TTL value of specified non-PK columns will be used for insert on target (overrides automatic setting)
#spark.cdm.schema.origin.column.ttl.names data_col1,data_col2,...

# Max WRITETIME value of all non-PK columns will be used for insert on target
#spark.cdm.schema.origin.column.writetime.automatic true
spark.cdm.schema.origin.column.writetime.automatic true

# Max WRITETIME value of specified non-PK columns will be used for insert on target (overrides automatic setting)
#spark.cdm.schema.origin.column.writetime.names data_col1,data_col2,...
#spark.cdm.schema.origin.column.names.to.target partition_col1:partition_col_1,partition_col2:partition_col_2,...
spark.cdm.schema.ttlwritetime.calc.useCollections false

#===========================================================================================================
# Details about the Target Schema
Expand Down
17 changes: 17 additions & 0 deletions src/test/java/com/datastax/cdm/feature/WritetimeTTLTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,23 @@ public void getLargestTTLTest() {
assertEquals(30, largestTTL);
}

@Test
public void getLargestTTLWithListTest() {
when(propertyHelper.getBoolean(KnownProperties.ALLOW_COLL_FOR_WRITETIME_TTL_COLS)).thenReturn(true);
when(propertyHelper.getLong(KnownProperties.TRANSFORM_CUSTOM_TTL)).thenReturn(null);
when(originTable.indexOf("TTL("+ttlColumnName+")")).thenReturn(100);
when(originRow.getType(eq(100))).thenReturn(DataTypes.listOf(DataTypes.INT));
when(originRow.getList(eq(100), eq(Integer.class))).thenReturn(Arrays.asList(Integer.valueOf(40), Integer.valueOf(10)));
when(originTable.indexOf("TTL("+writetimeTTLColumnName+")")).thenReturn(101);
when(originRow.getType(eq(101))).thenReturn(DataTypes.INT);
when(originRow.getInt(eq(101))).thenReturn(20);

feature.loadProperties(propertyHelper);
feature.initializeAndValidate(originTable, targetTable);
Integer largestTTL = feature.getLargestTTL(originRow);
assertEquals(40, largestTTL);
}

@Test
public void validateInvalidFilterMin() {
when(propertyHelper.getLong(KnownProperties.FILTER_WRITETS_MIN)).thenReturn(-1L);
Expand Down

0 comments on commit 6ee6434

Please sign in to comment.