From d092e0e87cb95b7193035bb82682099d82f18e1b Mon Sep 17 00:00:00 2001 From: Malte Sander Date: Mon, 2 Jan 2023 13:48:01 +0000 Subject: [PATCH] Consolidate TLS encryption and authentication (#532) # Description New structure looks like this: ``` apiVersion: kafka.stackable.tech/v1alpha1 kind: KafkaCluster metadata: name: simple-kafka spec: image: productVersion: 3.3.1 stackableVersion: 0.3.0 clusterConfig: authentication: - authenticationClass: kafka-client-auth-tls tls: internalSecretClass: kafka-internal-tls serverSecretClass: tls zookeeperConfigMapName: simple-kafka-znode brokers: .... ``` fixes: https://github.com/stackabletech/kafka-operator/issues/529 test: https://ci.stackable.tech/view/02%20Operator%20Tests%20(custom)/job/kafka-operator-it-custom/29/ Co-authored-by: Malte Sander --- .readme/partials/main.md.j2 | 7 +- CHANGELOG.md | 2 + README.md | 7 +- deploy/crd/kafkacluster.crd.yaml | 98 ++-- deploy/helm/kafka-operator/crds/crds.yaml | 98 ++-- deploy/manifests/crds.yaml | 98 ++-- docs/modules/ROOT/pages/usage.adoc | 77 +-- .../getting_started/examples/code/kafka.yaml | 7 +- .../examples/code/zookeeper.yaml | 4 +- .../simple-kafka-cluster-opa-log4j.yaml | 168 +++--- .../simple-kafka-cluster-opa-allow-all.yaml | 14 +- examples/tls/simple-kafka-cluster-tls.yaml | 19 +- rust/crd/src/authentication.rs | 115 ++++ rust/crd/src/authorization.rs | 11 + rust/crd/src/lib.rs | 409 ++++---------- rust/crd/src/listener.rs | 124 +++-- rust/crd/src/security.rs | 504 ++++++++++++++++++ rust/crd/src/tls.rs | 45 ++ rust/operator/src/command.rs | 98 ---- rust/operator/src/discovery.rs | 5 +- rust/operator/src/kafka_controller.rs | 276 ++++------ rust/operator/src/lib.rs | 1 - .../kuttl/configuration/00-install-zk.yaml.j2 | 2 - .../configuration/01-install-kafka.yaml.j2 | 3 +- .../delete-rolegroup/01-install-kafka.yaml.j2 | 3 +- .../02-delete-secondary.yaml.j2 | 3 +- .../kuttl/smoke/01-install-kafka.yaml.j2 | 10 +- .../kuttl/tls/20-install-kafka.yaml.j2 | 37 +- .../kuttl/tls/test_client_auth_tls.sh | 2 +- tests/templates/kuttl/tls/test_client_tls.sh | 2 +- .../kuttl/upgrade/00-install-zk.yaml.j2 | 2 - .../kuttl/upgrade/01-install-kafka.yaml.j2 | 14 +- tests/test-definition.yaml | 13 +- 33 files changed, 1292 insertions(+), 986 deletions(-) create mode 100644 rust/crd/src/authentication.rs create mode 100644 rust/crd/src/authorization.rs create mode 100644 rust/crd/src/security.rs create mode 100644 rust/crd/src/tls.rs delete mode 100644 rust/operator/src/command.rs diff --git a/.readme/partials/main.md.j2 b/.readme/partials/main.md.j2 index 2c9ca1a9..499bc8af 100644 --- a/.readme/partials/main.md.j2 +++ b/.readme/partials/main.md.j2 @@ -30,9 +30,10 @@ spec: image: productVersion: 3.3.1 stackableVersion: 0.3.0 - zookeeperConfigMapName: simple-kafka-znode - config: - tls: null + clusterConfig: + zookeeperConfigMapName: simple-kafka-znode + tls: + serverSecretClass: null brokers: roleGroups: default: diff --git a/CHANGELOG.md b/CHANGELOG.md index bcd677e3..60c2841d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ All notable changes to this project will be documented in this file. - Don't run init container as root and avoid chmod and chowning ([#524]). - [BREAKING] Use Product image selection instead of version. `spec.version` has been replaced by `spec.image` ([#482]). - [BREAKING]: Removed tools image for init and get-svc container and replaced with Kafka product image. This means the latest stackable version has to be used in the product image selection ([#527]) +- [BREAKING] Consolidated top-level configuration. Split up TLS encryption and authentication. Moved all top-level fields except `spec.image` below `spec.clusterConfig` ([#532]). [#530]: https://github.com/stackabletech/kafka-operator/pull/530 [#482]: https://github.com/stackabletech/kafka-operator/pull/482 @@ -19,6 +20,7 @@ All notable changes to this project will be documented in this file. [#519]: https://github.com/stackabletech/kafka-operator/pull/519 [#524]: https://github.com/stackabletech/kafka-operator/pull/524 [#527]: https://github.com/stackabletech/kafka-operator/pull/527 +[#532]: https://github.com/stackabletech/kafka-operator/pull/532 ## [0.8.0] - 2022-11-07 diff --git a/README.md b/README.md index d2e2f4bf..abeeed1c 100644 --- a/README.md +++ b/README.md @@ -50,9 +50,10 @@ spec: image: productVersion: 3.3.1 stackableVersion: 0.3.0 - zookeeperConfigMapName: simple-kafka-znode - config: - tls: null + clusterConfig: + zookeeperConfigMapName: simple-kafka-znode + tls: + serverSecretClass: null brokers: roleGroups: default: diff --git a/deploy/crd/kafkacluster.crd.yaml b/deploy/crd/kafkacluster.crd.yaml index 940ccbaa..3b49ebc7 100644 --- a/deploy/crd/kafkacluster.crd.yaml +++ b/deploy/crd/kafkacluster.crd.yaml @@ -523,44 +523,68 @@ spec: required: - roleGroups type: object - config: - default: - tls: - secretClass: tls - internalTls: - secretClass: tls + clusterConfig: properties: - clientAuthentication: - description: 'Only affects client connections. This setting controls: - If clients need to authenticate themselves against the server via TLS - Which ca.crt to use when validating the provided client certs Defaults to `None`' - nullable: true - properties: - authenticationClass: - type: string - required: - - authenticationClass - type: object - internalTls: + authentication: + default: [] + description: Authentication class settings for Kafka like mTLS authentication. + items: + properties: + authenticationClass: + description: |- + The AuthenticationClass to use. + + ## TLS provider + + Only affects client connections. This setting controls: - If clients need to authenticate themselves against the broker via TLS - Which ca.crt to use when validating the provided client certs This will override the server TLS settings (if set) in `spec.clusterConfig.tls.serverSecretClass`. + type: string + required: + - authenticationClass + type: object + type: array + authorization: default: - secretClass: tls - description: 'Only affects internal communication. Use mutual verification between Kafka nodes This setting controls: - Which cert the servers should use to authenticate themselves against other servers - Which ca.crt to use when validating the other server' - nullable: true + opa: null + description: Authorization settings for Kafka like OPA. properties: - secretClass: - type: string - required: - - secretClass + opa: + nullable: true + properties: + configMapName: + type: string + package: + nullable: true + type: string + required: + - configMapName + type: object type: object + log4j: + description: Log4j configuration + nullable: true + type: string tls: default: - secretClass: tls - description: 'Only affects client connections. This setting controls: - If TLS encryption is used at all - Which cert the servers should use to authenticate themselves against the client Defaults to `TlsSecretClass` { secret_class: "tls".to_string() }.' + internalSecretClass: tls + serverSecretClass: tls + description: TLS encryption settings for Kafka (server, internal). nullable: true properties: - secretClass: + internalSecretClass: + default: tls + description: 'The to use for internal broker communication. Use mutual verification between brokers (mandatory). This setting controls: - Which cert the brokers should use to authenticate themselves against other brokers - Which ca.crt to use when validating the other brokers Defaults to `tls`' + type: string + serverSecretClass: + default: tls + description: 'The to use for client connections. This setting controls: - If TLS encryption is used at all - Which cert the servers should use to authenticate themselves against the client Defaults to `tls`.' + nullable: true type: string - required: - - secretClass type: object + zookeeperConfigMapName: + description: ZooKeeper discovery config map name. + type: string + required: + - zookeeperConfigMapName type: object image: anyOf: @@ -604,28 +628,12 @@ spec: description: Stackable version of the product, e.g. 2.1.0 type: string type: object - log4j: - nullable: true - type: string - opa: - nullable: true - properties: - configMapName: - type: string - package: - nullable: true - type: string - required: - - configMapName - type: object stopped: nullable: true type: boolean - zookeeperConfigMapName: - type: string required: + - clusterConfig - image - - zookeeperConfigMapName type: object required: - spec diff --git a/deploy/helm/kafka-operator/crds/crds.yaml b/deploy/helm/kafka-operator/crds/crds.yaml index 66994544..c5d34dce 100644 --- a/deploy/helm/kafka-operator/crds/crds.yaml +++ b/deploy/helm/kafka-operator/crds/crds.yaml @@ -525,44 +525,68 @@ spec: required: - roleGroups type: object - config: - default: - tls: - secretClass: tls - internalTls: - secretClass: tls + clusterConfig: properties: - clientAuthentication: - description: 'Only affects client connections. This setting controls: - If clients need to authenticate themselves against the server via TLS - Which ca.crt to use when validating the provided client certs Defaults to `None`' - nullable: true - properties: - authenticationClass: - type: string - required: - - authenticationClass - type: object - internalTls: + authentication: + default: [] + description: Authentication class settings for Kafka like mTLS authentication. + items: + properties: + authenticationClass: + description: |- + The AuthenticationClass to use. + + ## TLS provider + + Only affects client connections. This setting controls: - If clients need to authenticate themselves against the broker via TLS - Which ca.crt to use when validating the provided client certs This will override the server TLS settings (if set) in `spec.clusterConfig.tls.serverSecretClass`. + type: string + required: + - authenticationClass + type: object + type: array + authorization: default: - secretClass: tls - description: 'Only affects internal communication. Use mutual verification between Kafka nodes This setting controls: - Which cert the servers should use to authenticate themselves against other servers - Which ca.crt to use when validating the other server' - nullable: true + opa: null + description: Authorization settings for Kafka like OPA. properties: - secretClass: - type: string - required: - - secretClass + opa: + nullable: true + properties: + configMapName: + type: string + package: + nullable: true + type: string + required: + - configMapName + type: object type: object + log4j: + description: Log4j configuration + nullable: true + type: string tls: default: - secretClass: tls - description: 'Only affects client connections. This setting controls: - If TLS encryption is used at all - Which cert the servers should use to authenticate themselves against the client Defaults to `TlsSecretClass` { secret_class: "tls".to_string() }.' + internalSecretClass: tls + serverSecretClass: tls + description: TLS encryption settings for Kafka (server, internal). nullable: true properties: - secretClass: + internalSecretClass: + default: tls + description: 'The to use for internal broker communication. Use mutual verification between brokers (mandatory). This setting controls: - Which cert the brokers should use to authenticate themselves against other brokers - Which ca.crt to use when validating the other brokers Defaults to `tls`' + type: string + serverSecretClass: + default: tls + description: 'The to use for client connections. This setting controls: - If TLS encryption is used at all - Which cert the servers should use to authenticate themselves against the client Defaults to `tls`.' + nullable: true type: string - required: - - secretClass type: object + zookeeperConfigMapName: + description: ZooKeeper discovery config map name. + type: string + required: + - zookeeperConfigMapName type: object image: anyOf: @@ -606,28 +630,12 @@ spec: description: Stackable version of the product, e.g. 2.1.0 type: string type: object - log4j: - nullable: true - type: string - opa: - nullable: true - properties: - configMapName: - type: string - package: - nullable: true - type: string - required: - - configMapName - type: object stopped: nullable: true type: boolean - zookeeperConfigMapName: - type: string required: + - clusterConfig - image - - zookeeperConfigMapName type: object required: - spec diff --git a/deploy/manifests/crds.yaml b/deploy/manifests/crds.yaml index 386aeefc..be2fa01a 100644 --- a/deploy/manifests/crds.yaml +++ b/deploy/manifests/crds.yaml @@ -526,44 +526,68 @@ spec: required: - roleGroups type: object - config: - default: - tls: - secretClass: tls - internalTls: - secretClass: tls + clusterConfig: properties: - clientAuthentication: - description: 'Only affects client connections. This setting controls: - If clients need to authenticate themselves against the server via TLS - Which ca.crt to use when validating the provided client certs Defaults to `None`' - nullable: true - properties: - authenticationClass: - type: string - required: - - authenticationClass - type: object - internalTls: + authentication: + default: [] + description: Authentication class settings for Kafka like mTLS authentication. + items: + properties: + authenticationClass: + description: |- + The AuthenticationClass to use. + + ## TLS provider + + Only affects client connections. This setting controls: - If clients need to authenticate themselves against the broker via TLS - Which ca.crt to use when validating the provided client certs This will override the server TLS settings (if set) in `spec.clusterConfig.tls.serverSecretClass`. + type: string + required: + - authenticationClass + type: object + type: array + authorization: default: - secretClass: tls - description: 'Only affects internal communication. Use mutual verification between Kafka nodes This setting controls: - Which cert the servers should use to authenticate themselves against other servers - Which ca.crt to use when validating the other server' - nullable: true + opa: null + description: Authorization settings for Kafka like OPA. properties: - secretClass: - type: string - required: - - secretClass + opa: + nullable: true + properties: + configMapName: + type: string + package: + nullable: true + type: string + required: + - configMapName + type: object type: object + log4j: + description: Log4j configuration + nullable: true + type: string tls: default: - secretClass: tls - description: 'Only affects client connections. This setting controls: - If TLS encryption is used at all - Which cert the servers should use to authenticate themselves against the client Defaults to `TlsSecretClass` { secret_class: "tls".to_string() }.' + internalSecretClass: tls + serverSecretClass: tls + description: TLS encryption settings for Kafka (server, internal). nullable: true properties: - secretClass: + internalSecretClass: + default: tls + description: 'The to use for internal broker communication. Use mutual verification between brokers (mandatory). This setting controls: - Which cert the brokers should use to authenticate themselves against other brokers - Which ca.crt to use when validating the other brokers Defaults to `tls`' + type: string + serverSecretClass: + default: tls + description: 'The to use for client connections. This setting controls: - If TLS encryption is used at all - Which cert the servers should use to authenticate themselves against the client Defaults to `tls`.' + nullable: true type: string - required: - - secretClass type: object + zookeeperConfigMapName: + description: ZooKeeper discovery config map name. + type: string + required: + - zookeeperConfigMapName type: object image: anyOf: @@ -607,28 +631,12 @@ spec: description: Stackable version of the product, e.g. 2.1.0 type: string type: object - log4j: - nullable: true - type: string - opa: - nullable: true - properties: - configMapName: - type: string - package: - nullable: true - type: string - required: - - configMapName - type: object stopped: nullable: true type: boolean - zookeeperConfigMapName: - type: string required: + - clusterConfig - image - - zookeeperConfigMapName type: object required: - spec diff --git a/docs/modules/ROOT/pages/usage.adoc b/docs/modules/ROOT/pages/usage.adoc index 983b7004..16ce1092 100644 --- a/docs/modules/ROOT/pages/usage.adoc +++ b/docs/modules/ROOT/pages/usage.adoc @@ -26,7 +26,8 @@ spec: image: productVersion: 3.3.1 stackableVersion: 0.3.0 - zookeeperConfigMapName: simple-kafka-znode + clusterConfig: + zookeeperConfigMapName: simple-kafka-znode brokers: roleGroups: default: @@ -45,10 +46,12 @@ spec: image: productVersion: 3.3.1 stackableVersion: 0.3.0 - zookeeperConfigMapName: simple-kafka-znode - opa: - configMapName: simple-opa - package: kafka + clusterConfig: + authorization: + opa: + configMapName: simple-opa + package: kafka + zookeeperConfigMapName: simple-kafka-znode brokers: roleGroups: default: @@ -68,10 +71,12 @@ spec: image: productVersion: 3.3.1 stackableVersion: 0.3.0 - zookeeperConfigMapName: simple-kafka-znode - opa: - configMapName: simple-opa - package: kafka + clusterConfig: + authorization: + opa: + configMapName: simple-opa + package: kafka + zookeeperConfigMapName: simple-kafka-znode brokers: configOverrides: server.properties: @@ -105,19 +110,20 @@ spec: image: productVersion: 3.3.1 stackableVersion: 0.3.0 - zookeeperConfigMapName: simple-kafka-znode - log4j: |- - log4j.rootLogger=INFO, stdout, kafkaAppender - - log4j.appender.stdout=org.apache.log4j.ConsoleAppender - log4j.appender.stdout.layout=org.apache.log4j.PatternLayout - log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n - - log4j.appender.kafkaAppender=org.apache.log4j.DailyRollingFileAppender - log4j.appender.kafkaAppender.DatePattern='.'yyyy-MM-dd-HH - log4j.appender.kafkaAppender.File=${kafka.logs.dir}/server.log - log4j.appender.kafkaAppender.layout=org.apache.log4j.PatternLayout - log4j.appender.kafkaAppender.layout.ConversionPattern=[%d] %p %m (%c)%n + clusterConfig: + zookeeperConfigMapName: simple-kafka-znode + log4j: |- + log4j.rootLogger=INFO, stdout, kafkaAppender + + log4j.appender.stdout=org.apache.log4j.ConsoleAppender + log4j.appender.stdout.layout=org.apache.log4j.PatternLayout + log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n + + log4j.appender.kafkaAppender=org.apache.log4j.DailyRollingFileAppender + log4j.appender.kafkaAppender.DatePattern='.'yyyy-MM-dd-HH + log4j.appender.kafkaAppender.File=${kafka.logs.dir}/server.log + log4j.appender.kafkaAppender.layout=org.apache.log4j.PatternLayout + log4j.appender.kafkaAppender.layout.ConversionPattern=[%d] %p %m (%c)%n brokers: roleGroups: default: @@ -139,19 +145,18 @@ spec: image: productVersion: 3.3.1 stackableVersion: 0.3.0 - zookeeperConfigMapName: simple-kafka-znode - config: + clusterConfig: + zookeeperConfigMapName: simple-kafka-znode tls: - secretClass: tls # <1> - internalTls: - secretClass: kafka-internal-tls # <2> + serverSecretClass: tls # <1> + internalSecretClass: kafka-internal-tls # <2> brokers: roleGroups: default: replicas: 3 ---- -<1> The `tls.secretClass` refers to the client-to-server encryption. Defaults to the `tls` secret. It can be deactivated by setting `config.tls` to `null`. -<2> The `internalTls.secretClass` refers to the broker-to-broker internal encryption. This must be explicitly set or defaults to `tls`. Can be disabled by setting `config.internalTls` to `null`. +<1> The `spec.clusterConfig.tls.serverSecretClass` refers to the client-to-server encryption. Defaults to the `tls` secret. Can be deactivated by setting `serverSecretClass` to `null`. +<2> The `spec.clusterConfig.tls.internalSecretClass` refers to the broker-to-broker internal encryption. This must be explicitly set or defaults to `tls`. May be disabled by setting `internalSecretClass` to `null`. The `tls` secret is deployed from the xref:secret-operator::index.adoc[Secret Operator] and looks like this: @@ -172,7 +177,7 @@ spec: autoGenerate: true ---- -You can create your own secrets and reference them e.g. in the `tls.secretClass` or `internalTls.secretClass` to use different certificates. +You can create your own secrets and reference them e.g. in the `spec.clusterConfig.tls.serverSecretClass` or `spec.clusterConfig.tls.internalSecretClass` to use different certificates. == Authentication @@ -211,18 +216,16 @@ spec: image: productVersion: 3.3.1 stackableVersion: 0.3.0 - zookeeperConfigMapName: simple-kafka-znode - config: - tls: - secretClass: tls - clientAuthentication: - authenticationClass: kafka-client-tls # <1> + clusterConfig: + authentication: + - authenticationClass: kafka-client-tls # <1> + zookeeperConfigMapName: simple-kafka-znode brokers: roleGroups: default: replicas: 3 ---- -<1> The `config.clientAuthentication.authenticationClass` can be set to use TLS for authentication. This is optional. +<1> The `clusterConfig.authentication.authenticationClass` can be set to use TLS for authentication. This is optional. <2> The referenced `AuthenticationClass` that references a `SecretClass` to provide certificates. <3> The reference to a `SecretClass`. <4> The `SecretClass` that is referenced by the `AuthenticationClass` in order to provide certificates. diff --git a/docs/modules/getting_started/examples/code/kafka.yaml b/docs/modules/getting_started/examples/code/kafka.yaml index d0f087e4..47d01e89 100644 --- a/docs/modules/getting_started/examples/code/kafka.yaml +++ b/docs/modules/getting_started/examples/code/kafka.yaml @@ -7,9 +7,10 @@ spec: image: productVersion: 3.3.1 stackableVersion: 0.3.0 - zookeeperConfigMapName: simple-kafka-znode - config: - tls: null + clusterConfig: + tls: + serverSecretClass: null + zookeeperConfigMapName: simple-kafka-znode brokers: roleGroups: default: diff --git a/docs/modules/getting_started/examples/code/zookeeper.yaml b/docs/modules/getting_started/examples/code/zookeeper.yaml index b2fab1c8..71d2eab2 100644 --- a/docs/modules/getting_started/examples/code/zookeeper.yaml +++ b/docs/modules/getting_started/examples/code/zookeeper.yaml @@ -6,8 +6,8 @@ metadata: spec: image: productVersion: 3.8.0 - stackableVersion: 0.8.0 + stackableVersion: 0.9.0 servers: roleGroups: default: - replicas: 3 + replicas: 1 diff --git a/examples/logging/simple-kafka-cluster-opa-log4j.yaml b/examples/logging/simple-kafka-cluster-opa-log4j.yaml index 57a6f9af..712f0275 100644 --- a/examples/logging/simple-kafka-cluster-opa-log4j.yaml +++ b/examples/logging/simple-kafka-cluster-opa-log4j.yaml @@ -6,7 +6,7 @@ metadata: spec: image: productVersion: 3.8.0 - stackableVersion: 0.8.0 + stackableVersion: 0.9.0 servers: roleGroups: default: @@ -30,7 +30,7 @@ metadata: spec: image: productVersion: 0.45.0 - stackableVersion: 0.2.0 + stackableVersion: 0.3.0 servers: roleGroups: default: @@ -60,90 +60,92 @@ spec: image: productVersion: 3.3.1 stackableVersion: 0.3.0 - zookeeperConfigMapName: simple-kafka-znode - opa: - configMapName: simple-opa - package: kafka/authz - log4j: |- - log4j.rootLogger=INFO, stdout, kafkaAppender - - log4j.appender.stdout=org.apache.log4j.ConsoleAppender - log4j.appender.stdout.layout=org.apache.log4j.PatternLayout - log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n - - log4j.appender.kafkaAppender=org.apache.log4j.DailyRollingFileAppender - log4j.appender.kafkaAppender.DatePattern='.'yyyy-MM-dd-HH - log4j.appender.kafkaAppender.File=${kafka.logs.dir}/server.log - log4j.appender.kafkaAppender.layout=org.apache.log4j.PatternLayout - log4j.appender.kafkaAppender.layout.ConversionPattern=[%d] %p %m (%c)%n - - log4j.appender.stateChangeAppender=org.apache.log4j.DailyRollingFileAppender - log4j.appender.stateChangeAppender.DatePattern='.'yyyy-MM-dd-HH - log4j.appender.stateChangeAppender.File=${kafka.logs.dir}/state-change.log - log4j.appender.stateChangeAppender.layout=org.apache.log4j.PatternLayout - log4j.appender.stateChangeAppender.layout.ConversionPattern=[%d] %p %m (%c)%n - - log4j.appender.requestAppender=org.apache.log4j.DailyRollingFileAppender - log4j.appender.requestAppender.DatePattern='.'yyyy-MM-dd-HH - log4j.appender.requestAppender.File=${kafka.logs.dir}/kafka-request.log - log4j.appender.requestAppender.layout=org.apache.log4j.PatternLayout - log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n - - log4j.appender.cleanerAppender=org.apache.log4j.DailyRollingFileAppender - log4j.appender.cleanerAppender.DatePattern='.'yyyy-MM-dd-HH - log4j.appender.cleanerAppender.File=${kafka.logs.dir}/log-cleaner.log - log4j.appender.cleanerAppender.layout=org.apache.log4j.PatternLayout - log4j.appender.cleanerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n - - log4j.appender.controllerAppender=org.apache.log4j.DailyRollingFileAppender - log4j.appender.controllerAppender.DatePattern='.'yyyy-MM-dd-HH - log4j.appender.controllerAppender.File=${kafka.logs.dir}/controller.log - log4j.appender.controllerAppender.layout=org.apache.log4j.PatternLayout - log4j.appender.controllerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n - - log4j.appender.authorizerAppender=org.apache.log4j.DailyRollingFileAppender - log4j.appender.authorizerAppender.DatePattern='.'yyyy-MM-dd-HH - log4j.appender.authorizerAppender.File=${kafka.logs.dir}/kafka-authorizer.log - log4j.appender.authorizerAppender.layout=org.apache.log4j.PatternLayout - log4j.appender.authorizerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n - - # Change the line below to adjust ZK client logging - log4j.logger.org.apache.zookeeper=INFO - - # Change the two lines below to adjust the general broker logging level (output to server.log and stdout) - log4j.logger.kafka=INFO - log4j.logger.org.apache.kafka=INFO - - # Change to DEBUG or TRACE to enable request logging - log4j.logger.kafka.request.logger=WARN, requestAppender - log4j.additivity.kafka.request.logger=false - - # Uncomment the lines below and change log4j.logger.kafka.network.RequestChannel$ to TRACE for additional output - # related to the handling of requests - #log4j.logger.kafka.network.Processor=TRACE, requestAppender - #log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender - #log4j.additivity.kafka.server.KafkaApis=false - log4j.logger.kafka.network.RequestChannel$=WARN, requestAppender - log4j.additivity.kafka.network.RequestChannel$=false - - log4j.logger.kafka.controller=TRACE, controllerAppender - log4j.additivity.kafka.controller=false - - log4j.logger.kafka.log.LogCleaner=INFO, cleanerAppender - log4j.additivity.kafka.log.LogCleaner=false - - log4j.logger.state.change.logger=INFO, stateChangeAppender - log4j.additivity.state.change.logger=false - - # Access denials are logged at INFO level, change to DEBUG to also log allowed accesses - log4j.logger.kafka.authorizer.logger=INFO, authorizerAppender - log4j.additivity.kafka.authorizer.logger=false - - log4j.logger.org.openpolicyagent=DEBUG, authorizerAppender + clusterConfig: + authorization: + opa: + configMapName: simple-opa + package: kafka/authz + zookeeperConfigMapName: simple-kafka-znode + log4j: |- + log4j.rootLogger=INFO, stdout, kafkaAppender + + log4j.appender.stdout=org.apache.log4j.ConsoleAppender + log4j.appender.stdout.layout=org.apache.log4j.PatternLayout + log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n + + log4j.appender.kafkaAppender=org.apache.log4j.DailyRollingFileAppender + log4j.appender.kafkaAppender.DatePattern='.'yyyy-MM-dd-HH + log4j.appender.kafkaAppender.File=${kafka.logs.dir}/server.log + log4j.appender.kafkaAppender.layout=org.apache.log4j.PatternLayout + log4j.appender.kafkaAppender.layout.ConversionPattern=[%d] %p %m (%c)%n + + log4j.appender.stateChangeAppender=org.apache.log4j.DailyRollingFileAppender + log4j.appender.stateChangeAppender.DatePattern='.'yyyy-MM-dd-HH + log4j.appender.stateChangeAppender.File=${kafka.logs.dir}/state-change.log + log4j.appender.stateChangeAppender.layout=org.apache.log4j.PatternLayout + log4j.appender.stateChangeAppender.layout.ConversionPattern=[%d] %p %m (%c)%n + + log4j.appender.requestAppender=org.apache.log4j.DailyRollingFileAppender + log4j.appender.requestAppender.DatePattern='.'yyyy-MM-dd-HH + log4j.appender.requestAppender.File=${kafka.logs.dir}/kafka-request.log + log4j.appender.requestAppender.layout=org.apache.log4j.PatternLayout + log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n + + log4j.appender.cleanerAppender=org.apache.log4j.DailyRollingFileAppender + log4j.appender.cleanerAppender.DatePattern='.'yyyy-MM-dd-HH + log4j.appender.cleanerAppender.File=${kafka.logs.dir}/log-cleaner.log + log4j.appender.cleanerAppender.layout=org.apache.log4j.PatternLayout + log4j.appender.cleanerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n + + log4j.appender.controllerAppender=org.apache.log4j.DailyRollingFileAppender + log4j.appender.controllerAppender.DatePattern='.'yyyy-MM-dd-HH + log4j.appender.controllerAppender.File=${kafka.logs.dir}/controller.log + log4j.appender.controllerAppender.layout=org.apache.log4j.PatternLayout + log4j.appender.controllerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n + + log4j.appender.authorizerAppender=org.apache.log4j.DailyRollingFileAppender + log4j.appender.authorizerAppender.DatePattern='.'yyyy-MM-dd-HH + log4j.appender.authorizerAppender.File=${kafka.logs.dir}/kafka-authorizer.log + log4j.appender.authorizerAppender.layout=org.apache.log4j.PatternLayout + log4j.appender.authorizerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n + + # Change the line below to adjust ZK client logging + log4j.logger.org.apache.zookeeper=INFO + + # Change the two lines below to adjust the general broker logging level (output to server.log and stdout) + log4j.logger.kafka=INFO + log4j.logger.org.apache.kafka=INFO + + # Change to DEBUG or TRACE to enable request logging + log4j.logger.kafka.request.logger=WARN, requestAppender + log4j.additivity.kafka.request.logger=false + + # Uncomment the lines below and change log4j.logger.kafka.network.RequestChannel$ to TRACE for additional output + # related to the handling of requests + #log4j.logger.kafka.network.Processor=TRACE, requestAppender + #log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender + #log4j.additivity.kafka.server.KafkaApis=false + log4j.logger.kafka.network.RequestChannel$=WARN, requestAppender + log4j.additivity.kafka.network.RequestChannel$=false + + log4j.logger.kafka.controller=TRACE, controllerAppender + log4j.additivity.kafka.controller=false + + log4j.logger.kafka.log.LogCleaner=INFO, cleanerAppender + log4j.additivity.kafka.log.LogCleaner=false + + log4j.logger.state.change.logger=INFO, stateChangeAppender + log4j.additivity.state.change.logger=false + + # Access denials are logged at INFO level, change to DEBUG to also log allowed accesses + log4j.logger.kafka.authorizer.logger=INFO, authorizerAppender + log4j.additivity.kafka.authorizer.logger=false + + log4j.logger.org.openpolicyagent=DEBUG, authorizerAppender brokers: configOverrides: server.properties: - opa.authorizer.cache.expire.after.seconds: "10" + opa.authorizer.cache.expire.after.seconds: "10" roleGroups: default: replicas: 3 diff --git a/examples/opa/simple-kafka-cluster-opa-allow-all.yaml b/examples/opa/simple-kafka-cluster-opa-allow-all.yaml index 25c71dc6..93accf20 100644 --- a/examples/opa/simple-kafka-cluster-opa-allow-all.yaml +++ b/examples/opa/simple-kafka-cluster-opa-allow-all.yaml @@ -6,7 +6,7 @@ metadata: spec: image: productVersion: 3.8.0 - stackableVersion: 0.8.0 + stackableVersion: 0.9.0 servers: roleGroups: default: @@ -30,7 +30,7 @@ metadata: spec: image: productVersion: 0.45.0 - stackableVersion: 0.2.0 + stackableVersion: 0.3.0 servers: roleGroups: default: @@ -60,10 +60,12 @@ spec: image: productVersion: 3.3.1 stackableVersion: 0.3.0 - zookeeperConfigMapName: simple-kafka-znode - opa: - configMapName: simple-opa - package: kafka/authz + clusterConfig: + authorization: + opa: + configMapName: simple-opa + package: kafka/authz + zookeeperConfigMapName: simple-kafka-znode brokers: configOverrides: server.properties: diff --git a/examples/tls/simple-kafka-cluster-tls.yaml b/examples/tls/simple-kafka-cluster-tls.yaml index ee16c304..9b35b3be 100644 --- a/examples/tls/simple-kafka-cluster-tls.yaml +++ b/examples/tls/simple-kafka-cluster-tls.yaml @@ -6,15 +6,11 @@ metadata: spec: image: productVersion: 3.8.0 - stackableVersion: 0.8.0 + stackableVersion: 0.9.0 servers: roleGroups: default: - selector: - matchLabels: - kubernetes.io/os: linux replicas: 3 - config: {} --- apiVersion: zookeeper.stackable.tech/v1alpha1 kind: ZookeeperZnode @@ -67,14 +63,13 @@ spec: image: productVersion: 3.3.1 stackableVersion: 0.3.0 - zookeeperConfigMapName: simple-kafka-znode - config: + clusterConfig: + authentication: + - authenticationClass: kafka-client-auth-tls tls: - secretClass: tls - clientAuthentication: - authenticationClass: kafka-client-auth-tls - internalTls: - secretClass: kafka-internal-tls + internalSecretClass: kafka-internal-tls + serverSecretClass: tls + zookeeperConfigMapName: simple-kafka-znode brokers: roleGroups: default: diff --git a/rust/crd/src/authentication.rs b/rust/crd/src/authentication.rs new file mode 100644 index 00000000..21943603 --- /dev/null +++ b/rust/crd/src/authentication.rs @@ -0,0 +1,115 @@ +use crate::ObjectRef; + +use serde::{Deserialize, Serialize}; +use snafu::{ResultExt, Snafu}; +use stackable_operator::commons::authentication::AuthenticationClassProvider; +use stackable_operator::{ + client::Client, + commons::authentication::AuthenticationClass, + schemars::{self, JsonSchema}, +}; + +const SUPPORTED_AUTHENTICATION_CLASS_PROVIDERS: [&str; 1] = ["TLS"]; + +#[derive(Snafu, Debug)] +pub enum Error { + #[snafu(display("failed to retrieve AuthenticationClass [{}]", authentication_class))] + AuthenticationClassRetrieval { + source: stackable_operator::error::Error, + authentication_class: ObjectRef, + }, + // TODO: Adapt message if multiple authentication classes are supported + #[snafu(display("only one authentication class is currently supported. Possible Authentication class providers are {SUPPORTED_AUTHENTICATION_CLASS_PROVIDERS:?}"))] + MultipleAuthenticationClassesProvided, + #[snafu(display( + "failed to use authentication provider [{provider}] for authentication class [{authentication_class}] - supported providers: {SUPPORTED_AUTHENTICATION_CLASS_PROVIDERS:?}", + ))] + AuthenticationProviderNotSupported { + authentication_class: ObjectRef, + provider: String, + }, +} + +#[derive(Clone, Deserialize, Debug, Eq, JsonSchema, PartialEq, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct KafkaAuthentication { + /// The AuthenticationClass to use. + /// + /// ## TLS provider + /// + /// Only affects client connections. This setting controls: + /// - If clients need to authenticate themselves against the broker via TLS + /// - Which ca.crt to use when validating the provided client certs + /// This will override the server TLS settings (if set) in `spec.clusterConfig.tls.serverSecretClass`. + pub authentication_class: String, +} + +#[derive(Clone, Debug)] +/// Helper struct that contains resolved AuthenticationClasses to reduce network API calls. +pub struct ResolvedAuthenticationClasses { + resolved_authentication_classes: Vec, +} + +impl ResolvedAuthenticationClasses { + pub fn new(resolved_authentication_classes: Vec) -> Self { + Self { + resolved_authentication_classes, + } + } + + /// Resolve provided AuthenticationClasses via API calls and validate the contents. + /// Currently errors out if: + /// - AuthenticationClass could not be resolved + /// - Validation failed + pub async fn from_references( + client: &Client, + auth_classes: &Vec, + ) -> Result { + let mut resolved_authentication_classes: Vec = vec![]; + + for auth_class in auth_classes { + resolved_authentication_classes.push( + AuthenticationClass::resolve(client, &auth_class.authentication_class) + .await + .context(AuthenticationClassRetrievalSnafu { + authentication_class: ObjectRef::::new( + &auth_class.authentication_class, + ), + })?, + ); + } + + ResolvedAuthenticationClasses::new(resolved_authentication_classes).validate() + } + + /// Return the (first) TLS `AuthenticationClass` if available + pub fn get_tls_authentication_class(&self) -> Option<&AuthenticationClass> { + self.resolved_authentication_classes + .iter() + .find(|auth| matches!(auth.spec.provider, AuthenticationClassProvider::Tls(_))) + } + + /// Validates the resolved AuthenticationClasses. + /// Currently errors out if: + /// - More than one AuthenticationClass was provided + /// - AuthenticationClass provider was not supported + pub fn validate(&self) -> Result { + if self.resolved_authentication_classes.len() > 1 { + return Err(Error::MultipleAuthenticationClassesProvided); + } + + for auth_class in &self.resolved_authentication_classes { + match &auth_class.spec.provider { + AuthenticationClassProvider::Tls(_) => {} + _ => { + return Err(Error::AuthenticationProviderNotSupported { + authentication_class: ObjectRef::from_obj(auth_class), + provider: auth_class.spec.provider.to_string(), + }) + } + } + } + + Ok(self.clone()) + } +} diff --git a/rust/crd/src/authorization.rs b/rust/crd/src/authorization.rs new file mode 100644 index 00000000..fb084c46 --- /dev/null +++ b/rust/crd/src/authorization.rs @@ -0,0 +1,11 @@ +use serde::{Deserialize, Serialize}; +use stackable_operator::{ + commons::opa::OpaConfig, + schemars::{self, JsonSchema}, +}; + +#[derive(Clone, Deserialize, Debug, Default, Eq, JsonSchema, PartialEq, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct KafkaAuthorization { + pub opa: Option, +} diff --git a/rust/crd/src/lib.rs b/rust/crd/src/lib.rs index 04443518..bac96ad0 100644 --- a/rust/crd/src/lib.rs +++ b/rust/crd/src/lib.rs @@ -1,22 +1,24 @@ +pub mod authentication; +pub mod authorization; pub mod listener; +pub mod security; +pub mod tls; + +use crate::authentication::KafkaAuthentication; +use crate::authorization::KafkaAuthorization; +use crate::tls::KafkaTls; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, Snafu}; -use stackable_operator::commons::{ - product_image_selection::ProductImage, - resources::{ - CpuLimitsFragment, MemoryLimitsFragment, NoRuntimeLimitsFragment, PvcConfigFragment, - ResourcesFragment, - }, -}; -use stackable_operator::config::fragment::{Fragment, ValidationError}; -use stackable_operator::memory::to_java_heap; -use stackable_operator::role_utils::RoleGroup; use stackable_operator::{ commons::{ - opa::OpaConfig, - resources::{NoRuntimeLimits, PvcConfig, Resources}, + product_image_selection::ProductImage, + resources::{ + CpuLimitsFragment, MemoryLimitsFragment, NoRuntimeLimits, NoRuntimeLimitsFragment, + PvcConfig, PvcConfigFragment, Resources, ResourcesFragment, + }, }, + config::fragment::{Fragment, ValidationError}, config::merge::Merge, error::OperatorResult, k8s_openapi::{ @@ -24,8 +26,9 @@ use stackable_operator::{ apimachinery::pkg::api::resource::Quantity, }, kube::{runtime::reflector::ObjectRef, CustomResource}, + memory::to_java_heap, product_config_utils::{ConfigError, Configuration}, - role_utils::{Role, RoleGroupRef}, + role_utils::{Role, RoleGroup, RoleGroupRef}, schemars::{self, JsonSchema}, }; use std::collections::BTreeMap; @@ -34,13 +37,7 @@ use strum::{Display, EnumIter, EnumString}; pub const DOCKER_IMAGE_BASE_NAME: &str = "kafka"; pub const APP_NAME: &str = "kafka"; pub const OPERATOR_NAME: &str = "kafka.stackable.tech"; -// ports -pub const CLIENT_PORT_NAME: &str = "kafka"; -pub const CLIENT_PORT: u16 = 9092; -pub const SECURE_CLIENT_PORT_NAME: &str = "kafka-tls"; -pub const SECURE_CLIENT_PORT: u16 = 9093; -pub const INTERNAL_PORT: u16 = 19092; -pub const SECURE_INTERNAL_PORT: u16 = 19093; +// metrics pub const METRICS_PORT_NAME: &str = "metrics"; pub const METRICS_PORT: u16 = 9606; // config files @@ -49,52 +46,10 @@ pub const SERVER_PROPERTIES_FILE: &str = "server.properties"; pub const KAFKA_HEAP_OPTS: &str = "KAFKA_HEAP_OPTS"; // server_properties pub const LOG_DIRS_VOLUME_NAME: &str = "log-dirs"; -// - TLS global -pub const TLS_DEFAULT_SECRET_CLASS: &str = "tls"; -pub const SSL_KEYSTORE_LOCATION: &str = "ssl.keystore.location"; -pub const SSL_KEYSTORE_PASSWORD: &str = "ssl.keystore.password"; -pub const SSL_KEYSTORE_TYPE: &str = "ssl.keystore.type"; -pub const SSL_TRUSTSTORE_LOCATION: &str = "ssl.truststore.location"; -pub const SSL_TRUSTSTORE_PASSWORD: &str = "ssl.truststore.password"; -pub const SSL_TRUSTSTORE_TYPE: &str = "ssl.truststore.type"; -pub const SSL_STORE_PASSWORD: &str = "changeit"; -// - TLS client -pub const CLIENT_SSL_KEYSTORE_LOCATION: &str = "listener.name.client.ssl.keystore.location"; -pub const CLIENT_SSL_KEYSTORE_PASSWORD: &str = "listener.name.client.ssl.keystore.password"; -pub const CLIENT_SSL_KEYSTORE_TYPE: &str = "listener.name.client.ssl.keystore.type"; -pub const CLIENT_SSL_TRUSTSTORE_LOCATION: &str = "listener.name.client.ssl.truststore.location"; -pub const CLIENT_SSL_TRUSTSTORE_PASSWORD: &str = "listener.name.client.ssl.truststore.password"; -pub const CLIENT_SSL_TRUSTSTORE_TYPE: &str = "listener.name.client.ssl.truststore.type"; -// - TLS client authentication -pub const CLIENT_AUTH_SSL_KEYSTORE_LOCATION: &str = - "listener.name.client_auth.ssl.keystore.location"; -pub const CLIENT_AUTH_SSL_KEYSTORE_PASSWORD: &str = - "listener.name.client_auth.ssl.keystore.password"; -pub const CLIENT_AUTH_SSL_KEYSTORE_TYPE: &str = "listener.name.client_auth.ssl.keystore.type"; -pub const CLIENT_AUTH_SSL_TRUSTSTORE_LOCATION: &str = - "listener.name.client_auth.ssl.truststore.location"; -pub const CLIENT_AUTH_SSL_TRUSTSTORE_PASSWORD: &str = - "listener.name.client_auth.ssl.truststore.password"; -pub const CLIENT_AUTH_SSL_TRUSTSTORE_TYPE: &str = "listener.name.client_auth.ssl.truststore.type"; -pub const CLIENT_AUTH_SSL_CLIENT_AUTH: &str = "listener.name.client_auth.ssl.client.auth"; -// - TLS internal -pub const SECURITY_INTER_BROKER_PROTOCOL: &str = "security.inter.broker.protocol"; -pub const INTER_BROKER_LISTENER_NAME: &str = "inter.broker.listener.name"; -pub const INTER_SSL_KEYSTORE_LOCATION: &str = "listener.name.internal.ssl.keystore.location"; -pub const INTER_SSL_KEYSTORE_PASSWORD: &str = "listener.name.internal.ssl.keystore.password"; -pub const INTER_SSL_KEYSTORE_TYPE: &str = "listener.name.internal.ssl.keystore.type"; -pub const INTER_SSL_TRUSTSTORE_LOCATION: &str = "listener.name.internal.ssl.truststore.location"; -pub const INTER_SSL_TRUSTSTORE_PASSWORD: &str = "listener.name.internal.ssl.truststore.password"; -pub const INTER_SSL_TRUSTSTORE_TYPE: &str = "listener.name.internal.ssl.truststore.type"; -pub const INTER_SSL_CLIENT_AUTH: &str = "listener.name.internal.ssl.client.auth"; // directories pub const STACKABLE_TMP_DIR: &str = "/stackable/tmp"; pub const STACKABLE_DATA_DIR: &str = "/stackable/data"; pub const STACKABLE_CONFIG_DIR: &str = "/stackable/config"; -pub const STACKABLE_TLS_CLIENT_DIR: &str = "/stackable/tls_client"; -pub const STACKABLE_TLS_CLIENT_AUTH_DIR: &str = "/stackable/tls_client_auth"; -pub const STACKABLE_TLS_INTERNAL_DIR: &str = "/stackable/tls_internal"; -pub const SYSTEM_TRUST_STORE_DIR: &str = "/etc/pki/java/cacerts"; const JVM_HEAP_FACTOR: f32 = 0.8; @@ -131,69 +86,29 @@ pub enum Error { pub struct KafkaClusterSpec { pub image: ProductImage, pub brokers: Option>, - pub zookeeper_config_map_name: String, - pub opa: Option, - pub log4j: Option, - #[serde(default)] - pub config: GlobalKafkaConfig, + pub cluster_config: KafkaClusterConfig, pub stopped: Option, } -#[derive(Clone, Debug, Deserialize, Eq, JsonSchema, PartialEq, Serialize)] +#[derive(Clone, Deserialize, Debug, Eq, JsonSchema, PartialEq, Serialize)] #[serde(rename_all = "camelCase")] -pub struct GlobalKafkaConfig { - /// Only affects client connections. This setting controls: - /// - If TLS encryption is used at all - /// - Which cert the servers should use to authenticate themselves against the client - /// Defaults to `TlsSecretClass` { secret_class: "tls".to_string() }. - #[serde( - default = "tls_secret_class_default", - skip_serializing_if = "Option::is_none" - )] - pub tls: Option, - /// Only affects client connections. This setting controls: - /// - If clients need to authenticate themselves against the server via TLS - /// - Which ca.crt to use when validating the provided client certs - /// Defaults to `None` - #[serde(skip_serializing_if = "Option::is_none")] - pub client_authentication: Option, - /// Only affects internal communication. Use mutual verification between Kafka nodes - /// This setting controls: - /// - Which cert the servers should use to authenticate themselves against other servers - /// - Which ca.crt to use when validating the other server +pub struct KafkaClusterConfig { + /// Authentication class settings for Kafka like mTLS authentication. + #[serde(default)] + pub authentication: Vec, + /// Authorization settings for Kafka like OPA. + #[serde(default)] + pub authorization: KafkaAuthorization, + /// Log4j configuration + pub log4j: Option, + /// TLS encryption settings for Kafka (server, internal). #[serde( - default = "tls_secret_class_default", + default = "tls::default_kafka_tls", skip_serializing_if = "Option::is_none" )] - pub internal_tls: Option, -} - -impl Default for GlobalKafkaConfig { - fn default() -> Self { - GlobalKafkaConfig { - tls: tls_secret_class_default(), - client_authentication: None, - internal_tls: tls_secret_class_default(), - } - } -} - -#[derive(Clone, Default, Debug, Deserialize, Eq, JsonSchema, PartialEq, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct ClientAuthenticationClass { - pub authentication_class: String, -} - -#[derive(Clone, Debug, Deserialize, Eq, JsonSchema, PartialEq, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct TlsSecretClass { - pub secret_class: String, -} - -fn tls_secret_class_default() -> Option { - Some(TlsSecretClass { - secret_class: TLS_DEFAULT_SECRET_CLASS.to_string(), - }) + pub tls: Option, + /// ZooKeeper discovery config map name. + pub zookeeper_config_map_name: String, } impl KafkaCluster { @@ -282,47 +197,6 @@ impl KafkaCluster { .map(|memory_limit| to_java_heap(memory_limit, JVM_HEAP_FACTOR)) .transpose() } - - /// Returns the secret class for client connection encryption. Defaults to `tls`. - pub fn client_tls_secret_class(&self) -> Option<&TlsSecretClass> { - let spec: &KafkaClusterSpec = &self.spec; - spec.config.tls.as_ref() - } - - /// Returns the authentication class used for client authentication - pub fn client_authentication_class(&self) -> Option<&str> { - let spec: &KafkaClusterSpec = &self.spec; - spec.config - .client_authentication - .as_ref() - .map(|tls| tls.authentication_class.as_ref()) - } - - /// Returns the secret class for internal server encryption. - pub fn internal_tls_secret_class(&self) -> Option<&TlsSecretClass> { - let spec: &KafkaClusterSpec = &self.spec; - spec.config.internal_tls.as_ref() - } - - /// Returns the client port based on the security (tls) settings. - pub fn client_port(&self) -> u16 { - if self.client_tls_secret_class().is_some() || self.client_authentication_class().is_some() - { - SECURE_CLIENT_PORT - } else { - CLIENT_PORT - } - } - - /// Returns the client port name based on the security (tls) settings. - pub fn client_port_name(&self) -> &str { - if self.client_tls_secret_class().is_some() || self.client_authentication_class().is_some() - { - SECURE_CLIENT_PORT_NAME - } else { - CLIENT_PORT_NAME - } - } } /// Reference to a single `Pod` that is a component of a [`KafkaCluster`] @@ -438,7 +312,7 @@ impl Configuration for KafkaConfigFragment { if file == SERVER_PROPERTIES_FILE { // OPA - if resource.spec.opa.is_some() { + if resource.spec.cluster_config.authorization.opa.is_some() { config.insert( "authorizer.class.name".to_string(), Some("org.openpolicyagent.kafka.OpaAuthorizer".to_string()), @@ -448,104 +322,6 @@ impl Configuration for KafkaConfigFragment { Some("true".to_string()), ); } - - // We set either client tls with authentication or client tls without authentication - // If authentication is explicitly required we do not want to have any other CAs to - // be trusted. - if resource.client_authentication_class().is_some() { - config.insert( - CLIENT_AUTH_SSL_KEYSTORE_LOCATION.to_string(), - Some(format!("{}/keystore.p12", STACKABLE_TLS_CLIENT_AUTH_DIR)), - ); - config.insert( - CLIENT_AUTH_SSL_KEYSTORE_PASSWORD.to_string(), - Some(SSL_STORE_PASSWORD.to_string()), - ); - config.insert( - CLIENT_AUTH_SSL_KEYSTORE_TYPE.to_string(), - Some("PKCS12".to_string()), - ); - config.insert( - CLIENT_AUTH_SSL_TRUSTSTORE_LOCATION.to_string(), - Some(format!("{}/truststore.p12", STACKABLE_TLS_CLIENT_AUTH_DIR)), - ); - config.insert( - CLIENT_AUTH_SSL_TRUSTSTORE_PASSWORD.to_string(), - Some(SSL_STORE_PASSWORD.to_string()), - ); - config.insert( - CLIENT_AUTH_SSL_TRUSTSTORE_TYPE.to_string(), - Some("PKCS12".to_string()), - ); - // client auth required - config.insert( - CLIENT_AUTH_SSL_CLIENT_AUTH.to_string(), - Some("required".to_string()), - ); - } else if resource.client_tls_secret_class().is_some() { - config.insert( - CLIENT_SSL_KEYSTORE_LOCATION.to_string(), - Some(format!("{}/keystore.p12", STACKABLE_TLS_CLIENT_DIR)), - ); - config.insert( - CLIENT_SSL_KEYSTORE_PASSWORD.to_string(), - Some(SSL_STORE_PASSWORD.to_string()), - ); - config.insert( - CLIENT_SSL_KEYSTORE_TYPE.to_string(), - Some("PKCS12".to_string()), - ); - config.insert( - CLIENT_SSL_TRUSTSTORE_LOCATION.to_string(), - Some(format!("{}/truststore.p12", STACKABLE_TLS_CLIENT_DIR)), - ); - config.insert( - CLIENT_SSL_TRUSTSTORE_PASSWORD.to_string(), - Some(SSL_STORE_PASSWORD.to_string()), - ); - config.insert( - CLIENT_SSL_TRUSTSTORE_TYPE.to_string(), - Some("PKCS12".to_string()), - ); - } - - // Internal TLS - if resource.internal_tls_secret_class().is_some() { - config.insert( - INTER_SSL_KEYSTORE_LOCATION.to_string(), - Some(format!("{}/keystore.p12", STACKABLE_TLS_INTERNAL_DIR)), - ); - config.insert( - INTER_SSL_KEYSTORE_PASSWORD.to_string(), - Some(SSL_STORE_PASSWORD.to_string()), - ); - config.insert( - INTER_SSL_KEYSTORE_TYPE.to_string(), - Some("PKCS12".to_string()), - ); - config.insert( - INTER_SSL_TRUSTSTORE_LOCATION.to_string(), - Some(format!("{}/truststore.p12", STACKABLE_TLS_INTERNAL_DIR)), - ); - config.insert( - INTER_SSL_TRUSTSTORE_PASSWORD.to_string(), - Some(SSL_STORE_PASSWORD.to_string()), - ); - config.insert( - INTER_SSL_TRUSTSTORE_TYPE.to_string(), - Some("PKCS12".to_string()), - ); - config.insert( - INTER_SSL_CLIENT_AUTH.to_string(), - Some("required".to_string()), - ); - } - - // common - config.insert( - INTER_BROKER_LISTENER_NAME.to_string(), - Some(listener::KafkaListenerName::Internal.to_string()), - ); } Ok(config) @@ -556,6 +332,26 @@ impl Configuration for KafkaConfigFragment { mod tests { use super::*; + fn get_server_secret_class(kafka: &KafkaCluster) -> Option { + kafka + .spec + .cluster_config + .tls + .as_ref() + .and_then(|tls| tls.server_secret_class.clone()) + } + + fn get_internal_secret_class(kafka: &KafkaCluster) -> String { + kafka + .spec + .cluster_config + .tls + .as_ref() + .unwrap() + .internal_secret_class + .clone() + } + #[test] fn test_client_tls() { let input = r#" @@ -567,16 +363,14 @@ mod tests { image: productVersion: 42.0.0 stackableVersion: 0.42.0 - zookeeperConfigMapName: xyz + clusterConfig: + zookeeperConfigMapName: xyz "#; let kafka: KafkaCluster = serde_yaml::from_str(input).expect("illegal test input"); + assert_eq!(get_server_secret_class(&kafka), tls::server_tls_default()); assert_eq!( - kafka.client_tls_secret_class().unwrap().secret_class, - TLS_DEFAULT_SECRET_CLASS.to_string() - ); - assert_eq!( - kafka.internal_tls_secret_class().unwrap().secret_class, - TLS_DEFAULT_SECRET_CLASS.to_string() + get_internal_secret_class(&kafka), + tls::internal_tls_default() ); let input = r#" @@ -588,19 +382,20 @@ mod tests { image: productVersion: 42.0.0 stackableVersion: 0.42.0 - zookeeperConfigMapName: xyz - config: + clusterConfig: tls: - secretClass: simple-kafka-client-tls + serverSecretClass: simple-kafka-server-tls + zookeeperConfigMapName: xyz + "#; let kafka: KafkaCluster = serde_yaml::from_str(input).expect("illegal test input"); assert_eq!( - kafka.client_tls_secret_class().unwrap().secret_class, - "simple-kafka-client-tls".to_string() + get_server_secret_class(&kafka).unwrap(), + "simple-kafka-server-tls".to_string() ); assert_eq!( - kafka.internal_tls_secret_class().unwrap().secret_class, - TLS_DEFAULT_SECRET_CLASS + get_internal_secret_class(&kafka), + tls::internal_tls_default() ); let input = r#" @@ -612,15 +407,16 @@ mod tests { image: productVersion: 42.0.0 stackableVersion: 0.42.0 - zookeeperConfigMapName: xyz - config: - tls: null + clusterConfig: + tls: + serverSecretClass: null + zookeeperConfigMapName: xyz "#; let kafka: KafkaCluster = serde_yaml::from_str(input).expect("illegal test input"); - assert_eq!(kafka.client_tls_secret_class(), None); + assert_eq!(get_server_secret_class(&kafka), None); assert_eq!( - kafka.internal_tls_secret_class().unwrap().secret_class, - TLS_DEFAULT_SECRET_CLASS.to_string() + get_internal_secret_class(&kafka), + tls::internal_tls_default() ); let input = r#" @@ -633,18 +429,16 @@ mod tests { productVersion: 42.0.0 stackableVersion: 0.42.0 zookeeperConfigMapName: xyz - config: - internalTls: - secretClass: simple-kafka-internal-tls + clusterConfig: + tls: + internalSecretClass: simple-kafka-internal-tls + zookeeperConfigMapName: xyz "#; let kafka: KafkaCluster = serde_yaml::from_str(input).expect("illegal test input"); + assert_eq!(get_server_secret_class(&kafka), tls::server_tls_default()); assert_eq!( - kafka.client_tls_secret_class().unwrap().secret_class, - TLS_DEFAULT_SECRET_CLASS.to_string() - ); - assert_eq!( - kafka.internal_tls_secret_class().unwrap().secret_class, - "simple-kafka-internal-tls" + get_internal_secret_class(&kafka), + "simple-kafka-internal-tls".to_string() ); } @@ -659,16 +453,14 @@ mod tests { image: productVersion: 42.0.0 stackableVersion: 0.42.0 - zookeeperConfigMapName: xyz + clusterConfig: + zookeeperConfigMapName: xyz "#; let kafka: KafkaCluster = serde_yaml::from_str(input).expect("illegal test input"); + assert_eq!(get_server_secret_class(&kafka), tls::server_tls_default()); assert_eq!( - kafka.internal_tls_secret_class().unwrap().secret_class, - TLS_DEFAULT_SECRET_CLASS.to_string() - ); - assert_eq!( - kafka.client_tls_secret_class().unwrap().secret_class, - TLS_DEFAULT_SECRET_CLASS + get_internal_secret_class(&kafka), + tls::internal_tls_default() ); let input = r#" @@ -680,20 +472,17 @@ mod tests { image: productVersion: 42.0.0 stackableVersion: 0.42.0 - zookeeperConfigMapName: xyz - config: - internalTls: - secretClass: simple-kafka-internal-tls + clusterConfig: + tls: + internalSecretClass: simple-kafka-internal-tls + zookeeperConfigMapName: xyz "#; let kafka: KafkaCluster = serde_yaml::from_str(input).expect("illegal test input"); + assert_eq!(get_server_secret_class(&kafka), tls::server_tls_default()); assert_eq!( - kafka.internal_tls_secret_class().unwrap().secret_class, + get_internal_secret_class(&kafka), "simple-kafka-internal-tls".to_string() ); - assert_eq!( - kafka.client_tls_secret_class().unwrap().secret_class, - TLS_DEFAULT_SECRET_CLASS - ); let input = r#" apiVersion: kafka.stackable.tech/v1alpha1 @@ -704,19 +493,19 @@ mod tests { image: productVersion: 42.0.0 stackableVersion: 0.42.0 - zookeeperConfigMapName: xyz - config: + clusterConfig: tls: - secretClass: simple-kafka-client-tls + serverSecretClass: simple-kafka-server-tls + zookeeperConfigMapName: xyz "#; let kafka: KafkaCluster = serde_yaml::from_str(input).expect("illegal test input"); assert_eq!( - kafka.internal_tls_secret_class().unwrap().secret_class, - TLS_DEFAULT_SECRET_CLASS.to_string() + get_server_secret_class(&kafka), + Some("simple-kafka-server-tls".to_string()) ); assert_eq!( - kafka.client_tls_secret_class().unwrap().secret_class, - "simple-kafka-client-tls" + get_internal_secret_class(&kafka), + tls::internal_tls_default() ); } } diff --git a/rust/crd/src/listener.rs b/rust/crd/src/listener.rs index d1d2be8e..ab9a4606 100644 --- a/rust/crd/src/listener.rs +++ b/rust/crd/src/listener.rs @@ -1,7 +1,6 @@ -use crate::{ - KafkaCluster, CLIENT_PORT, CLIENT_PORT_NAME, INTERNAL_PORT, SECURE_CLIENT_PORT, - SECURE_CLIENT_PORT_NAME, SECURE_INTERNAL_PORT, STACKABLE_TMP_DIR, -}; +use crate::{KafkaCluster, STACKABLE_TMP_DIR}; + +use crate::security::KafkaTlsSecurity; use snafu::{OptionExt, Snafu}; use stackable_operator::kube::ResourceExt; use std::collections::BTreeMap; @@ -90,6 +89,7 @@ impl Display for KafkaListener { pub fn get_kafka_listener_config( kafka: &KafkaCluster, + kafka_security: &KafkaTlsSecurity, object_name: &str, ) -> Result { let pod_fqdn = pod_fqdn(kafka, object_name)?; @@ -97,31 +97,31 @@ pub fn get_kafka_listener_config( let mut advertised_listeners = vec![]; let mut listener_security_protocol_map = BTreeMap::new(); - if kafka.client_authentication_class().is_some() { + if kafka_security.tls_client_authentication_class().is_some() { // 1) If client authentication required, we expose only CLIENT_AUTH connection with SSL listeners.push(KafkaListener { name: KafkaListenerName::ClientAuth, host: LISTENER_LOCAL_ADDRESS.to_string(), - port: SECURE_CLIENT_PORT.to_string(), + port: kafka_security.client_port().to_string(), }); advertised_listeners.push(KafkaListener { name: KafkaListenerName::ClientAuth, host: LISTENER_NODE_ADDRESS.to_string(), - port: node_port_cmd(STACKABLE_TMP_DIR, SECURE_CLIENT_PORT_NAME), + port: node_port_cmd(STACKABLE_TMP_DIR, kafka_security.client_port_name()), }); listener_security_protocol_map .insert(KafkaListenerName::ClientAuth, KafkaListenerProtocol::Ssl); - } else if kafka.client_tls_secret_class().is_some() { + } else if kafka_security.tls_server_secret_class().is_some() { // 2) If no client authentication but tls is required we expose CLIENT with SSL listeners.push(KafkaListener { name: KafkaListenerName::Client, host: LISTENER_LOCAL_ADDRESS.to_string(), - port: SECURE_CLIENT_PORT.to_string(), + port: kafka_security.client_port().to_string(), }); advertised_listeners.push(KafkaListener { name: KafkaListenerName::Client, host: LISTENER_NODE_ADDRESS.to_string(), - port: node_port_cmd(STACKABLE_TMP_DIR, SECURE_CLIENT_PORT_NAME), + port: node_port_cmd(STACKABLE_TMP_DIR, kafka_security.client_port_name()), }); listener_security_protocol_map .insert(KafkaListenerName::Client, KafkaListenerProtocol::Ssl); @@ -130,28 +130,28 @@ pub fn get_kafka_listener_config( listeners.push(KafkaListener { name: KafkaListenerName::Client, host: LISTENER_LOCAL_ADDRESS.to_string(), - port: CLIENT_PORT.to_string(), + port: KafkaTlsSecurity::CLIENT_PORT.to_string(), }); advertised_listeners.push(KafkaListener { name: KafkaListenerName::Client, host: LISTENER_NODE_ADDRESS.to_string(), - port: node_port_cmd(STACKABLE_TMP_DIR, CLIENT_PORT_NAME), + port: node_port_cmd(STACKABLE_TMP_DIR, kafka_security.client_port_name()), }); listener_security_protocol_map .insert(KafkaListenerName::Client, KafkaListenerProtocol::Plaintext); } - if kafka.internal_tls_secret_class().is_some() { + if kafka_security.tls_internal_secret_class().is_some() { // 4) If internal tls is required we expose INTERNAL as SSL listeners.push(KafkaListener { name: KafkaListenerName::Internal, host: LISTENER_LOCAL_ADDRESS.to_string(), - port: SECURE_INTERNAL_PORT.to_string(), + port: kafka_security.internal_port().to_string(), }); advertised_listeners.push(KafkaListener { name: KafkaListenerName::Internal, host: pod_fqdn, - port: SECURE_INTERNAL_PORT.to_string(), + port: kafka_security.internal_port().to_string(), }); listener_security_protocol_map .insert(KafkaListenerName::Internal, KafkaListenerProtocol::Ssl); @@ -160,12 +160,12 @@ pub fn get_kafka_listener_config( listeners.push(KafkaListener { name: KafkaListenerName::Internal, host: LISTENER_LOCAL_ADDRESS.to_string(), - port: INTERNAL_PORT.to_string(), + port: kafka_security.internal_port().to_string(), }); advertised_listeners.push(KafkaListener { name: KafkaListenerName::Internal, host: pod_fqdn, - port: INTERNAL_PORT.to_string(), + port: kafka_security.internal_port().to_string(), }); listener_security_protocol_map.insert( KafkaListenerName::Internal, @@ -195,12 +195,19 @@ fn pod_fqdn(kafka: &KafkaCluster, object_name: &str) -> Result, +} + +impl KafkaTlsSecurity { + // ports + pub const CLIENT_PORT_NAME: &'static str = "kafka"; + pub const CLIENT_PORT: u16 = 9092; + pub const SECURE_CLIENT_PORT_NAME: &'static str = "kafka-tls"; + pub const SECURE_CLIENT_PORT: u16 = 9093; + pub const INTERNAL_PORT: u16 = 19092; + pub const SECURE_INTERNAL_PORT: u16 = 19093; + // - TLS global + const SSL_STORE_PASSWORD: &'static str = "changeit"; + // - TLS client + const CLIENT_SSL_KEYSTORE_LOCATION: &'static str = "listener.name.client.ssl.keystore.location"; + const CLIENT_SSL_KEYSTORE_PASSWORD: &'static str = "listener.name.client.ssl.keystore.password"; + const CLIENT_SSL_KEYSTORE_TYPE: &'static str = "listener.name.client.ssl.keystore.type"; + const CLIENT_SSL_TRUSTSTORE_LOCATION: &'static str = + "listener.name.client.ssl.truststore.location"; + const CLIENT_SSL_TRUSTSTORE_PASSWORD: &'static str = + "listener.name.client.ssl.truststore.password"; + const CLIENT_SSL_TRUSTSTORE_TYPE: &'static str = "listener.name.client.ssl.truststore.type"; + // - TLS client authentication + const CLIENT_AUTH_SSL_KEYSTORE_LOCATION: &'static str = + "listener.name.client_auth.ssl.keystore.location"; + const CLIENT_AUTH_SSL_KEYSTORE_PASSWORD: &'static str = + "listener.name.client_auth.ssl.keystore.password"; + const CLIENT_AUTH_SSL_KEYSTORE_TYPE: &'static str = + "listener.name.client_auth.ssl.keystore.type"; + const CLIENT_AUTH_SSL_TRUSTSTORE_LOCATION: &'static str = + "listener.name.client_auth.ssl.truststore.location"; + const CLIENT_AUTH_SSL_TRUSTSTORE_PASSWORD: &'static str = + "listener.name.client_auth.ssl.truststore.password"; + const CLIENT_AUTH_SSL_TRUSTSTORE_TYPE: &'static str = + "listener.name.client_auth.ssl.truststore.type"; + const CLIENT_AUTH_SSL_CLIENT_AUTH: &'static str = "listener.name.client_auth.ssl.client.auth"; + // - TLS internal + const INTER_BROKER_LISTENER_NAME: &'static str = "inter.broker.listener.name"; + const INTER_SSL_KEYSTORE_LOCATION: &'static str = + "listener.name.internal.ssl.keystore.location"; + const INTER_SSL_KEYSTORE_PASSWORD: &'static str = + "listener.name.internal.ssl.keystore.password"; + const INTER_SSL_KEYSTORE_TYPE: &'static str = "listener.name.internal.ssl.keystore.type"; + const INTER_SSL_TRUSTSTORE_LOCATION: &'static str = + "listener.name.internal.ssl.truststore.location"; + const INTER_SSL_TRUSTSTORE_PASSWORD: &'static str = + "listener.name.internal.ssl.truststore.password"; + const INTER_SSL_TRUSTSTORE_TYPE: &'static str = "listener.name.internal.ssl.truststore.type"; + const INTER_SSL_CLIENT_AUTH: &'static str = "listener.name.internal.ssl.client.auth"; + // directories + const STACKABLE_TLS_SERVER_MOUNT_DIR: &'static str = "/stackable/tls_server_mount"; + const STACKABLE_TLS_SERVER_DIR: &'static str = "/stackable/tls_server"; + const STACKABLE_TLS_INTERNAL_MOUNT_DIR: &'static str = "/stackable/tls_internal_mount"; + const STACKABLE_TLS_INTERNAL_DIR: &'static str = "/stackable/tls_internal"; + const SYSTEM_TRUST_STORE_DIR: &'static str = "/etc/pki/java/cacerts"; + + pub fn new( + resolved_authentication_classes: ResolvedAuthenticationClasses, + internal_secret_class: String, + server_secret_class: Option, + ) -> Self { + Self { + resolved_authentication_classes, + internal_secret_class, + server_secret_class, + } + } + + /// Create a `KafkaSecurity` struct from the Kafka custom resource and resolve + /// all provided `AuthenticationClass` references. + pub async fn new_from_kafka_cluster( + client: &Client, + kafka: &KafkaCluster, + ) -> Result { + Ok(KafkaTlsSecurity { + resolved_authentication_classes: + authentication::ResolvedAuthenticationClasses::from_references( + client, + &kafka.spec.cluster_config.authentication, + ) + .await + .context(InvalidAuthenticationClassConfigurationSnafu)?, + internal_secret_class: kafka + .spec + .cluster_config + .tls + .as_ref() + .map(|tls| tls.internal_secret_class.clone()) + .unwrap_or_else(tls::internal_tls_default), + server_secret_class: kafka + .spec + .cluster_config + .tls + .as_ref() + .and_then(|tls| tls.server_secret_class.clone()), + }) + } + + /// Check if TLS encryption is enabled. This could be due to: + /// - A provided server `SecretClass` + /// - A provided client `AuthenticationClass` + /// This affects init container commands, Kafka configuration, volume mounts and + /// the Kafka client port + pub fn tls_enabled(&self) -> bool { + // TODO: This must be adapted if other authentication methods are supported and require TLS + self.tls_client_authentication_class().is_some() || self.tls_server_secret_class().is_some() + } + + /// Retrieve an optional TLS secret class for external client -> server communications. + pub fn tls_server_secret_class(&self) -> Option<&str> { + self.server_secret_class.as_deref() + } + + /// Retrieve an optional TLS `AuthenticationClass`. + pub fn tls_client_authentication_class(&self) -> Option<&AuthenticationClass> { + self.resolved_authentication_classes + .get_tls_authentication_class() + } + + /// Retrieve the mandatory internal `SecretClass`. + pub fn tls_internal_secret_class(&self) -> Option<&str> { + if !self.internal_secret_class.is_empty() { + Some(self.internal_secret_class.as_str()) + } else { + None + } + } + + /// Return the Kafka (secure) client port depending on tls or authentication settings. + pub fn client_port(&self) -> u16 { + if self.tls_enabled() { + Self::SECURE_CLIENT_PORT + } else { + Self::CLIENT_PORT + } + } + + /// Return the Kafka (secure) client port name depending on tls or authentication settings. + pub fn client_port_name(&self) -> &str { + if self.tls_enabled() { + Self::SECURE_CLIENT_PORT_NAME + } else { + Self::CLIENT_PORT_NAME + } + } + + /// Return the Kafka (secure) internal port depending on tls settings. + pub fn internal_port(&self) -> u16 { + if self.tls_internal_secret_class().is_some() { + Self::SECURE_INTERNAL_PORT + } else { + Self::INTERNAL_PORT + } + } + + /// Returns required (init) container commands to generate keystores and truststores + /// depending on the tls and authentication settings. + pub fn prepare_container_command_args(&self) -> Vec { + let mut args = vec![]; + + if self.tls_client_authentication_class().is_some() { + args.extend(Self::create_key_and_trust_store( + Self::STACKABLE_TLS_SERVER_MOUNT_DIR, + Self::STACKABLE_TLS_SERVER_DIR, + "stackable-tls-client-auth-ca-cert", + Self::SSL_STORE_PASSWORD, + )); + } else if self.tls_server_secret_class().is_some() { + // Copy system truststore to stackable truststore + args.push(format!("keytool -importkeystore -srckeystore {system_trust_store_dir} -srcstoretype jks -srcstorepass {ssl_store_password} -destkeystore {stackable_tls_server_dir}/truststore.p12 -deststoretype pkcs12 -deststorepass {ssl_store_password} -noprompt", + system_trust_store_dir = Self::SYSTEM_TRUST_STORE_DIR, + ssl_store_password = Self::SSL_STORE_PASSWORD, + stackable_tls_server_dir = Self::STACKABLE_TLS_SERVER_DIR, + )); + args.extend(Self::create_key_and_trust_store( + Self::STACKABLE_TLS_SERVER_MOUNT_DIR, + Self::STACKABLE_TLS_SERVER_DIR, + "stackable-tls-server-ca-cert", + Self::SSL_STORE_PASSWORD, + )); + } + + if self.tls_internal_secret_class().is_some() { + args.extend(Self::create_key_and_trust_store( + Self::STACKABLE_TLS_INTERNAL_MOUNT_DIR, + Self::STACKABLE_TLS_INTERNAL_DIR, + "stackable-tls-internal-ca-cert", + Self::SSL_STORE_PASSWORD, + )); + } + + args + } + + /// Returns SVC container command to retrieve the node port service port. + pub fn svc_container_commands(&self) -> String { + let port_name = self.client_port_name(); + // Extract the nodeport from the nodeport service + format!("kubectl get service \"$POD_NAME\" -o jsonpath='{{.spec.ports[?(@.name==\"{port_name}\")].nodePort}}' | tee {STACKABLE_TMP_DIR}/{port_name}_nodeport") + } + + /// Returns the commands for the kcat readiness probe. + pub fn kcat_prober_container_commands(&self) -> Vec { + let mut args = vec!["/stackable/kcat".to_string()]; + let port = self.client_port(); + + if self.tls_client_authentication_class().is_some() { + args.push("-b".to_string()); + args.push(format!("localhost:{}", port)); + args.extend(Self::kcat_client_auth_ssl( + Self::STACKABLE_TLS_SERVER_MOUNT_DIR, + )); + } else if self.tls_server_secret_class().is_some() { + args.push("-b".to_string()); + args.push(format!("localhost:{}", port)); + args.extend(Self::kcat_client_ssl(Self::STACKABLE_TLS_SERVER_MOUNT_DIR)); + } else { + args.push("-b".to_string()); + args.push(format!("localhost:{}", port)); + } + + args.push("-L".to_string()); + args + } + + /// Returns the commands to start the main Kafka container + pub fn kafka_container_commands( + &self, + kafka_listeners: &KafkaListenerConfig, + opa_connect_string: Option<&str>, + ) -> Vec { + vec![ + "bin/kafka-server-start.sh".to_string(), + format!("/stackable/config/{}", SERVER_PROPERTIES_FILE), + "--override \"zookeeper.connect=$ZOOKEEPER\"".to_string(), + format!("--override \"listeners={}\"", kafka_listeners.listeners()), + format!( + "--override \"advertised.listeners={}\"", + kafka_listeners.advertised_listeners() + ), + format!( + "--override \"listener.security.protocol.map={}\"", + kafka_listeners.listener_security_protocol_map() + ), + opa_connect_string.map_or("".to_string(), |opa| { + format!("--override \"opa.authorizer.url={}\"", opa) + }), + ] + } + + /// Adds required volumes and volume mounts to the pod and container builders + /// depending on the tls and authentication settings. + pub fn add_volume_and_volume_mounts( + &self, + pod_builder: &mut PodBuilder, + cb_prepare: &mut ContainerBuilder, + cb_kcat_prober: &mut ContainerBuilder, + cb_kafka: &mut ContainerBuilder, + ) { + // add tls (server or client authentication volumes) if required + if let Some(tls_server_secret_class) = self.get_tls_secret_class() { + cb_prepare.add_volume_mount("server-tls-mount", Self::STACKABLE_TLS_SERVER_MOUNT_DIR); + // kcat requires pem files and not keystores + cb_kcat_prober + .add_volume_mount("server-tls-mount", Self::STACKABLE_TLS_SERVER_MOUNT_DIR); + cb_kafka.add_volume_mount("server-tls-mount", Self::STACKABLE_TLS_SERVER_MOUNT_DIR); + pod_builder.add_volume(Self::create_tls_volume( + "server-tls-mount", + tls_server_secret_class, + )); + + // empty mount for trust and keystore + cb_prepare.add_volume_mount("server-tls", Self::STACKABLE_TLS_SERVER_DIR); + cb_kafka.add_volume_mount("server-tls", Self::STACKABLE_TLS_SERVER_DIR); + pod_builder.add_volume( + VolumeBuilder::new("server-tls") + .with_empty_dir(Some(""), None) + .build(), + ); + } + + if let Some(tls_internal_secret_class) = self.tls_internal_secret_class() { + cb_prepare + .add_volume_mount("internal-tls-mount", Self::STACKABLE_TLS_INTERNAL_MOUNT_DIR); + cb_kafka.add_volume_mount("internal-tls-mount", Self::STACKABLE_TLS_INTERNAL_MOUNT_DIR); + pod_builder.add_volume(Self::create_tls_volume( + "internal-tls-mount", + tls_internal_secret_class, + )); + + // empty mount for trust and keystore + cb_prepare.add_volume_mount("internal-tls", Self::STACKABLE_TLS_INTERNAL_DIR); + cb_kafka.add_volume_mount("internal-tls", Self::STACKABLE_TLS_INTERNAL_DIR); + pod_builder.add_volume( + VolumeBuilder::new("internal-tls") + .with_empty_dir(Some(""), None) + .build(), + ); + } + } + + /// Returns required Kafka configuration settings for the `server.properties` file + /// depending on the tls and authentication settings. + pub fn config_settings(&self) -> BTreeMap { + let mut config = BTreeMap::new(); + + // We set either client tls with authentication or client tls without authentication + // If authentication is explicitly required we do not want to have any other CAs to + // be trusted. + if self.tls_client_authentication_class().is_some() { + config.insert( + Self::CLIENT_AUTH_SSL_KEYSTORE_LOCATION.to_string(), + format!("{}/keystore.p12", Self::STACKABLE_TLS_SERVER_DIR), + ); + config.insert( + Self::CLIENT_AUTH_SSL_KEYSTORE_PASSWORD.to_string(), + Self::SSL_STORE_PASSWORD.to_string(), + ); + config.insert( + Self::CLIENT_AUTH_SSL_KEYSTORE_TYPE.to_string(), + "PKCS12".to_string(), + ); + config.insert( + Self::CLIENT_AUTH_SSL_TRUSTSTORE_LOCATION.to_string(), + format!("{}/truststore.p12", Self::STACKABLE_TLS_SERVER_DIR), + ); + config.insert( + Self::CLIENT_AUTH_SSL_TRUSTSTORE_PASSWORD.to_string(), + Self::SSL_STORE_PASSWORD.to_string(), + ); + config.insert( + Self::CLIENT_AUTH_SSL_TRUSTSTORE_TYPE.to_string(), + "PKCS12".to_string(), + ); + // client auth required + config.insert( + Self::CLIENT_AUTH_SSL_CLIENT_AUTH.to_string(), + "required".to_string(), + ); + } else if self.tls_server_secret_class().is_some() { + config.insert( + Self::CLIENT_SSL_KEYSTORE_LOCATION.to_string(), + format!("{}/keystore.p12", Self::STACKABLE_TLS_SERVER_DIR), + ); + config.insert( + Self::CLIENT_SSL_KEYSTORE_PASSWORD.to_string(), + Self::SSL_STORE_PASSWORD.to_string(), + ); + config.insert( + Self::CLIENT_SSL_KEYSTORE_TYPE.to_string(), + "PKCS12".to_string(), + ); + config.insert( + Self::CLIENT_SSL_TRUSTSTORE_LOCATION.to_string(), + format!("{}/truststore.p12", Self::STACKABLE_TLS_SERVER_DIR), + ); + config.insert( + Self::CLIENT_SSL_TRUSTSTORE_PASSWORD.to_string(), + Self::SSL_STORE_PASSWORD.to_string(), + ); + config.insert( + Self::CLIENT_SSL_TRUSTSTORE_TYPE.to_string(), + "PKCS12".to_string(), + ); + } + + // Internal TLS + if self.tls_internal_secret_class().is_some() { + config.insert( + Self::INTER_SSL_KEYSTORE_LOCATION.to_string(), + format!("{}/keystore.p12", Self::STACKABLE_TLS_INTERNAL_DIR), + ); + config.insert( + Self::INTER_SSL_KEYSTORE_PASSWORD.to_string(), + Self::SSL_STORE_PASSWORD.to_string(), + ); + config.insert( + Self::INTER_SSL_KEYSTORE_TYPE.to_string(), + "PKCS12".to_string(), + ); + config.insert( + Self::INTER_SSL_TRUSTSTORE_LOCATION.to_string(), + format!("{}/truststore.p12", Self::STACKABLE_TLS_INTERNAL_DIR), + ); + config.insert( + Self::INTER_SSL_TRUSTSTORE_PASSWORD.to_string(), + Self::SSL_STORE_PASSWORD.to_string(), + ); + config.insert( + Self::INTER_SSL_TRUSTSTORE_TYPE.to_string(), + "PKCS12".to_string(), + ); + config.insert( + Self::INTER_SSL_CLIENT_AUTH.to_string(), + "required".to_string(), + ); + } + + // common + config.insert( + Self::INTER_BROKER_LISTENER_NAME.to_string(), + listener::KafkaListenerName::Internal.to_string(), + ); + + config + } + + /// Returns the `SecretClass` provided in a `AuthenticationClass` for TLS. + fn get_tls_secret_class(&self) -> Option<&String> { + self.resolved_authentication_classes + .get_tls_authentication_class() + .and_then(|auth_class| match &auth_class.spec.provider { + AuthenticationClassProvider::Tls(tls) => tls.client_cert_secret_class.as_ref(), + _ => None, + }) + .or(self.server_secret_class.as_ref()) + } + + /// Creates ephemeral volumes to mount the `SecretClass` into the Pods + fn create_tls_volume(volume_name: &str, secret_class_name: &str) -> Volume { + VolumeBuilder::new(volume_name) + .ephemeral( + SecretOperatorVolumeSourceBuilder::new(secret_class_name) + .with_pod_scope() + .with_node_scope() + .build(), + ) + .build() + } + + /// Generates the shell script to create key and trust stores from the certificates provided + /// by the secret operator. + fn create_key_and_trust_store( + mount_directory: &str, + store_directory: &str, + alias_name: &str, + store_password: &str, + ) -> Vec { + vec![ + format!("echo [{store_directory}] Cleaning up truststore - just in case"), + format!("rm -f {store_directory}/truststore.p12"), + format!("echo [{store_directory}] Creating truststore"), + format!("keytool -importcert -file {mount_directory}/ca.crt -keystore {store_directory}/truststore.p12 -storetype pkcs12 -noprompt -alias {alias_name} -storepass {store_password}"), + format!("echo [{store_directory}] Creating certificate chain"), + format!("cat {mount_directory}/ca.crt {mount_directory}/tls.crt > {store_directory}/chain.crt"), + format!("echo [{store_directory}] Creating keystore"), + format!("openssl pkcs12 -export -in {store_directory}/chain.crt -inkey {mount_directory}/tls.key -out {store_directory}/keystore.p12 --passout pass:{store_password}"), + ] + } + + fn kcat_client_auth_ssl(cert_directory: &str) -> Vec { + vec![ + "-X".to_string(), + "security.protocol=SSL".to_string(), + "-X".to_string(), + format!("ssl.key.location={cert_directory}/tls.key"), + "-X".to_string(), + format!("ssl.certificate.location={cert_directory}/tls.crt"), + "-X".to_string(), + format!("ssl.ca.location={cert_directory}/ca.crt"), + ] + } + + fn kcat_client_ssl(cert_directory: &str) -> Vec { + vec![ + "-X".to_string(), + "security.protocol=SSL".to_string(), + "-X".to_string(), + format!("ssl.ca.location={cert_directory}/ca.crt"), + ] + } +} diff --git a/rust/crd/src/tls.rs b/rust/crd/src/tls.rs new file mode 100644 index 00000000..c140d081 --- /dev/null +++ b/rust/crd/src/tls.rs @@ -0,0 +1,45 @@ +use serde::{Deserialize, Serialize}; +use stackable_operator::schemars::{self, JsonSchema}; + +const TLS_DEFAULT_SECRET_CLASS: &str = "tls"; + +#[derive(Clone, Deserialize, Debug, Eq, JsonSchema, PartialEq, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct KafkaTls { + /// The to use for + /// internal broker communication. Use mutual verification between brokers (mandatory). + /// This setting controls: + /// - Which cert the brokers should use to authenticate themselves against other brokers + /// - Which ca.crt to use when validating the other brokers + /// Defaults to `tls` + #[serde(default = "internal_tls_default")] + pub internal_secret_class: String, + /// The to use for + /// client connections. This setting controls: + /// - If TLS encryption is used at all + /// - Which cert the servers should use to authenticate themselves against the client + /// Defaults to `tls`. + #[serde( + default = "server_tls_default", + skip_serializing_if = "Option::is_none" + )] + pub server_secret_class: Option, +} + +/// Default TLS settings. Internal and server communication default to "tls" secret class. +pub fn default_kafka_tls() -> Option { + Some(KafkaTls { + internal_secret_class: internal_tls_default(), + server_secret_class: server_tls_default(), + }) +} + +/// Helper methods to provide defaults in the CRDs and tests +pub fn internal_tls_default() -> String { + TLS_DEFAULT_SECRET_CLASS.into() +} + +/// Helper methods to provide defaults in the CRDs and tests +pub fn server_tls_default() -> Option { + Some(TLS_DEFAULT_SECRET_CLASS.into()) +} diff --git a/rust/operator/src/command.rs b/rust/operator/src/command.rs deleted file mode 100644 index 4f70a134..00000000 --- a/rust/operator/src/command.rs +++ /dev/null @@ -1,98 +0,0 @@ -use stackable_kafka_crd::{ - KafkaCluster, CLIENT_PORT, SECURE_CLIENT_PORT, SSL_STORE_PASSWORD, - STACKABLE_TLS_CLIENT_AUTH_DIR, STACKABLE_TLS_CLIENT_DIR, STACKABLE_TLS_INTERNAL_DIR, - STACKABLE_TMP_DIR, SYSTEM_TRUST_STORE_DIR, -}; - -pub fn prepare_container_cmd_args(kafka: &KafkaCluster) -> String { - let mut args = vec![]; - - if kafka.client_authentication_class().is_some() { - args.extend(create_key_and_trust_store( - STACKABLE_TLS_CLIENT_AUTH_DIR, - "stackable-tls-client-auth-ca-cert", - )); - } else if kafka.client_tls_secret_class().is_some() { - // Copy system truststore to stackable truststore - args.push(format!("keytool -importkeystore -srckeystore {SYSTEM_TRUST_STORE_DIR} -srcstoretype jks -srcstorepass {SSL_STORE_PASSWORD} -destkeystore {STACKABLE_TLS_CLIENT_DIR}/truststore.p12 -deststoretype pkcs12 -deststorepass {SSL_STORE_PASSWORD} -noprompt")); - args.extend(create_key_and_trust_store( - STACKABLE_TLS_CLIENT_DIR, - "stackable-tls-client-ca-cert", - )); - } - - if kafka.internal_tls_secret_class().is_some() { - args.extend(create_key_and_trust_store( - STACKABLE_TLS_INTERNAL_DIR, - "stackable-tls-internal-ca-cert", - )); - } - - args.join(" && ") -} - -pub fn get_svc_container_cmd_args(kafka: &KafkaCluster) -> String { - get_node_port(STACKABLE_TMP_DIR, kafka.client_port_name()) -} - -pub fn kcat_container_cmd_args(kafka: &KafkaCluster) -> Vec { - let mut args = vec!["/stackable/kcat".to_string()]; - - if kafka.client_authentication_class().is_some() { - args.push("-b".to_string()); - args.push(format!("localhost:{}", SECURE_CLIENT_PORT)); - args.extend(kcat_client_auth_ssl(STACKABLE_TLS_CLIENT_AUTH_DIR)); - } else if kafka.client_tls_secret_class().is_some() { - args.push("-b".to_string()); - args.push(format!("localhost:{}", SECURE_CLIENT_PORT)); - args.extend(kcat_client_ssl(STACKABLE_TLS_CLIENT_DIR)); - } else { - args.push("-b".to_string()); - args.push(format!("localhost:{}", CLIENT_PORT)); - } - - args.push("-L".to_string()); - args -} - -fn kcat_client_auth_ssl(cert_directory: &str) -> Vec { - vec![ - "-X".to_string(), - "security.protocol=SSL".to_string(), - "-X".to_string(), - format!("ssl.key.location={cert_directory}/tls.key"), - "-X".to_string(), - format!("ssl.certificate.location={cert_directory}/tls.crt"), - "-X".to_string(), - format!("ssl.ca.location={cert_directory}/ca.crt"), - ] -} - -fn kcat_client_ssl(cert_directory: &str) -> Vec { - vec![ - "-X".to_string(), - "security.protocol=SSL".to_string(), - "-X".to_string(), - format!("ssl.ca.location={cert_directory}/ca.crt"), - ] -} - -/// Generates the shell script to create key and truststores from the certificates provided -/// by the secret operator. -fn create_key_and_trust_store(directory: &str, alias_name: &str) -> Vec { - vec![ - format!("echo [{dir}] Creating truststore", dir = directory), - format!("keytool -importcert -file {dir}/ca.crt -keystore {dir}/truststore.p12 -storetype pkcs12 -noprompt -alias {alias} -storepass {password}", - dir = directory, alias = alias_name, password = SSL_STORE_PASSWORD), - format!("echo [{dir}] Creating certificate chain", dir = directory), - format!("cat {dir}/ca.crt {dir}/tls.crt > {dir}/chain.crt", dir = directory), - format!("echo [{dir}] Creating keystore", dir = directory), - format!("openssl pkcs12 -export -in {dir}/chain.crt -inkey {dir}/tls.key -out {dir}/keystore.p12 --passout pass:{password}", - dir = directory, password = SSL_STORE_PASSWORD), - ] -} - -/// Extract the nodeport from the nodeport service -fn get_node_port(directory: &str, port_name: &str) -> String { - format!("kubectl get service \"$POD_NAME\" -o jsonpath='{{.spec.ports[?(@.name==\"{name}\")].nodePort}}' | tee {dir}/{name}_nodeport", dir = directory, name = port_name) -} diff --git a/rust/operator/src/discovery.rs b/rust/operator/src/discovery.rs index c2d27132..9ff17cbc 100644 --- a/rust/operator/src/discovery.rs +++ b/rust/operator/src/discovery.rs @@ -2,7 +2,7 @@ use crate::utils::build_recommended_labels; use crate::KAFKA_CONTROLLER_NAME; use snafu::{OptionExt, ResultExt, Snafu}; -use stackable_kafka_crd::{KafkaCluster, KafkaRole}; +use stackable_kafka_crd::{security::KafkaTlsSecurity, KafkaCluster, KafkaRole}; use stackable_operator::{ builder::{ConfigMapBuilder, ObjectMetaBuilder}, commons::product_image_selection::ResolvedProductImage, @@ -45,10 +45,11 @@ pub async fn build_discovery_configmaps( owner: &impl Resource, resolved_product_image: &ResolvedProductImage, client: &stackable_operator::client::Client, + kafka_security: &KafkaTlsSecurity, svc: &Service, ) -> Result, Error> { let name = owner.name_unchecked(); - let port_name = kafka.client_port_name(); + let port_name = kafka_security.client_port_name(); Ok(vec![ build_discovery_configmap( kafka, diff --git a/rust/operator/src/kafka_controller.rs b/rust/operator/src/kafka_controller.rs index 5267dbfb..53101509 100644 --- a/rust/operator/src/kafka_controller.rs +++ b/rust/operator/src/kafka_controller.rs @@ -1,25 +1,19 @@ //! Ensures that `Pod`s are configured and running for each [`KafkaCluster`] use snafu::{OptionExt, ResultExt, Snafu}; +use stackable_kafka_crd::security::KafkaTlsSecurity; use stackable_kafka_crd::{ - listener::get_kafka_listener_config, KafkaCluster, KafkaConfig, KafkaRole, TlsSecretClass, - APP_NAME, CLIENT_PORT, CLIENT_PORT_NAME, DOCKER_IMAGE_BASE_NAME, KAFKA_HEAP_OPTS, - LOG_DIRS_VOLUME_NAME, METRICS_PORT, METRICS_PORT_NAME, OPERATOR_NAME, SECURE_CLIENT_PORT, - SECURE_CLIENT_PORT_NAME, SERVER_PROPERTIES_FILE, STACKABLE_CONFIG_DIR, STACKABLE_DATA_DIR, - STACKABLE_TLS_CLIENT_AUTH_DIR, STACKABLE_TLS_CLIENT_DIR, STACKABLE_TLS_INTERNAL_DIR, - STACKABLE_TMP_DIR, TLS_DEFAULT_SECRET_CLASS, + listener::get_kafka_listener_config, KafkaCluster, KafkaConfig, KafkaRole, APP_NAME, + DOCKER_IMAGE_BASE_NAME, KAFKA_HEAP_OPTS, LOG_DIRS_VOLUME_NAME, METRICS_PORT, METRICS_PORT_NAME, + OPERATOR_NAME, SERVER_PROPERTIES_FILE, STACKABLE_CONFIG_DIR, STACKABLE_DATA_DIR, + STACKABLE_TMP_DIR, }; use stackable_operator::{ - builder::{ - ConfigMapBuilder, ContainerBuilder, ObjectMetaBuilder, PodBuilder, - SecretOperatorVolumeSourceBuilder, VolumeBuilder, - }, + builder::{ConfigMapBuilder, ContainerBuilder, ObjectMetaBuilder, PodBuilder}, cluster_resources::ClusterResources, commons::{ - authentication::{AuthenticationClass, AuthenticationClassProvider}, - opa::OpaApiVersion, + authentication::AuthenticationClass, opa::OpaApiVersion, product_image_selection::ResolvedProductImage, - tls::TlsAuthenticationProvider, }, config::fragment::ValidationError, k8s_openapi::{ @@ -57,7 +51,6 @@ use std::{ use strum::{EnumDiscriminants, IntoStaticStr}; use crate::{ - command::{self, get_svc_container_cmd_args, kcat_container_cmd_args}, discovery::{self, build_discovery_configmaps}, pod_svc_controller, utils::{self, build_recommended_labels, ObjectRefExt}, @@ -166,14 +159,14 @@ pub enum Error { authentication_class: ObjectRef, }, #[snafu(display( - "failed to use authentication mechanism {} - supported methods: {:?}", - method, + "failed to use authentication provider {} - supported methods: {:?}", + provider, supported ))] - AuthenticationMethodNotSupported { + AuthenticationProviderNotSupported { authentication_class: ObjectRef, supported: Vec, - method: String, + provider: String, }, #[snafu(display("invalid kafka listeners"))] InvalidKafkaListeners { @@ -188,6 +181,10 @@ pub enum Error { DeleteOrphans { source: stackable_operator::error::Error, }, + #[snafu(display("failed to initialize security context"))] + FailedToInitializeSecurityContext { + source: stackable_kafka_crd::security::Error, + }, } type Result = std::result::Result; @@ -224,13 +221,14 @@ impl ReconcilerError for Error { authentication_class, .. } => Some(authentication_class.clone().erase()), - Error::AuthenticationMethodNotSupported { + Error::AuthenticationProviderNotSupported { authentication_class, .. } => Some(authentication_class.clone().erase()), Error::InvalidKafkaListeners { .. } => None, Error::InvalidContainerName { .. } => None, Error::DeleteOrphans { .. } => None, + Error::FailedToInitializeSecurityContext { .. } => None, } } } @@ -276,22 +274,13 @@ pub async fn reconcile_kafka(kafka: Arc, ctx: Arc) -> Result< .map(Cow::Borrowed) .unwrap_or_default(); - let client_authentication_class = if let Some(auth_class) = kafka.client_authentication_class() - { - Some( - AuthenticationClass::resolve(client, auth_class) - .await - .context(AuthenticationClassRetrievalSnafu { - authentication_class: ObjectRef::::new(auth_class), - })?, - ) - } else { - None - }; + let kafka_security = KafkaTlsSecurity::new_from_kafka_cluster(client, &kafka) + .await + .context(FailedToInitializeSecurityContextSnafu)?; // Assemble the OPA connection string from the discovery and the given path if provided // Will be passed as --override parameter in the cli in the state ful set - let opa_connect = if let Some(opa_spec) = &kafka.spec.opa { + let opa_connect = if let Some(opa_spec) = &kafka.spec.cluster_config.authorization.opa { Some( opa_spec .full_document_url_from_config_map( @@ -307,7 +296,8 @@ pub async fn reconcile_kafka(kafka: Arc, ctx: Arc) -> Result< None }; - let broker_role_service = build_broker_role_service(&kafka, &resolved_product_image)?; + let broker_role_service = + build_broker_role_service(&kafka, &resolved_product_image, &kafka_security)?; let (broker_role_serviceaccount, broker_role_rolebinding) = build_broker_role_serviceaccount(&kafka, &resolved_product_image, &ctx.controller_config)?; let broker_role_serviceaccount_ref = ObjectRef::from_obj(&broker_role_serviceaccount); @@ -338,11 +328,16 @@ pub async fn reconcile_kafka(kafka: Arc, ctx: Arc) -> Result< rolegroup: rolegroup_ref.clone(), })?; - let rg_service = - build_broker_rolegroup_service(&kafka, &resolved_product_image, &rolegroup_ref)?; + let rg_service = build_broker_rolegroup_service( + &kafka, + &resolved_product_image, + &kafka_security, + &rolegroup_ref, + )?; let rg_configmap = build_broker_rolegroup_config_map( &kafka, &resolved_product_image, + &kafka_security, &rolegroup_ref, rolegroup_config, )?; @@ -353,7 +348,7 @@ pub async fn reconcile_kafka(kafka: Arc, ctx: Arc) -> Result< rolegroup_config, &broker_role_serviceaccount_ref, opa_connect.as_deref(), - client_authentication_class.as_ref(), + &kafka_security, &rolegroup_typed_config, )?; cluster_resources @@ -381,6 +376,7 @@ pub async fn reconcile_kafka(kafka: Arc, ctx: Arc) -> Result< &*kafka, &resolved_product_image, client, + &kafka_security, &broker_role_service, ) .await @@ -405,6 +401,7 @@ pub async fn reconcile_kafka(kafka: Arc, ctx: Arc) -> Result< pub fn build_broker_role_service( kafka: &KafkaCluster, resolved_product_image: &ResolvedProductImage, + kafka_security: &KafkaTlsSecurity, ) -> Result { let role_name = KafkaRole::Broker.to_string(); let role_svc_name = kafka @@ -425,7 +422,7 @@ pub fn build_broker_role_service( )) .build(), spec: Some(ServiceSpec { - ports: Some(service_ports(kafka)), + ports: Some(service_ports(kafka_security)), selector: Some(role_selector_labels(kafka, APP_NAME, &role_name)), type_: Some("NodePort".to_string()), ..ServiceSpec::default() @@ -491,13 +488,17 @@ fn build_broker_role_serviceaccount( fn build_broker_rolegroup_config_map( kafka: &KafkaCluster, resolved_product_image: &ResolvedProductImage, + kafka_security: &KafkaTlsSecurity, rolegroup: &RoleGroupRef, broker_config: &HashMap>, ) -> Result { - let server_cfg = broker_config + let mut server_cfg = broker_config .get(&PropertyNameKind::File(SERVER_PROPERTIES_FILE.to_string())) .cloned() .unwrap_or_default(); + + server_cfg.extend(kafka_security.config_settings()); + let server_cfg = server_cfg .into_iter() .map(|(k, v)| (k, Some(v))) @@ -528,7 +529,12 @@ fn build_broker_rolegroup_config_map( ) .add_data( "log4j.properties", - kafka.spec.log4j.as_ref().unwrap_or(&"".to_string()), + kafka + .spec + .cluster_config + .log4j + .as_deref() + .unwrap_or_default(), ) .build() .with_context(|_| BuildRoleGroupConfigSnafu { @@ -542,6 +548,7 @@ fn build_broker_rolegroup_config_map( fn build_broker_rolegroup_service( kafka: &KafkaCluster, resolved_product_image: &ResolvedProductImage, + kafka_security: &KafkaTlsSecurity, rolegroup: &RoleGroupRef, ) -> Result { Ok(Service { @@ -561,7 +568,7 @@ fn build_broker_rolegroup_service( .build(), spec: Some(ServiceSpec { cluster_ip: Some("None".to_string()), - ports: Some(service_ports(kafka)), + ports: Some(service_ports(kafka_security)), selector: Some(role_group_selector_labels( kafka, APP_NAME, @@ -586,7 +593,7 @@ fn build_broker_rolegroup_statefulset( broker_config: &HashMap>, serviceaccount: &ObjectRef, opa_connect_string: Option<&str>, - client_authentication_class: Option<&AuthenticationClass>, + kafka_security: &KafkaTlsSecurity, rolegroup_typed_config: &KafkaConfig, ) -> Result { let mut cb_kafka = @@ -608,56 +615,13 @@ fn build_broker_rolegroup_statefulset( rolegroup: rolegroup_ref.clone(), })?; - let get_svc_args = get_svc_container_cmd_args(kafka); - - // add client authentication volumes if required - if let Some(auth_class) = client_authentication_class { - match &auth_class.spec.provider { - AuthenticationClassProvider::Tls(TlsAuthenticationProvider { - client_cert_secret_class: Some(secret_class), - }) => { - cb_prepare.add_volume_mount( - "client-tls-authentication-certificate", - STACKABLE_TLS_CLIENT_AUTH_DIR, - ); - cb_kafka.add_volume_mount( - "client-tls-authentication-certificate", - STACKABLE_TLS_CLIENT_AUTH_DIR, - ); - cb_kcat_prober.add_volume_mount( - "client-tls-authentication-certificate", - STACKABLE_TLS_CLIENT_AUTH_DIR, - ); - pod_builder.add_volume(create_tls_volume( - "client-tls-authentication-certificate", - Some(&TlsSecretClass { - secret_class: secret_class.clone(), - }), - )); - } - _ => { - return Err(Error::AuthenticationMethodNotSupported { - authentication_class: ObjectRef::from_obj(auth_class), - supported: vec!["tls".to_string()], - method: auth_class.spec.provider.to_string(), - }) - } - } - } else if let Some(tls) = kafka.client_tls_secret_class() { - cb_prepare.add_volume_mount("client-tls-certificate", STACKABLE_TLS_CLIENT_DIR); - cb_kafka.add_volume_mount("client-tls-certificate", STACKABLE_TLS_CLIENT_DIR); - cb_kcat_prober.add_volume_mount("client-tls-certificate", STACKABLE_TLS_CLIENT_DIR); - pod_builder.add_volume(create_tls_volume("client-tls-certificate", Some(tls))); - } - - if let Some(tls_internal) = kafka.internal_tls_secret_class() { - cb_prepare.add_volume_mount("internal-tls-certificate", STACKABLE_TLS_INTERNAL_DIR); - cb_kafka.add_volume_mount("internal-tls-certificate", STACKABLE_TLS_INTERNAL_DIR); - pod_builder.add_volume(create_tls_volume( - "internal-tls-certificate", - Some(tls_internal), - )); - } + // Add TLS related volumes and volume mounts + kafka_security.add_volume_and_volume_mounts( + &mut pod_builder, + &mut cb_prepare, + &mut cb_kcat_prober, + &mut cb_kafka, + ); let container_get_svc = ContainerBuilder::new("get-svc") .context(InvalidContainerNameSnafu { @@ -669,7 +633,7 @@ fn build_broker_rolegroup_statefulset( "-euo".to_string(), "pipefail".to_string(), "-c".to_string(), - get_svc_args, + kafka_security.svc_container_commands(), ]) .add_env_vars(vec![EnvVar { name: "POD_NAME".to_string(), @@ -693,7 +657,9 @@ fn build_broker_rolegroup_statefulset( "pipefail".to_string(), "-c".to_string(), ]) - .args(vec![command::prepare_container_cmd_args(kafka)]) + .args(vec![kafka_security + .prepare_container_command_args() + .join(" && ")]) .add_volume_mount(LOG_DIRS_VOLUME_NAME, STACKABLE_DATA_DIR) .add_volume_mount("tmp", STACKABLE_TMP_DIR); @@ -727,7 +693,7 @@ fn build_broker_rolegroup_statefulset( name: "ZOOKEEPER".to_string(), value_from: Some(EnvVarSource { config_map_key_ref: Some(ConfigMapKeySelector { - name: Some(kafka.spec.zookeeper_config_map_name.clone()), + name: Some(kafka.spec.cluster_config.zookeeper_config_map_name.clone()), key: "ZOOKEEPER".to_string(), ..ConfigMapKeySelector::default() }), @@ -760,7 +726,7 @@ fn build_broker_rolegroup_statefulset( }); // add env var for log4j if set - if kafka.spec.log4j.is_some() { + if kafka.spec.cluster_config.log4j.is_some() { env.push(EnvVar { name: "KAFKA_LOG4J_OPTS".to_string(), value: Some( @@ -771,42 +737,19 @@ fn build_broker_rolegroup_statefulset( } let jvm_args = format!("-javaagent:/stackable/jmx/jmx_prometheus_javaagent-0.16.1.jar={}:/stackable/jmx/broker.yaml", METRICS_PORT); - let zookeeper_override = "--override \"zookeeper.connect=$ZOOKEEPER\""; - - let kafka_listeners = get_kafka_listener_config(kafka, &rolegroup_ref.object_name()) - .context(InvalidKafkaListenersSnafu)?; - let listeners_override = format!("--override \"listeners={}\"", kafka_listeners.listeners()); - let advertised_listeners_override = format!( - "--override \"advertised.listeners={}\"", - kafka_listeners.advertised_listeners() - ); - let listener_security_protocol_map_override = format!( - "--override \"listener.security.protocol.map={}\"", - kafka_listeners.listener_security_protocol_map() - ); - let opa_url_override = opa_connect_string.map_or("".to_string(), |opa| { - format!("--override \"opa.authorizer.url={}\"", opa) - }); + let kafka_listeners = + get_kafka_listener_config(kafka, kafka_security, &rolegroup_ref.object_name()) + .context(InvalidKafkaListenersSnafu)?; cb_kafka .image_from_product_image(resolved_product_image) - .args(vec![ - "sh".to_string(), - "-c".to_string(), - [ - "bin/kafka-server-start.sh", - &format!("/stackable/config/{}", SERVER_PROPERTIES_FILE), - zookeeper_override, - &listeners_override, - &advertised_listeners_override, - &listener_security_protocol_map_override, - &opa_url_override, - ] - .join(" "), - ]) + .command(vec!["sh".to_string(), "-c".to_string()]) + .args(vec![kafka_security + .kafka_container_commands(&kafka_listeners, opa_connect_string) + .join(" ")]) .add_env_vars(env) .add_env_var("EXTRA_ARGS", jvm_args) - .add_container_ports(container_ports(kafka)) + .add_container_ports(container_ports(kafka_security)) .add_volume_mount(LOG_DIRS_VOLUME_NAME, STACKABLE_DATA_DIR) .add_volume_mount("config", STACKABLE_CONFIG_DIR) .add_volume_mount("tmp", STACKABLE_TMP_DIR) @@ -822,7 +765,7 @@ fn build_broker_rolegroup_statefulset( .readiness_probe(Probe { exec: Some(ExecAction { // If the broker is able to get its fellow cluster members then it has at least completed basic registration at some point - command: Some(kcat_container_cmd_args(kafka)), + command: Some(kafka_security.kcat_prober_container_commands()), }), timeout_seconds: Some(5), period_seconds: Some(2), @@ -919,72 +862,37 @@ pub fn error_policy(_obj: Arc, _error: &Error, _ctx: Arc) -> } /// We only expose client HTTP / HTTPS and Metrics ports. -fn service_ports(kafka: &KafkaCluster) -> Vec { - let mut ports = vec![ServicePort { - name: Some(METRICS_PORT_NAME.to_string()), - port: METRICS_PORT.into(), - protocol: Some("TCP".to_string()), - ..ServicePort::default() - }]; - - if kafka.client_tls_secret_class().is_some() || kafka.client_authentication_class().is_some() { - ports.push(ServicePort { - name: Some(SECURE_CLIENT_PORT_NAME.to_string()), - port: SECURE_CLIENT_PORT.into(), +fn service_ports(kafka_security: &KafkaTlsSecurity) -> Vec { + vec![ + ServicePort { + name: Some(METRICS_PORT_NAME.to_string()), + port: METRICS_PORT.into(), protocol: Some("TCP".to_string()), ..ServicePort::default() - }); - } else { - ports.push(ServicePort { - name: Some(CLIENT_PORT_NAME.to_string()), - port: CLIENT_PORT.into(), + }, + ServicePort { + name: Some(kafka_security.client_port_name().to_string()), + port: kafka_security.client_port().into(), protocol: Some("TCP".to_string()), ..ServicePort::default() - }); - } - - ports + }, + ] } /// We only expose client HTTP / HTTPS and Metrics ports. -fn container_ports(kafka: &KafkaCluster) -> Vec { - let mut ports = vec![ContainerPort { - name: Some(METRICS_PORT_NAME.to_string()), - container_port: METRICS_PORT.into(), - protocol: Some("TCP".to_string()), - ..ContainerPort::default() - }]; - - if kafka.client_tls_secret_class().is_some() || kafka.client_authentication_class().is_some() { - ports.push(ContainerPort { - name: Some(SECURE_CLIENT_PORT_NAME.to_string()), - container_port: SECURE_CLIENT_PORT.into(), +fn container_ports(kafka_security: &KafkaTlsSecurity) -> Vec { + vec![ + ContainerPort { + name: Some(METRICS_PORT_NAME.to_string()), + container_port: METRICS_PORT.into(), protocol: Some("TCP".to_string()), ..ContainerPort::default() - }); - } else { - ports.push(ContainerPort { - name: Some(CLIENT_PORT_NAME.to_string()), - container_port: CLIENT_PORT.into(), + }, + ContainerPort { + name: Some(kafka_security.client_port_name().to_string()), + container_port: kafka_security.client_port().into(), protocol: Some("TCP".to_string()), ..ContainerPort::default() - }); - } - - ports -} - -fn create_tls_volume(volume_name: &str, tls_secret_class: Option<&TlsSecretClass>) -> Volume { - let secret_class_name = tls_secret_class - .map(|t| t.secret_class.as_ref()) - .unwrap_or(TLS_DEFAULT_SECRET_CLASS); - - VolumeBuilder::new(volume_name) - .ephemeral( - SecretOperatorVolumeSourceBuilder::new(secret_class_name) - .with_pod_scope() - .with_node_scope() - .build(), - ) - .build() + }, + ] } diff --git a/rust/operator/src/lib.rs b/rust/operator/src/lib.rs index f7e9a680..10b49a65 100644 --- a/rust/operator/src/lib.rs +++ b/rust/operator/src/lib.rs @@ -1,4 +1,3 @@ -mod command; mod discovery; mod kafka_controller; mod pod_svc_controller; diff --git a/tests/templates/kuttl/configuration/00-install-zk.yaml.j2 b/tests/templates/kuttl/configuration/00-install-zk.yaml.j2 index 97d9f72a..9343ee4c 100644 --- a/tests/templates/kuttl/configuration/00-install-zk.yaml.j2 +++ b/tests/templates/kuttl/configuration/00-install-zk.yaml.j2 @@ -17,5 +17,3 @@ spec: roleGroups: default: replicas: 1 - config: - myidOffset: 10 diff --git a/tests/templates/kuttl/configuration/01-install-kafka.yaml.j2 b/tests/templates/kuttl/configuration/01-install-kafka.yaml.j2 index 4757104d..1ee26f90 100644 --- a/tests/templates/kuttl/configuration/01-install-kafka.yaml.j2 +++ b/tests/templates/kuttl/configuration/01-install-kafka.yaml.j2 @@ -13,7 +13,8 @@ spec: image: productVersion: "{{ test_scenario['values']['kafka-latest'].split('-stackable')[0] }}" stackableVersion: "{{ test_scenario['values']['kafka-latest'].split('-stackable')[1] }}" - zookeeperConfigMapName: kafka-zk + clusterConfig: + zookeeperConfigMapName: kafka-zk brokers: config: resources: diff --git a/tests/templates/kuttl/delete-rolegroup/01-install-kafka.yaml.j2 b/tests/templates/kuttl/delete-rolegroup/01-install-kafka.yaml.j2 index b2e38708..a9e4b00d 100644 --- a/tests/templates/kuttl/delete-rolegroup/01-install-kafka.yaml.j2 +++ b/tests/templates/kuttl/delete-rolegroup/01-install-kafka.yaml.j2 @@ -13,7 +13,8 @@ spec: image: productVersion: "{{ test_scenario['values']['kafka'].split('-stackable')[0] }}" stackableVersion: "{{ test_scenario['values']['kafka'].split('-stackable')[1] }}" - zookeeperConfigMapName: kafka-zk + clusterConfig: + zookeeperConfigMapName: kafka-zk brokers: roleGroups: default: diff --git a/tests/templates/kuttl/delete-rolegroup/02-delete-secondary.yaml.j2 b/tests/templates/kuttl/delete-rolegroup/02-delete-secondary.yaml.j2 index af04c30f..fe33ab0d 100644 --- a/tests/templates/kuttl/delete-rolegroup/02-delete-secondary.yaml.j2 +++ b/tests/templates/kuttl/delete-rolegroup/02-delete-secondary.yaml.j2 @@ -13,7 +13,8 @@ spec: image: productVersion: "{{ test_scenario['values']['kafka'].split('-stackable')[0] }}" stackableVersion: "{{ test_scenario['values']['kafka'].split('-stackable')[1] }}" - zookeeperConfigMapName: kafka-zk + clusterConfig: + zookeeperConfigMapName: kafka-zk brokers: roleGroups: default: diff --git a/tests/templates/kuttl/smoke/01-install-kafka.yaml.j2 b/tests/templates/kuttl/smoke/01-install-kafka.yaml.j2 index b988fa23..9fdab5a0 100644 --- a/tests/templates/kuttl/smoke/01-install-kafka.yaml.j2 +++ b/tests/templates/kuttl/smoke/01-install-kafka.yaml.j2 @@ -13,13 +13,13 @@ spec: image: productVersion: "{{ test_scenario['values']['kafka'].split('-stackable')[0] }}" stackableVersion: "{{ test_scenario['values']['kafka'].split('-stackable')[1] }}" - zookeeperConfigMapName: kafka-zk - config: -{% if test_scenario['values']['use-client-tls'] == 'true' %} + clusterConfig: + zookeeperConfigMapName: kafka-zk tls: - secretClass: tls +{% if test_scenario['values']['use-client-tls'] == 'true' %} + serverSecretClass: tls {% else %} - tls: null + serverSecretClass: null {% endif %} brokers: config: diff --git a/tests/templates/kuttl/tls/20-install-kafka.yaml.j2 b/tests/templates/kuttl/tls/20-install-kafka.yaml.j2 index 79c2de68..519d6130 100644 --- a/tests/templates/kuttl/tls/20-install-kafka.yaml.j2 +++ b/tests/templates/kuttl/tls/20-install-kafka.yaml.j2 @@ -30,21 +30,6 @@ spec: namespace: default autoGenerate: true {% endif %} -{% if test_scenario['values']['use-internal-tls'] == 'true' %} ---- -apiVersion: secrets.stackable.tech/v1alpha1 -kind: SecretClass -metadata: - name: test-kafka-internal-tls -spec: - backend: - autoTls: - ca: - secret: - name: secret-provisioner-test-kafka-internal-tls-ca - namespace: default - autoGenerate: true -{% endif %} --- apiVersion: kafka.stackable.tech/v1alpha1 kind: KafkaCluster @@ -54,25 +39,17 @@ spec: image: productVersion: "{{ test_scenario['values']['kafka'].split('-stackable')[0] }}" stackableVersion: "{{ test_scenario['values']['kafka'].split('-stackable')[1] }}" - zookeeperConfigMapName: test-kafka-znode - config: -{% if test_scenario['values']['use-client-tls'] == 'true' %} + clusterConfig: + zookeeperConfigMapName: test-kafka-znode tls: - secretClass: tls +{% if test_scenario['values']['use-client-tls'] == 'true' %} + serverSecretClass: tls {% else %} - tls: null + serverSecretClass: null {% endif %} {% if test_scenario['values']['use-client-auth-tls'] == 'true' %} - clientAuthentication: - authenticationClass: test-kafka-client-auth-tls -{% else %} - clientAuthentication: null -{% endif %} -{% if test_scenario['values']['use-internal-tls'] == 'true' %} - internalTls: - secretClass: test-kafka-internal-tls -{% else %} - internalTls: null + authentication: + - authenticationClass: test-kafka-client-auth-tls {% endif %} brokers: roleGroups: diff --git a/tests/templates/kuttl/tls/test_client_auth_tls.sh b/tests/templates/kuttl/tls/test_client_auth_tls.sh index eece2a5e..c09f0cfe 100755 --- a/tests/templates/kuttl/tls/test_client_auth_tls.sh +++ b/tests/templates/kuttl/tls/test_client_auth_tls.sh @@ -18,7 +18,7 @@ TOPIC=$(tr -dc A-Za-z0-9 /tmp/client.config +echo $'security.protocol=SSL\nssl.keystore.location=/stackable/tls_server/keystore.p12\nssl.keystore.password=changeit\nssl.truststore.location=/stackable/tls_server/truststore.p12\nssl.truststore.password=changeit' > /tmp/client.config if /stackable/kafka/bin/kafka-topics.sh --create --topic "$TOPIC" --bootstrap-server "$SERVER" --command-config /tmp/client.config then diff --git a/tests/templates/kuttl/tls/test_client_tls.sh b/tests/templates/kuttl/tls/test_client_tls.sh index cc675b0d..4acf34e9 100755 --- a/tests/templates/kuttl/tls/test_client_tls.sh +++ b/tests/templates/kuttl/tls/test_client_tls.sh @@ -18,7 +18,7 @@ TOPIC=$(tr -dc A-Za-z0-9 /tmp/client.config +echo $'security.protocol=SSL\nssl.truststore.location=/stackable/tls_server/truststore.p12\nssl.truststore.password=changeit' > /tmp/client.config if /stackable/kafka/bin/kafka-topics.sh --create --topic "$TOPIC" --bootstrap-server "$SERVER" --command-config /tmp/client.config then diff --git a/tests/templates/kuttl/upgrade/00-install-zk.yaml.j2 b/tests/templates/kuttl/upgrade/00-install-zk.yaml.j2 index a66c17e2..0b478989 100644 --- a/tests/templates/kuttl/upgrade/00-install-zk.yaml.j2 +++ b/tests/templates/kuttl/upgrade/00-install-zk.yaml.j2 @@ -15,5 +15,3 @@ spec: roleGroups: default: replicas: 1 - config: - myidOffset: 10 diff --git a/tests/templates/kuttl/upgrade/01-install-kafka.yaml.j2 b/tests/templates/kuttl/upgrade/01-install-kafka.yaml.j2 index b86e588d..ed26875d 100644 --- a/tests/templates/kuttl/upgrade/01-install-kafka.yaml.j2 +++ b/tests/templates/kuttl/upgrade/01-install-kafka.yaml.j2 @@ -35,17 +35,17 @@ spec: image: productVersion: "{{ test_scenario['values']['upgrade_old'].split('-stackable')[0] }}" stackableVersion: "{{ test_scenario['values']['upgrade_old'].split('-stackable')[1] }}" - zookeeperConfigMapName: kafka-zk - config: -{% if test_scenario['values']['use-client-tls'] == 'true' %} + clusterConfig: + zookeeperConfigMapName: kafka-zk tls: - secretClass: tls +{% if test_scenario['values']['use-client-tls'] == 'true' %} + serverSecretClass: tls {% else %} - tls: null + serverSecretClass: null {% endif %} {% if test_scenario['values']['use-client-auth-tls'] == 'true' %} - clientAuthentication: - authenticationClass: test-kafka-client-auth-tls + authentication: + - authenticationClass: test-kafka-client-auth-tls {% endif %} brokers: roleGroups: diff --git a/tests/test-definition.yaml b/tests/test-definition.yaml index a88fe069..c1002252 100644 --- a/tests/test-definition.yaml +++ b/tests/test-definition.yaml @@ -11,12 +11,12 @@ dimensions: - 3.3.1-stackable0.3.0 - name: zookeeper values: - - 3.6.3-stackable0.8.0 - - 3.7.0-stackable0.8.0 - - 3.8.0-stackable0.8.0 + - 3.6.3-stackable0.9.0 + - 3.7.0-stackable0.9.0 + - 3.8.0-stackable0.9.0 - name: zookeeper-latest values: - - 3.8.0-stackable0.8.0 + - 3.8.0-stackable0.9.0 - name: upgrade_old values: - 2.8.1-stackable0.7.0 @@ -33,10 +33,6 @@ dimensions: values: - "true" - "false" - - name: use-internal-tls - values: - - "true" - - "false" tests: - name: smoke dimensions: @@ -60,7 +56,6 @@ tests: - zookeeper-latest - use-client-tls - use-client-auth-tls - - use-internal-tls - name: delete-rolegroup dimensions: - kafka