diff --git a/core/src/main/java/io/confluent/connect/storage/StorageSinkConnectorConfig.java b/core/src/main/java/io/confluent/connect/storage/StorageSinkConnectorConfig.java index fa749079b..a78ace8bd 100644 --- a/core/src/main/java/io/confluent/connect/storage/StorageSinkConnectorConfig.java +++ b/core/src/main/java/io/confluent/connect/storage/StorageSinkConnectorConfig.java @@ -100,6 +100,11 @@ public class StorageSinkConnectorConfig extends AbstractConfig implements Compos public static final int SCHEMA_CACHE_SIZE_DEFAULT = 1000; public static final String SCHEMA_CACHE_SIZE_DISPLAY = "Schema Cache Size"; + public static final String SCRUB_INVALID_NAMES_CONFIG = AvroDataConfig.SCRUB_INVALID_NAMES_CONFIG; + public static final String SCRUB_INVALID_NAMES_DOC = "Enable scrubbing of invalid schema names"; + public static final String SCRUB_INVALID_NAMES_DEFAULT = false; + public static final String SCRUB_INVALID_NAMES_DISPLAY = "Scrub Invalid Schema Names"; + public static final String ENHANCED_AVRO_SCHEMA_SUPPORT_CONFIG = "enhanced.avro.schema.support"; public static final boolean ENHANCED_AVRO_SCHEMA_SUPPORT_DEFAULT = true; public static final String ENHANCED_AVRO_SCHEMA_SUPPORT_DOC = @@ -243,6 +248,18 @@ public static ConfigDef newConfigDef( CONNECT_META_DATA_DISPLAY ); + configDef.define( + SCRUB_INVALID_NAMES_CONFIG, + Type.BOOLEAN, + SCRUB_INVALID_NAMES_DEFAULT, + Importance.LOW, + SCRUB_INVALID_NAMES_DOC, + group, + ++orderInGroup, + Width.SHORT, + SCRUB_INVALID_NAMES_DISPLAY + ); + configDef.define( RETRY_BACKOFF_CONFIG, Type.LONG, @@ -409,6 +426,7 @@ public AvroDataConfig avroDataConfig() { props.put(SCHEMA_CACHE_SIZE_CONFIG, get(SCHEMA_CACHE_SIZE_CONFIG)); props.put(ENHANCED_AVRO_SCHEMA_SUPPORT_CONFIG, get(ENHANCED_AVRO_SCHEMA_SUPPORT_CONFIG)); props.put(CONNECT_META_DATA_CONFIG, get(CONNECT_META_DATA_CONFIG)); + props.put(SCRUB_INVALID_NAMES_CONFIG, get(SCRUB_INVALID_NAMES_CONFIG)); return new AvroDataConfig(props); } }