Skip to content

Commit

Permalink
feat(kafka_quota): added support for kafka quota
Browse files Browse the repository at this point in the history
  • Loading branch information
vmyroslav committed Dec 24, 2024
1 parent 81e23a1 commit 8287c87
Show file tree
Hide file tree
Showing 7 changed files with 426 additions and 75 deletions.
7 changes: 4 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,12 @@ lint: lint-go lint-test lint-docs
lint-go: $(GOLANGCILINT)
$(GOLANGCILINT) run --build-tags all --timeout=30m ./...

# Exclude files that use templates from linting
TERRAFMT_EXCLUDE = -not -path "./internal/acctest/*" \
-not -path "./internal/sdkprovider/service/kafka/kafka_quota_test.go"

lint-test: $(TERRAFMT)
$(TERRAFMT) diff ./internal -cfq

find ./internal -type f $(TERRAFMT_EXCLUDE) -exec $(TERRAFMT) diff {} -cfq \;

lint-docs: $(TFPLUGINDOCS)
PROVIDER_AIVEN_ENABLE_BETA=1 $(TFPLUGINDOCS) generate --rendered-website-dir tmp
Expand All @@ -132,7 +134,6 @@ lint-docs: $(TFPLUGINDOCS)

fmt: fmt-test fmt-imports


fmt-test: $(TERRAFMT)
$(TERRAFMT) fmt ./internal -fv

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.23

