Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…-marklogic-connector into origin-master
  • Loading branch information
BillFarber committed Aug 15, 2020
2 parents 4a8d769 + d9338af commit 1f93b62
Show file tree
Hide file tree
Showing 9 changed files with 239 additions and 29 deletions.
Binary file added MarkLogic_Kafka_Connector_v1.2.2.pdf
Binary file not shown.
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
# v1.2.2 Changes
1. Support of additional authentication options
2. Documentation of how to update the connector for security options

Refer MarkLogic_Kafka_Connector_v1.2.2.pdf for details

# kafka-connect-marklogic

This is a connector for subscribing to Kafka queues and pushing messages to MarkLogic
Expand Down
8 changes: 5 additions & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ configurations {
}

dependencies {
compileOnly "org.apache.kafka:connect-api:2.3.0"
compileOnly "org.apache.kafka:connect-api:2.5.0"
compileOnly "org.apache.kafka:connect-json:2.5.0"

compile ("com.marklogic:marklogic-data-hub:5.2.0") {
compile ("com.marklogic:marklogic-data-hub:5.2.2") {
// Excluding these because there's no need for them
exclude module: "spring-boot-autoconfigure"
exclude module: "spring-integration-http"
Expand All @@ -31,7 +32,8 @@ dependencies {
}

testCompile "org.junit.jupiter:junit-jupiter-api:5.3.0"
testCompile "org.apache.kafka:connect-api:2.3.0"
testCompile "org.apache.kafka:connect-api:2.5.0"
testCompile "org.apache.kafka:connect-json:2.5.0"

// Needed by Gradle 4.6+ - see https://www.petrikainulainen.net/programming/testing/junit-5-tutorial-running-unit-tests-with-gradle/
testRuntime "org.junit.jupiter:junit-jupiter-engine:5.3.0"
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
group=com.marklogic
version=1.2.1
version=1.2.2

# For the Confluent Connector Archive
componentOwner=marklogic
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,33 @@
package com.marklogic.kafka.connect;

import javax.net.ssl.SSLContext;
import java.security.NoSuchAlgorithmException;
import java.util.Map;

import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.security.KeyManagementException;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.UnrecoverableKeyException;
import java.security.cert.CertificateException;

import javax.net.ssl.KeyManager;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;

import com.marklogic.client.DatabaseClient;
import com.marklogic.client.DatabaseClientFactory;
import com.marklogic.client.ext.DatabaseClientConfig;
import com.marklogic.client.ext.SecurityContextType;
import com.marklogic.kafka.connect.sink.MarkLogicSinkConfig;

import javax.net.ssl.SSLContext;
import java.security.NoSuchAlgorithmException;
import java.util.Map;
import com.marklogic.client.ext.modulesloader.ssl.SimpleX509TrustManager;

public class DefaultDatabaseClientConfigBuilder implements DatabaseClientConfigBuilder {

Expand All @@ -17,32 +36,26 @@ public DatabaseClientConfig buildDatabaseClientConfig(Map<String, String> kafkaC
DatabaseClientConfig clientConfig = new DatabaseClientConfig();
clientConfig.setCertFile(kafkaConfig.get(MarkLogicSinkConfig.CONNECTION_CERT_FILE));
clientConfig.setCertPassword(kafkaConfig.get(MarkLogicSinkConfig.CONNECTION_CERT_PASSWORD));

String type = kafkaConfig.get(MarkLogicSinkConfig.CONNECTION_TYPE);
if (type != null && type.trim().length() > 0) {
clientConfig.setConnectionType(DatabaseClient.ConnectionType.valueOf(type.toUpperCase()));
}

clientConfig.setTrustManager(new SimpleX509TrustManager());
clientConfig = configureHostNameVerifier(clientConfig,kafkaConfig);
String database = kafkaConfig.get(MarkLogicSinkConfig.CONNECTION_DATABASE);
if (database != null && database.trim().length() > 0) {
clientConfig.setDatabase(database);
}

String connType = kafkaConfig.get(MarkLogicSinkConfig.CONNECTION_TYPE);
if (connType != null && connType.trim().length() > 0) {
clientConfig.setConnectionType(DatabaseClient.ConnectionType.valueOf(connType.toUpperCase()));
}
clientConfig.setExternalName(kafkaConfig.get(MarkLogicSinkConfig.CONNECTION_EXTERNAL_NAME));
clientConfig.setHost(kafkaConfig.get(MarkLogicSinkConfig.CONNECTION_HOST));
clientConfig.setPassword(kafkaConfig.get(MarkLogicSinkConfig.CONNECTION_PASSWORD));
clientConfig.setPort(Integer.parseInt(kafkaConfig.get(MarkLogicSinkConfig.CONNECTION_PORT)));

String securityContextType = kafkaConfig.get(MarkLogicSinkConfig.CONNECTION_SECURITY_CONTEXT_TYPE).toUpperCase();
clientConfig.setSecurityContextType(SecurityContextType.valueOf(securityContextType));

clientConfig = configureCustomSslConnection(clientConfig, kafkaConfig);
String simpleSsl = kafkaConfig.get(MarkLogicSinkConfig.CONNECTION_SIMPLE_SSL);
if (simpleSsl != null && Boolean.parseBoolean(simpleSsl)) {
configureSimpleSsl(clientConfig);
clientConfig = configureSimpleSsl(clientConfig);
}

clientConfig.setUsername(kafkaConfig.get(MarkLogicSinkConfig.CONNECTION_USERNAME));

return clientConfig;
}

Expand All @@ -53,13 +66,117 @@ public DatabaseClientConfig buildDatabaseClientConfig(Map<String, String> kafkaC
*
* @param clientConfig
*/
protected void configureSimpleSsl(DatabaseClientConfig clientConfig) {
protected DatabaseClientConfig configureSimpleSsl(DatabaseClientConfig clientConfig) {
try {
clientConfig.setSslContext(SSLContext.getDefault());
clientConfig.setTrustManager(new SimpleX509TrustManager());
} catch (NoSuchAlgorithmException e) {
throw new RuntimeException("Unable to get default SSLContext: " + e.getMessage(), e);
}

clientConfig.setSslHostnameVerifier(DatabaseClientFactory.SSLHostnameVerifier.ANY);
return clientConfig;
}

/**
* This function configures the Host Name verifier based on the configuration.
* ANY, STRICT and COMMON are the possible values, ANY being default.
*
* @param clientConfig
*/
protected DatabaseClientConfig configureHostNameVerifier(DatabaseClientConfig clientConfig, Map<String, String> kafkaConfig) {
String sslHostNameVerifier = kafkaConfig.get(MarkLogicSinkConfig.SSL_HOST_VERIFIER);
if ("ANY".equals(sslHostNameVerifier))
clientConfig.setSslHostnameVerifier(DatabaseClientFactory.SSLHostnameVerifier.ANY);
else if ("COMMON".equals(sslHostNameVerifier))
clientConfig.setSslHostnameVerifier(DatabaseClientFactory.SSLHostnameVerifier.COMMON);
else if ("STRICT".equals(sslHostNameVerifier))
clientConfig.setSslHostnameVerifier(DatabaseClientFactory.SSLHostnameVerifier.STRICT);
else
clientConfig.setSslHostnameVerifier(DatabaseClientFactory.SSLHostnameVerifier.ANY);
return clientConfig;
}

protected DatabaseClientConfig configureCustomSslConnection(DatabaseClientConfig clientConfig, Map<String, String> kafkaConfig) {
String ssl = kafkaConfig.get(MarkLogicSinkConfig.SSL);
String tlsVersion = kafkaConfig.get(MarkLogicSinkConfig.TLS_VERSION);
String sslMutualAuth = kafkaConfig.get(MarkLogicSinkConfig.SSL_MUTUAL_AUTH);
SSLContext sslContext = null;
String securityContextType = kafkaConfig.get(MarkLogicSinkConfig.CONNECTION_SECURITY_CONTEXT_TYPE).toUpperCase();
clientConfig.setSecurityContextType(SecurityContextType.valueOf(securityContextType));

if ("BASIC".equals(securityContextType) ||
"DIGEST".equals(securityContextType)
) {
if (ssl != null && Boolean.parseBoolean(ssl)) {
if (sslMutualAuth != null && Boolean.parseBoolean(sslMutualAuth)) {
/*2 way ssl changes*/
KeyStore clientKeyStore = null;
try {
clientKeyStore = KeyStore.getInstance("PKCS12");
} catch (KeyStoreException e) {

throw new RuntimeException("Unable to get default SSLContext: " + e.getMessage(), e);
}
TrustManager[] trust = new TrustManager[] { new SimpleX509TrustManager()};

try (InputStream keystoreInputStream = new FileInputStream(clientConfig.getCertFile())) {
clientKeyStore.load(keystoreInputStream, clientConfig.getCertPassword().toCharArray());
} catch (Exception e) {
throw new RuntimeException("Unable to configure custom SSL connection: " + e.getMessage(), e);
}
KeyManagerFactory keyManagerFactory = null;
try {
keyManagerFactory = KeyManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
} catch (Exception e) {

throw new RuntimeException("Unable to configure custom SSL connection: " + e.getMessage(), e);
}
try {
keyManagerFactory.init(clientKeyStore, clientConfig.getCertPassword().toCharArray());
} catch (Exception e) {

throw new RuntimeException("Unable to configure custom SSL connection: " + e.getMessage(), e);
}
KeyManager[] key = keyManagerFactory.getKeyManagers();
try {
if (tlsVersion != null && tlsVersion.trim().length() > 0 ) {
sslContext = SSLContext.getInstance(tlsVersion);
}
else {
sslContext = SSLContext.getInstance("TLSv1.2");
}
} catch (NoSuchAlgorithmException e) {

throw new RuntimeException("Unable to configure custom SSL connection:" + e.getMessage(), e);
}
try {
sslContext.init(key, trust, null);
} catch (KeyManagementException e) {
throw new RuntimeException("Unable to configure custom SSL connection:" + e.getMessage(), e);
}
clientConfig.setSslContext(sslContext);
}
else {/*1wayssl*/
TrustManager[] trust = new TrustManager[] { new SimpleX509TrustManager()};
try {
if (tlsVersion != null && tlsVersion.trim().length() > 0 ) {
sslContext = SSLContext.getInstance(tlsVersion);
}
else {
sslContext = SSLContext.getInstance("TLSv1.2");
}
} catch (NoSuchAlgorithmException e) {
throw new RuntimeException("Unable to configure custom SSL connection: " + e.getMessage(), e);
}
try {
sslContext.init(null, trust, null);
}catch (KeyManagementException e) {
throw new RuntimeException("Unable to configure custom SSL connection:" + e.getMessage(), e);
}
clientConfig.setSslContext(sslContext);
}
} /* End of if ssl */
}
return clientConfig;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ public class MarkLogicSinkConfig extends AbstractConfig {
public static final String DOCUMENT_URI_PREFIX = "ml.document.uriPrefix";
public static final String DOCUMENT_URI_SUFFIX = "ml.document.uriSuffix";

public static final String SSL = "ml.connection.enableCustomSsl";
public static final String TLS_VERSION = "ml.connection.customSsl.tlsVersion";
public static final String SSL_HOST_VERIFIER = "ml.connection.customSsl.hostNameVerifier";
public static final String SSL_MUTUAL_AUTH = "ml.connection.customSsl.mutualAuth";

public static ConfigDef CONFIG_DEF = new ConfigDef()
.define(CONNECTION_HOST, Type.STRING, Importance.HIGH, "MarkLogic server hostname")
.define(CONNECTION_PORT, Type.INT, Importance.HIGH, "The REST app server port to connect to")
Expand All @@ -68,7 +73,12 @@ public class MarkLogicSinkConfig extends AbstractConfig {
.define(DOCUMENT_MIMETYPE, Type.STRING, Importance.LOW, "Defines the mime type of each document; optional, and typically the format is set instead of the mime type")
.define(DOCUMENT_PERMISSIONS, Type.STRING, Importance.MEDIUM, "String-delimited permissions to add to each document; role1,capability1,role2,capability2,etc")
.define(DOCUMENT_URI_PREFIX, Type.STRING, Importance.MEDIUM, "Prefix to prepend to each generated URI")
.define(DOCUMENT_URI_SUFFIX, Type.STRING, Importance.MEDIUM, "Suffix to append to each generated URI");
.define(DOCUMENT_URI_SUFFIX, Type.STRING, Importance.MEDIUM, "Suffix to append to each generated URI")
.define(SSL, Type.BOOLEAN, Importance.LOW, "Whether SSL connection to the App server - true or false.")
.define(TLS_VERSION, Type.STRING, Importance.LOW, "Version of TLS to connect to MarkLogic SSL enabled App server. Ex. TLSv1.2")
.define(SSL_HOST_VERIFIER, Type.STRING, Importance.LOW, "The strictness of Host Verifier - ANY, COMMON, STRICT")
.define(SSL_MUTUAL_AUTH, Type.BOOLEAN, Importance.LOW, "Mutual Authentication for Basic or Digest : true or false")
;

public MarkLogicSinkConfig(final Map<?, ?> originals) {
super(CONFIG_DEF, originals, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@
import com.marklogic.kafka.connect.sink.MarkLogicSinkConfig;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import com.marklogic.client.DatabaseClientFactory;

import java.util.HashMap;
import java.util.Map;
import java.io.File;

import static org.junit.jupiter.api.Assertions.*;

Expand Down Expand Up @@ -70,8 +72,78 @@ public void digestAuthenticationAndSimpleSsl() {
assertEquals(SecurityContextType.DIGEST, clientConfig.getSecurityContextType());
assertNotNull(clientConfig.getSslContext());
assertNotNull(clientConfig.getSslHostnameVerifier());
assertNull(clientConfig.getTrustManager(), "If DatabaseClientFactory is given a null TrustManager, it will " +
"default to the JVM's cacerts file, which is a reasonable default approach");
assertNotNull(clientConfig.getTrustManager());
}

@Test
public void basictAuthenticationAndSimpleSsl() {
config.put(MarkLogicSinkConfig.CONNECTION_SECURITY_CONTEXT_TYPE, "basic");
config.put(MarkLogicSinkConfig.CONNECTION_SIMPLE_SSL, "true");

DatabaseClientConfig clientConfig = builder.buildDatabaseClientConfig(config);
assertEquals(SecurityContextType.BASIC, clientConfig.getSecurityContextType());
assertNotNull(clientConfig.getSslContext());
assertNotNull(clientConfig.getSslHostnameVerifier());
assertNotNull(clientConfig.getTrustManager());
}

@Test
public void basicAuthenticationAndMutualSSL() {
File file = new File("src/test/resources/srportal.p12");
String absolutePath = file.getAbsolutePath();
config.put(MarkLogicSinkConfig.CONNECTION_SECURITY_CONTEXT_TYPE, "basic");
config.put(MarkLogicSinkConfig.CONNECTION_SIMPLE_SSL, "false");
config.put(MarkLogicSinkConfig.SSL, "true");
config.put(MarkLogicSinkConfig.TLS_VERSION, "TLSv1.2");
config.put(MarkLogicSinkConfig.SSL_HOST_VERIFIER, "STRICT");
config.put(MarkLogicSinkConfig.SSL_MUTUAL_AUTH, "true");
config.put(MarkLogicSinkConfig.CONNECTION_CERT_FILE, absolutePath);
config.put(MarkLogicSinkConfig.CONNECTION_CERT_PASSWORD, "abc");

DatabaseClientConfig clientConfig = builder.buildDatabaseClientConfig(config);
assertEquals(SecurityContextType.BASIC, clientConfig.getSecurityContextType());
assertNotNull(clientConfig.getSslContext());
assertEquals(DatabaseClientFactory.SSLHostnameVerifier.STRICT, clientConfig.getSslHostnameVerifier());
assertNotNull(clientConfig.getTrustManager());
}

@Test
public void basicAuthenticationAndMutualSSLWithInvalidHost() {
File file = new File("src/test/resources/srportal.p12");
String absolutePath = file.getAbsolutePath();
config.put(MarkLogicSinkConfig.CONNECTION_SECURITY_CONTEXT_TYPE, "basic");
config.put(MarkLogicSinkConfig.CONNECTION_SIMPLE_SSL, "false");
config.put(MarkLogicSinkConfig.SSL, "true");
config.put(MarkLogicSinkConfig.TLS_VERSION, "TLSv1.2");
config.put(MarkLogicSinkConfig.SSL_HOST_VERIFIER, "SOMETHING");
config.put(MarkLogicSinkConfig.SSL_MUTUAL_AUTH, "true");
config.put(MarkLogicSinkConfig.CONNECTION_CERT_FILE, absolutePath);
config.put(MarkLogicSinkConfig.CONNECTION_CERT_PASSWORD, "abc");

DatabaseClientConfig clientConfig = builder.buildDatabaseClientConfig(config);
assertEquals(SecurityContextType.BASIC, clientConfig.getSecurityContextType());
assertNotNull(clientConfig.getSslContext());
assertEquals(DatabaseClientFactory.SSLHostnameVerifier.ANY, clientConfig.getSslHostnameVerifier());
System.out.println(clientConfig.getSslHostnameVerifier());
assertNotNull(clientConfig.getTrustManager());
}


@Test
public void digestAuthenticationAnd1WaySSL() {

config.put(MarkLogicSinkConfig.CONNECTION_SECURITY_CONTEXT_TYPE, "digest");
config.put(MarkLogicSinkConfig.CONNECTION_SIMPLE_SSL, "false");
config.put(MarkLogicSinkConfig.SSL, "true");
config.put(MarkLogicSinkConfig.TLS_VERSION, "TLSv1.2");
config.put(MarkLogicSinkConfig.SSL_HOST_VERIFIER, "STRICT");
config.put(MarkLogicSinkConfig.SSL_MUTUAL_AUTH, "false");

DatabaseClientConfig clientConfig = builder.buildDatabaseClientConfig(config);
assertEquals(SecurityContextType.DIGEST, clientConfig.getSecurityContextType());
assertNotNull(clientConfig.getSslContext());
assertNotNull(clientConfig.getSslHostnameVerifier());
assertNotNull(clientConfig.getTrustManager());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@
import com.marklogic.client.io.Format;
import com.marklogic.client.io.StringHandle;
import org.apache.kafka.connect.sink.SinkRecord;

import org.junit.jupiter.api.Test;


import java.util.*;
import java.io.IOException;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand All @@ -22,7 +25,7 @@ public class ConvertSinkRecordTest {
MarkLogicSinkTask markLogicSinkTask = new MarkLogicSinkTask();

@Test
public void allPropertiesSet() {
public void allPropertiesSet() throws IOException {
Map<String, String> config = new HashMap<>();
config.put("ml.document.collections", "one,two");
config.put("ml.document.format", "json");
Expand Down Expand Up @@ -54,7 +57,7 @@ public void allPropertiesSet() {
}

@Test
public void noPropertiesSet() {
public void noPropertiesSet() throws IOException {
converter = new DefaultSinkRecordConverter(new HashMap<>());
converter.getDocumentWriteOperationBuilder().withContentIdExtractor(content -> "12345");

Expand All @@ -68,7 +71,7 @@ public void noPropertiesSet() {
}

@Test
public void binaryContent() {
public void binaryContent() throws IOException{
converter = new DefaultSinkRecordConverter(new HashMap<>());

DocumentWriteOperation op = converter.convert(newSinkRecord("hello world".getBytes()));
Expand Down
Binary file added src/test/resources/srportal.p12
Binary file not shown.

0 comments on commit 1f93b62

Please sign in to comment.