diff --git a/README.md b/README.md index cd7c56a6..6020bfa0 100644 --- a/README.md +++ b/README.md @@ -146,9 +146,9 @@ spark-submit --properties-file cdm.properties \ # Things to know - Each run (Migration or Validation) can be tracked (when enabled). You can find summary and details of the same in tables `cdm_run_info` and `cdm_run_details` in the target keyspace. - CDM does not migrate `ttl` & `writetime` at the field-level (for optimization reasons). It instead finds the field with the highest `ttl` & the field with the highest `writetime` within an `origin` row and uses those values on the entire `target` row. -- CDM ignores `ttl` & `writetime` on collection and UDT fields while computing the highest value -- If a table has only collection and/or UDT non-key columns and not table-level `ttl` configuration, the target will have no `ttl`, which can lead to inconsistencies between `origin` and `target` as rows expire on `origin` due to `ttl` expiry. -- If a table has only collection and/or UDT non-key columns, the `writetime` used on target will be time the job was run. Alternatively if needed, the param `spark.cdm.transform.custom.writetime` can be used to set a static custom value for `writetime`. +- CDM ignores using collection and UDT fields for `ttl` & `writetime` calculations by default for performance reasons. If you want to include such fields, set `spark.cdm.schema.ttlwritetime.calc.useCollections` param to `true`. +- If a table has only collection and/or UDT non-key columns and no table-level `ttl` configuration, the target will have no `ttl`, which can lead to inconsistencies between `origin` and `target` as rows expire on `origin` due to `ttl` expiry. If you want to avoid this, we recommend setting `spark.cdm.schema.ttlwritetime.calc.useCollections` param to `true` in such scenarios. +- If a table has only collection and/or UDT non-key columns, the `writetime` used on target will be time the job was run. If you want to avoid this, we recommend setting `spark.cdm.schema.ttlwritetime.calc.useCollections` param to `true` in such scenarios. - When CDM migration (or validation with autocorrect) is run multiple times on the same table (for whatever reasons), it could lead to duplicate entries in `list` type columns. Note this is [due to a Cassandra/DSE bug](https://issues.apache.org/jira/browse/CASSANDRA-11368) and not a CDM issue. This issue can be addressed by enabling and setting a positive value for `spark.cdm.transform.custom.writetime.incrementBy` param. This param was specifically added to address this issue. - When you rerun job to resume from a previous run, the run metrics (read, write, skipped, etc.) captured in table `cdm_run_info` will be only for the current run. If the previous run was killed for some reasons, its run metrics may not have been saved. If the previous run did complete (not killed) but with errors, then you will have all run metrics from previous run as well. diff --git a/RELEASE.md b/RELEASE.md index fd6cc0b8..d4caca1b 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -1,4 +1,7 @@ # Release Notes +## [4.6.0] - 2024-10-18 +- Allow using Collections and/or UDTs for `ttl` & `writetime` calculations. This is specifically helpful in scenarios where the only non-key columns are Collections and/or UDTs. + ## [4.5.1] - 2024-10-11 - Made CDM generated SCB unique & much short-lived when using the TLS option to connect to Astra more securely. diff --git a/src/main/java/com/datastax/cdm/feature/WritetimeTTL.java b/src/main/java/com/datastax/cdm/feature/WritetimeTTL.java index 0e224dfe..8dd26134 100644 --- a/src/main/java/com/datastax/cdm/feature/WritetimeTTL.java +++ b/src/main/java/com/datastax/cdm/feature/WritetimeTTL.java @@ -246,12 +246,13 @@ private OptionalLong getMaxWriteTimeStampForCollections(Row row) { return this.writetimeSelectColumnIndexes.stream().map(col -> { if (row.getType(col).equals(DataTypes.BIGINT)) return Arrays.asList(row.getLong(col)); - return row.getList(col, BigInteger.class).stream().map(BigInteger::longValue).collect(Collectors.toList()); - }).flatMap(List::stream).mapToLong(Long::longValue).max(); + return row.getList(col, BigInteger.class).stream().filter(Objects::nonNull).map(BigInteger::longValue) + .collect(Collectors.toList()); + }).flatMap(List::stream).filter(Objects::nonNull).mapToLong(Long::longValue).max(); } private OptionalLong getMaxWriteTimeStamp(Row row) { - return this.writetimeSelectColumnIndexes.stream().mapToLong(row::getLong).filter(Objects::nonNull).max(); + return this.writetimeSelectColumnIndexes.stream().filter(Objects::nonNull).mapToLong(row::getLong).max(); } public Integer getLargestTTL(Row row) { @@ -271,12 +272,12 @@ private OptionalInt getMaxTTLForCollections(Row row) { return this.ttlSelectColumnIndexes.stream().map(col -> { if (row.getType(col).equals(DataTypes.INT)) return Arrays.asList(row.getInt(col)); - return row.getList(col, Integer.class).stream().collect(Collectors.toList()); + return row.getList(col, Integer.class).stream().filter(Objects::nonNull).collect(Collectors.toList()); }).flatMap(List::stream).filter(Objects::nonNull).mapToInt(Integer::intValue).max(); } private OptionalInt getMaxTTL(Row row) { - return this.ttlSelectColumnIndexes.stream().mapToInt(row::getInt).filter(Objects::nonNull).max(); + return this.ttlSelectColumnIndexes.stream().filter(Objects::nonNull).mapToInt(row::getInt).max(); } private void validateTTLColumns(CqlTable originTable) { diff --git a/src/resources/cdm-detailed.properties b/src/resources/cdm-detailed.properties index 96f1438d..a02fbd8c 100644 --- a/src/resources/cdm-detailed.properties +++ b/src/resources/cdm-detailed.properties @@ -94,27 +94,33 @@ spark.cdm.connect.target.password cassandra # the WRITETIME of the target record. # # Other Parameters: -# spark.cdm.schema.origin -# .column +# spark.cdm.schema +# .origin.column # .names.to.target : Default is empty. If column names are changed between Origin and Target, then # this map-like list provides a mechanism to associate the two. The format is # origin_column_name:target_column_name. The list is comma-separated. Only renamed # columns need to be listed. +# +# .ttlwritetime.calc +# .useCollections : Default is false. When true, TTL and WRITETIME max calculations will include +# collections and UDTs. This is useful when the only non-PK columns are collections +# and/or UDTs. #----------------------------------------------------------------------------------------------------------- spark.cdm.schema.origin.keyspaceTable keyspace_name.table_name # Max TTL value of all non-PK columns will be used for insert on target -#spark.cdm.schema.origin.column.ttl.automatic true +spark.cdm.schema.origin.column.ttl.automatic true # Max TTL value of specified non-PK columns will be used for insert on target (overrides automatic setting) #spark.cdm.schema.origin.column.ttl.names data_col1,data_col2,... # Max WRITETIME value of all non-PK columns will be used for insert on target -#spark.cdm.schema.origin.column.writetime.automatic true +spark.cdm.schema.origin.column.writetime.automatic true # Max WRITETIME value of specified non-PK columns will be used for insert on target (overrides automatic setting) #spark.cdm.schema.origin.column.writetime.names data_col1,data_col2,... #spark.cdm.schema.origin.column.names.to.target partition_col1:partition_col_1,partition_col2:partition_col_2,... +spark.cdm.schema.ttlwritetime.calc.useCollections false #=========================================================================================================== # Details about the Target Schema diff --git a/src/test/java/com/datastax/cdm/feature/WritetimeTTLTest.java b/src/test/java/com/datastax/cdm/feature/WritetimeTTLTest.java index 9955b93d..8ad14f2b 100644 --- a/src/test/java/com/datastax/cdm/feature/WritetimeTTLTest.java +++ b/src/test/java/com/datastax/cdm/feature/WritetimeTTLTest.java @@ -410,6 +410,23 @@ public void getLargestTTLTest() { assertEquals(30, largestTTL); } + @Test + public void getLargestTTLWithListTest() { + when(propertyHelper.getBoolean(KnownProperties.ALLOW_COLL_FOR_WRITETIME_TTL_COLS)).thenReturn(true); + when(propertyHelper.getLong(KnownProperties.TRANSFORM_CUSTOM_TTL)).thenReturn(null); + when(originTable.indexOf("TTL("+ttlColumnName+")")).thenReturn(100); + when(originRow.getType(eq(100))).thenReturn(DataTypes.listOf(DataTypes.INT)); + when(originRow.getList(eq(100), eq(Integer.class))).thenReturn(Arrays.asList(Integer.valueOf(40), Integer.valueOf(10))); + when(originTable.indexOf("TTL("+writetimeTTLColumnName+")")).thenReturn(101); + when(originRow.getType(eq(101))).thenReturn(DataTypes.INT); + when(originRow.getInt(eq(101))).thenReturn(20); + + feature.loadProperties(propertyHelper); + feature.initializeAndValidate(originTable, targetTable); + Integer largestTTL = feature.getLargestTTL(originRow); + assertEquals(40, largestTTL); + } + @Test public void validateInvalidFilterMin() { when(propertyHelper.getLong(KnownProperties.FILTER_WRITETS_MIN)).thenReturn(-1L);