require (
github.com/aiven/aiven-go-client/v2 v2.33.0
github.com/aiven/go-client-codegen v0.73.0
github.com/aiven/go-client-codegen v0.74.0
github.com/avast/retry-go v3.0.0+incompatible
github.com/dave/jennifer v1.7.1
github.com/docker/go-units v0.5.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ github.com/aiven/go-client-codegen v0.71.0 h1:SGiHrfbU8RiqVegQGV3BStnbIdFke+15lx
github.com/aiven/go-client-codegen v0.71.0/go.mod h1:QKN/GgLMGWd6+gPEucXlZPi5vC3C6RpD3UeBRQOLI1Y=
github.com/aiven/go-client-codegen v0.73.0 h1:1xk7zmAqKxQYHWE4ARWFlKHZg8FB4VTDGxVua7iruRg=
github.com/aiven/go-client-codegen v0.73.0/go.mod h1:QKN/GgLMGWd6+gPEucXlZPi5vC3C6RpD3UeBRQOLI1Y=
github.com/aiven/go-client-codegen v0.74.0 h1:CqZUq8aGdhgcKJuM+YbC8RVUVgnCw9aA6yk0hMfvXWg=
github.com/aiven/go-client-codegen v0.74.0/go.mod h1:QKN/GgLMGWd6+gPEucXlZPi5vC3C6RpD3UeBRQOLI1Y=
github.com/apparentlymart/go-textseg/v12 v12.0.0/go.mod h1:S/4uRK2UtaQttw1GenVJEynmyUenKwP++x/+DdGV/Ec=
github.com/apparentlymart/go-textseg/v15 v15.0.0 h1:uYvfpb3DyLSCGWnctWKGj857c6ew1u1fNQOlOtuGxQY=
github.com/apparentlymart/go-textseg/v15 v15.0.0/go.mod h1:K8XmNZdhEBkdlyDdvbmmsvpAG721bKi0joRfFdHIWJ4=
Expand Down
98 changes: 98 additions & 0 deletions internal/acctest/plancheck.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package acctest

import (
"context"
"fmt"

tfjson "github.com/hashicorp/terraform-json"
"github.com/hashicorp/terraform-plugin-testing/plancheck"
)

type attributeChangeCheck struct {
resourceAddr string
attrs []string
}

func ExpectOnlyAttributesChanged(resourceAddr string, attrs ...string) plancheck.PlanCheck {
return &attributeChangeCheck{
resourceAddr: resourceAddr,
attrs: attrs,
}
}

func (c *attributeChangeCheck) CheckPlan(_ context.Context, req plancheck.CheckPlanRequest, resp *plancheck.CheckPlanResponse) {
var targetResource *tfjson.ResourceChange

// Find our resource in the changes
for _, rc := range req.Plan.ResourceChanges {
if rc.Address == c.resourceAddr {
targetResource = rc
break
}
}

if targetResource == nil {
resp.Error = fmt.Errorf("resource %s not found in plan", c.resourceAddr)
return
}

if targetResource.Change == nil {
resp.Error = fmt.Errorf("no changes found for resource %s", c.resourceAddr)
return
}

// Convert Before and After to maps
before, ok := targetResource.Change.Before.(map[string]interface{})
if !ok {
resp.Error = fmt.Errorf("before state for resource %s is not a map", c.resourceAddr)
return
}

after, ok := targetResource.Change.After.(map[string]interface{})
if !ok {
resp.Error = fmt.Errorf("after state for resource %s is not a map", c.resourceAddr)

return
}

// Create a set of expected changes
expectedChanges := make(map[string]struct{})
for _, attr := range c.attrs {
expectedChanges[attr] = struct{}{}
}

// Check all attributes in the after state
for key, afterValue := range after {
beforeValue, existsInBefore := before[key]

// If value changed
if !existsInBefore || beforeValue != afterValue {
// Check if this change was expected
if _, expected := expectedChanges[key]; !expected {
resp.Error = fmt.Errorf(
"unexpected change in attribute %q for resource %s: before=%v, after=%v",
key,
c.resourceAddr,
beforeValue,
afterValue,
)
return
}
// Remove from expected changes as we found it
delete(expectedChanges, key)
}
}

// Check if all expected changes were found
if len(expectedChanges) > 0 {
remaining := make([]string, 0, len(expectedChanges))
for attr := range expectedChanges {
remaining = append(remaining, attr)
}
resp.Error = fmt.Errorf(
"expected changes in attributes %v for resource %s were not found in plan",
remaining,
c.resourceAddr,
)
}
}
112 changes: 102 additions & 10 deletions internal/acctest/template_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package acctest

import (
"regexp"
"fmt"
"strings"
"testing"

Expand Down Expand Up @@ -182,6 +182,65 @@ resource "aiven_project" "example_project" {
assert.Equal(t, normalizeHCL(expected), normalizeHCL(result))
}

func TestTemplateNilAndMissingValues(t *testing.T) {
registry := NewTemplateRegistry("test")

templateStr := `resource "aiven_service" "example" {
project = "{{ .project }}"
service_name = "{{ .service_name }}"
{{- if .maintenance_window_dow }}
maintenance_window_dow = "{{ .maintenance_window_dow }}"
{{- end }}
{{- if .maintenance_window_time }}
maintenance_window_time = "{{ .maintenance_window_time }}"
{{- end }}
}`

registry.MustAddTemplate(t, "service", templateStr)

tests := []struct {
name string
config map[string]any
expected string
}{
{
name: "explicit_nil_value",
config: map[string]any{
"project": "test-project",
"service_name": "test-service",
"maintenance_window_dow": nil,
"maintenance_window_time": "10:00:00",
},
expected: `resource "aiven_service" "example" {
project = "test-project"
service_name = "test-service"
maintenance_window_time = "10:00:00"
}`,
},
{
name: "missing_key",
config: map[string]any{
"project": "test-project",
"service_name": "test-service",
// maintenance_window_dow is not in config at all
"maintenance_window_time": "10:00:00",
},
expected: `resource "aiven_service" "example" {
project = "test-project"
service_name = "test-service"
maintenance_window_time = "10:00:00"
}`,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := registry.MustRender(t, "service", tt.config)
assert.Equal(t, normalizeHCL(tt.expected), normalizeHCL(result))
})
}
}

func TestCompositionBuilder(t *testing.T) {
tests := []struct {
name string
Expand Down Expand Up @@ -368,18 +427,51 @@ resource "aiven_kafka_user" "example_service_user" {

// normalizeHCL function remains the same
func normalizeHCL(s string) string {
// Remove all whitespace between blocks
s = regexp.MustCompile(`}\s+resource`).ReplaceAllString(s, "}\nresource")
s = regexp.MustCompile(`}\s+data`).ReplaceAllString(s, "}\ndata")
// Split into lines for processing
lines := strings.Split(s, "\n")
var normalized = make([]string, 0, len(lines))

// Remove all empty lines
s = regexp.MustCompile(`(?m)^\s*$`).ReplaceAllString(s, "")
for _, line := range lines {
// Trim spaces from both ends
line = strings.TrimSpace(line)

// Skip empty lines
if line == "" {
continue
}

// Handle lines with just closing braces
if line == "}" {
normalized = append(normalized, line)
continue
}

// For lines with content, normalize internal spacing
if strings.Contains(line, "=") {
// Split by = and trim spaces
parts := strings.Split(line, "=")
if len(parts) == 2 {
key := strings.TrimSpace(parts[0])
value := strings.TrimSpace(parts[1])
// Reconstruct with consistent spacing
line = fmt.Sprintf(" %s = %s", key, value)
}
} else if strings.HasPrefix(line, "resource") || strings.HasPrefix(line, "data") {
// Handle resource and data block declarations
line = strings.TrimSpace(line)
} else if !strings.HasPrefix(line, "}") {
// Add consistent indentation for other non-empty lines
line = " " + strings.TrimSpace(line)
}

normalized = append(normalized, line)
}

// Remove leading/trailing whitespace
s = strings.TrimSpace(s)
// Join lines with newlines
result := strings.Join(normalized, "\n")

// Normalize line endings
s = strings.ReplaceAll(s, "\r\n", "\n")
result = strings.ReplaceAll(result, "\r\n", "\n")

return s
return result
}
31 changes: 27 additions & 4 deletions internal/sdkprovider/service/kafka/kafka_quota.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ It is possible to set default quotas for each (user, client-id), user or client-
"consumer_byte_rate": {
Type: schema.TypeInt,
Optional: true,
ForceNew: true,
Description: `
Defines the bandwidth limit in bytes/sec for each group of clients sharing a quota.
Every distinct client group is allocated a specific quota, as defined by the cluster, on a per-broker basis.
Expand All @@ -53,7 +52,6 @@ Exceeding this limit results in client throttling.`,
"producer_byte_rate": {
Type: schema.TypeInt,
Optional: true,
ForceNew: true,
Description: `
Defines the bandwidth limit in bytes/sec for each group of clients sharing a quota.
Every distinct client group is allocated a specific quota, as defined by the cluster, on a per-broker basis.
Expand All @@ -63,7 +61,6 @@ Exceeding this limit results in client throttling.`,
"request_percentage": {
Type: schema.TypeInt,
Optional: true,
ForceNew: true,
Description: `
Sets the maximum percentage of CPU time that a client group can use on request handler I/O and network threads per broker within a quota window.
Exceeding this limit triggers throttling.
Expand All @@ -75,8 +72,9 @@ The quota, expressed as a percentage, also indicates the total allowable CPU usa
func ResourceKafkaQuota() *schema.Resource {
return &schema.Resource{
Description: "Creates and manages quotas for an Aiven for Apache Kafka® service user.",
CreateContext: common.WithGenClient(resourceKafkaQuotaCreate),
ReadContext: common.WithGenClient(resourceKafkaQuotaRead),
CreateContext: common.WithGenClient(resourceKafkaQuotaCreate),
UpdateContext: common.WithGenClient(resourceKafkaQuotaUpdate),
DeleteContext: common.WithGenClient(resourceKafkaQuotaDelete),
Timeouts: schemautil.DefaultResourceTimeouts(),

Expand Down Expand Up @@ -112,6 +110,31 @@ func resourceKafkaQuotaCreate(ctx context.Context, d *schema.ResourceData, clien
return resourceKafkaQuotaRead(ctx, d, client)
}

func resourceKafkaQuotaUpdate(ctx context.Context, d *schema.ResourceData, client avngen.Client) error {
var (
req kafka.ServiceKafkaQuotaCreateIn
)

project, service, _, _, err := schemautil.SplitResourceID4(d.Id())
if err != nil {
return err
}

if err := schemautil.ResourceDataGet(
d,
&req,
schemautil.RenameAlias("client_id", "client-id"),
); err != nil {
return err
}

if err := client.ServiceKafkaQuotaCreate(ctx, project, service, &req); err != nil {
return err
}

return resourceKafkaQuotaRead(ctx, d, client)
}

func resourceKafkaQuotaRead(ctx context.Context, d *schema.ResourceData, client avngen.Client) error {
project, serviceName, clientID, user, err := schemautil.SplitResourceID4(d.Id())
if err != nil {
Expand Down
Loading

0 comments on commit 8287c87

Please sign in to comment.