Skip to content

Commit

Permalink
Develop (#49)
Browse files Browse the repository at this point in the history
* Ensuring optional parameters are actually optional and do not cause a crash when they are not included.

* Fixing some Boolean and Integer casts now that the initial configuration object is a Hashmap of <String, Object> instead of <String, String>. Also fixed some tests that used that configuration object.

* Updating the build to use the ML client api v5.3.0 and the ML data hub v5.2.4. This should solve problems with running the connector with Java 11.
  • Loading branch information
BillFarber authored Jan 3, 2021
1 parent d47d98f commit 91fa572
Show file tree
Hide file tree
Showing 13 changed files with 144 additions and 132 deletions.
10 changes: 8 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ dependencies {
compileOnly "org.apache.kafka:connect-api:2.5.0"
compileOnly "org.apache.kafka:connect-json:2.5.0"


compile ("com.marklogic:marklogic-data-hub:5.2.2") {
compile "com.marklogic:marklogic-client-api:5.3.0"
compile ("com.marklogic:marklogic-data-hub:5.2.4") {
// Prefer the version above
exclude module: "marklogic-client-api"
// Excluding these because there's no need for them
exclude module: "spring-boot-autoconfigure"
exclude module: "spring-integration-http"
Expand Down Expand Up @@ -60,6 +62,10 @@ test {

// Customize the Java plugin's jar task to produce a "fat" jar with all dependencies included
jar {
manifest {
attributes 'Implementation-Title': 'Kafka-Connect-MarkLogic',
'Implementation-Version': version
}
from { configurations.compile.collect { it.isDirectory() ? it : zipTree(it) } }
}

Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
group=com.marklogic
version=1.5.0
version=1.5.2

# For the Confluent Connector Archive
componentOwner=marklogic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,21 @@

public class IdStrategyFactory {

public static IdStrategy getIdStrategy(Map<String, String> kafkaConfig) {
String strategyType = kafkaConfig.get(MarkLogicSinkConfig.ID_STRATEGY);
String strategyPaths= kafkaConfig.get(MarkLogicSinkConfig.ID_STRATEGY_PATH);
public static IdStrategy getIdStrategy(Map<String, Object> parsedConfig) {
String strategyType = (String) parsedConfig.get(MarkLogicSinkConfig.ID_STRATEGY);
String strategyPaths= (String) parsedConfig.get(MarkLogicSinkConfig.ID_STRATEGY_PATH);

switch((strategyType != null) ? strategyType : "UUID") {
case "JSONPATH":
return (new JSONPathStrategy(strategyPaths.trim().split(",")[0]));
case "HASH":
return (new HashedJSONPathsStrategy(strategyPaths.trim().split(",")));
case "UUID":
return (new DefaultStrategy());
case "KAFKA_META_WITH_SLASH":
return (new KafkaMetaStrategy());
case "KAFKA_META_HASHED":
return (new HashedKafkaMetaStrategy());
default:
case "UUID":
default:
return (new DefaultStrategy());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@
*/
public interface DatabaseClientConfigBuilder {

DatabaseClientConfig buildDatabaseClientConfig(Map<String, String> kafkaConfig);
DatabaseClientConfig buildDatabaseClientConfig(Map<String, Object> kafkaConfig);

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,13 @@
import java.util.Map;

import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.security.KeyManagementException;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.UnrecoverableKeyException;
import java.security.cert.CertificateException;

import javax.net.ssl.KeyManager;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;

Expand All @@ -31,35 +25,36 @@
public class DefaultDatabaseClientConfigBuilder implements DatabaseClientConfigBuilder {

@Override
public DatabaseClientConfig buildDatabaseClientConfig(Map<String, String> kafkaConfig) {
public DatabaseClientConfig buildDatabaseClientConfig(Map<String, Object> parsedConfig) {

DatabaseClientConfig clientConfig = new DatabaseClientConfig();
clientConfig.setCertFile(kafkaConfig.get(MarkLogicSinkConfig.CONNECTION_CERT_FILE));
clientConfig.setCertPassword(kafkaConfig.get(MarkLogicSinkConfig.CONNECTION_CERT_PASSWORD));
clientConfig.setCertFile((String) parsedConfig.get(MarkLogicSinkConfig.CONNECTION_CERT_FILE));
clientConfig.setCertPassword((String) parsedConfig.get(MarkLogicSinkConfig.CONNECTION_CERT_PASSWORD));
clientConfig.setTrustManager(new SimpleX509TrustManager());
clientConfig = configureHostNameVerifier(clientConfig,kafkaConfig);
String securityContextType = kafkaConfig.get(MarkLogicSinkConfig.CONNECTION_SECURITY_CONTEXT_TYPE).toUpperCase();
clientConfig = configureHostNameVerifier(clientConfig,parsedConfig);
String securityContextType = ((String) parsedConfig.get(MarkLogicSinkConfig.CONNECTION_SECURITY_CONTEXT_TYPE)).toUpperCase();
clientConfig.setSecurityContextType(SecurityContextType.valueOf(securityContextType));
String database = kafkaConfig.get(MarkLogicSinkConfig.CONNECTION_DATABASE);
String database = (String) parsedConfig.get(MarkLogicSinkConfig.CONNECTION_DATABASE);
if (database != null && database.trim().length() > 0) {
clientConfig.setDatabase(database);
}
String connType = kafkaConfig.get(MarkLogicSinkConfig.CONNECTION_TYPE);
String connType = (String) parsedConfig.get(MarkLogicSinkConfig.CONNECTION_TYPE);
if (connType != null && connType.trim().length() > 0) {
clientConfig.setConnectionType(DatabaseClient.ConnectionType.valueOf(connType.toUpperCase()));
}
clientConfig.setExternalName(kafkaConfig.get(MarkLogicSinkConfig.CONNECTION_EXTERNAL_NAME));
clientConfig.setHost(kafkaConfig.get(MarkLogicSinkConfig.CONNECTION_HOST));
clientConfig.setPassword(kafkaConfig.get(MarkLogicSinkConfig.CONNECTION_PASSWORD));
clientConfig.setPort(Integer.parseInt(kafkaConfig.get(MarkLogicSinkConfig.CONNECTION_PORT)));
String customSsl = kafkaConfig.get(MarkLogicSinkConfig.SSL);
if (customSsl != null && Boolean.parseBoolean(customSsl)) {
clientConfig = configureCustomSslConnection(clientConfig, kafkaConfig);
clientConfig.setExternalName((String) parsedConfig.get(MarkLogicSinkConfig.CONNECTION_EXTERNAL_NAME));
clientConfig.setHost((String) parsedConfig.get(MarkLogicSinkConfig.CONNECTION_HOST));
clientConfig.setPassword((String) parsedConfig.get(MarkLogicSinkConfig.CONNECTION_PASSWORD));
clientConfig.setPort((Integer) parsedConfig.get(MarkLogicSinkConfig.CONNECTION_PORT));
Boolean customSsl = (Boolean) parsedConfig.get(MarkLogicSinkConfig.SSL);
if (customSsl != null && customSsl) {
clientConfig = configureCustomSslConnection(clientConfig, parsedConfig, customSsl);
}
String simpleSsl = kafkaConfig.get(MarkLogicSinkConfig.CONNECTION_SIMPLE_SSL);
if (simpleSsl != null && Boolean.parseBoolean(simpleSsl)) {
Boolean simpleSsl = (Boolean) parsedConfig.get(MarkLogicSinkConfig.CONNECTION_SIMPLE_SSL);
if (simpleSsl != null && simpleSsl) {
clientConfig = configureSimpleSsl(clientConfig);
}
clientConfig.setUsername(kafkaConfig.get(MarkLogicSinkConfig.CONNECTION_USERNAME));
clientConfig.setUsername((String) parsedConfig.get(MarkLogicSinkConfig.CONNECTION_USERNAME));
return clientConfig;
}

Expand All @@ -82,8 +77,8 @@ protected DatabaseClientConfig configureSimpleSsl(DatabaseClientConfig clientCon
*
* @param clientConfig
*/
protected DatabaseClientConfig configureHostNameVerifier(DatabaseClientConfig clientConfig, Map<String, String> kafkaConfig) {
String sslHostNameVerifier = kafkaConfig.get(MarkLogicSinkConfig.SSL_HOST_VERIFIER);
protected DatabaseClientConfig configureHostNameVerifier(DatabaseClientConfig clientConfig, Map<String, Object> parsedConfig) {
String sslHostNameVerifier = (String) parsedConfig.get(MarkLogicSinkConfig.SSL_HOST_VERIFIER);
if ("ANY".equals(sslHostNameVerifier))
clientConfig.setSslHostnameVerifier(DatabaseClientFactory.SSLHostnameVerifier.ANY);
else if ("COMMON".equals(sslHostNameVerifier))
Expand All @@ -95,18 +90,17 @@ else if ("STRICT".equals(sslHostNameVerifier))
return clientConfig;
}

protected DatabaseClientConfig configureCustomSslConnection(DatabaseClientConfig clientConfig, Map<String, String> kafkaConfig) {
String ssl = kafkaConfig.get(MarkLogicSinkConfig.SSL);
String tlsVersion = kafkaConfig.get(MarkLogicSinkConfig.TLS_VERSION);
String sslMutualAuth = kafkaConfig.get(MarkLogicSinkConfig.SSL_MUTUAL_AUTH);
protected DatabaseClientConfig configureCustomSslConnection(DatabaseClientConfig clientConfig, Map<String, Object> parsedConfig, Boolean ssl) {
String tlsVersion = (String) parsedConfig.get(MarkLogicSinkConfig.TLS_VERSION);
String sslMutualAuth = (String) parsedConfig.get(MarkLogicSinkConfig.SSL_MUTUAL_AUTH);
SSLContext sslContext = null;
String securityContextType = kafkaConfig.get(MarkLogicSinkConfig.CONNECTION_SECURITY_CONTEXT_TYPE).toUpperCase();
String securityContextType = ((String) parsedConfig.get(MarkLogicSinkConfig.CONNECTION_SECURITY_CONTEXT_TYPE)).toUpperCase();
clientConfig.setSecurityContextType(SecurityContextType.valueOf(securityContextType));

if ("BASIC".equals(securityContextType) ||
"DIGEST".equals(securityContextType)
) {
if (ssl != null && Boolean.parseBoolean(ssl)) {
if (ssl != null && ssl) {
if (sslMutualAuth != null && Boolean.parseBoolean(sslMutualAuth)) {
/*2 way ssl changes*/
KeyStore clientKeyStore = null;
Expand Down Expand Up @@ -155,7 +149,7 @@ protected DatabaseClientConfig configureCustomSslConnection(DatabaseClientConfig
}
clientConfig.setSslContext(sslContext);
}
else {/*1wayssl*/
else {/* 1-way ssl */
TrustManager[] trust = new TrustManager[] { new SimpleX509TrustManager()};
try {
if (tlsVersion != null && tlsVersion.trim().length() > 0 ) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,13 @@
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Date;
import java.util.Map;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;

import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.storage.Converter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.marklogic.client.document.DocumentWriteOperation;
import com.marklogic.client.id.strategy.IdStrategyFactory;
Expand All @@ -24,19 +21,13 @@
import com.marklogic.client.io.Format;
import com.marklogic.client.io.StringHandle;
import com.marklogic.client.io.marker.AbstractWriteHandle;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.storage.Converter;
import java.util.Collections;
import org.apache.kafka.connect.json.JsonConverter;

/**
* Handles converting a SinkRecord into a DocumentWriteOperation via the properties in the given config map.
*/
public class DefaultSinkRecordConverter implements SinkRecordConverter {

private static final Converter JSON_CONVERTER;
private static final Logger logger = LoggerFactory.getLogger(DefaultSinkRecordConverter.class);
static {
JSON_CONVERTER = new JsonConverter();
JSON_CONVERTER.configure(Collections.singletonMap("schemas.enable", "false"), false);
Expand All @@ -48,30 +39,30 @@ public class DefaultSinkRecordConverter implements SinkRecordConverter {
private Boolean addTopicToCollections = false;
private IdStrategy idStrategy = null;

public DefaultSinkRecordConverter(Map<String, String> kafkaConfig) {
String val = kafkaConfig.get(MarkLogicSinkConfig.DOCUMENT_COLLECTIONS_ADD_TOPIC);
if (val != null && val.trim().length() > 0) {
addTopicToCollections = Boolean.parseBoolean(val.trim());
public DefaultSinkRecordConverter(Map<String, Object> parsedConfig) {

Boolean booleanVal = (Boolean) parsedConfig.get(MarkLogicSinkConfig.DOCUMENT_COLLECTIONS_ADD_TOPIC);
if (booleanVal != null) {
addTopicToCollections = booleanVal;
}

documentWriteOperationBuilder = new DocumentWriteOperationBuilder()
.withCollections(kafkaConfig.get(MarkLogicSinkConfig.DOCUMENT_COLLECTIONS))
.withPermissions(kafkaConfig.get(MarkLogicSinkConfig.DOCUMENT_PERMISSIONS))
.withUriPrefix(kafkaConfig.get(MarkLogicSinkConfig.DOCUMENT_URI_PREFIX))
.withUriSuffix(kafkaConfig.get(MarkLogicSinkConfig.DOCUMENT_URI_SUFFIX))
.withCollections((String) parsedConfig.get(MarkLogicSinkConfig.DOCUMENT_COLLECTIONS))
.withPermissions((String) parsedConfig.get(MarkLogicSinkConfig.DOCUMENT_PERMISSIONS))
.withUriPrefix((String) parsedConfig.get(MarkLogicSinkConfig.DOCUMENT_URI_PREFIX))
.withUriSuffix((String) parsedConfig.get(MarkLogicSinkConfig.DOCUMENT_URI_SUFFIX))
;

val = kafkaConfig.get(MarkLogicSinkConfig.DOCUMENT_FORMAT);
String val = (String) parsedConfig.get(MarkLogicSinkConfig.DOCUMENT_FORMAT);
if (val != null && val.trim().length() > 0) {
format = Format.valueOf(val.toUpperCase());
}
val = kafkaConfig.get(MarkLogicSinkConfig.DOCUMENT_MIMETYPE);
val = (String) parsedConfig.get(MarkLogicSinkConfig.DOCUMENT_MIMETYPE);
if (val != null && val.trim().length() > 0) {
mimeType = val;
}
//Get the correct ID or URI generation strategy based on the configuration
idStrategy = IdStrategyFactory.getIdStrategy(kafkaConfig);
idStrategy = IdStrategyFactory.getIdStrategy(parsedConfig);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,46 +50,46 @@ public class MarkLogicSinkConfig extends AbstractConfig {
public static final String LOGGING_RECORD_KEY = "ml.log.record.key";
public static final String LOGGING_RECORD_HEADERS = "ml.log.record.headers";

public static final String ID_STRATEGY = "ml.id.strategy";
public static final String ID_STRATEGY = "ml.id.strategy";
public static final String ID_STRATEGY_PATH = "ml.id.strategy.paths";

public static ConfigDef CONFIG_DEF = new ConfigDef()
.define(CONNECTION_HOST, Type.STRING, Importance.HIGH, "MarkLogic server hostname")
.define(CONNECTION_PORT, Type.INT, Importance.HIGH, "The REST app server port to connect to")
.define(CONNECTION_DATABASE, Type.STRING, Importance.LOW, "Database to connect, if different from the one associated with the port")
.define(CONNECTION_SECURITY_CONTEXT_TYPE, Type.STRING, Importance.HIGH, "Type of MarkLogic security context to create - either digest, basic, kerberos, certificate, or none")
.define(CONNECTION_DATABASE, Type.STRING, "", Importance.LOW, "Database to connect, if different from the one associated with the port")
.define(CONNECTION_SECURITY_CONTEXT_TYPE, Type.STRING, "NONE", Importance.HIGH, "Type of MarkLogic security context to create - either digest, basic, kerberos, certificate, or none")
.define(CONNECTION_USERNAME, Type.STRING, Importance.HIGH, "Name of MarkLogic user to authenticate as")
.define(CONNECTION_PASSWORD, Type.STRING, Importance.HIGH, "Password for the MarkLogic user")
.define(CONNECTION_TYPE, Type.STRING, Importance.LOW, "Connection type; DIRECT or GATEWAY")
.define(CONNECTION_SIMPLE_SSL, Type.BOOLEAN, Importance.LOW, "Set to true to use a trust-everything SSL connection")
.define(CONNECTION_CERT_FILE, Type.STRING, Importance.LOW, "Path to a certificate file")
.define(CONNECTION_CERT_PASSWORD, Type.STRING, Importance.LOW, "Password for the certificate file")
.define(CONNECTION_EXTERNAL_NAME, Type.STRING, Importance.LOW, "External name for Kerberos authentication")
.define(CONNECTION_TYPE, Type.STRING, "DIRECT", Importance.LOW, "Connection type; DIRECT or GATEWAY")
.define(CONNECTION_SIMPLE_SSL, Type.BOOLEAN, false, Importance.LOW, "Set to true to use a trust-everything SSL connection")
.define(CONNECTION_CERT_FILE, Type.STRING, "", Importance.LOW, "Path to a certificate file")
.define(CONNECTION_CERT_PASSWORD, Type.STRING, "", Importance.LOW, "Password for the certificate file")
.define(CONNECTION_EXTERNAL_NAME, Type.STRING, "", Importance.LOW, "External name for Kerberos authentication")
.define(DATAHUB_FLOW_NAME, Type.STRING, null, Importance.MEDIUM, "Name of a Data Hub flow to run")
.define(DATAHUB_FLOW_STEPS, Type.STRING, null, Importance.MEDIUM, "Comma-delimited names of steps to run")
.define(DATAHUB_FLOW_LOG_RESPONSE, Type.BOOLEAN, false, Importance.LOW, "If set to true, the response from running a flow on each ingested batch will be logged at the info level")
.define(DMSDK_BATCH_SIZE, Type.INT, 100, Importance.HIGH, "Number of documents to write in each batch")
.define(DMSDK_THREAD_COUNT, Type.INT, 8, Importance.HIGH, "Number of threads for DMSDK to use")
.define(DMSDK_TRANSFORM, Type.STRING, Importance.MEDIUM, "Name of a REST transform to use when writing documents")
.define(DMSDK_TRANSFORM_PARAMS, Type.STRING, Importance.MEDIUM, "Delimited set of transform names and values")
.define(DMSDK_TRANSFORM, Type.STRING, "", Importance.MEDIUM, "Name of a REST transform to use when writing documents")
.define(DMSDK_TRANSFORM_PARAMS, Type.STRING, "", Importance.MEDIUM, "Delimited set of transform names and values")
.define(DMSDK_TRANSFORM_PARAMS_DELIMITER, Type.STRING, ",", Importance.LOW, "Delimiter for transform parameter names and values; defaults to a comma")
.define(DOCUMENT_COLLECTIONS_ADD_TOPIC, Type.BOOLEAN, false,Importance.LOW, "Indicates if the topic name should be added to the set of collections for a document")
.define(DOCUMENT_COLLECTIONS, Type.STRING, Importance.MEDIUM, "String-delimited collections to add each document to")
.define(DOCUMENT_FORMAT, Type.STRING, Importance.LOW, "Defines format of each document; can be one of json, xml, text, binary, or unknown")
.define(DOCUMENT_MIMETYPE, Type.STRING, Importance.LOW, "Defines the mime type of each document; optional, and typically the format is set instead of the mime type")
.define(DOCUMENT_PERMISSIONS, Type.STRING, Importance.MEDIUM, "String-delimited permissions to add to each document; role1,capability1,role2,capability2,etc")
.define(DOCUMENT_URI_PREFIX, Type.STRING, Importance.MEDIUM, "Prefix to prepend to each generated URI")
.define(DOCUMENT_URI_SUFFIX, Type.STRING, Importance.MEDIUM, "Suffix to append to each generated URI")
.define(SSL, Type.BOOLEAN, Importance.LOW, "Whether SSL connection to the App server - true or false.")
.define(TLS_VERSION, Type.STRING, Importance.LOW, "Version of TLS to connect to MarkLogic SSL enabled App server. Ex. TLSv1.2")
.define(SSL_HOST_VERIFIER, Type.STRING, Importance.LOW, "The strictness of Host Verifier - ANY, COMMON, STRICT")
.define(SSL_MUTUAL_AUTH, Type.BOOLEAN, Importance.LOW, "Mutual Authentication for Basic or Digest : true or false")
.define(DOCUMENT_COLLECTIONS_ADD_TOPIC, Type.BOOLEAN, false, Importance.LOW, "Indicates if the topic name should be added to the set of collections for a document")
.define(DOCUMENT_COLLECTIONS, Type.STRING, "", Importance.MEDIUM, "String-delimited collections to add each document to")
.define(DOCUMENT_FORMAT, Type.STRING, "", Importance.LOW, "Defines format of each document; can be one of json, xml, text, binary, or unknown")
.define(DOCUMENT_MIMETYPE, Type.STRING, "", Importance.LOW, "Defines the mime type of each document; optional, and typically the format is set instead of the mime type")
.define(DOCUMENT_PERMISSIONS, Type.STRING, "", Importance.MEDIUM, "String-delimited permissions to add to each document; role1,capability1,role2,capability2,etc")
.define(DOCUMENT_URI_PREFIX, Type.STRING, "", Importance.MEDIUM, "Prefix to prepend to each generated URI")
.define(DOCUMENT_URI_SUFFIX, Type.STRING, "", Importance.MEDIUM, "Suffix to append to each generated URI")
.define(SSL, Type.BOOLEAN, false, Importance.LOW, "Whether SSL connection to the App server - true or false.")
.define(TLS_VERSION, Type.STRING, "", Importance.LOW, "Version of TLS to connect to MarkLogic SSL enabled App server. Ex. TLSv1.2")
.define(SSL_HOST_VERIFIER, Type.STRING, "", Importance.LOW, "The strictness of Host Verifier - ANY, COMMON, STRICT")
.define(SSL_MUTUAL_AUTH, Type.BOOLEAN, false, Importance.LOW, "Mutual Authentication for Basic or Digest : true or false")

.define(LOGGING_RECORD_KEY, Type.BOOLEAN, false, Importance.LOW, "Log incoming record keys")
.define(LOGGING_RECORD_KEY, Type.BOOLEAN, false, Importance.LOW, "Log incoming record keys")
.define(LOGGING_RECORD_HEADERS, Type.BOOLEAN, false, Importance.LOW, "Log incoming record headers")

.define(ID_STRATEGY, Type.STRING, Importance.LOW, "The ID Strategy for URI.")
.define(ID_STRATEGY_PATH, Type.STRING, Importance.LOW, "The JSON path for ID Strategy")
.define(ID_STRATEGY, Type.STRING, "", Importance.LOW, "The ID Strategy for URI.")
.define(ID_STRATEGY_PATH, Type.STRING, "", Importance.LOW, "The JSON path for ID Strategy")
;

public MarkLogicSinkConfig(final Map<?, ?> originals) {
Expand Down
Loading

0 comments on commit 91fa572

Please sign in to comment.