Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

S3 Source custom partition Caused by: NoSuchMethodException: after creation #239

Open
artemio77 opened this issue May 30, 2022 · 11 comments

Comments

@artemio77
Copy link

artemio77 commented May 30, 2022

Hi, I implement a custom partitioner for the sink and source s3 connector.
Partitioner:
package io.confluent.connect.storage.partitioner;

import java.util.List;
import java.util.Locale;
import java.util.Map;
import org.apache.kafka.connect.sink.SinkRecord;
import org.joda.time.DateTimeZone;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FieldAndTimeBasedPartitioner extends TimeBasedPartitioner {

private static final String PARTITION_FIELD_FORMAT_PATH_CONFIG = "partition.field.format.path";
private static final boolean PARTITION_FIELD_FORMAT_PATH_DEFAULT = true;
private static final Logger log = LoggerFactory.getLogger(FieldAndTimeBasedPartitioner.class);
private PartitionFieldExtractor partitionFieldExtractor;

public FieldAndTimeBasedPartitioner() {
}

protected void init(long partitionDurationMs, String pathFormat, Locale locale, DateTimeZone timeZone, Map<String, Object> config) {
    super.init(partitionDurationMs, pathFormat, locale, timeZone, config);

    final List<String> fieldNames = (List<String>) config.get(PartitionerConfig.PARTITION_FIELD_NAME_CONFIG);
    // option value is parse as string all other type is cast as string by kafka connect need to parse by ourselves
    boolean formatPath =
        Boolean.parseBoolean((String) config.getOrDefault(PARTITION_FIELD_FORMAT_PATH_CONFIG, PARTITION_FIELD_FORMAT_PATH_DEFAULT));

    this.partitionFieldExtractor = new PartitionFieldExtractor(fieldNames, formatPath);
}

public String encodePartition(final SinkRecord sinkRecord, final long nowInMillis) {
    final String partitionsForTimestamp = super.encodePartition(sinkRecord, nowInMillis);
    final String partitionsForFields = this.partitionFieldExtractor.extract(sinkRecord);
    final String partition = String.join(this.delim, partitionsForFields, partitionsForTimestamp);

    log.info("Encoded partition : {}", partition);

    return partition;
}

public String encodePartition(final SinkRecord sinkRecord) {
    final String partitionsForTimestamp = super.encodePartition(sinkRecord);
    final String partitionsForFields = this.partitionFieldExtractor.extract(sinkRecord);
    final String partition = String.join(this.delim, partitionsForFields, partitionsForTimestamp);

    log.info("Encoded partition : {}", partition);

    return partition;
}

public static class PartitionFieldExtractor {
private static final String DELIMITER_EQ = "=";

private final boolean formatPath;
private final List<String> fieldNames;
private static final Logger log = LoggerFactory.getLogger(PartitionFieldExtractor.class);


PartitionFieldExtractor(final List<String> fieldNames, final boolean formatPath) {
    this.fieldNames = fieldNames;
    this.formatPath = formatPath;
}

public String extract(final ConnectRecord<?> record) {
    Object value = record.value();
    StringBuilder builder = new StringBuilder();
    for (final String fieldName : this.fieldNames) {
        if (builder.length() != 0) {
            builder.append(StorageCommonConfig.DIRECTORY_DELIM_DEFAULT);
        }
        if (value instanceof Struct || value instanceof Map) {
            final String partitionField = (String) DataUtils.getNestedFieldValue(value, fieldName);
            if (formatPath) {
                builder.append(String.join(DELIMITER_EQ, fieldName, partitionField));
            } else {
                builder.append(partitionField);
            }
        } else {
            log.error("Value is not of Struct or Map type. type {}", value.getClass());
            throw new PartitionException("Error encoding partition.");
        }
    }
    return builder.toString();
}

}

}
When I create sink s3 connector everuthing is fine it working correct:
{
"name": "s3-sink",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": "3",
"topics": "test-s3-sink",
"topic.creation.default.replication.factor": "1",
"topic.creation.default.partitions": "20",
"s3.region": "us-west-2",
"s3.proxy.url": "http://localstack:4566",
"s3.bucket.name": "confluent-kafka-connect-s3-testing",
"s3.part.size": "5242880",
"flush.size": "3",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"schema.compatibility": "NONE",
"value.converter.schemas.enable": "true",
"partitioner.class": "com.kafka.connect.extention.FieldAndTimeBasedPartitioner",
"partition.field.name": "customerId,channelType,channelId",
"partition.field.format.path": true,
"partition.duration.ms": 86400000,
"path.format": "'year'=YYYY/'month'=MM/'day'=dd",
"locale": "US",
"timezone": "UTC",
"timestamp.extractor": "RecordField",
"timestamp.field": "messageDate"
}
}
}
But when i try create source s3 connector with this partitioner I got exception after creation
{
"name": "s3-source",
"config": {
"confluent.license": "",
"connector.class": "io.confluent.connect.s3.source.S3SourceConnector",
"confluent.topic.bootstrap.servers": "kafka01.internal-service:9092",
"s3.region": "us-west-2",
"s3.proxy.url": "http://localstack:4566",
"s3.bucket.name": "confluent-kafka-connect-s3-testing",
"confluent.topic.replication.factor": 1,
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"schema.compatibility": "NONE",
"value.converter.schemas.enable": "true",
"format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
"partitioner.class": "io.confluent.connect.storage.partitioner.FieldAndTimeBasedPartitioner",
"partition.field.name": "customerId,channelType,channelId",
"partition.field.format.path": true,
"partition.duration.ms": 86400000,
"path.format": "'year'=YYYY/'month'=MM/'day'=dd",
"locale": "US",
"timezone": "UTC",
"timestamp.extractor": "RecordField",
"timestamp.field": "messageDate"
}
}
Exception:
[2022-05-30 21:33:00,018] ERROR WorkerConnector{id=s3-source} Error while starting connector (org.apache.kafka.connect.runtime.WorkerConnector)
2022-05-30T21:33:00.023992047Z org.apache.kafka.common.config.ConfigException: Invalid value java.lang.NoSuchMethodException: io.confluent.connect.storage.partitioner.FieldAndTimeBasedPartitioner.(io.confluent.connect.cloud.storage.source.StorageSourceConnectorConfig, io.confluent.connect.cloud.storage.source.SourceStorage) for configuration Failed to instantiate partitioner class io.confluent.connect.storage.partitioner.FieldAndTimeBasedPartitioner
2022-05-30T21:33:00.024020005Z at io.confluent.connect.cloud.storage.source.StorageSourceConnectorConfig.getPartitioner(StorageSourceConnectorConfig.java:667)
2022-05-30T21:33:00.024032949Z at io.confluent.connect.cloud.storage.source.RestoreStorageSourceConnector.doStart(RestoreStorageSourceConnector.java:90)
2022-05-30T21:33:00.024044101Z at io.confluent.connect.cloud.storage.source.RestoreStorageSourceConnector.start(RestoreStorageSourceConnector.java:83)
2022-05-30T21:33:00.024054699Z at io.confluent.connect.cloud.storage.source.CompositeSourceConnector.start(CompositeSourceConnector.java:72)
2022-05-30T21:33:00.024118151Z at org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:184)
2022-05-30T21:33:00.024226395Z at org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:209)
2022-05-30T21:33:00.024239719Z at org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:348)
2022-05-30T21:33:00.024250041Z at org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:331)
2022-05-30T21:33:00.024260690Z at org.apache.kafka.connect.runtime.WorkerConnector.doRun(WorkerConnector.java:140)
2022-05-30T21:33:00.024271458Z at org.apache.kafka.connect.runtime.WorkerConnector.run(WorkerConnector.java:117)
2022-05-30T21:33:00.024283104Z at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
2022-05-30T21:33:00.024293380Z at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
2022-05-30T21:33:00.024303275Z at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
2022-05-30T21:33:00.024313394Z at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
2022-05-30T21:33:00.024323897Z at java.base/java.lang.Thread.run(Thread.java:829)
2022-05-30T21:33:00.029909345Z [2022-05-30 21:33:00,028] ERROR [Worker clientId=connect-1, groupId=kafka-connect] Failed to start connector 's3-source' (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
2022-05-30T21:33:00.029993514Z org.apache.kafka.connect.errors.ConnectException: Failed to start connector: s3-source
2022-05-30T21:33:00.030013367Z at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$startConnector$25(DistributedHerder.java:1461)
2022-05-30T21:33:00.030028022Z at org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:334)
2022-05-30T21:33:00.030039736Z at org.apache.kafka.connect.runtime.WorkerConnector.doRun(WorkerConnector.java:140)
2022-05-30T21:33:00.030051017Z at org.apache.kafka.connect.runtime.WorkerConnector.run(WorkerConnector.java:117)
2022-05-30T21:33:00.030061715Z at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
2022-05-30T21:33:00.030072480Z at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
2022-05-30T21:33:00.030084369Z at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
2022-05-30T21:33:00.030095914Z at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
2022-05-30T21:33:00.030107039Z at java.base/java.lang.Thread.run(Thread.java:829)
2022-05-30T21:33:00.030118727Z Caused by: org.apache.kafka.connect.errors.ConnectException: Failed to transition connector s3-source to state STARTED
2022-05-30T21:33:00.030130358Z ... 8 more
2022-05-30T21:33:00.030140662Z Caused by: org.apache.kafka.common.config.ConfigException: Invalid value java.lang.NoSuchMethodException: io.confluent.connect.storage.partitioner.FieldAndTimeBasedPartitioner.(io.confluent.connect.cloud.storage.source.StorageSourceConnectorConfig, io.confluent.connect.cloud.storage.source.SourceStorage) for configuration Failed to instantiate partitioner class io.confluent.connect.storage.partitioner.FieldAndTimeBasedPartitioner
2022-05-30T21:33:00.030181168Z at io.confluent.connect.cloud.storage.source.StorageSourceConnectorConfig.getPartitioner(StorageSourceConnectorConfig.java:667)
2022-05-30T21:33:00.030193293Z at io.confluent.connect.cloud.storage.source.RestoreStorageSourceConnector.doStart(RestoreStorageSourceConnector.java:90)
2022-05-30T21:33:00.030204530Z at io.confluent.connect.cloud.storage.source.RestoreStorageSourceConnector.start(RestoreStorageSourceConnector.java:83)
2022-05-30T21:33:00.030218397Z at io.confluent.connect.cloud.storage.source.CompositeSourceConnector.start(CompositeSourceConnector.java:72)
2022-05-30T21:33:00.030229561Z at org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:184)
2022-05-30T21:33:00.030240106Z at org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:209)
2022-05-30T21:33:00.030250930Z at org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:348)
2022-05-30T21:33:00.030261898Z at org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:331)
2022-05-30T21:33:00.030272834Z ... 7 more

