Skip to content

Commit

Permalink
Implemented support for preserving writetimes & TTL on tables with on…
Browse files Browse the repository at this point in the history
…ly collection
  • Loading branch information
pravinbhat committed Oct 17, 2024
1 parent efb1558 commit 6eaa55a
Show file tree
Hide file tree
Showing 14 changed files with 458 additions and 35 deletions.
43 changes: 18 additions & 25 deletions src/main/java/com/datastax/cdm/cql/EnhancedSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,27 @@
*/
package com.datastax.cdm.cql;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.datastax.cdm.cql.codec.CodecFactory;
import com.datastax.cdm.cql.codec.Codecset;
import com.datastax.cdm.cql.statement.*;
import com.datastax.cdm.cql.statement.OriginSelectByPKStatement;
import com.datastax.cdm.cql.statement.OriginSelectByPartitionRangeStatement;
import com.datastax.cdm.cql.statement.TargetInsertStatement;
import com.datastax.cdm.cql.statement.TargetSelectByPKStatement;
import com.datastax.cdm.cql.statement.TargetUpdateStatement;
import com.datastax.cdm.cql.statement.TargetUpsertStatement;
import com.datastax.cdm.data.PKFactory;
import com.datastax.cdm.properties.KnownProperties;
import com.datastax.cdm.properties.PropertyHelper;
import com.datastax.cdm.schema.CqlTable;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.type.DataType;
import com.datastax.oss.driver.api.core.type.codec.CodecNotFoundException;
import com.datastax.oss.driver.api.core.type.codec.TypeCodec;
import com.datastax.oss.driver.api.core.type.codec.registry.MutableCodecRegistry;
import com.datastax.oss.driver.api.core.type.reflect.GenericType;

