Skip to content

Commit

Permalink
Improvements to storage partitioning scheme to be flexible and config…
Browse files Browse the repository at this point in the history
…urable at runtime as below.

- Ability to exclude topic name in partiton by configuration
- Ability to specify scaling factor for timestamp column used in partitioning by configuration
  • Loading branch information
hariprasad-k committed Mar 5, 2020
1 parent 286b9b8 commit f1de545
Show file tree
Hide file tree
Showing 8 changed files with 108 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,15 @@ public class StorageCommonConfig extends AbstractConfig implements ComposableCon
public static final String DIRECTORY_DELIM_DEFAULT = "/";
public static final String DIRECTORY_DELIM_DISPLAY = "Directory Delimiter";

public static final String PATH_INCLUDE_TOPICNAME_CONFIG = "path.include.topicname";
public static final String PATH_INCLUDE_TOPICNAME_DOC = "Whether to append the topic"
+ " name to the topics.dir parameter. If true the full path will be composed"
+ " of topics.dir + delim + topic_name + delim + partitioner_path. If false"
+ " topics.dir + delim + partitioner_path";
public static final Boolean PATH_INCLUDE_TOPICNAME_DEFAULT = true;
public static final String PATH_INCLUDE_TOPICNAME_DISPLAY =
"Whether to append the topic name to the topics.dir parameter";

public static final String FILE_DELIM_CONFIG = "file.delim";
public static final String FILE_DELIM_DOC = "File delimiter pattern";
public static final String FILE_DELIM_DEFAULT = "+";
Expand Down Expand Up @@ -125,6 +134,16 @@ public static ConfigDef newConfigDef(ConfigDef.Recommender storageClassRecommend
Width.LONG,
FILE_DELIM_DISPLAY
);

