Skip to content

Commit

Permalink
CCMSG-1613 | Use the configured schema.generation.key.name and schema…
Browse files Browse the repository at this point in the history
….generation.value.name while creating the schema's. (#204)
  • Loading branch information
pbadani authored Oct 17, 2022
1 parent ec283f8 commit 7506b3d
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ public Map.Entry<Schema, Schema> generate(File inputFile, List<String> keyFields

log.trace("generate() - Building key schema.");
SchemaBuilder keySchemaBuilder = SchemaBuilder.struct()
.name("com.github.jcustenborder.kafka.connect.model.Key");
.name(this.config.schemaGenerationKeyName);

for (String keyFieldName : keyFields) {
log.trace("generate() - Adding keyFieldName field '{}'", keyFieldName);
Expand All @@ -205,7 +205,7 @@ public Map.Entry<Schema, Schema> generate(File inputFile, List<String> keyFields

log.trace("generate() - Building value schema.");
SchemaBuilder valueSchemaBuilder = SchemaBuilder.struct()
.name("com.github.jcustenborder.kafka.connect.model.Value");
.name(this.config.schemaGenerationValueName);

for (Map.Entry<String, Schema.Type> kvp : fieldTypes.entrySet()) {
addField(valueSchemaBuilder, kvp.getKey(), kvp.getValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.github.jcustenborder.kafka.connect.spooldir;

import java.util.HashMap;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -57,4 +58,35 @@ public void schema() throws IOException {
}


@Test
public void schemaWithCustomSchemaName() throws IOException {
File inputFile = new File("src/test/resources/com/github/jcustenborder/kafka/connect/spooldir/json/FieldsMatch.data");
Map<String, String> configs = new HashMap<>(settings);
configs.put(AbstractSpoolDirSourceConnectorConfig.SCHEMA_GENERATION_KEY_NAME_CONF, "com.foo.key");
configs.put(AbstractSpoolDirSourceConnectorConfig.SCHEMA_GENERATION_VALUE_NAME_CONF, "com.foo.value");
JsonSchemaGenerator schemaGenerator = new JsonSchemaGenerator(configs);
Map.Entry<Schema, Schema> kvp = schemaGenerator.generate(inputFile, Arrays.asList("id"));
final Schema expectedKeySchema = SchemaBuilder.struct()
.name("com.foo.key")
.field("id", Schema.OPTIONAL_STRING_SCHEMA)
.build();

final Schema expectedValueSchema = SchemaBuilder.struct()
.name("com.foo.value")
.field("id", Schema.OPTIONAL_STRING_SCHEMA)
.field("first_name", Schema.OPTIONAL_STRING_SCHEMA)
.field("last_name", Schema.OPTIONAL_STRING_SCHEMA)
.field("email", Schema.OPTIONAL_STRING_SCHEMA)
.field("gender", Schema.OPTIONAL_STRING_SCHEMA)
.field("ip_address", Schema.OPTIONAL_STRING_SCHEMA)
.field("last_login", Schema.OPTIONAL_STRING_SCHEMA)
.field("account_balance", Schema.OPTIONAL_STRING_SCHEMA)
.field("country", Schema.OPTIONAL_STRING_SCHEMA)
.field("favorite_color", Schema.OPTIONAL_STRING_SCHEMA)
.build();

assertSchema(expectedKeySchema, kvp.getKey(), "key schema does not match.");
assertSchema(expectedValueSchema, kvp.getValue(), "value schema does not match.");
}

}

0 comments on commit 7506b3d

Please sign in to comment.