From 701f294a8a06a1e43d419328ec00f8836bf3fd7e Mon Sep 17 00:00:00 2001 From: Akos Korsos Date: Thu, 1 Sep 2022 20:49:52 +0200 Subject: [PATCH 1/5] add AWS MSK IAM auth library --- build.gradle | 1 + 1 file changed, 1 insertion(+) diff --git a/build.gradle b/build.gradle index b6a164bc..b453667d 100644 --- a/build.gradle +++ b/build.gradle @@ -68,6 +68,7 @@ dependencies { implementation 'org.slf4j:slf4j-api:1.7.36' implementation 'org.lz4:lz4-java:1.7.1' implementation 'org.xerial.snappy:snappy-java:1.1.8.4' + implementation 'software.amazon.msk:aws-msk-iam-auth:1.1.0' } task generateGemJarRequiresFile { doLast { From bb49b913bb249f3f76468574ab72d680e6fa6b50 Mon Sep 17 00:00:00 2001 From: Akos Korsos Date: Wed, 7 Sep 2022 08:50:30 +0200 Subject: [PATCH 2/5] add transitive dependencies for msk iam auth --- build.gradle | 25 +++++++++++++++++++++++-- lib/logstash-integration-kafka_jars.rb | 21 +++++++++++++++++++++ 2 files changed, 44 insertions(+), 2 deletions(-) diff --git a/build.gradle b/build.gradle index b453667d..3ac32325 100644 --- a/build.gradle +++ b/build.gradle @@ -68,7 +68,29 @@ dependencies { implementation 'org.slf4j:slf4j-api:1.7.36' implementation 'org.lz4:lz4-java:1.7.1' implementation 'org.xerial.snappy:snappy-java:1.1.8.4' - implementation 'software.amazon.msk:aws-msk-iam-auth:1.1.0' + + // AWS MSK IAM auth library and its transitive dependencies + implementation('com.amazonaws:aws-java-sdk-core:1.12.290') + implementation('com.amazonaws:aws-java-sdk-sts:1.12.290') + implementation('software.amazon.awssdk:annotations:2.17.267') + implementation('software.amazon.awssdk:apache-client:2.17.267') + implementation('software.amazon.awssdk:auth:2.17.267') + implementation('software.amazon.awssdk:aws-core:2.17.267') + implementation('software.amazon.awssdk:aws-json-protocol:2.17.267') + implementation('software.amazon.awssdk:aws-query-protocol:2.17.267') + implementation('software.amazon.awssdk:http-client-spi:2.17.267') + implementation('software.amazon.awssdk:json-utils:2.17.267') + implementation('software.amazon.awssdk:metrics-spi:2.17.267') + implementation('software.amazon.awssdk:netty-nio-client:2.17.267') + implementation('software.amazon.awssdk:profiles:2.17.267') + implementation('software.amazon.awssdk:protocol-core:2.17.267') + implementation('software.amazon.awssdk:regions:2.17.267') + implementation('software.amazon.awssdk:sdk-core:2.17.267') + implementation('software.amazon.awssdk:sso:2.17.267') + implementation('software.amazon.awssdk:sts:2.17.267') + implementation('software.amazon.awssdk:third-party-jackson-core:2.17.267') + implementation('software.amazon.awssdk:utils:2.17.267') + implementation('software.amazon.msk:aws-msk-iam-auth:1.1.4') } task generateGemJarRequiresFile { doLast { @@ -95,5 +117,4 @@ task vendor { } } } - vendor.dependsOn(generateGemJarRequiresFile) diff --git a/lib/logstash-integration-kafka_jars.rb b/lib/logstash-integration-kafka_jars.rb index 2f4a83a6..d98b98b2 100644 --- a/lib/logstash-integration-kafka_jars.rb +++ b/lib/logstash-integration-kafka_jars.rb @@ -15,3 +15,24 @@ require_jar('org.slf4j', 'slf4j-api', '1.7.36') require_jar('org.lz4', 'lz4-java', '1.7.1') require_jar('org.xerial.snappy', 'snappy-java', '1.1.8.4') +require_jar('com.amazonaws', 'aws-java-sdk-core', '1.12.290') +require_jar('com.amazonaws', 'aws-java-sdk-sts', '1.12.290') +require_jar('software.amazon.awssdk', 'annotations', '2.17.267') +require_jar('software.amazon.awssdk', 'apache-client', '2.17.267') +require_jar('software.amazon.awssdk', 'auth', '2.17.267') +require_jar('software.amazon.awssdk', 'aws-core', '2.17.267') +require_jar('software.amazon.awssdk', 'aws-json-protocol', '2.17.267') +require_jar('software.amazon.awssdk', 'aws-query-protocol', '2.17.267') +require_jar('software.amazon.awssdk', 'http-client-spi', '2.17.267') +require_jar('software.amazon.awssdk', 'json-utils', '2.17.267') +require_jar('software.amazon.awssdk', 'metrics-spi', '2.17.267') +require_jar('software.amazon.awssdk', 'netty-nio-client', '2.17.267') +require_jar('software.amazon.awssdk', 'profiles', '2.17.267') +require_jar('software.amazon.awssdk', 'protocol-core', '2.17.267') +require_jar('software.amazon.awssdk', 'regions', '2.17.267') +require_jar('software.amazon.awssdk', 'sdk-core', '2.17.267') +require_jar('software.amazon.awssdk', 'sso', '2.17.267') +require_jar('software.amazon.awssdk', 'sts', '2.17.267') +require_jar('software.amazon.awssdk', 'third-party-jackson-core', '2.17.267') +require_jar('software.amazon.awssdk', 'utils', '2.17.267') +require_jar('software.amazon.msk', 'aws-msk-iam-auth', '1.1.4') From 48ae18058f05a89bfe2be9949c3608a70a1def18 Mon Sep 17 00:00:00 2001 From: Akos Korsos Date: Wed, 7 Sep 2022 08:51:00 +0200 Subject: [PATCH 3/5] set sasl callback handler for msk iam auth --- lib/logstash/plugin_mixins/kafka/common.rb | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/lib/logstash/plugin_mixins/kafka/common.rb b/lib/logstash/plugin_mixins/kafka/common.rb index 6b4b3fbd..896cdd61 100644 --- a/lib/logstash/plugin_mixins/kafka/common.rb +++ b/lib/logstash/plugin_mixins/kafka/common.rb @@ -39,9 +39,13 @@ def set_sasl_config(props) raise LogStash::ConfigurationError, "sasl_kerberos_service_name must be specified when SASL mechanism is GSSAPI" end + if sasl_mechanism == "AWS_MSK_IAM" + props.put("sasl.client.callback.handler.class", "software.amazon.msk.auth.iam.IAMClientCallbackHandler") + end + props.put("sasl.kerberos.service.name", sasl_kerberos_service_name) unless sasl_kerberos_service_name.nil? props.put("sasl.jaas.config", sasl_jaas_config) unless sasl_jaas_config.nil? end end -end end end \ No newline at end of file +end end end From c2fbe09afcfc0bdf32f8b0b6753933e267fa7b03 Mon Sep 17 00:00:00 2001 From: Akos Korsos Date: Wed, 7 Sep 2022 09:50:05 +0200 Subject: [PATCH 4/5] export all jar dependencies --- build.gradle | 59 +++++++++++++++++----------------------------------- 1 file changed, 19 insertions(+), 40 deletions(-) diff --git a/build.gradle b/build.gradle index 3ac32325..01a71357 100644 --- a/build.gradle +++ b/build.gradle @@ -69,52 +69,31 @@ dependencies { implementation 'org.lz4:lz4-java:1.7.1' implementation 'org.xerial.snappy:snappy-java:1.1.8.4' - // AWS MSK IAM auth library and its transitive dependencies - implementation('com.amazonaws:aws-java-sdk-core:1.12.290') - implementation('com.amazonaws:aws-java-sdk-sts:1.12.290') - implementation('software.amazon.awssdk:annotations:2.17.267') - implementation('software.amazon.awssdk:apache-client:2.17.267') - implementation('software.amazon.awssdk:auth:2.17.267') - implementation('software.amazon.awssdk:aws-core:2.17.267') - implementation('software.amazon.awssdk:aws-json-protocol:2.17.267') - implementation('software.amazon.awssdk:aws-query-protocol:2.17.267') - implementation('software.amazon.awssdk:http-client-spi:2.17.267') - implementation('software.amazon.awssdk:json-utils:2.17.267') - implementation('software.amazon.awssdk:metrics-spi:2.17.267') - implementation('software.amazon.awssdk:netty-nio-client:2.17.267') - implementation('software.amazon.awssdk:profiles:2.17.267') - implementation('software.amazon.awssdk:protocol-core:2.17.267') - implementation('software.amazon.awssdk:regions:2.17.267') - implementation('software.amazon.awssdk:sdk-core:2.17.267') - implementation('software.amazon.awssdk:sso:2.17.267') - implementation('software.amazon.awssdk:sts:2.17.267') - implementation('software.amazon.awssdk:third-party-jackson-core:2.17.267') - implementation('software.amazon.awssdk:utils:2.17.267') - implementation('software.amazon.msk:aws-msk-iam-auth:1.1.4') + implementation('software.amazon.msk:aws-msk-iam-auth:1.1.4') { + because 'AWS MSK IAM authentication support' + } } -task generateGemJarRequiresFile { - doLast { + +task vendor { + doLast{ File jars_file = file('lib/logstash-integration-kafka_jars.rb') + String vendorPathPrefix = "vendor/jar-dependencies" jars_file.newWriter().withWriter { w -> w << "# AUTOGENERATED BY THE GRADLE SCRIPT. DO NOT EDIT.\n\n" w << "require \'jar_dependencies\'\n" - configurations.runtimeClasspath.allDependencies.each { - w << "require_jar(\'${it.group}\', \'${it.name}\', \'${it.version}\')\n" - } - } - } -} + configurations.runtimeClasspath.each { + // File path in gradle cache contains the dependency group/name/version + // Eg: .gradle/caches/.../.../io.confluent/common-utils/6.2.2//common-utils-6.2.2.jar + String version = it.getParentFile().getParentFile().getName() + String name = it.getParentFile().getParentFile().getParentFile().getName() + String group = it.getParentFile().getParentFile().getParentFile().getParentFile().getName() -task vendor { - doLast { - String vendorPathPrefix = "vendor/jar-dependencies" - configurations.runtimeClasspath.allDependencies.each { dep -> - File f = configurations.runtimeClasspath.filter { it.absolutePath.contains("${dep.group}/${dep.name}/${dep.version}") }.singleFile - String groupPath = dep.group.replaceAll('\\.', '/') - File newJarFile = file("${vendorPathPrefix}/${groupPath}/${dep.name}/${dep.version}/${dep.name}-${dep.version}.jar") - newJarFile.mkdirs() - Files.copy(f.toPath(), newJarFile.toPath(), REPLACE_EXISTING) + w << "require_jar(\'${group}\', \'${name}\', \'${version}\')\n" + String groupPath = group.replaceAll('\\.', '/') + File newJarFile = file("${vendorPathPrefix}/${groupPath}/${name}/${version}/${name}-${version}.jar") + newJarFile.mkdirs() + Files.copy(it.toPath(), newJarFile.toPath(), REPLACE_EXISTING) + } } } } -vendor.dependsOn(generateGemJarRequiresFile) From 8c525efb9a863a4815383a21fc54cd225c56bb92 Mon Sep 17 00:00:00 2001 From: Akos Korsos Date: Wed, 7 Sep 2022 10:57:40 +0200 Subject: [PATCH 5/5] add sasl.client.callback.handler.class config for aws msk iam auth --- docs/input-kafka.asciidoc | 23 +++++++++++++++++++++- docs/output-kafka.asciidoc | 23 +++++++++++++++++++++- lib/logstash/inputs/kafka.rb | 4 +++- lib/logstash/outputs/kafka.rb | 2 ++ lib/logstash/plugin_mixins/kafka/common.rb | 5 +---- 5 files changed, 50 insertions(+), 7 deletions(-) diff --git a/docs/input-kafka.asciidoc b/docs/input-kafka.asciidoc index d9537ab7..f0ee0392 100644 --- a/docs/input-kafka.asciidoc +++ b/docs/input-kafka.asciidoc @@ -65,6 +65,19 @@ For more information see https://kafka.apache.org/{kafka_client_doc}/documentati Kafka consumer configuration: https://kafka.apache.org/{kafka_client_doc}/documentation.html#consumerconfigs +==== AWS MSK IAM authentication +If you use AWS MSK, the AWS MSK IAM access control enables you to handle both authentication and authorization for your MSK cluster with AWS IAM. +For more information on this AWS MSK feature see the https://docs.aws.amazon.com/msk/latest/developerguide/iam-access-control.html[AWS documentation]. + +To use this Kafka input with AWS MSK IAM authentication, set the following configuration: +``` +security_protocol => "SASL_SSL" +sasl_mechanism => "AWS_MSK_IAM" +sasl_jaas_config => "software.amazon.msk.auth.iam.IAMLoginModule required;" +sasl_client_callback_handler_class => "software.amazon.msk.auth.iam.IAMClientCallbackHandler" +``` +For more IAM authentication configurations, see the https://github.com/aws/aws-msk-iam-auth[AWS MSK IAM authentication library documentation]. + ==== Metadata fields The following metadata from Kafka broker are added under the `[@metadata]` field: @@ -131,6 +144,7 @@ See the https://kafka.apache.org/{kafka_client_doc}/documentation for more detai | <> |<>|No | <> |<>|No | <> |<>|No +| <> |<>|No | <> |<>|No | <> |<>|No | <> |<>|No @@ -542,9 +556,16 @@ This can be defined either in Kafka's JAAS config or in Kafka's config. * Default value is `"GSSAPI"` http://kafka.apache.org/documentation.html#security_sasl[SASL mechanism] used for client connections. -This may be any mechanism for which a security provider is available. +This may be any mechanism for which a security provider is available. For AWS MSK IAM authentication use `AWS_MSK_IAM`. GSSAPI is the default mechanism. +[id="plugins-{type}s-{plugin}-sasl_client_callback_handler_class""] +===== `sasl_client_callback_handler_class` + * Value type is <> + * There is no default value for this setting. + +The SASL client callback handler class the specified SASL mechanism should use. + [id="plugins-{type}s-{plugin}-schema_registry_key"] ===== `schema_registry_key` diff --git a/docs/output-kafka.asciidoc b/docs/output-kafka.asciidoc index efc5ac41..60450341 100644 --- a/docs/output-kafka.asciidoc +++ b/docs/output-kafka.asciidoc @@ -66,6 +66,19 @@ https://kafka.apache.org/{kafka_client_doc}/documentation.html#producerconfigs NOTE: This plugin does not support using a proxy when communicating to the Kafka broker. +==== AWS MSK IAM authentication +If you use AWS MSK, the AWS MSK IAM access control enables you to handle both authentication and authorization for your MSK cluster with AWS IAM. +For more information on this AWS MSK feature see the https://docs.aws.amazon.com/msk/latest/developerguide/iam-access-control.html[AWS documentation]. + +To use this Kafka input with AWS MSK IAM authentication, set the following configuration: +``` +security_protocol => "SASL_SSL" +sasl_mechanism => "AWS_MSK_IAM" +sasl_jaas_config => "software.amazon.msk.auth.iam.IAMLoginModule required;" +sasl_client_callback_handler_class => "software.amazon.msk.auth.iam.IAMClientCallbackHandler" +``` +For more IAM authentication configurations, see the https://github.com/aws/aws-msk-iam-auth[AWS MSK IAM authentication library documentation]. + [id="plugins-{type}s-{plugin}-options"] ==== Kafka Output Configuration Options @@ -103,6 +116,7 @@ See the https://kafka.apache.org/{kafka_client_doc}/documentation for more detai | <> |<>|No | <> |<>|No | <> |<>|No +| <> |<>|No | <> |<>, one of `["PLAINTEXT", "SSL", "SASL_PLAINTEXT", "SASL_SSL"]`|No | <> |<>|No | <> |<>|No @@ -402,9 +416,16 @@ This can be defined either in Kafka's JAAS config or in Kafka's config. * Default value is `"GSSAPI"` http://kafka.apache.org/documentation.html#security_sasl[SASL mechanism] used for client connections. -This may be any mechanism for which a security provider is available. +This may be any mechanism for which a security provider is available. For AWS MSK IAM authentication use `AWS_MSK_IAM`. GSSAPI is the default mechanism. +[id="plugins-{type}s-{plugin}-sasl_client_callback_handler_class""] +===== `sasl_client_callback_handler_class` + * Value type is <> + * There is no default value for this setting. + +The SASL client callback handler class the specified SASL mechanism should use. + [id="plugins-{type}s-{plugin}-security_protocol"] ===== `security_protocol` diff --git a/lib/logstash/inputs/kafka.rb b/lib/logstash/inputs/kafka.rb index 9662667e..c75b0b50 100644 --- a/lib/logstash/inputs/kafka.rb +++ b/lib/logstash/inputs/kafka.rb @@ -228,6 +228,8 @@ class LogStash::Inputs::Kafka < LogStash::Inputs::Base config :jaas_path, :validate => :path # JAAS configuration settings. This allows JAAS config to be a part of the plugin configuration and allows for different JAAS configuration per each plugin config. config :sasl_jaas_config, :validate => :string + # SASL client callback handler class + config :sasl_client_callback_handler_class, :validate => :string # Optional path to kerberos config file. This is krb5.conf style as detailed in https://web.mit.edu/kerberos/krb5-1.12/doc/admin/conf_files/krb5_conf.html config :kerberos_config, :validate => :path # Option to add Kafka metadata like topic, message size and header key values to the event. @@ -420,7 +422,7 @@ def create_consumer(client_id) props.put(kafka::SEND_BUFFER_CONFIG, send_buffer_bytes.to_s) unless send_buffer_bytes.nil? props.put(kafka::SESSION_TIMEOUT_MS_CONFIG, session_timeout_ms.to_s) unless session_timeout_ms.nil? props.put(kafka::VALUE_DESERIALIZER_CLASS_CONFIG, value_deserializer_class) - props.put(kafka::CLIENT_RACK_CONFIG, client_rack) unless client_rack.nil? + props.put(kafka::CLIENT_RACK_CONFIG, client_rack) unless client_rack.nil? props.put("security.protocol", security_protocol) unless security_protocol.nil? if schema_registry_url diff --git a/lib/logstash/outputs/kafka.rb b/lib/logstash/outputs/kafka.rb index 3acd5b73..1b19b516 100644 --- a/lib/logstash/outputs/kafka.rb +++ b/lib/logstash/outputs/kafka.rb @@ -170,6 +170,8 @@ class LogStash::Outputs::Kafka < LogStash::Outputs::Base config :jaas_path, :validate => :path # JAAS configuration settings. This allows JAAS config to be a part of the plugin configuration and allows for different JAAS configuration per each plugin config. config :sasl_jaas_config, :validate => :string + # SASL client callback handler class + config :sasl_client_callback_handler_class, :validate => :string # Optional path to kerberos config file. This is krb5.conf style as detailed in https://web.mit.edu/kerberos/krb5-1.12/doc/admin/conf_files/krb5_conf.html config :kerberos_config, :validate => :path diff --git a/lib/logstash/plugin_mixins/kafka/common.rb b/lib/logstash/plugin_mixins/kafka/common.rb index 896cdd61..38870926 100644 --- a/lib/logstash/plugin_mixins/kafka/common.rb +++ b/lib/logstash/plugin_mixins/kafka/common.rb @@ -39,12 +39,9 @@ def set_sasl_config(props) raise LogStash::ConfigurationError, "sasl_kerberos_service_name must be specified when SASL mechanism is GSSAPI" end - if sasl_mechanism == "AWS_MSK_IAM" - props.put("sasl.client.callback.handler.class", "software.amazon.msk.auth.iam.IAMClientCallbackHandler") - end - props.put("sasl.kerberos.service.name", sasl_kerberos_service_name) unless sasl_kerberos_service_name.nil? props.put("sasl.jaas.config", sasl_jaas_config) unless sasl_jaas_config.nil? + props.put("sasl.client.callback.handler.class", sasl_client_callback_handler_class) unless sasl_client_callback_handler_class.nil? end end