configDef.define(PATH_INCLUDE_TOPICNAME_CONFIG,
Type.BOOLEAN,
PATH_INCLUDE_TOPICNAME_DEFAULT,
Importance.MEDIUM,
PATH_INCLUDE_TOPICNAME_DOC,
group,
++orderInGroup,
Width.SHORT,
PATH_INCLUDE_TOPICNAME_DISPLAY);
}
return configDef;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,13 @@ public class DefaultPartitioner<T> implements Partitioner<T> {
protected Map<String, Object> config;
protected List<T> partitionFields = null;
protected String delim;
protected boolean includeTopicInPath;

@Override
public void configure(Map<String, Object> config) {
this.config = config;
delim = (String) config.get(StorageCommonConfig.DIRECTORY_DELIM_CONFIG);
includeTopicInPath = (boolean) config.get(StorageCommonConfig.PATH_INCLUDE_TOPICNAME_CONFIG);
}

@Override
Expand All @@ -51,7 +53,11 @@ public String encodePartition(SinkRecord sinkRecord) {

@Override
public String generatePartitionedPath(String topic, String encodedPartition) {
return topic + delim + encodedPartition;
if (includeTopicInPath) {
return topic + delim + encodedPartition;
} else {
return encodedPartition;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,20 @@ public class PartitionerConfig extends AbstractConfig implements ComposableConfi
public static final String TIMESTAMP_FIELD_NAME_DEFAULT = "timestamp";
public static final String TIMESTAMP_FIELD_NAME_DISPLAY = "Record Field for Timestamp Extractor";

public static final String TIMESTAMP_SCALING_FACTOR_CONFIG = "timestamp.scaling.factor";
public static final String TIMESTAMP_SCALING_FACTOR_DOC =
"The scaling factor to be applied to the timestamp by the timestamp extractor.";
public static final long TIMESTAMP_SCALING_FACTOR_DEFAULT = 1L;
public static final String TIMESTAMP_SCALING_FACTOR_DISPLAY =
"Timestamp scaling factor for Timestamp Extractor";

public static final String TIMESTAMP_SCALING_OPERATION_CONFIG = "timestamp.scaling.operation";
public static final String TIMESTAMP_SCALING_OPERATION_DOC =
"The scaling operation to be applied to the timestamp by the timestamp extractor.";
public static final String TIMESTAMP_SCALING_OPERATION_DEFAULT = "Division";
public static final String TIMESTAMP_SCALING_OPERATION_DISPLAY =
"Timestamp scaling operation for Timestamp Extractor";

/**
* Create a new configuration definition.
*
Expand Down Expand Up @@ -208,18 +222,38 @@ public static ConfigDef newConfigDef(ConfigDef.Recommender partitionerClassRecom
++orderInGroup,
Width.LONG,
TIMESTAMP_FIELD_NAME_DISPLAY);

configDef.define(TIMESTAMP_SCALING_FACTOR_CONFIG,
Type.LONG,
TIMESTAMP_SCALING_FACTOR_DEFAULT,
Importance.MEDIUM,
TIMESTAMP_SCALING_FACTOR_DOC,
group,
++orderInGroup,
Width.LONG,
TIMESTAMP_SCALING_FACTOR_DISPLAY);

configDef.define(TIMESTAMP_SCALING_OPERATION_CONFIG,
Type.STRING,
TIMESTAMP_SCALING_OPERATION_DEFAULT,
Importance.MEDIUM,
TIMESTAMP_SCALING_OPERATION_DOC,
group,
++orderInGroup,
Width.LONG,
TIMESTAMP_SCALING_OPERATION_DISPLAY);
}

return configDef;
}

public static class BooleanParentRecommender implements ConfigDef.Recommender {
protected final String parentConfigName;

public BooleanParentRecommender(String parentConfigName) {
this.parentConfigName = parentConfigName;
}

@Override
public List<Object> validValues(String name, Map<String, Object> connectorConfigs) {
return new LinkedList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,11 +266,31 @@ public Long extract(ConnectRecord<?> record) {
public static class RecordFieldTimestampExtractor implements TimestampExtractor {
private String fieldName;
private DateTimeFormatter dateTime;
private long timestampScalingFactor;
private String timestampScalingOperation;

@Override
public void configure(Map<String, Object> config) {
fieldName = (String) config.get(PartitionerConfig.TIMESTAMP_FIELD_NAME_CONFIG);
dateTime = ISODateTimeFormat.dateTimeParser();
timestampScalingFactor = (long) config.get(PartitionerConfig.TIMESTAMP_SCALING_FACTOR_CONFIG);
timestampScalingOperation =
(String) config.get(PartitionerConfig.TIMESTAMP_SCALING_OPERATION_CONFIG);
}

public Long scalingTimestamp(
long timestamp, String timestampScalingOperation, long timestampScalingFactor) {
switch (timestampScalingOperation) {
case "Division":
return timestamp / timestampScalingFactor;
case "Multiplication":
return timestamp * timestampScalingFactor;
default:
log.error(
"Timestamp scaling operation '{}' is not recognized (timestamp remains unscaled).",
timestampScalingOperation);
return timestamp;
}
}

@Override
Expand All @@ -288,7 +308,8 @@ public Long extract(ConnectRecord<?> record) {
switch (fieldSchema.type()) {
case INT32:
case INT64:
return ((Number) timestampValue).longValue();
return scalingTimestamp(((Number) timestampValue).longValue(),
timestampScalingOperation, timestampScalingFactor);
case STRING:
return dateTime.parseMillis((String) timestampValue);
default:
Expand All @@ -304,7 +325,8 @@ public Long extract(ConnectRecord<?> record) {
Map<?, ?> map = (Map<?, ?>) value;
Object timestampValue = DataUtils.getNestedFieldValue(map, fieldName);
if (timestampValue instanceof Number) {
return ((Number) timestampValue).longValue();
return scalingTimestamp(((Number) timestampValue).longValue(),
timestampScalingOperation, timestampScalingFactor);
} else if (timestampValue instanceof String) {
return dateTime.parseMillis((String) timestampValue);
} else if (timestampValue instanceof Date) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class DailyPartitionerTest extends StorageSinkTestBase {
public void testDailyPartitioner() {
Map<String, Object> config = new HashMap<>();
config.put(StorageCommonConfig.DIRECTORY_DELIM_CONFIG, StorageCommonConfig.DIRECTORY_DELIM_DEFAULT);
config.put(StorageCommonConfig.PATH_INCLUDE_TOPICNAME_CONFIG, StorageCommonConfig.PATH_INCLUDE_TOPICNAME_DEFAULT);
config.put(PartitionerConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "Record");
config.put(PartitionerConfig.LOCALE_CONFIG, Locale.US.toString());
config.put(PartitionerConfig.TIMEZONE_CONFIG, DATE_TIME_ZONE.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class DefaultPartitionerTest extends StorageSinkTestBase {
public void testDefaultPartitioner() {
Map<String, Object> config = new HashMap<>();
config.put(StorageCommonConfig.DIRECTORY_DELIM_CONFIG, StorageCommonConfig.DIRECTORY_DELIM_DEFAULT);
config.put(StorageCommonConfig.PATH_INCLUDE_TOPICNAME_CONFIG, StorageCommonConfig.PATH_INCLUDE_TOPICNAME_DEFAULT);

DefaultPartitioner<String> partitioner = new DefaultPartitioner<>();
partitioner.configure(config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class HourlyPartitionerTest extends StorageSinkTestBase {
public void testHourlyPartitioner() {
Map<String, Object> config = new HashMap<>();
config.put(StorageCommonConfig.DIRECTORY_DELIM_CONFIG, StorageCommonConfig.DIRECTORY_DELIM_DEFAULT);
config.put(StorageCommonConfig.PATH_INCLUDE_TOPICNAME_CONFIG, StorageCommonConfig.PATH_INCLUDE_TOPICNAME_DEFAULT);
config.put(PartitionerConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "Record");
config.put(PartitionerConfig.LOCALE_CONFIG, Locale.US.toString());
config.put(PartitionerConfig.TIMEZONE_CONFIG, DATE_TIME_ZONE.toString());
Expand All @@ -61,4 +62,4 @@ public void testHourlyPartitioner() {
assertThat(encodedPartition, is(generateEncodedPartitionFromMap(m)));
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,21 @@ public void testGeneratePartitionedPath() throws Exception {
assertEquals(topic+"/year=2015/month=4/day=2/hour=0/", path);
}

@Test
public void testGeneratePartitionedPathWithoutTopicName() throws Exception {
Map<String, Object> config = createConfig(null);
config.put(StorageCommonConfig.PATH_INCLUDE_TOPICNAME_CONFIG, false);

BiHourlyPartitioner partitioner = (BiHourlyPartitioner) configurePartitioner(
new BiHourlyPartitioner(), null, config);

SinkRecord sinkRecord = getSinkRecord();
String encodedPartition = partitioner.encodePartition(sinkRecord);
final String topic = "topic";
String path = partitioner.generatePartitionedPath(topic, encodedPartition);
assertEquals("year=2015/month=4/day=2/hour=0/", path);
}

@Test
public void testInvalidPathFormat() {
final String configKey = PartitionerConfig.PATH_FORMAT_CONFIG;
Expand Down Expand Up @@ -671,8 +686,11 @@ private Map<String, Object> createConfig(String timeFieldName) {
config.put(PartitionerConfig.PATH_FORMAT_CONFIG, PATH_FORMAT);
config.put(PartitionerConfig.LOCALE_CONFIG, Locale.US.toString());
config.put(PartitionerConfig.TIMEZONE_CONFIG, DATE_TIME_ZONE.toString());
config.put(StorageCommonConfig.PATH_INCLUDE_TOPICNAME_CONFIG, StorageCommonConfig.PATH_INCLUDE_TOPICNAME_DEFAULT);
if (timeFieldName != null) {
config.put(PartitionerConfig.TIMESTAMP_FIELD_NAME_CONFIG, timeFieldName);
config.put(PartitionerConfig.TIMESTAMP_SCALING_FACTOR_CONFIG, PartitionerConfig.TIMESTAMP_SCALING_FACTOR_DEFAULT);
config.put(PartitionerConfig.TIMESTAMP_SCALING_OPERATION_CONFIG, PartitionerConfig.TIMESTAMP_SCALING_OPERATION_DEFAULT);
}
return config;
}
Expand Down

0 comments on commit f1de545

Please sign in to comment.