diff --git a/modules/msk-cluster/cluster.tf b/modules/msk-cluster/cluster.tf index aa173c5..0dc5455 100644 --- a/modules/msk-cluster/cluster.tf +++ b/modules/msk-cluster/cluster.tf @@ -44,7 +44,6 @@ resource "aws_msk_configuration" "this" { # MSK Cluster ################################################### -# TODO: public access cidrs resource "aws_msk_cluster" "this" { cluster_name = var.name kafka_version = var.kafka_version @@ -55,10 +54,12 @@ resource "aws_msk_cluster" "this" { az_distribution = "DEFAULT" client_subnets = var.broker_subnets security_groups = concat( - module.security_group[*].id, + [module.security_group.id], var.broker_additional_security_groups ) + # TODO: `vpc_connectivity` + # TODO: public access cidrs connectivity_info { public_access { type = var.broker_public_access_enabled ? "SERVICE_PROVIDED_EIPS" : "DISABLED" @@ -67,19 +68,20 @@ resource "aws_msk_cluster" "this" { storage_info { ebs_storage_info { - volume_size = var.broker_volume_size + volume_size = var.broker_storage.volume_size dynamic "provisioned_throughput" { - for_each = var.broker_volume_provisioned_throughput_enabled ? ["go"] : [] + for_each = var.broker_storage.provisioned_throughput.enabled ? [var.broker_storage.provisioned_throughput] : [] content { - enabled = true - volume_throughput = var.broker_volume_provisioned_throughput + enabled = provisioned_throughput.value.enabled + volume_throughput = provisioned_throughput.value.throughput } } } } } + storage_mode = var.cluster_storage_mode configuration_info { arn = aws_msk_configuration.this.arn @@ -87,20 +89,20 @@ resource "aws_msk_cluster" "this" { } - ## Auth + ## Authentiation client_authentication { - unauthenticated = var.auth_unauthenticated_access_enabled + unauthenticated = var.authentication.unauthenticated_access.enabled sasl { - iam = var.auth_sasl_iam_enabled - scram = var.auth_sasl_scram_enabled + iam = var.authentication.sasl_iam.enabled + scram = var.authentication.sasl_scram.enabled } dynamic "tls" { - for_each = var.auth_tls_enabled ? ["go"] : [] + for_each = var.authentication.tls.enabled ? [var.authentication.tls] : [] content { - certificate_authority_arns = var.auth_tls_acm_ca_arns + certificate_authority_arns = tls.value.acm_private_certificate_authorities } } } @@ -108,11 +110,11 @@ resource "aws_msk_cluster" "this" { ## Encryption encryption_info { - encryption_at_rest_kms_key_arn = var.encryption_at_rest_kms_key + encryption_at_rest_kms_key_arn = var.encryption_at_rest.kms_key encryption_in_transit { - in_cluster = var.encryption_in_transit_in_cluster_enabled - client_broker = var.encryption_in_transit_client_mode + in_cluster = var.encryption_in_transit.in_cluster_enabled + client_broker = var.encryption_in_transit.client_mode } } @@ -121,33 +123,33 @@ resource "aws_msk_cluster" "this" { logging_info { broker_logs { cloudwatch_logs { - enabled = var.logging_cloudwatch_enabled - log_group = var.logging_cloudwatch_log_group + enabled = var.logging.cloudwatch_logs.enabled + log_group = var.logging.cloudwatch_logs.log_group } firehose { - enabled = var.logging_firehose_enabled - delivery_stream = var.logging_firehose_delivery_stream + enabled = var.logging.firehose.enabled + delivery_stream = var.logging.firehose.delivery_stream } s3 { - enabled = var.logging_s3_enabled - bucket = var.logging_s3_bucket - prefix = var.logging_s3_prefix + enabled = var.logging.s3.enabled + bucket = var.logging.s3.bucket + prefix = var.logging.s3.key_prefix } } } ## Monitoring - enhanced_monitoring = var.monitoring_cloudwatch_level + enhanced_monitoring = var.cloudwatch_metrics.monitoring_level open_monitoring { prometheus { jmx_exporter { - enabled_in_broker = var.monitoring_prometheus_jmx_exporter_enabled + enabled_in_broker = var.prometheus.jmx_exporter_enabled } node_exporter { - enabled_in_broker = var.monitoring_prometheus_node_exporter_enabled + enabled_in_broker = var.prometheus.node_exporter_enabled } } } diff --git a/modules/msk-cluster/outputs.tf b/modules/msk-cluster/outputs.tf index 844179e..251d78b 100644 --- a/modules/msk-cluster/outputs.tf +++ b/modules/msk-cluster/outputs.tf @@ -46,8 +46,6 @@ output "broker" { `public_access_enabled` - Whether public access to MSK brokers is enabled. `security_groups` - A list of the security groups associated with the MSK cluster. - - `volume` - A EBS volume information for MSK brokers. EOF value = { size = aws_msk_cluster.this.number_of_broker_nodes @@ -56,19 +54,27 @@ output "broker" { subnets = aws_msk_cluster.this.broker_node_group_info[0].client_subnets public_access_enabled = var.broker_public_access_enabled security_groups = aws_msk_cluster.this.broker_node_group_info[0].security_groups - default_security_group_id = try(module.security_group[*].id[0], null) + default_security_group_id = module.security_group.id + } +} - volume = { - size = aws_msk_cluster.this.broker_node_group_info[0].storage_info[0].ebs_storage_info[0].volume_size - provisioned_throughput = { - enabled = try(aws_msk_cluster.this.broker_node_group_info[0].storage_info[0].ebs_storage_info[0].provisioned_throughput[0].enabled, false) - throughput = try(aws_msk_cluster.this.broker_node_group_info[0].storage_info[0].ebs_storage_info[0].provisioned_throughput[0].volume_throughput, null) - } +output "broker_storage" { + description = "The configuration for broker storage of the MSK cluster." + value = { + volume_size = aws_msk_cluster.this.broker_node_group_info[0].storage_info[0].ebs_storage_info[0].volume_size + provisioned_throughput = { + enabled = try(aws_msk_cluster.this.broker_node_group_info[0].storage_info[0].ebs_storage_info[0].provisioned_throughput[0].enabled, false) + throughput = try(aws_msk_cluster.this.broker_node_group_info[0].storage_info[0].ebs_storage_info[0].provisioned_throughput[0].volume_throughput, null) } } } -output "auth" { +output "cluster_storage_mode" { + description = "The storage mode of the MSK cluster." + value = aws_msk_cluster.this.storage_mode +} + +output "authentication" { description = "A configuration for authentication of the Kafka cluster." value = { unauthenticated_access = { @@ -80,31 +86,33 @@ output "auth" { } scram = { enabled = aws_msk_cluster.this.client_authentication[0].sasl[0].scram - kms_key = var.auth_sasl_scram_kms_key - users = var.auth_sasl_scram_users + kms_key = var.authentication.sasl_scram.kms_key + users = var.authentication.sasl_scram.users } } tls = { - enabled = var.auth_tls_enabled - acm_ca_arns = try(aws_msk_cluster.this.client_authentication[0].tls[0].certificate_authority_arns, []) + enabled = var.authentication.tls.enabled + acm_private_certificate_authorities = try(aws_msk_cluster.this.client_authentication[0].tls[0].certificate_authority_arns, []) } } } -output "encryption" { +output "encryption_at_rest" { description = < 0 ? 1 : 0 - - name = var.name + name = "msk-${var.name}" description = "Security group for MSK Cluster." vpc_id = local.vpc_id - ingress_rules = [ - { - id = "broker-plaintext/cidrs" - description = "Allow CIDRs to communicate with Kafka brokers in plaintext." - protocol = "tcp" - from_port = 9092 - to_port = 9092 - - cidr_blocks = var.broker_allowed_ingress_cidrs - }, - { - id = "broker-tls/cidrs" - description = "Allow CIDRs to communicate with Kafka brokers in tls." - protocol = "tcp" - from_port = 9094 - to_port = 9094 - - cidr_blocks = var.broker_allowed_ingress_cidrs - }, - { - id = "broker-sasl-scram/cidrs" - description = "Allow CIDRs to communicate with Kafka brokers in SASL SCRAM." - protocol = "tcp" - from_port = 9096 - to_port = 9096 - - cidr_blocks = var.broker_allowed_ingress_cidrs - }, - { - id = "broker-sasl-iam/cidrs" - description = "Allow CIDRs to communicate with Kafka brokers in SASL IAM." - protocol = "tcp" - from_port = 9098 - to_port = 9098 - - cidr_blocks = var.broker_allowed_ingress_cidrs - }, - { - id = "broker-public-tls/cidrs" - description = "Allow CIDRs to communicate with Kafka brokers in tls (public)." - protocol = "tcp" - from_port = 9194 - to_port = 9194 - - cidr_blocks = var.broker_allowed_ingress_cidrs - }, - { - id = "broker-public-sasl-scram/cidrs" - description = "Allow CIDRs to communicate with Kafka brokers in SASL SCRAM (public)." - protocol = "tcp" - from_port = 9196 - to_port = 9196 - - cidr_blocks = var.broker_allowed_ingress_cidrs - }, - { - id = "broker-public-sasl-iam/cidrs" - description = "Allow CIDRs to communicate with Kafka brokers in SASL IAM (public)." - protocol = "tcp" - from_port = 9198 - to_port = 9198 - - cidr_blocks = var.broker_allowed_ingress_cidrs - }, - { - id = "zookeeper/cidrs" - description = "Allow CIDRs to communicate with Kafka zookeepers." - protocol = "tcp" - from_port = 2181 - to_port = 2181 - - cidr_blocks = var.broker_allowed_ingress_cidrs - }, - { - id = "prometheus-jmx-exporter/cidrs" - description = "Allow CIDRs to communicate with Prometheus JMX Exporter." - protocol = "tcp" - from_port = 11001 - to_port = 11001 - - cidr_blocks = var.broker_allowed_ingress_cidrs - }, - { - id = "prometheus-node-exporter/cidrs" - description = "Allow CIDRs to communicate with Prometheus Node Exporter." - protocol = "tcp" - from_port = 11002 - to_port = 11002 - - cidr_blocks = var.broker_allowed_ingress_cidrs - }, - ] + ingress_rules = concat( + contains(["PLAINTEXT", "TLS_PLAINTEXT"], var.encryption_in_transit.client_mode) ? [ + { + id = "broker-plaintext/cidrs" + description = "Allow CIDRs to communicate with Kafka brokers in plaintext." + protocol = "tcp" + from_port = 9092 + to_port = 9092 + + ipv4_cidrs = var.broker_allowed_ingress_cidrs + }, + ] : [], + contains(["TLS", "TLS_PLAINTEXT"], var.encryption_in_transit.client_mode) ? [ + { + id = "broker-tls/cidrs" + description = "Allow CIDRs to communicate with Kafka brokers in tls." + protocol = "tcp" + from_port = 9094 + to_port = 9094 + + ipv4_cidrs = var.broker_allowed_ingress_cidrs + }, + ] : [], + var.authentication.sasl_scram.enabled ? [ + { + id = "broker-sasl-scram/cidrs" + description = "Allow CIDRs to communicate with Kafka brokers in SASL SCRAM." + protocol = "tcp" + from_port = 9096 + to_port = 9096 + + ipv4_cidrs = var.broker_allowed_ingress_cidrs + }, + ] : [], + var.authentication.sasl_iam.enabled ? [ + { + id = "broker-sasl-iam/cidrs" + description = "Allow CIDRs to communicate with Kafka brokers in SASL IAM." + protocol = "tcp" + from_port = 9098 + to_port = 9098 + + ipv4_cidrs = var.broker_allowed_ingress_cidrs + }, + ] : [], + [ + { + id = "broker-public-tls/cidrs" + description = "Allow CIDRs to communicate with Kafka brokers in tls (public)." + protocol = "tcp" + from_port = 9194 + to_port = 9194 + + ipv4_cidrs = var.broker_allowed_ingress_cidrs + }, + { + id = "broker-public-sasl-scram/cidrs" + description = "Allow CIDRs to communicate with Kafka brokers in SASL SCRAM (public)." + protocol = "tcp" + from_port = 9196 + to_port = 9196 + + ipv4_cidrs = var.broker_allowed_ingress_cidrs + }, + { + id = "broker-public-sasl-iam/cidrs" + description = "Allow CIDRs to communicate with Kafka brokers in SASL IAM (public)." + protocol = "tcp" + from_port = 9198 + to_port = 9198 + + ipv4_cidrs = var.broker_allowed_ingress_cidrs + }, + { + id = "zookeeper/cidrs" + description = "Allow CIDRs to communicate with Kafka zookeepers." + protocol = "tcp" + from_port = 2181 + to_port = 2181 + + ipv4_cidrs = var.broker_allowed_ingress_cidrs + }, + ], + var.prometheus.jmx_exporter_enabled ? [ + { + id = "prometheus-jmx-exporter/cidrs" + description = "Allow CIDRs to communicate with Prometheus JMX Exporter." + protocol = "tcp" + from_port = 11001 + to_port = 11001 + + ipv4_cidrs = var.prometheus.allowed_ingress_ipv4_cidrs + }, + ] : [], + var.prometheus.node_exporter_enabled ? [ + { + id = "prometheus-node-exporter/cidrs" + description = "Allow CIDRs to communicate with Prometheus Node Exporter." + protocol = "tcp" + from_port = 11002 + to_port = 11002 + + ipv4_cidrs = var.prometheus.allowed_ingress_ipv4_cidrs + }, + ] : [], + ) + revoke_rules_on_delete = true resource_group_enabled = false module_tags_enabled = false diff --git a/modules/msk-cluster/variables.tf b/modules/msk-cluster/variables.tf index 634de59..31c0506 100644 --- a/modules/msk-cluster/variables.tf +++ b/modules/msk-cluster/variables.tf @@ -1,12 +1,13 @@ variable "name" { description = "(Required) Name of the MSK cluster." type = string + nullable = false } variable "kafka_version" { description = "(Optional) Kafka version to use for the MSK cluster." type = string - default = "2.8.0" + default = "3.6.0" nullable = false } @@ -17,9 +18,12 @@ variable "kafka_server_properties" { nullable = false } +# TODO: naming +# TODO: validation variable "broker_size" { description = "(Required) The desired total number of broker nodes in the kafka cluster. It must be a multiple of the number of specified client subnets." type = number + nullable = false } variable "broker_instance_type" { @@ -29,37 +33,10 @@ variable "broker_instance_type" { nullable = false } -variable "broker_volume_size" { - description = "(Optional) The size in GiB of the EBS volume for the data drive on each broker node. Minimum value of `1` and maximum value of `16384`. Defaults to `1000`." - type = number - default = 1000 - nullable = false - - validation { - condition = alltrue([ - var.broker_volume_size >= 1, - var.broker_volume_size <= 16384, - ]) - error_message = "Valid value for `broker_volume_size` is between `1` and `16384`." - } -} - -variable "broker_volume_provisioned_throughput_enabled" { - description = "(Optional) Whether provisioned throughput is enabled or not. You can specify the provisioned throughput rate in MiB per second for clusters whose brokers are of type `kafka.m5.4xlarge` or larger and if the storage volume is 10 GiB or greater. Defaults to `false`." - type = bool - default = false - nullable = false -} - -variable "broker_volume_provisioned_throughput" { - description = "(Optional) Throughput value of the EBS volumes for the data drive on each kafka broker node in MiB per second. The minimum value is `250`. The maximum value varies between broker type." - type = number - default = null -} - variable "broker_subnets" { description = "(Required) A list of subnet IDs to place ENIs of the MSK cluster broker nodes within." type = list(string) + nullable = false } variable "broker_public_access_enabled" { @@ -72,161 +49,196 @@ variable "broker_public_access_enabled" { variable "broker_allowed_ingress_cidrs" { description = "(Optional) A list of CIDR for MSK ingress access." type = list(string) - default = [] + default = ["10.0.0.0/8"] + nullable = false } variable "broker_additional_security_groups" { description = "(Optional) A list of security group IDs to associate with ENIs to control who can communicate with the cluster." type = list(string) default = [] -} - -variable "auth_unauthenticated_access_enabled" { - description = "(Optional) Enables unauthenticated access. Defaults to `true`." - type = bool - default = true nullable = false } -variable "auth_sasl_iam_enabled" { - description = "(Optional) Enables IAM client authentication." - type = bool - default = false - nullable = false -} - -variable "auth_sasl_scram_enabled" { - description = "(Optional) Enables SCRAM client authentication via AWS Secrets Manager." - type = bool - default = false - nullable = false -} - -variable "auth_sasl_scram_kms_key" { - description = "(Optional) The ARN of a KMS key to encrypt AWS SeecretsManager Secret resources for storing SASL/SCRAM authentication data. Only required when the MSK cluster has SASL/SCRAM authentication enabled. The Username/Password Authentication based on SASL/SCRAM needs to create a Secret resource in AWS SecretsManager with a custom AWS KMS Key. A secret created with the default AWS KMS key cannot be used with an Amazon MSK cluster." - type = string - default = null -} - -variable "auth_sasl_scram_users" { - description = "(Optional) A list of usernames to be allowed for SASL/SCRAM authentication to the MSK cluster. The password for each username is randomly generated and stored in AWS SecretsManager secret." - type = set(string) - default = [] - nullable = false -} - -variable "auth_tls_enabled" { - description = "(Optional) Enables TLS client authentication." - type = bool - default = false - nullable = false -} - -variable "auth_tls_acm_ca_arns" { - description = "(Optional) List of ACM Certificate Authority Amazon Resource Names (ARNs)." - type = list(string) - default = [] - nullable = false -} - -variable "encryption_at_rest_kms_key" { - description = "(Optional) Specify a KMS key short ID or ARN (it will always output an ARN) to use for encrypting your data at rest. If no key is specified, an AWS managed KMS ('aws/msk' managed service) key will be used for encrypting the data at rest." - type = string - default = "" -} +variable "broker_storage" { + description = <= 1, + var.broker_storage.volume_size <= 16384, + ]) + error_message = "Valid value for `volume_size` is between `1` and `16384`." + } + validation { + condition = anytrue([ + var.broker_storage.provisioned_throughput.enabled == false, + var.broker_storage.provisioned_throughput.enabled == true && var.broker_storage.provisioned_throughput.throughput >= 250, + ]) + error_message = "Valid value for `throughput` is greater than `250`." + } } -variable "encryption_in_transit_client_mode" { - description = "(Optional) Encryption setting for data in transit between clients and brokers. `TLS`, `TLS_PLAINTEXT`, `PLAINTEXT` are available." +variable "cluster_storage_mode" { + description = <