diff --git a/MarkLogic_Kafka_Connector_v1.2.2.pdf b/MarkLogic_Kafka_Connector_v1.2.2.pdf new file mode 100644 index 0000000..9d833a9 Binary files /dev/null and b/MarkLogic_Kafka_Connector_v1.2.2.pdf differ diff --git a/README.md b/README.md index 2422fb8..cc538ba 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,9 @@ +# v1.2.2 Changes +1. Support of additional authentication options +2. Documentation of how to update the connector for security options + +Refer MarkLogic_Kafka_Connector_v1.2.2.pdf for details + # kafka-connect-marklogic This is a connector for subscribing to Kafka queues and pushing messages to MarkLogic diff --git a/build.gradle b/build.gradle index 4053a43..48be3eb 100644 --- a/build.gradle +++ b/build.gradle @@ -17,9 +17,10 @@ configurations { } dependencies { - compileOnly "org.apache.kafka:connect-api:2.3.0" + compileOnly "org.apache.kafka:connect-api:2.5.0" + compileOnly "org.apache.kafka:connect-json:2.5.0" - compile ("com.marklogic:marklogic-data-hub:5.2.0") { + compile ("com.marklogic:marklogic-data-hub:5.2.2") { // Excluding these because there's no need for them exclude module: "spring-boot-autoconfigure" exclude module: "spring-integration-http" @@ -31,7 +32,8 @@ dependencies { } testCompile "org.junit.jupiter:junit-jupiter-api:5.3.0" - testCompile "org.apache.kafka:connect-api:2.3.0" + testCompile "org.apache.kafka:connect-api:2.5.0" + testCompile "org.apache.kafka:connect-json:2.5.0" // Needed by Gradle 4.6+ - see https://www.petrikainulainen.net/programming/testing/junit-5-tutorial-running-unit-tests-with-gradle/ testRuntime "org.junit.jupiter:junit-jupiter-engine:5.3.0" diff --git a/gradle.properties b/gradle.properties index 58c8798..c25c2a2 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,5 +1,5 @@ group=com.marklogic -version=1.2.1 +version=1.2.2 # For the Confluent Connector Archive componentOwner=marklogic diff --git a/src/main/java/com/marklogic/kafka/connect/DefaultDatabaseClientConfigBuilder.java b/src/main/java/com/marklogic/kafka/connect/DefaultDatabaseClientConfigBuilder.java index 0f32965..c0fb6e5 100644 --- a/src/main/java/com/marklogic/kafka/connect/DefaultDatabaseClientConfigBuilder.java +++ b/src/main/java/com/marklogic/kafka/connect/DefaultDatabaseClientConfigBuilder.java @@ -1,14 +1,33 @@ package com.marklogic.kafka.connect; +import javax.net.ssl.SSLContext; +import java.security.NoSuchAlgorithmException; +import java.util.Map; + +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.security.KeyManagementException; +import java.security.KeyStore; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.security.UnrecoverableKeyException; +import java.security.cert.CertificateException; + +import javax.net.ssl.KeyManager; +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManager; +import javax.net.ssl.TrustManagerFactory; + import com.marklogic.client.DatabaseClient; import com.marklogic.client.DatabaseClientFactory; import com.marklogic.client.ext.DatabaseClientConfig; import com.marklogic.client.ext.SecurityContextType; import com.marklogic.kafka.connect.sink.MarkLogicSinkConfig; -import javax.net.ssl.SSLContext; -import java.security.NoSuchAlgorithmException; -import java.util.Map; +import com.marklogic.client.ext.modulesloader.ssl.SimpleX509TrustManager; public class DefaultDatabaseClientConfigBuilder implements DatabaseClientConfigBuilder { @@ -17,32 +36,26 @@ public DatabaseClientConfig buildDatabaseClientConfig(Map kafkaC DatabaseClientConfig clientConfig = new DatabaseClientConfig(); clientConfig.setCertFile(kafkaConfig.get(MarkLogicSinkConfig.CONNECTION_CERT_FILE)); clientConfig.setCertPassword(kafkaConfig.get(MarkLogicSinkConfig.CONNECTION_CERT_PASSWORD)); - - String type = kafkaConfig.get(MarkLogicSinkConfig.CONNECTION_TYPE); - if (type != null && type.trim().length() > 0) { - clientConfig.setConnectionType(DatabaseClient.ConnectionType.valueOf(type.toUpperCase())); - } - + clientConfig.setTrustManager(new SimpleX509TrustManager()); + clientConfig = configureHostNameVerifier(clientConfig,kafkaConfig); String database = kafkaConfig.get(MarkLogicSinkConfig.CONNECTION_DATABASE); if (database != null && database.trim().length() > 0) { clientConfig.setDatabase(database); } - + String connType = kafkaConfig.get(MarkLogicSinkConfig.CONNECTION_TYPE); + if (connType != null && connType.trim().length() > 0) { + clientConfig.setConnectionType(DatabaseClient.ConnectionType.valueOf(connType.toUpperCase())); + } clientConfig.setExternalName(kafkaConfig.get(MarkLogicSinkConfig.CONNECTION_EXTERNAL_NAME)); clientConfig.setHost(kafkaConfig.get(MarkLogicSinkConfig.CONNECTION_HOST)); clientConfig.setPassword(kafkaConfig.get(MarkLogicSinkConfig.CONNECTION_PASSWORD)); clientConfig.setPort(Integer.parseInt(kafkaConfig.get(MarkLogicSinkConfig.CONNECTION_PORT))); - - String securityContextType = kafkaConfig.get(MarkLogicSinkConfig.CONNECTION_SECURITY_CONTEXT_TYPE).toUpperCase(); - clientConfig.setSecurityContextType(SecurityContextType.valueOf(securityContextType)); - + clientConfig = configureCustomSslConnection(clientConfig, kafkaConfig); String simpleSsl = kafkaConfig.get(MarkLogicSinkConfig.CONNECTION_SIMPLE_SSL); if (simpleSsl != null && Boolean.parseBoolean(simpleSsl)) { - configureSimpleSsl(clientConfig); + clientConfig = configureSimpleSsl(clientConfig); } - clientConfig.setUsername(kafkaConfig.get(MarkLogicSinkConfig.CONNECTION_USERNAME)); - return clientConfig; } @@ -53,13 +66,117 @@ public DatabaseClientConfig buildDatabaseClientConfig(Map kafkaC * * @param clientConfig */ - protected void configureSimpleSsl(DatabaseClientConfig clientConfig) { + protected DatabaseClientConfig configureSimpleSsl(DatabaseClientConfig clientConfig) { try { clientConfig.setSslContext(SSLContext.getDefault()); + clientConfig.setTrustManager(new SimpleX509TrustManager()); } catch (NoSuchAlgorithmException e) { throw new RuntimeException("Unable to get default SSLContext: " + e.getMessage(), e); } - clientConfig.setSslHostnameVerifier(DatabaseClientFactory.SSLHostnameVerifier.ANY); + return clientConfig; + } + + /** + * This function configures the Host Name verifier based on the configuration. + * ANY, STRICT and COMMON are the possible values, ANY being default. + * + * @param clientConfig + */ + protected DatabaseClientConfig configureHostNameVerifier(DatabaseClientConfig clientConfig, Map kafkaConfig) { + String sslHostNameVerifier = kafkaConfig.get(MarkLogicSinkConfig.SSL_HOST_VERIFIER); + if ("ANY".equals(sslHostNameVerifier)) + clientConfig.setSslHostnameVerifier(DatabaseClientFactory.SSLHostnameVerifier.ANY); + else if ("COMMON".equals(sslHostNameVerifier)) + clientConfig.setSslHostnameVerifier(DatabaseClientFactory.SSLHostnameVerifier.COMMON); + else if ("STRICT".equals(sslHostNameVerifier)) + clientConfig.setSslHostnameVerifier(DatabaseClientFactory.SSLHostnameVerifier.STRICT); + else + clientConfig.setSslHostnameVerifier(DatabaseClientFactory.SSLHostnameVerifier.ANY); + return clientConfig; + } + + protected DatabaseClientConfig configureCustomSslConnection(DatabaseClientConfig clientConfig, Map kafkaConfig) { + String ssl = kafkaConfig.get(MarkLogicSinkConfig.SSL); + String tlsVersion = kafkaConfig.get(MarkLogicSinkConfig.TLS_VERSION); + String sslMutualAuth = kafkaConfig.get(MarkLogicSinkConfig.SSL_MUTUAL_AUTH); + SSLContext sslContext = null; + String securityContextType = kafkaConfig.get(MarkLogicSinkConfig.CONNECTION_SECURITY_CONTEXT_TYPE).toUpperCase(); + clientConfig.setSecurityContextType(SecurityContextType.valueOf(securityContextType)); + + if ("BASIC".equals(securityContextType) || + "DIGEST".equals(securityContextType) + ) { + if (ssl != null && Boolean.parseBoolean(ssl)) { + if (sslMutualAuth != null && Boolean.parseBoolean(sslMutualAuth)) { + /*2 way ssl changes*/ + KeyStore clientKeyStore = null; + try { + clientKeyStore = KeyStore.getInstance("PKCS12"); + } catch (KeyStoreException e) { + + throw new RuntimeException("Unable to get default SSLContext: " + e.getMessage(), e); + } + TrustManager[] trust = new TrustManager[] { new SimpleX509TrustManager()}; + + try (InputStream keystoreInputStream = new FileInputStream(clientConfig.getCertFile())) { + clientKeyStore.load(keystoreInputStream, clientConfig.getCertPassword().toCharArray()); + } catch (Exception e) { + throw new RuntimeException("Unable to configure custom SSL connection: " + e.getMessage(), e); + } + KeyManagerFactory keyManagerFactory = null; + try { + keyManagerFactory = KeyManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); + } catch (Exception e) { + + throw new RuntimeException("Unable to configure custom SSL connection: " + e.getMessage(), e); + } + try { + keyManagerFactory.init(clientKeyStore, clientConfig.getCertPassword().toCharArray()); + } catch (Exception e) { + + throw new RuntimeException("Unable to configure custom SSL connection: " + e.getMessage(), e); + } + KeyManager[] key = keyManagerFactory.getKeyManagers(); + try { + if (tlsVersion != null && tlsVersion.trim().length() > 0 ) { + sslContext = SSLContext.getInstance(tlsVersion); + } + else { + sslContext = SSLContext.getInstance("TLSv1.2"); + } + } catch (NoSuchAlgorithmException e) { + + throw new RuntimeException("Unable to configure custom SSL connection:" + e.getMessage(), e); + } + try { + sslContext.init(key, trust, null); + } catch (KeyManagementException e) { + throw new RuntimeException("Unable to configure custom SSL connection:" + e.getMessage(), e); + } + clientConfig.setSslContext(sslContext); + } + else {/*1wayssl*/ + TrustManager[] trust = new TrustManager[] { new SimpleX509TrustManager()}; + try { + if (tlsVersion != null && tlsVersion.trim().length() > 0 ) { + sslContext = SSLContext.getInstance(tlsVersion); + } + else { + sslContext = SSLContext.getInstance("TLSv1.2"); + } + } catch (NoSuchAlgorithmException e) { + throw new RuntimeException("Unable to configure custom SSL connection: " + e.getMessage(), e); + } + try { + sslContext.init(null, trust, null); + }catch (KeyManagementException e) { + throw new RuntimeException("Unable to configure custom SSL connection:" + e.getMessage(), e); + } + clientConfig.setSslContext(sslContext); + } + } /* End of if ssl */ + } + return clientConfig; } } diff --git a/src/main/java/com/marklogic/kafka/connect/sink/MarkLogicSinkConfig.java b/src/main/java/com/marklogic/kafka/connect/sink/MarkLogicSinkConfig.java index 9a2b60e..556a6a1 100644 --- a/src/main/java/com/marklogic/kafka/connect/sink/MarkLogicSinkConfig.java +++ b/src/main/java/com/marklogic/kafka/connect/sink/MarkLogicSinkConfig.java @@ -42,6 +42,11 @@ public class MarkLogicSinkConfig extends AbstractConfig { public static final String DOCUMENT_URI_PREFIX = "ml.document.uriPrefix"; public static final String DOCUMENT_URI_SUFFIX = "ml.document.uriSuffix"; + public static final String SSL = "ml.connection.enableCustomSsl"; + public static final String TLS_VERSION = "ml.connection.customSsl.tlsVersion"; + public static final String SSL_HOST_VERIFIER = "ml.connection.customSsl.hostNameVerifier"; + public static final String SSL_MUTUAL_AUTH = "ml.connection.customSsl.mutualAuth"; + public static ConfigDef CONFIG_DEF = new ConfigDef() .define(CONNECTION_HOST, Type.STRING, Importance.HIGH, "MarkLogic server hostname") .define(CONNECTION_PORT, Type.INT, Importance.HIGH, "The REST app server port to connect to") @@ -68,7 +73,12 @@ public class MarkLogicSinkConfig extends AbstractConfig { .define(DOCUMENT_MIMETYPE, Type.STRING, Importance.LOW, "Defines the mime type of each document; optional, and typically the format is set instead of the mime type") .define(DOCUMENT_PERMISSIONS, Type.STRING, Importance.MEDIUM, "String-delimited permissions to add to each document; role1,capability1,role2,capability2,etc") .define(DOCUMENT_URI_PREFIX, Type.STRING, Importance.MEDIUM, "Prefix to prepend to each generated URI") - .define(DOCUMENT_URI_SUFFIX, Type.STRING, Importance.MEDIUM, "Suffix to append to each generated URI"); + .define(DOCUMENT_URI_SUFFIX, Type.STRING, Importance.MEDIUM, "Suffix to append to each generated URI") + .define(SSL, Type.BOOLEAN, Importance.LOW, "Whether SSL connection to the App server - true or false.") + .define(TLS_VERSION, Type.STRING, Importance.LOW, "Version of TLS to connect to MarkLogic SSL enabled App server. Ex. TLSv1.2") + .define(SSL_HOST_VERIFIER, Type.STRING, Importance.LOW, "The strictness of Host Verifier - ANY, COMMON, STRICT") + .define(SSL_MUTUAL_AUTH, Type.BOOLEAN, Importance.LOW, "Mutual Authentication for Basic or Digest : true or false") + ; public MarkLogicSinkConfig(final Map originals) { super(CONFIG_DEF, originals, false); diff --git a/src/test/java/com/marklogic/kafka/connect/BuildDatabaseClientConfigTest.java b/src/test/java/com/marklogic/kafka/connect/BuildDatabaseClientConfigTest.java index 41b8619..2bcf3e7 100644 --- a/src/test/java/com/marklogic/kafka/connect/BuildDatabaseClientConfigTest.java +++ b/src/test/java/com/marklogic/kafka/connect/BuildDatabaseClientConfigTest.java @@ -6,9 +6,11 @@ import com.marklogic.kafka.connect.sink.MarkLogicSinkConfig; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import com.marklogic.client.DatabaseClientFactory; import java.util.HashMap; import java.util.Map; +import java.io.File; import static org.junit.jupiter.api.Assertions.*; @@ -70,8 +72,78 @@ public void digestAuthenticationAndSimpleSsl() { assertEquals(SecurityContextType.DIGEST, clientConfig.getSecurityContextType()); assertNotNull(clientConfig.getSslContext()); assertNotNull(clientConfig.getSslHostnameVerifier()); - assertNull(clientConfig.getTrustManager(), "If DatabaseClientFactory is given a null TrustManager, it will " + - "default to the JVM's cacerts file, which is a reasonable default approach"); + assertNotNull(clientConfig.getTrustManager()); + } + + @Test + public void basictAuthenticationAndSimpleSsl() { + config.put(MarkLogicSinkConfig.CONNECTION_SECURITY_CONTEXT_TYPE, "basic"); + config.put(MarkLogicSinkConfig.CONNECTION_SIMPLE_SSL, "true"); + + DatabaseClientConfig clientConfig = builder.buildDatabaseClientConfig(config); + assertEquals(SecurityContextType.BASIC, clientConfig.getSecurityContextType()); + assertNotNull(clientConfig.getSslContext()); + assertNotNull(clientConfig.getSslHostnameVerifier()); + assertNotNull(clientConfig.getTrustManager()); + } + + @Test + public void basicAuthenticationAndMutualSSL() { + File file = new File("src/test/resources/srportal.p12"); + String absolutePath = file.getAbsolutePath(); + config.put(MarkLogicSinkConfig.CONNECTION_SECURITY_CONTEXT_TYPE, "basic"); + config.put(MarkLogicSinkConfig.CONNECTION_SIMPLE_SSL, "false"); + config.put(MarkLogicSinkConfig.SSL, "true"); + config.put(MarkLogicSinkConfig.TLS_VERSION, "TLSv1.2"); + config.put(MarkLogicSinkConfig.SSL_HOST_VERIFIER, "STRICT"); + config.put(MarkLogicSinkConfig.SSL_MUTUAL_AUTH, "true"); + config.put(MarkLogicSinkConfig.CONNECTION_CERT_FILE, absolutePath); + config.put(MarkLogicSinkConfig.CONNECTION_CERT_PASSWORD, "abc"); + + DatabaseClientConfig clientConfig = builder.buildDatabaseClientConfig(config); + assertEquals(SecurityContextType.BASIC, clientConfig.getSecurityContextType()); + assertNotNull(clientConfig.getSslContext()); + assertEquals(DatabaseClientFactory.SSLHostnameVerifier.STRICT, clientConfig.getSslHostnameVerifier()); + assertNotNull(clientConfig.getTrustManager()); + } + + @Test + public void basicAuthenticationAndMutualSSLWithInvalidHost() { + File file = new File("src/test/resources/srportal.p12"); + String absolutePath = file.getAbsolutePath(); + config.put(MarkLogicSinkConfig.CONNECTION_SECURITY_CONTEXT_TYPE, "basic"); + config.put(MarkLogicSinkConfig.CONNECTION_SIMPLE_SSL, "false"); + config.put(MarkLogicSinkConfig.SSL, "true"); + config.put(MarkLogicSinkConfig.TLS_VERSION, "TLSv1.2"); + config.put(MarkLogicSinkConfig.SSL_HOST_VERIFIER, "SOMETHING"); + config.put(MarkLogicSinkConfig.SSL_MUTUAL_AUTH, "true"); + config.put(MarkLogicSinkConfig.CONNECTION_CERT_FILE, absolutePath); + config.put(MarkLogicSinkConfig.CONNECTION_CERT_PASSWORD, "abc"); + + DatabaseClientConfig clientConfig = builder.buildDatabaseClientConfig(config); + assertEquals(SecurityContextType.BASIC, clientConfig.getSecurityContextType()); + assertNotNull(clientConfig.getSslContext()); + assertEquals(DatabaseClientFactory.SSLHostnameVerifier.ANY, clientConfig.getSslHostnameVerifier()); + System.out.println(clientConfig.getSslHostnameVerifier()); + assertNotNull(clientConfig.getTrustManager()); + } + + + @Test + public void digestAuthenticationAnd1WaySSL() { + + config.put(MarkLogicSinkConfig.CONNECTION_SECURITY_CONTEXT_TYPE, "digest"); + config.put(MarkLogicSinkConfig.CONNECTION_SIMPLE_SSL, "false"); + config.put(MarkLogicSinkConfig.SSL, "true"); + config.put(MarkLogicSinkConfig.TLS_VERSION, "TLSv1.2"); + config.put(MarkLogicSinkConfig.SSL_HOST_VERIFIER, "STRICT"); + config.put(MarkLogicSinkConfig.SSL_MUTUAL_AUTH, "false"); + + DatabaseClientConfig clientConfig = builder.buildDatabaseClientConfig(config); + assertEquals(SecurityContextType.DIGEST, clientConfig.getSecurityContextType()); + assertNotNull(clientConfig.getSslContext()); + assertNotNull(clientConfig.getSslHostnameVerifier()); + assertNotNull(clientConfig.getTrustManager()); } @Test diff --git a/src/test/java/com/marklogic/kafka/connect/sink/ConvertSinkRecordTest.java b/src/test/java/com/marklogic/kafka/connect/sink/ConvertSinkRecordTest.java index 1d5ff9e..8635b76 100644 --- a/src/test/java/com/marklogic/kafka/connect/sink/ConvertSinkRecordTest.java +++ b/src/test/java/com/marklogic/kafka/connect/sink/ConvertSinkRecordTest.java @@ -6,9 +6,12 @@ import com.marklogic.client.io.Format; import com.marklogic.client.io.StringHandle; import org.apache.kafka.connect.sink.SinkRecord; + import org.junit.jupiter.api.Test; + import java.util.*; +import java.io.IOException; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -22,7 +25,7 @@ public class ConvertSinkRecordTest { MarkLogicSinkTask markLogicSinkTask = new MarkLogicSinkTask(); @Test - public void allPropertiesSet() { + public void allPropertiesSet() throws IOException { Map config = new HashMap<>(); config.put("ml.document.collections", "one,two"); config.put("ml.document.format", "json"); @@ -54,7 +57,7 @@ public void allPropertiesSet() { } @Test - public void noPropertiesSet() { + public void noPropertiesSet() throws IOException { converter = new DefaultSinkRecordConverter(new HashMap<>()); converter.getDocumentWriteOperationBuilder().withContentIdExtractor(content -> "12345"); @@ -68,7 +71,7 @@ public void noPropertiesSet() { } @Test - public void binaryContent() { + public void binaryContent() throws IOException{ converter = new DefaultSinkRecordConverter(new HashMap<>()); DocumentWriteOperation op = converter.convert(newSinkRecord("hello world".getBytes())); diff --git a/src/test/resources/srportal.p12 b/src/test/resources/srportal.p12 new file mode 100644 index 0000000..7e553c2 Binary files /dev/null and b/src/test/resources/srportal.p12 differ