public class EnhancedSession {
public Logger logger = LoggerFactory.getLogger(this.getClass().getName());
Expand Down Expand Up @@ -96,26 +99,16 @@ public TargetUpsertStatement getTargetUpsertStatement() {
}

private CqlSession initSession(PropertyHelper propertyHelper, CqlSession session) {
List<String> codecList = propertyHelper.getStringList(KnownProperties.TRANSFORM_CODECS);
if (null != codecList && !codecList.isEmpty()) {
MutableCodecRegistry registry = (MutableCodecRegistry) session.getContext().getCodecRegistry();

for (String codecString : codecList) {
Codecset codecEnum = Codecset.valueOf(codecString);
for (TypeCodec<?> codec : CodecFactory.getCodecPair(propertyHelper, codecEnum)) {
DataType dataType = codec.getCqlType();
GenericType<?> javaType = codec.getJavaType();
if (logDebug)
logger.debug("Registering Codec {} for CQL type {} and Java type {}",
codec.getClass().getSimpleName(), dataType, javaType);
try {
registry.codecFor(dataType, javaType);
} catch (CodecNotFoundException e) {
registry.register(codec);
}
}
}
}
// BIGINT_BIGINTEGER codec is always needed to compare C* writetimes in collection columns
List<String> codecList = new ArrayList<>(Arrays.asList("BIGINT_BIGINTEGER"));

if (null != propertyHelper.getStringList(KnownProperties.TRANSFORM_CODECS))
codecList.addAll(propertyHelper.getStringList(KnownProperties.TRANSFORM_CODECS));
MutableCodecRegistry registry = (MutableCodecRegistry) session.getContext().getCodecRegistry();

codecList.stream().map(Codecset::valueOf).map(codec -> CodecFactory.getCodecPair(propertyHelper, codec))
.flatMap(List::stream).forEach(registry::register);

return session;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright DataStax, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datastax.cdm.cql.codec;

import java.math.BigInteger;
import java.nio.ByteBuffer;

import org.jetbrains.annotations.NotNull;

import com.datastax.cdm.properties.PropertyHelper;
import com.datastax.oss.driver.api.core.ProtocolVersion;
import com.datastax.oss.driver.api.core.type.DataType;
import com.datastax.oss.driver.api.core.type.DataTypes;
import com.datastax.oss.driver.api.core.type.codec.TypeCodecs;
import com.datastax.oss.driver.api.core.type.reflect.GenericType;

public class BIGINT_BigIntegerCodec extends AbstractBaseCodec<BigInteger> {

public BIGINT_BigIntegerCodec(PropertyHelper propertyHelper) {
super(propertyHelper);
}

@Override
public @NotNull GenericType<BigInteger> getJavaType() {
return GenericType.BIG_INTEGER;
}

@Override
public @NotNull DataType getCqlType() {
return DataTypes.BIGINT;
}

@Override
public ByteBuffer encode(BigInteger value, @NotNull ProtocolVersion protocolVersion) {
if (value == null) {
return null;
} else {
return TypeCodecs.BIGINT.encode(value.longValue(), protocolVersion);
}
}

@Override
public BigInteger decode(ByteBuffer bytes, @NotNull ProtocolVersion protocolVersion) {
return BigInteger.valueOf(TypeCodecs.BIGINT.decode(bytes, protocolVersion));
}

@Override
public @NotNull String format(BigInteger value) {
return TypeCodecs.BIGINT.format(value.longValue());
}

@Override
public BigInteger parse(String value) {
return BigInteger.valueOf(TypeCodecs.BIGINT.parse(value));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright DataStax, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datastax.cdm.cql.codec;

import java.nio.ByteBuffer;

import org.jetbrains.annotations.NotNull;

import com.datastax.cdm.properties.PropertyHelper;
import com.datastax.oss.driver.api.core.ProtocolVersion;
import com.datastax.oss.driver.api.core.type.DataType;
import com.datastax.oss.driver.api.core.type.DataTypes;
import com.datastax.oss.driver.api.core.type.codec.TypeCodecs;
import com.datastax.oss.driver.api.core.type.reflect.GenericType;

public class BigInteger_BIGINTCodec extends AbstractBaseCodec<Integer> {

public BigInteger_BIGINTCodec(PropertyHelper propertyHelper) {
super(propertyHelper);
}

@Override
public @NotNull GenericType<Integer> getJavaType() {
return GenericType.INTEGER;
}

@Override
public @NotNull DataType getCqlType() {
return DataTypes.INT;
}

@Override
public ByteBuffer encode(Integer value, @NotNull ProtocolVersion protocolVersion) {
if (value == null) {
return null;
} else {
return TypeCodecs.INT.encode(value, protocolVersion);
}
}

@Override
public Integer decode(ByteBuffer bytes, @NotNull ProtocolVersion protocolVersion) {
return TypeCodecs.INT.decode(bytes, protocolVersion);
}

@Override
public @NotNull String format(Integer value) {
return TypeCodecs.INT.format(value);
}

@Override
public Integer parse(String value) {
return TypeCodecs.INT.parse(value);
}

}
3 changes: 3 additions & 0 deletions src/main/java/com/datastax/cdm/cql/codec/CodecFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ public static List<TypeCodec<?>> getCodecPair(PropertyHelper propertyHelper, Cod
return Arrays.asList(new DOUBLE_StringCodec(propertyHelper), new TEXT_DoubleCodec(propertyHelper));
case BIGINT_STRING:
return Arrays.asList(new BIGINT_StringCodec(propertyHelper), new TEXT_LongCodec(propertyHelper));
case BIGINT_BIGINTEGER:
return Arrays.asList(new BIGINT_BigIntegerCodec(propertyHelper),
new BigInteger_BIGINTCodec(propertyHelper));
case STRING_BLOB:
return Arrays.asList(new TEXT_BLOBCodec(propertyHelper), new BLOB_TEXTCodec(propertyHelper));
case ASCII_BLOB:
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/com/datastax/cdm/cql/codec/Codecset.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,6 @@
package com.datastax.cdm.cql.codec;

public enum Codecset {
INT_STRING, DOUBLE_STRING, BIGINT_STRING, DECIMAL_STRING, TIMESTAMP_STRING_MILLIS, TIMESTAMP_STRING_FORMAT,
POINT_TYPE, POLYGON_TYPE, DATE_RANGE, LINE_STRING, STRING_BLOB, ASCII_BLOB
INT_STRING, DOUBLE_STRING, BIGINT_STRING, BIGINT_BIGINTEGER, DECIMAL_STRING, TIMESTAMP_STRING_MILLIS,
TIMESTAMP_STRING_FORMAT, POINT_TYPE, POLYGON_TYPE, DATE_RANGE, LINE_STRING, STRING_BLOB, ASCII_BLOB
}
40 changes: 35 additions & 5 deletions src/main/java/com/datastax/cdm/feature/WritetimeTTL.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.datastax.cdm.feature;

import java.math.BigInteger;
import java.time.Instant;
import java.util.*;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -45,6 +46,7 @@ public class WritetimeTTL extends AbstractFeature {
private Long filterMax;
private boolean hasWriteTimestampFilter;
private Long writetimeIncrement;
private boolean allowCollectionsForWritetimeTTL;

@Override
public boolean loadProperties(IPropertyHelper propertyHelper) {
Expand All @@ -61,7 +63,7 @@ public boolean loadProperties(IPropertyHelper propertyHelper) {
logger.info("PARAM -- WriteTimestampCols: {}", writetimeNames);
this.autoWritetimeNames = false;
}

allowCollectionsForWritetimeTTL = propertyHelper.getBoolean(KnownProperties.ALLOW_COLL_FOR_WRITETIME_TTL_COLS);
this.customWritetime = getCustomWritetime(propertyHelper);
if (this.customWritetime > 0) {
logger.info("PARAM -- {}: {} datetime is {} ", KnownProperties.TRANSFORM_CUSTOM_WRITETIME, customWritetime,
Expand Down Expand Up @@ -233,20 +235,48 @@ public Long getLargestWriteTimeStamp(Row row) {
return this.customWritetime;
if (null == this.writetimeSelectColumnIndexes || this.writetimeSelectColumnIndexes.isEmpty())
return null;
OptionalLong max = this.writetimeSelectColumnIndexes.stream().mapToLong(row::getLong).filter(Objects::nonNull)
.max();

OptionalLong max = (allowCollectionsForWritetimeTTL) ? getMaxWriteTimeStampForCollections(row)
: getMaxWriteTimeStamp(row);

return max.isPresent() ? max.getAsLong() + this.writetimeIncrement : null;
}

private OptionalLong getMaxWriteTimeStampForCollections(Row row) {
return this.writetimeSelectColumnIndexes.stream().map(col -> {
if (row.getType(col).equals(DataTypes.BIGINT))
return Arrays.asList(row.getLong(col));
return row.getList(col, BigInteger.class).stream().map(BigInteger::longValue).collect(Collectors.toList());
}).flatMap(List::stream).mapToLong(Long::longValue).max();
}

private OptionalLong getMaxWriteTimeStamp(Row row) {
return this.writetimeSelectColumnIndexes.stream().mapToLong(row::getLong).filter(Objects::nonNull).max();
}

public Integer getLargestTTL(Row row) {
if (logDebug)
logger.debug("getLargestTTL: customTTL={}, ttlSelectColumnIndexes={}", customTTL, ttlSelectColumnIndexes);
if (this.customTTL > 0)
return this.customTTL.intValue();
if (null == this.ttlSelectColumnIndexes || this.ttlSelectColumnIndexes.isEmpty())
return null;
OptionalInt max = this.ttlSelectColumnIndexes.stream().mapToInt(row::getInt).filter(Objects::nonNull).max();
return max.isPresent() ? max.getAsInt() : null;

OptionalInt max = (allowCollectionsForWritetimeTTL) ? getMaxTTLForCollections(row) : getMaxTTL(row);

return max.isPresent() ? max.getAsInt() : 0;
}

private OptionalInt getMaxTTLForCollections(Row row) {
return this.ttlSelectColumnIndexes.stream().map(col -> {
if (row.getType(col).equals(DataTypes.INT))
return Arrays.asList(row.getInt(col));
return row.getList(col, Integer.class).stream().collect(Collectors.toList());
}).flatMap(List::stream).filter(Objects::nonNull).mapToInt(Integer::intValue).max();
}

private OptionalInt getMaxTTL(Row row) {
return this.ttlSelectColumnIndexes.stream().mapToInt(row::getInt).filter(Objects::nonNull).max();
}

private void validateTTLColumns(CqlTable originTable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public enum PropertyType {
public static final String ORIGIN_TTL_NAMES = "spark.cdm.schema.origin.column.ttl.names";
public static final String ORIGIN_WRITETIME_AUTO = "spark.cdm.schema.origin.column.writetime.automatic";
public static final String ORIGIN_WRITETIME_NAMES = "spark.cdm.schema.origin.column.writetime.names";
public static final String ALLOW_COLL_FOR_WRITETIME_TTL_COLS = "spark.cdm.schema.origin.column.ttlwritetime.allow.collections";

public static final String ORIGIN_COLUMN_NAMES_TO_TARGET = "spark.cdm.schema.origin.column.names.to.target";

Expand All @@ -90,6 +91,8 @@ public enum PropertyType {
types.put(ORIGIN_WRITETIME_NAMES, PropertyType.STRING_LIST);
types.put(ORIGIN_WRITETIME_AUTO, PropertyType.BOOLEAN);
defaults.put(ORIGIN_WRITETIME_AUTO, "true");
types.put(ALLOW_COLL_FOR_WRITETIME_TTL_COLS, PropertyType.BOOLEAN);
defaults.put(ALLOW_COLL_FOR_WRITETIME_TTL_COLS, "false");
types.put(ORIGIN_COLUMN_NAMES_TO_TARGET, PropertyType.STRING_LIST);
}

Expand Down
10 changes: 8 additions & 2 deletions src/main/java/com/datastax/cdm/schema/CqlTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -470,15 +470,19 @@ private void setCqlMetadata(CqlSession cqlSession) {
.filter(md -> !extractJsonExclusive || md.getName().asCql(true).endsWith(columnName))
.collect(Collectors.toCollection(() -> this.cqlAllColumns));

boolean allowCollectionsForWritetimeTTL = propertyHelper
.getBoolean(KnownProperties.ALLOW_COLL_FOR_WRITETIME_TTL_COLS);
this.writetimeTTLColumns = tableMetadata.getColumns().values().stream()
.filter(columnMetadata -> canColumnHaveTTLorWritetime(tableMetadata, columnMetadata))
.filter(columnMetadata -> canColumnHaveTTLorWritetime(tableMetadata, columnMetadata,
allowCollectionsForWritetimeTTL))
.map(ColumnMetadata::getName).map(CqlIdentifier::asInternal).collect(Collectors.toList());

this.columnNameToCqlTypeMap = this.cqlAllColumns.stream().collect(
Collectors.toMap(columnMetadata -> columnMetadata.getName().asInternal(), ColumnMetadata::getType));
}

private boolean canColumnHaveTTLorWritetime(TableMetadata tableMetadata, ColumnMetadata columnMetadata) {
private boolean canColumnHaveTTLorWritetime(TableMetadata tableMetadata, ColumnMetadata columnMetadata,
boolean allowCollectionsForWritetimeTTL) {
DataType dataType = columnMetadata.getType();
boolean isKeyColumn = tableMetadata.getPartitionKey().contains(columnMetadata)
|| tableMetadata.getClusteringColumns().containsKey(columnMetadata);
Expand All @@ -492,6 +496,8 @@ private boolean canColumnHaveTTLorWritetime(TableMetadata tableMetadata, ColumnM
// supported here?
if (CqlData.isFrozen(dataType))
return true;
if (allowCollectionsForWritetimeTTL && CqlData.isCollection(dataType))
return true;
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@

import java.nio.ByteBuffer;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import com.datastax.oss.driver.api.core.ProtocolVersion;
import com.datastax.oss.driver.api.core.type.codec.TypeCodecs;

public class ASCII_BLOBCodecTest {
private final String INPUT = "Encode this Text string to Blob";
Expand All @@ -42,4 +44,11 @@ public void encodeDecode() {
assertEquals(retBuffer, codec.parse(INPUT));
}

@Test
void testFormat() {
String expected = TypeCodecs.ASCII.format(INPUT);

String result = codec.format(ByteBuffer.wrap(INPUT.getBytes()));
Assertions.assertEquals(expected, result);
}
}
Loading

0 comments on commit 6eaa55a

Please sign in to comment.