diff --git a/pkg/controller/kafka/cluster/setup.go b/pkg/controller/kafka/cluster/setup.go index 760155d762..7d4d6f8a6e 100644 --- a/pkg/controller/kafka/cluster/setup.go +++ b/pkg/controller/kafka/cluster/setup.go @@ -224,7 +224,7 @@ func isUpToDate(wanted *svcapitypes.Cluster, current *svcsdk.DescribeClusterOutp !isOpenMonitoringInfoUpToDate(forProvider.OpenMonitoring, clusterInfo.OpenMonitoring), !isCustomConfigurationInfoUpToDate(forProvider.CustomConfigurationInfo, clusterInfo.CurrentBrokerSoftwareInfo), !isKafkaVersionUpToDate(forProvider.KafkaVersion, clusterInfo.CurrentBrokerSoftwareInfo), - !isEncryptionInfoUpToDate(forProvider.EncryptionInfo, clusterInfo.EncryptionInfo), + !isEncryptionInfoInTransitUpToDate(forProvider.EncryptionInfo, clusterInfo.EncryptionInfo), !isClientAuthenticationUpToDate(forProvider.ClientAuthentication, clusterInfo.ClientAuthentication), !isTagsUpToDate(forProvider.Tags, clusterInfo.Tags): return false, nil @@ -297,11 +297,32 @@ func isEnhancedMonitoringUpToDate(wanted svcapitypes.ClusterParameters, current return true } +func isSomeLoggingEnabled(wanted *svcapitypes.LoggingInfo) bool { + if wanted.BrokerLogs != nil { + if wanted.BrokerLogs.CloudWatchLogs != nil { + if aws.BoolValue(wanted.BrokerLogs.CloudWatchLogs.Enabled) { + return true + } + } + if wanted.BrokerLogs.Firehose != nil { + if aws.BoolValue(wanted.BrokerLogs.Firehose.Enabled) { + return true + } + } + if wanted.BrokerLogs.S3 != nil { + if aws.BoolValue(wanted.BrokerLogs.S3.Enabled) { + return true + } + } + } + return false +} + func isLoggingInfoUpToDate(wanted *svcapitypes.LoggingInfo, current *svcsdk.LoggingInfo) bool { // nolint:gocyclo if wanted != nil { if current == nil { - return false + return !isSomeLoggingEnabled(wanted) } if wanted.BrokerLogs != nil { @@ -316,8 +337,10 @@ func isLoggingInfoUpToDate(wanted *svcapitypes.LoggingInfo, current *svcsdk.Logg if aws.BoolValue(wanted.BrokerLogs.CloudWatchLogs.Enabled) != aws.BoolValue(current.BrokerLogs.CloudWatchLogs.Enabled) { return false } - if aws.StringValue(wanted.BrokerLogs.CloudWatchLogs.LogGroup) != aws.StringValue(current.BrokerLogs.CloudWatchLogs.LogGroup) { - return false + if aws.BoolValue(wanted.BrokerLogs.CloudWatchLogs.Enabled) { + if aws.StringValue(wanted.BrokerLogs.CloudWatchLogs.LogGroup) != aws.StringValue(current.BrokerLogs.CloudWatchLogs.LogGroup) { + return false + } } } else if current.BrokerLogs.CloudWatchLogs != nil { return false @@ -327,12 +350,15 @@ func isLoggingInfoUpToDate(wanted *svcapitypes.LoggingInfo, current *svcsdk.Logg if current.BrokerLogs.Firehose == nil { return false } - if aws.StringValue(wanted.BrokerLogs.Firehose.DeliveryStream) != aws.StringValue(current.BrokerLogs.Firehose.DeliveryStream) { - return false - } if aws.BoolValue(wanted.BrokerLogs.Firehose.Enabled) != aws.BoolValue(current.BrokerLogs.Firehose.Enabled) { return false } + if aws.BoolValue(wanted.BrokerLogs.Firehose.Enabled) { + if aws.StringValue(wanted.BrokerLogs.Firehose.DeliveryStream) != aws.StringValue(current.BrokerLogs.Firehose.DeliveryStream) { + return false + } + } + } else if current.BrokerLogs.Firehose != nil { if aws.BoolValue(current.BrokerLogs.Firehose.Enabled) { return false @@ -344,16 +370,17 @@ func isLoggingInfoUpToDate(wanted *svcapitypes.LoggingInfo, current *svcsdk.Logg return false } - if aws.StringValue(wanted.BrokerLogs.S3.Bucket) != aws.StringValue(current.BrokerLogs.S3.Bucket) { - return false - } - if aws.BoolValue(wanted.BrokerLogs.S3.Enabled) != aws.BoolValue(current.BrokerLogs.S3.Enabled) { return false } + if aws.BoolValue(wanted.BrokerLogs.S3.Enabled) { + if aws.StringValue(wanted.BrokerLogs.S3.Bucket) != aws.StringValue(current.BrokerLogs.S3.Bucket) { + return false + } - if aws.StringValue(wanted.BrokerLogs.S3.Prefix) != aws.StringValue(current.BrokerLogs.S3.Prefix) { - return false + if aws.StringValue(wanted.BrokerLogs.S3.Prefix) != aws.StringValue(current.BrokerLogs.S3.Prefix) { + return false + } } } else if current.BrokerLogs.S3 != nil { if aws.BoolValue(current.BrokerLogs.S3.Enabled) { @@ -455,21 +482,11 @@ func isTagsUpToDate(wanted map[string]*string, current map[string]*string) bool return true } -func isEncryptionInfoUpToDate(wanted *svcapitypes.EncryptionInfo, current *svcsdk.EncryptionInfo) bool { // nolint:gocyclo +func isEncryptionInfoInTransitUpToDate(wanted *svcapitypes.EncryptionInfo, current *svcsdk.EncryptionInfo) bool { // nolint:gocyclo if wanted != nil { if current == nil { return false } - if wanted.EncryptionAtRest != nil { - if current.EncryptionAtRest == nil { - return false - } - if aws.StringValue(wanted.EncryptionAtRest.DataVolumeKMSKeyID) != aws.StringValue(current.EncryptionAtRest.DataVolumeKMSKeyId) { - return false - } - } else if current.EncryptionAtRest != nil { - return false - } if wanted.EncryptionInTransit != nil { if current.EncryptionInTransit == nil { return false @@ -477,9 +494,6 @@ func isEncryptionInfoUpToDate(wanted *svcapitypes.EncryptionInfo, current *svcsd if aws.StringValue(wanted.EncryptionInTransit.ClientBroker) != aws.StringValue(current.EncryptionInTransit.ClientBroker) { return false } - if aws.BoolValue(wanted.EncryptionInTransit.InCluster) != aws.BoolValue(current.EncryptionInTransit.InCluster) { - return false - } } else if current.EncryptionInTransit != nil { return false } @@ -732,7 +746,7 @@ func (u *updater) update(ctx context.Context, mg cpresource.Managed) (managed.Ex } } - encryptionUpToDate := isEncryptionInfoUpToDate(wanted.EncryptionInfo, obj.ClusterInfo.EncryptionInfo) + encryptionUpToDate := isEncryptionInfoInTransitUpToDate(wanted.EncryptionInfo, obj.ClusterInfo.EncryptionInfo) clientAuthenticationUpToDate := isClientAuthenticationUpToDate(wanted.ClientAuthentication, obj.ClusterInfo.ClientAuthentication) if !encryptionUpToDate || !clientAuthenticationUpToDate { obj, err = u.client.DescribeClusterWithContext(ctx, input) @@ -754,6 +768,11 @@ func (u *updater) update(ctx context.Context, mg cpresource.Managed) (managed.Ex if !encryptionUpToDate { input.EncryptionInfo = generateEncryptionInfo(wanted.EncryptionInfo) + + input.EncryptionInfo.EncryptionAtRest = nil // "Updating encryption-at-rest settings on your cluster is not currently supported." + if input.EncryptionInfo.EncryptionInTransit != nil { + input.EncryptionInfo.EncryptionInTransit.InCluster = nil // "Updating the inter-broker encryption setting on your cluster is not currently supported." + } } _, err := u.client.UpdateSecurityWithContext(ctx, input) @@ -800,6 +819,8 @@ func generateEncryptionInfo(wanted *svcapitypes.EncryptionInfo) *svcsdk.Encrypti } if wanted.EncryptionInTransit.InCluster != nil { encryptionInTransit.InCluster = wanted.EncryptionInTransit.InCluster + } else { + encryptionInTransit.InCluster = aws.Bool(true) // true is default value on aws side } output.EncryptionInTransit = encryptionInTransit } @@ -891,32 +912,43 @@ func generateLoggingInfoInput(wanted *svcapitypes.LoggingInfo) *svcsdk.LoggingIn cloudWatchLogs := &svcsdk.CloudWatchLogs{} if wanted.BrokerLogs.CloudWatchLogs.Enabled != nil { cloudWatchLogs.SetEnabled(*wanted.BrokerLogs.CloudWatchLogs.Enabled) - } - if wanted.BrokerLogs.CloudWatchLogs.LogGroup != nil { - cloudWatchLogs.SetLogGroup(*wanted.BrokerLogs.CloudWatchLogs.LogGroup) + + if aws.BoolValue(wanted.BrokerLogs.CloudWatchLogs.Enabled) { + if wanted.BrokerLogs.CloudWatchLogs.LogGroup != nil { + cloudWatchLogs.SetLogGroup(*wanted.BrokerLogs.CloudWatchLogs.LogGroup) + } + } } brokerLogs.SetCloudWatchLogs(cloudWatchLogs) } if wanted.BrokerLogs.Firehose != nil { firehose := &svcsdk.Firehose{} - if wanted.BrokerLogs.Firehose.DeliveryStream != nil { - firehose.SetDeliveryStream(*wanted.BrokerLogs.Firehose.DeliveryStream) - } if wanted.BrokerLogs.Firehose.Enabled != nil { firehose.SetEnabled(*wanted.BrokerLogs.Firehose.Enabled) + + if aws.BoolValue(wanted.BrokerLogs.Firehose.Enabled) { + if wanted.BrokerLogs.Firehose.DeliveryStream != nil { + firehose.SetDeliveryStream(*wanted.BrokerLogs.Firehose.DeliveryStream) + } + } } + brokerLogs.SetFirehose(firehose) } if wanted.BrokerLogs.S3 != nil { s3 := &svcsdk.S3{} - if wanted.BrokerLogs.S3.Bucket != nil { - s3.SetBucket(*wanted.BrokerLogs.S3.Bucket) - } if wanted.BrokerLogs.S3.Enabled != nil { s3.SetEnabled(*wanted.BrokerLogs.S3.Enabled) - } - if wanted.BrokerLogs.S3.Prefix != nil { - s3.SetPrefix(*wanted.BrokerLogs.S3.Prefix) + + if aws.BoolValue(wanted.BrokerLogs.S3.Enabled) { + if wanted.BrokerLogs.S3.Bucket != nil { + s3.SetBucket(*wanted.BrokerLogs.S3.Bucket) + } + + if wanted.BrokerLogs.S3.Prefix != nil { + s3.SetPrefix(*wanted.BrokerLogs.S3.Prefix) + } + } } brokerLogs.SetS3(s3) }