From d696a141da612e51e910eb64680291f31469ee00 Mon Sep 17 00:00:00 2001 From: Justin Lee Date: Tue, 15 Mar 2022 21:16:42 -0400 Subject: [PATCH 1/3] MVP --- .../FieldAndTimeBasedPartitioner.java | 126 ++++++++++++++++++ 1 file changed, 126 insertions(+) create mode 100644 partitioner/src/main/java/io/confluent/connect/storage/partitioner/FieldAndTimeBasedPartitioner.java diff --git a/partitioner/src/main/java/io/confluent/connect/storage/partitioner/FieldAndTimeBasedPartitioner.java b/partitioner/src/main/java/io/confluent/connect/storage/partitioner/FieldAndTimeBasedPartitioner.java new file mode 100644 index 000000000..6377d4cee --- /dev/null +++ b/partitioner/src/main/java/io/confluent/connect/storage/partitioner/FieldAndTimeBasedPartitioner.java @@ -0,0 +1,126 @@ +/* + * Copyright 2018 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.connect.storage.partitioner; + +import org.apache.kafka.common.utils.Utils; +// import org.apache.kafka.common.config.ConfigException; +// import org.apache.kafka.common.utils.Time; +// import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Schema.Type; +// import org.apache.kafka.connect.data.Timestamp; +// import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.sink.SinkRecord; +// import org.joda.time.DateTime; +// import org.joda.time.DateTimeZone; +// import org.joda.time.format.DateTimeFormat; +// import org.joda.time.format.DateTimeFormatter; +// import org.joda.time.format.ISODateTimeFormat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +// import java.util.Date; +import java.util.List; +// import java.util.Locale; +import java.util.Map; +// import java.util.regex.Pattern; + +// import io.confluent.connect.storage.common.SchemaGenerator; +// import io.confluent.connect.storage.common.StorageCommonConfig; +// import io.confluent.connect.storage.common.util.StringUtils; +import io.confluent.connect.storage.errors.PartitionException; +// import io.confluent.connect.storage.util.DataUtils; + +public class FieldAndTimeBasedPartitioner extends TimeBasedPartitioner { + private static final Logger log = LoggerFactory.getLogger(FieldAndTimeBasedPartitioner.class); + + private List fieldNames; + + @Override + public void configure(Map config) { + fieldNames = (List) config.get(PartitionerConfig.PARTITION_FIELD_NAME_CONFIG); + super.configure(config); + } + + // This is pulled from FieldPartitioner, which is private; alternate is to build a + private String encodeFieldPartition(SinkRecord sinkRecord) { + Object value = sinkRecord.value(); + if (value instanceof Struct) { + final Schema valueSchema = sinkRecord.valueSchema(); + final Struct struct = (Struct) value; + + StringBuilder builder = new StringBuilder(); + for (String fieldName : fieldNames) { + if (builder.length() > 0) { + builder.append(this.delim); + } + + Object partitionKey = struct.get(fieldName); + Type type = valueSchema.field(fieldName).schema().type(); + switch (type) { + case INT8: + case INT16: + case INT32: + case INT64: + Number record = (Number) partitionKey; + builder.append(fieldName + "=" + record.toString()); + break; + case STRING: + builder.append(fieldName + "=" + (String) partitionKey); + break; + case BOOLEAN: + boolean booleanRecord = (boolean) partitionKey; + builder.append(fieldName + "=" + Boolean.toString(booleanRecord)); + break; + default: + log.error("Type {} is not supported as a partition key.", type.getName()); + throw new PartitionException("Error encoding partition."); + } + } + return builder.toString(); + } else { + log.error("Value is not Struct type."); + throw new PartitionException("Error encoding partition."); + } + } + + @Override + public String encodePartition(SinkRecord sinkRecord, long nowInMillis) { + String timeStampPartition = super.encodePartition(sinkRecord, nowInMillis); + String fieldPartition = encodeFieldPartition(sinkRecord); + return fieldPartition + this.delim + timeStampPartition; + } + + @Override + public String encodePartition(SinkRecord sinkRecord) { + String timeStampPartition = super.encodePartition(sinkRecord); + String fieldPartition = encodeFieldPartition(sinkRecord); + return fieldPartition + this.delim + timeStampPartition; + } + + + @Override + public List partitionFields() { + if (partitionFields == null) { + String fieldString = Utils.join(fieldNames, ","); + partitionFields = newSchemaGenerator(config).newPartitionFields( + fieldString + super.getPathFormat() + ); + } + return partitionFields; + } +} \ No newline at end of file From 53e9c8520b1ae630beed6ac3408607b054733503 Mon Sep 17 00:00:00 2001 From: Justin Lee Date: Tue, 15 Mar 2022 21:16:42 -0400 Subject: [PATCH 2/3] MVP --- .../FieldAndTimeBasedPartitioner.java | 126 ++++++++++++++++++ 1 file changed, 126 insertions(+) create mode 100644 partitioner/src/main/java/io/confluent/connect/storage/partitioner/FieldAndTimeBasedPartitioner.java diff --git a/partitioner/src/main/java/io/confluent/connect/storage/partitioner/FieldAndTimeBasedPartitioner.java b/partitioner/src/main/java/io/confluent/connect/storage/partitioner/FieldAndTimeBasedPartitioner.java new file mode 100644 index 000000000..6377d4cee --- /dev/null +++ b/partitioner/src/main/java/io/confluent/connect/storage/partitioner/FieldAndTimeBasedPartitioner.java @@ -0,0 +1,126 @@ +/* + * Copyright 2018 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.connect.storage.partitioner; + +import org.apache.kafka.common.utils.Utils; +// import org.apache.kafka.common.config.ConfigException; +// import org.apache.kafka.common.utils.Time; +// import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Schema.Type; +// import org.apache.kafka.connect.data.Timestamp; +// import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.sink.SinkRecord; +// import org.joda.time.DateTime; +// import org.joda.time.DateTimeZone; +// import org.joda.time.format.DateTimeFormat; +// import org.joda.time.format.DateTimeFormatter; +// import org.joda.time.format.ISODateTimeFormat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +// import java.util.Date; +import java.util.List; +// import java.util.Locale; +import java.util.Map; +// import java.util.regex.Pattern; + +// import io.confluent.connect.storage.common.SchemaGenerator; +// import io.confluent.connect.storage.common.StorageCommonConfig; +// import io.confluent.connect.storage.common.util.StringUtils; +import io.confluent.connect.storage.errors.PartitionException; +// import io.confluent.connect.storage.util.DataUtils; + +public class FieldAndTimeBasedPartitioner extends TimeBasedPartitioner { + private static final Logger log = LoggerFactory.getLogger(FieldAndTimeBasedPartitioner.class); + + private List fieldNames; + + @Override + public void configure(Map config) { + fieldNames = (List) config.get(PartitionerConfig.PARTITION_FIELD_NAME_CONFIG); + super.configure(config); + } + + // This is pulled from FieldPartitioner, which is private; alternate is to build a + private String encodeFieldPartition(SinkRecord sinkRecord) { + Object value = sinkRecord.value(); + if (value instanceof Struct) { + final Schema valueSchema = sinkRecord.valueSchema(); + final Struct struct = (Struct) value; + + StringBuilder builder = new StringBuilder(); + for (String fieldName : fieldNames) { + if (builder.length() > 0) { + builder.append(this.delim); + } + + Object partitionKey = struct.get(fieldName); + Type type = valueSchema.field(fieldName).schema().type(); + switch (type) { + case INT8: + case INT16: + case INT32: + case INT64: + Number record = (Number) partitionKey; + builder.append(fieldName + "=" + record.toString()); + break; + case STRING: + builder.append(fieldName + "=" + (String) partitionKey); + break; + case BOOLEAN: + boolean booleanRecord = (boolean) partitionKey; + builder.append(fieldName + "=" + Boolean.toString(booleanRecord)); + break; + default: + log.error("Type {} is not supported as a partition key.", type.getName()); + throw new PartitionException("Error encoding partition."); + } + } + return builder.toString(); + } else { + log.error("Value is not Struct type."); + throw new PartitionException("Error encoding partition."); + } + } + + @Override + public String encodePartition(SinkRecord sinkRecord, long nowInMillis) { + String timeStampPartition = super.encodePartition(sinkRecord, nowInMillis); + String fieldPartition = encodeFieldPartition(sinkRecord); + return fieldPartition + this.delim + timeStampPartition; + } + + @Override + public String encodePartition(SinkRecord sinkRecord) { + String timeStampPartition = super.encodePartition(sinkRecord); + String fieldPartition = encodeFieldPartition(sinkRecord); + return fieldPartition + this.delim + timeStampPartition; + } + + + @Override + public List partitionFields() { + if (partitionFields == null) { + String fieldString = Utils.join(fieldNames, ","); + partitionFields = newSchemaGenerator(config).newPartitionFields( + fieldString + super.getPathFormat() + ); + } + return partitionFields; + } +} \ No newline at end of file From 8950f9fd4cd7882c565e8a46c0fddce52661a49e Mon Sep 17 00:00:00 2001 From: Justin Lee Date: Wed, 13 Jul 2022 10:04:36 -0400 Subject: [PATCH 3/3] Clean up a little bit --- .../FieldAndTimeBasedPartitioner.java | 21 +++---------------- 1 file changed, 3 insertions(+), 18 deletions(-) diff --git a/partitioner/src/main/java/io/confluent/connect/storage/partitioner/FieldAndTimeBasedPartitioner.java b/partitioner/src/main/java/io/confluent/connect/storage/partitioner/FieldAndTimeBasedPartitioner.java index 6377d4cee..d304b1c06 100644 --- a/partitioner/src/main/java/io/confluent/connect/storage/partitioner/FieldAndTimeBasedPartitioner.java +++ b/partitioner/src/main/java/io/confluent/connect/storage/partitioner/FieldAndTimeBasedPartitioner.java @@ -16,34 +16,17 @@ package io.confluent.connect.storage.partitioner; import org.apache.kafka.common.utils.Utils; -// import org.apache.kafka.common.config.ConfigException; -// import org.apache.kafka.common.utils.Time; -// import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Schema.Type; -// import org.apache.kafka.connect.data.Timestamp; -// import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.sink.SinkRecord; -// import org.joda.time.DateTime; -// import org.joda.time.DateTimeZone; -// import org.joda.time.format.DateTimeFormat; -// import org.joda.time.format.DateTimeFormatter; -// import org.joda.time.format.ISODateTimeFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -// import java.util.Date; import java.util.List; -// import java.util.Locale; import java.util.Map; -// import java.util.regex.Pattern; -// import io.confluent.connect.storage.common.SchemaGenerator; -// import io.confluent.connect.storage.common.StorageCommonConfig; -// import io.confluent.connect.storage.common.util.StringUtils; import io.confluent.connect.storage.errors.PartitionException; -// import io.confluent.connect.storage.util.DataUtils; public class FieldAndTimeBasedPartitioner extends TimeBasedPartitioner { private static final Logger log = LoggerFactory.getLogger(FieldAndTimeBasedPartitioner.class); @@ -56,7 +39,9 @@ public void configure(Map config) { super.configure(config); } - // This is pulled from FieldPartitioner, which is private; alternate is to build a + // This is pulled from FieldPartitioner, which is private; + // alternative option is to not-insignificant refactor + // (need to fix this, this may no longer be true) private String encodeFieldPartition(SinkRecord sinkRecord) { Object value = sinkRecord.value(); if (value instanceof Struct) {