I don't know why I got this exception because constructor in present for partitioner

@OneCricketeer
Copy link

Are you using the same connect worker for both connectors and this class is available on the CLASSPATH?

@yarlagaddag
Copy link

yarlagaddag commented Oct 20, 2022

@artemio77 is this issue resolved? I am also getting the same problem no matter what partitioner I use i.e default, or timebased or daily.

For me also s3 sink connector is working fine, but not s3 source connector

any pointers would be helpful

@OneCricketeer
Copy link

OneCricketeer commented Oct 20, 2022

This connector doesn't have field and time partitioner. Therefore, it wouldn't be found, by default.

Regarding the original post, seems like it was working?

When I create sink s3 connector everuthing is fine it working correct

... com.kafka.connect.extention.FieldAndTimeBasedPartitioner

Not clear what is this class since Kafka Connect packages start with org.apache.kafka, and all the included partitioners, including the shown code start with io.confluent

@yarlagaddag
Copy link

@OneCricketeer what are the valid partitioners for this connector?

@OneCricketeer
Copy link

@yarlagaddag
Copy link

Thank you @OneCricketeer . So there is TimeBasedPartitioner, and this is what I used earlier after looking into the above

@yarlagaddag
Copy link

Got the same issue for DefaultPartitioner as well
org.apache.kafka.common.config.ConfigException: Invalid value java.lang.NoSuchMethodException: io.confluent.connect.storage.partitioner.DefaultPartitioner.(io.confluent.connect.cloud.storage.source.StorageSourceConnectorConfig, io.confluent.connect.cloud.storage.source.SourceStorage) for configuration Failed to instantiate partitioner class io.confluent.connect.storage.partitioner.DefaultPartitioner\n\tat io.confluent.connect.cloud.storage.source.StorageSourceConnectorConfig.getPartitioner(StorageSourceConnectorConfig.java:686)\n\tat io.confluent.connect.cloud.storage.source.StorageSourceTask.start(StorageSourceTask.java:75)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.initializeAndStart(WorkerSourceTask.java:230)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:198)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:255)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:834)\n"
},

@OneCricketeer
Copy link

Right. If you want to use both fields and timestamp, you'll need to checkout PR #251, then replace all the connect-storage-common JARs on your connect servers

@yarlagaddag
Copy link

alright.. I am using just the default one and still ran into the same issue, not sure what else is causing this issue.. stack trace is posted above

@OneCricketeer
Copy link

I'd guess it has something to do with reflection looking up the class name, then calling configure method since all the partitioners themselves have been tested since ac6daad

@yarlagaddag
Copy link

Hmm, ok. thanks @OneCricketeer

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants