From 8ee0a6a264e929a3050d66016c232c7542cb701d Mon Sep 17 00:00:00 2001 From: Myroslav Vivcharyk Date: Tue, 17 Dec 2024 13:55:45 +0100 Subject: [PATCH] feat(kafka_quota): added support for kafka quota --- .gitignore | 4 + CHANGELOG.md | 5 + docs/resources/kafka_quota.md | 79 ++++ .../resources/aiven_kafka_quota/import.sh | 1 + .../resources/aiven_kafka_quota/resource.tf | 9 + go.mod | 2 +- go.sum | 2 + internal/acctest/template.go | 223 ++++++++++ internal/acctest/template_test.go | 395 ++++++++++++++++++ internal/sdkprovider/provider/provider.go | 1 + .../sdkprovider/service/kafka/kafka_quota.go | 206 +++++++++ .../service/kafka/kafka_quota_test.go | 233 +++++++++++ 12 files changed, 1159 insertions(+), 1 deletion(-) create mode 100644 docs/resources/kafka_quota.md create mode 100644 examples/resources/aiven_kafka_quota/import.sh create mode 100644 examples/resources/aiven_kafka_quota/resource.tf create mode 100644 internal/acctest/template.go create mode 100644 internal/acctest/template_test.go create mode 100644 internal/sdkprovider/service/kafka/kafka_quota.go create mode 100644 internal/sdkprovider/service/kafka/kafka_quota_test.go diff --git a/.gitignore b/.gitignore index 53a8785d3..1d330b55a 100644 --- a/.gitignore +++ b/.gitignore @@ -35,6 +35,10 @@ __debug_bin # Output of the go coverage tool, specifically when used with LiteIDE *.out +# Go workspace file +go.work +go.work.sum + # Dependency directories vendor/ packrd/ diff --git a/CHANGELOG.md b/CHANGELOG.md index 6e69c1432..e66ac8f60 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,11 @@ nav_order: 1 + +## [MAJOR.MINOR.PATCH] - YYYY-MM-DD + +- Add `aiven_kafka_quota` resource + ## [4.31.0] - 2024-12-18 - Add `alloydbomni` BETA resource and datasource diff --git a/docs/resources/kafka_quota.md b/docs/resources/kafka_quota.md new file mode 100644 index 000000000..df532e215 --- /dev/null +++ b/docs/resources/kafka_quota.md @@ -0,0 +1,79 @@ +--- +# generated by https://github.com/hashicorp/terraform-plugin-docs +page_title: "aiven_kafka_quota Resource - terraform-provider-aiven" +subcategory: "" +description: |- + Creates and manages quotas for an Aiven for Apache Kafka® service user. +--- + +# aiven_kafka_quota (Resource) + +Creates and manages quotas for an Aiven for Apache Kafka® service user. + +## Example Usage + +```terraform +resource "aiven_kafka_quota" "example_quota" { + project = data.aiven_project.foo.project + service_name = aiven_kafka.example_kafka.service_name + user = "example-kafka-user" + client_id = "example_client" + consumer_byte_rate = 1000 + producer_byte_rate = 1000 + request_percentage = 50 +} +``` + + +## Schema + +### Required + +- `project` (String) The name of the project this resource belongs to. To set up proper dependencies please refer to this variable as a reference. Changing this property forces recreation of the resource. +- `service_name` (String) The name of the service that this resource belongs to. To set up proper dependencies please refer to this variable as a reference. Changing this property forces recreation of the resource. + +### Optional + +- `client_id` (String) Represents a logical group of clients, assigned a unique name by the client application. +Quotas can be applied based on user, client-id, or both. +The most relevant quota is chosen for each connection. +All connections within a quota group share the same quota. +It is possible to set default quotas for each (user, client-id), user or client-id group by specifying 'default' +- `consumer_byte_rate` (Number) Defines the bandwidth limit in bytes/sec for each group of clients sharing a quota. +Every distinct client group is allocated a specific quota, as defined by the cluster, on a per-broker basis. +Exceeding this limit results in client throttling. +- `producer_byte_rate` (Number) Defines the bandwidth limit in bytes/sec for each group of clients sharing a quota. +Every distinct client group is allocated a specific quota, as defined by the cluster, on a per-broker basis. +Exceeding this limit results in client throttling. +- `request_percentage` (Number) Sets the maximum percentage of CPU time that a client group can use on request handler I/O and network threads per broker within a quota window. +Exceeding this limit triggers throttling. +The quota, expressed as a percentage, also indicates the total allowable CPU usage for the client groups sharing the quota. +- `timeouts` (Block, Optional) (see [below for nested schema](#nestedblock--timeouts)) +- `user` (String) Represents a logical group of clients, assigned a unique name by the client application. +Quotas can be applied based on user, client-id, or both. +The most relevant quota is chosen for each connection. +All connections within a quota group share the same quota. +It is possible to set default quotas for each (user, client-id), user or client-id group by specifying 'default' + +### Read-Only + +- `id` (String) The ID of this resource. + + +### Nested Schema for `timeouts` + +Optional: + +- `create` (String) +- `default` (String) +- `delete` (String) +- `read` (String) +- `update` (String) + +## Import + +Import is supported using the following syntax: + +```shell +terraform import aiven_kafka_quota.example_quota PROJECT/SERVICE_NAME +``` diff --git a/examples/resources/aiven_kafka_quota/import.sh b/examples/resources/aiven_kafka_quota/import.sh new file mode 100644 index 000000000..6a915c78e --- /dev/null +++ b/examples/resources/aiven_kafka_quota/import.sh @@ -0,0 +1 @@ +terraform import aiven_kafka_quota.example_quota PROJECT/SERVICE_NAME diff --git a/examples/resources/aiven_kafka_quota/resource.tf b/examples/resources/aiven_kafka_quota/resource.tf new file mode 100644 index 000000000..7405045f7 --- /dev/null +++ b/examples/resources/aiven_kafka_quota/resource.tf @@ -0,0 +1,9 @@ +resource "aiven_kafka_quota" "example_quota" { + project = data.aiven_project.foo.project + service_name = aiven_kafka.example_kafka.service_name + user = "example-kafka-user" + client_id = "example_client" + consumer_byte_rate = 1000 + producer_byte_rate = 1000 + request_percentage = 50 +} \ No newline at end of file diff --git a/go.mod b/go.mod index 664cb4f8a..68ecc7b81 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.23 require ( github.com/aiven/aiven-go-client/v2 v2.33.0 - github.com/aiven/go-client-codegen v0.71.0 + github.com/aiven/go-client-codegen v0.73.0 github.com/avast/retry-go v3.0.0+incompatible github.com/dave/jennifer v1.7.1 github.com/docker/go-units v0.5.0 diff --git a/go.sum b/go.sum index 2d3b16121..8a5c1aebf 100644 --- a/go.sum +++ b/go.sum @@ -12,6 +12,8 @@ github.com/aiven/go-api-schemas v1.106.0 h1:qncRsbiaGnU9JE9fmTFHclTCBem+t+6EPMXG github.com/aiven/go-api-schemas v1.106.0/go.mod h1:z7dGvufm6If4gOdVr7dWTuFZmll9FOZr5Z5CSxGpebA= github.com/aiven/go-client-codegen v0.71.0 h1:SGiHrfbU8RiqVegQGV3BStnbIdFke+15lxadlPORqfI= github.com/aiven/go-client-codegen v0.71.0/go.mod h1:QKN/GgLMGWd6+gPEucXlZPi5vC3C6RpD3UeBRQOLI1Y= +github.com/aiven/go-client-codegen v0.73.0 h1:1xk7zmAqKxQYHWE4ARWFlKHZg8FB4VTDGxVua7iruRg= +github.com/aiven/go-client-codegen v0.73.0/go.mod h1:QKN/GgLMGWd6+gPEucXlZPi5vC3C6RpD3UeBRQOLI1Y= github.com/apparentlymart/go-textseg/v12 v12.0.0/go.mod h1:S/4uRK2UtaQttw1GenVJEynmyUenKwP++x/+DdGV/Ec= github.com/apparentlymart/go-textseg/v15 v15.0.0 h1:uYvfpb3DyLSCGWnctWKGj857c6ew1u1fNQOlOtuGxQY= github.com/apparentlymart/go-textseg/v15 v15.0.0/go.mod h1:K8XmNZdhEBkdlyDdvbmmsvpAG721bKi0joRfFdHIWJ4= diff --git a/internal/acctest/template.go b/internal/acctest/template.go new file mode 100644 index 000000000..5df659864 --- /dev/null +++ b/internal/acctest/template.go @@ -0,0 +1,223 @@ +package acctest + +import ( + "bytes" + "fmt" + "sort" + "strings" + "testing" + "text/template" +) + +// ResourceConfig is the interface that all resource configs must implement +type resourceConfig interface { + // ToMap converts the config to a map for template rendering + ToMap() map[string]any +} + +// Template represents a single Terraform configuration template +type Template struct { + Name string + Template string +} + +// TemplateRegistry holds templates for a specific resource type +type TemplateRegistry struct { + resourceName string + templates map[string]*template.Template + funcMap template.FuncMap +} + +// NewTemplateRegistry creates a new template registry for a resource +func NewTemplateRegistry(resourceName string) *TemplateRegistry { + return &TemplateRegistry{ + resourceName: resourceName, + templates: make(map[string]*template.Template), + funcMap: make(template.FuncMap), + } +} + +// AddTemplate adds a new template to the registry +func (r *TemplateRegistry) AddTemplate(t testing.TB, name, templateStr string) error { + t.Helper() + + tmpl := template.New(name) + if len(r.funcMap) > 0 { + tmpl = tmpl.Funcs(r.funcMap) + } + + parsed, err := tmpl.Parse(templateStr) + if err != nil { + return fmt.Errorf("failed to parse template: %w", err) + } + r.templates[name] = parsed + + return nil +} + +// MustAddTemplate is like AddTemplate but panics on error +func (r *TemplateRegistry) MustAddTemplate(t testing.TB, name, templateStr string) { + t.Helper() + + if err := r.AddTemplate(t, name, templateStr); err != nil { + t.Fatal(err) + } +} + +// Render renders a template with the given config +func (r *TemplateRegistry) Render(t testing.TB, templateKey string, cfg map[string]any) (string, error) { + t.Helper() + + tmpl, exists := r.templates[templateKey] + if !exists { + availableTemplates := r.getAvailableTemplates() + + return "", fmt.Errorf("template %q does not exist for resource %s. Available templates: %v", + templateKey, + r.resourceName, + availableTemplates, + ) + } + + var buf bytes.Buffer + if err := tmpl.Execute(&buf, cfg); err != nil { + return "", fmt.Errorf("failed to render template: %w", err) + } + + return buf.String(), nil +} + +// MustRender is like Render but fails the test on error +func (r *TemplateRegistry) MustRender(t testing.TB, templateKey string, cfg map[string]any) string { + t.Helper() + + result, err := r.Render(t, templateKey, cfg) + if err != nil { + t.Fatal(err) + } + + return result +} + +// AddFunction adds a custom function to the template registry +func (r *TemplateRegistry) AddFunction(name string, fn interface{}) { + if r.funcMap == nil { + r.funcMap = make(template.FuncMap) + } + r.funcMap[name] = fn +} + +// HasTemplate checks if a template exists in the registry +func (r *TemplateRegistry) HasTemplate(key string) bool { + _, exists := r.templates[key] + return exists +} + +// RemoveTemplate removes a template from the registry +func (r *TemplateRegistry) RemoveTemplate(key string) { + delete(r.templates, key) +} + +// getAvailableTemplates returns a sorted list of available template keys +func (r *TemplateRegistry) getAvailableTemplates() []string { + templates := make([]string, 0, len(r.templates)) + for k := range r.templates { + templates = append(templates, k) + } + sort.Strings(templates) + + return templates +} + +// compositionEntry represents a combination of template and its config +type compositionEntry struct { + TemplateKey string + Config map[string]any +} + +// CompositionBuilder helps build complex compositions of templates +type CompositionBuilder struct { + registry *TemplateRegistry + compositions []compositionEntry +} + +// NewCompositionBuilder creates a new composition builder +func (r *TemplateRegistry) NewCompositionBuilder() *CompositionBuilder { + return &CompositionBuilder{ + registry: r, + compositions: make([]compositionEntry, 0), + } +} + +// Add adds a new template and config to the composition +func (b *CompositionBuilder) Add(templateKey string, cfg map[string]any) *CompositionBuilder { + b.compositions = append(b.compositions, compositionEntry{ + TemplateKey: templateKey, + Config: cfg, + }) + return b +} + +// AddWithConfig adds a new template and config to the composition using a resourceConfig +func (b *CompositionBuilder) AddWithConfig(templateKey string, cfg resourceConfig) *CompositionBuilder { + b.compositions = append(b.compositions, compositionEntry{ + TemplateKey: templateKey, + Config: cfg.ToMap(), + }) + return b +} + +// AddIf conditional method to CompositionBuilder +func (b *CompositionBuilder) AddIf(condition bool, templateKey string, cfg map[string]any) *CompositionBuilder { + if condition { + return b.Add(templateKey, cfg) + } + + return b +} + +func (b *CompositionBuilder) Remove(templateKey string) *CompositionBuilder { + var newCompositions []compositionEntry + for _, comp := range b.compositions { + if comp.TemplateKey != templateKey { + newCompositions = append(newCompositions, comp) + } + } + b.compositions = newCompositions + + return b +} + +// Render renders all templates in the composition and combines them +func (b *CompositionBuilder) Render(t testing.TB) (string, error) { + t.Helper() + + var renderedParts []string + + // Render each template + for _, comp := range b.compositions { + rendered, err := b.registry.Render(t, comp.TemplateKey, comp.Config) + if err != nil { + return "", fmt.Errorf("failed to render template %s: %w", comp.TemplateKey, err) + } + renderedParts = append(renderedParts, rendered) + } + + // Combine all rendered parts + combined := strings.Join(renderedParts, "\n\n") + + //TODO: add HCL validation? + + return combined, nil +} + +// MustRender is like Render but fails the test on error +func (b *CompositionBuilder) MustRender(t testing.TB) string { + t.Helper() + + result, err := b.Render(t) + if err != nil { + t.Fatal(err) + } + return result +} diff --git a/internal/acctest/template_test.go b/internal/acctest/template_test.go new file mode 100644 index 000000000..32292cb2f --- /dev/null +++ b/internal/acctest/template_test.go @@ -0,0 +1,395 @@ +package acctest + +import ( + "regexp" + "strings" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestCompositionBuilder(t *testing.T) { + tests := []struct { + name string + templates map[string]string + compositions []struct { + templateKey string + config map[string]any + } + expectedOutput string + expectError bool + }{ + { + name: "kafka_with_user", + templates: map[string]string{ + "kafka": `resource "aiven_kafka" "example_kafka" { + project = data.aiven_project.example_project.project + cloud_name = "{{ .cloud_name }}" + plan = "{{ .plan }}" + service_name = "{{ .service_name }}" + maintenance_window_dow = "{{ .maintenance_window_dow }}" + maintenance_window_time = "{{ .maintenance_window_time }}" + kafka_user_config { + kafka_rest = {{ .kafka_rest }} + kafka_connect = {{ .kafka_connect }} + schema_registry = {{ .schema_registry }} + kafka_version = "{{ .kafka_version }}" + kafka { + group_max_session_timeout_ms = {{ .group_max_session_timeout_ms }} + log_retention_bytes = {{ .log_retention_bytes }} + } + public_access { + kafka_rest = {{ .kafka_rest_public }} + kafka_connect = {{ .kafka_connect_public }} + } + } +}`, + "kafka_user": `resource "aiven_kafka_user" "example_service_user" { + service_name = aiven_kafka.example_kafka.service_name + project = data.aiven_project.example_project.project + username = "{{ .username }}" + password = "{{ .password }}" +}`, + "project_data": `data "aiven_project" "example_project" { + project = "{{ .project }}" +}`, + }, + compositions: []struct { + templateKey string + config map[string]any + }{ + { + templateKey: "project_data", + config: map[string]any{ + "project": "example-project", + }, + }, + { + templateKey: "kafka", + config: map[string]any{ + "cloud_name": "google-europe-west1", + "plan": "business-4", + "service_name": "example-kafka", + "maintenance_window_dow": "monday", + "maintenance_window_time": "10:00:00", + "kafka_rest": true, + "kafka_connect": true, + "schema_registry": true, + "kafka_version": "3.5", + "group_max_session_timeout_ms": 70000, + "log_retention_bytes": 1000000000, + "kafka_rest_public": true, + "kafka_connect_public": true, + }, + }, + { + templateKey: "kafka_user", + config: map[string]any{ + "username": "example-kafka-user", + "password": "dummy-password", + }, + }, + }, + expectedOutput: `data "aiven_project" "example_project" { + project = "example-project" +} +resource "aiven_kafka" "example_kafka" { + project = data.aiven_project.example_project.project + cloud_name = "google-europe-west1" + plan = "business-4" + service_name = "example-kafka" + maintenance_window_dow = "monday" + maintenance_window_time = "10:00:00" + kafka_user_config { + kafka_rest = true + kafka_connect = true + schema_registry = true + kafka_version = "3.5" + kafka { + group_max_session_timeout_ms = 70000 + log_retention_bytes = 1000000000 + } + public_access { + kafka_rest = true + kafka_connect = true + } + } +} +resource "aiven_kafka_user" "example_service_user" { + service_name = aiven_kafka.example_kafka.service_name + project = data.aiven_project.example_project.project + username = "example-kafka-user" + password = "dummy-password" +}`, + }, + { + name: "conditional_kafka_config", + templates: map[string]string{ + "kafka_base": `resource "aiven_kafka" "kafka" { + project = "{{ .project }}" + service_name = "{{ .service_name }}" + cloud_name = "{{ .cloud_name }}" + plan = "{{ .plan }}" +}`, + "kafka_config": ` kafka_user_config { + kafka_rest = {{ .kafka_rest }} + schema_registry = {{ .schema_registry }} + }`, + }, + compositions: []struct { + templateKey string + config map[string]any + }{ + { + templateKey: "kafka_base", + config: map[string]any{ + "project": "test-project", + "service_name": "test-kafka", + "cloud_name": "google-europe-west1", + "plan": "business-4", + }, + }, + }, + expectedOutput: `resource "aiven_kafka" "kafka" { + project = "test-project" + service_name = "test-kafka" + cloud_name = "google-europe-west1" + plan = "business-4" +}`, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + registry := NewTemplateRegistry("test") + + // Add all templates + for key, tmpl := range tt.templates { + err := registry.AddTemplate(t, key, tmpl) + assert.NoError(t, err) + } + + // Create composition + builder := registry.NewCompositionBuilder() + for _, comp := range tt.compositions { + builder.Add(comp.templateKey, comp.config) + } + + // Render and verify + result, err := builder.Render(t) + if tt.expectError { + assert.Error(t, err) + return + } + + assert.NoError(t, err) + assert.Equal(t, + normalizeHCL(tt.expectedOutput), + normalizeHCL(result), + "Rendered template should match expected output", + ) + }) + } +} + +func TestCompositionBuilderConditionalResources(t *testing.T) { + registry := NewTemplateRegistry("test") + + // Add templates for different resources + templates := map[string]string{ + "organization_data": `data "aiven_organization" "main" { + name = "{{ .org_name }}" +}`, + "billing_group": `resource "aiven_billing_group" "example_billing_group" { + name = "{{ .name }}" + billing_currency = "{{ .currency }}" + vat_id = "{{ .vat_id }}" + parent_id = data.aiven_organization.main.id +}`, + "project": `resource "aiven_project" "example_project" { + project = "{{ .project_name }}" + billing_group = aiven_billing_group.example_billing_group.id +}`, + "org_unit": `resource "aiven_organizational_unit" "example_unit" { + name = "{{ .name }}" + parent_id = data.aiven_organization.main.id +}`, + "redis": `resource "aiven_redis" "redis1" { + project = aiven_project.example_project.project + cloud_name = "{{ .cloud_name }}" + plan = "{{ .plan }}" + service_name = "{{ .service_name }}" + maintenance_window_dow = "{{ .maintenance_window_dow }}" + maintenance_window_time = "{{ .maintenance_window_time }}" + redis_user_config { + redis_maxmemory_policy = "{{ .maxmemory_policy }}" + public_access { + redis = {{ .public_access }} + } + } +}`} + + // Add all templates to registry + for key, tmpl := range templates { + registry.MustAddTemplate(t, key, tmpl) + } + + tests := []struct { + name string + includeBilling bool + includeOrgUnit bool + includeRedis bool + expectedResources int + config map[string]map[string]any + expectedOutput string + }{ + { + name: "all_resources", + includeBilling: true, + includeOrgUnit: true, + includeRedis: true, + expectedResources: 5, // data source + 4 resources + config: map[string]map[string]any{ + "organization_data": { + "org_name": "my-organization", + }, + "billing_group": { + "name": "example-billing-group", + "currency": "USD", + "vat_id": "123ABC", + }, + "project": { + "project_name": "example-project", + }, + "org_unit": { + "name": "Example organizational unit", + }, + "redis": { + "cloud_name": "google-europe-west1", + "plan": "business-4", + "service_name": "my-redis1", + "maintenance_window_dow": "monday", + "maintenance_window_time": "10:00:00", + "maxmemory_policy": "allkeys-random", + "public_access": true, + }, + }, + expectedOutput: `data "aiven_organization" "main" { + name = "my-organization" +} +resource "aiven_billing_group" "example_billing_group" { + name = "example-billing-group" + billing_currency = "USD" + vat_id = "123ABC" + parent_id = data.aiven_organization.main.id +} +resource "aiven_project" "example_project" { + project = "example-project" + billing_group = aiven_billing_group.example_billing_group.id +} +resource "aiven_organizational_unit" "example_unit" { + name = "Example organizational unit" + parent_id = data.aiven_organization.main.id +} +resource "aiven_redis" "redis1" { + project = aiven_project.example_project.project + cloud_name = "google-europe-west1" + plan = "business-4" + service_name = "my-redis1" + maintenance_window_dow = "monday" + maintenance_window_time = "10:00:00" + redis_user_config { + redis_maxmemory_policy = "allkeys-random" + public_access { + redis = true + } + } +}`, + }, + { + name: "minimal_setup", + includeBilling: true, + includeOrgUnit: false, + includeRedis: false, + expectedResources: 3, // data source + 2 resources + config: map[string]map[string]any{ + "organization_data": { + "org_name": "my-organization", + }, + "billing_group": { + "name": "example-billing-group", + "currency": "USD", + "vat_id": "123ABC", + }, + "project": { + "project_name": "example-project", + }, + }, + expectedOutput: `data "aiven_organization" "main" { + name = "my-organization" +} +resource "aiven_billing_group" "example_billing_group" { + name = "example-billing-group" + billing_currency = "USD" + vat_id = "123ABC" + parent_id = data.aiven_organization.main.id +} +resource "aiven_project" "example_project" { + project = "example-project" + billing_group = aiven_billing_group.example_billing_group.id +}`, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + builder := registry.NewCompositionBuilder() + + // Always add organization data source + builder.Add("organization_data", tt.config["organization_data"]) + + // Add billing group and project (they go together) + if tt.includeBilling { + builder.Add("billing_group", tt.config["billing_group"]) + builder.Add("project", tt.config["project"]) + } + + // Conditionally add organizational unit + builder.AddIf(tt.includeOrgUnit, "org_unit", tt.config["org_unit"]) + + // Conditionally add Redis service + builder.AddIf(tt.includeRedis, "redis", tt.config["redis"]) + + result := builder.MustRender(t) + + // Verify the rendered output + assert.Equal(t, + normalizeHCL(tt.expectedOutput), + normalizeHCL(result), + "Rendered template should match expected output", + ) + + // Count number of resources in output + resourceCount := strings.Count(result, "resource") + strings.Count(result, "data") + assert.Equal(t, tt.expectedResources, resourceCount, "Number of resources should match expected count") + }) + } +} + +// normalizeHCL function remains the same +func normalizeHCL(s string) string { + // Remove all whitespace between blocks + s = regexp.MustCompile(`}\s+resource`).ReplaceAllString(s, "}\nresource") + s = regexp.MustCompile(`}\s+data`).ReplaceAllString(s, "}\ndata") + + // Remove all empty lines + s = regexp.MustCompile(`(?m)^\s*$`).ReplaceAllString(s, "") + + // Remove leading/trailing whitespace + s = strings.TrimSpace(s) + + // Normalize line endings + s = strings.ReplaceAll(s, "\r\n", "\n") + + return s +} diff --git a/internal/sdkprovider/provider/provider.go b/internal/sdkprovider/provider/provider.go index 5f35201be..4e0634002 100644 --- a/internal/sdkprovider/provider/provider.go +++ b/internal/sdkprovider/provider/provider.go @@ -268,6 +268,7 @@ func Provider(version string) (*schema.Provider, error) { "aiven_mirrormaker_replication_flow": kafka.ResourceMirrorMakerReplicationFlow(), "aiven_kafka_connect": kafka.ResourceKafkaConnect(), "aiven_kafka_mirrormaker": kafka.ResourceKafkaMirrormaker(), + "aiven_kafka_quota": kafka.ResourceKafkaQuota(), // clickhouse "aiven_clickhouse": clickhouse.ResourceClickhouse(), diff --git a/internal/sdkprovider/service/kafka/kafka_quota.go b/internal/sdkprovider/service/kafka/kafka_quota.go new file mode 100644 index 000000000..bd012f64d --- /dev/null +++ b/internal/sdkprovider/service/kafka/kafka_quota.go @@ -0,0 +1,206 @@ +package kafka + +import ( + "context" + "fmt" + + avngen "github.com/aiven/go-client-codegen" + "github.com/aiven/go-client-codegen/handler/kafka" + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/validation" + + "github.com/aiven/terraform-provider-aiven/internal/common" + "github.com/aiven/terraform-provider-aiven/internal/schemautil" +) + +var aivenKafkaQuotaSchema = map[string]*schema.Schema{ + "project": schemautil.CommonSchemaProjectReference, + "service_name": schemautil.CommonSchemaServiceNameReference, + "user": { + Type: schema.TypeString, + Optional: true, + ForceNew: true, + Description: ` +Represents a logical group of clients, assigned a unique name by the client application. +Quotas can be applied based on user, client-id, or both. +The most relevant quota is chosen for each connection. +All connections within a quota group share the same quota. +It is possible to set default quotas for each (user, client-id), user or client-id group by specifying 'default'`, + ValidateFunc: schemautil.GetServiceUserValidateFunc(), + }, + "client_id": { + Type: schema.TypeString, + Optional: true, + ForceNew: true, + Description: ` +Represents a logical group of clients, assigned a unique name by the client application. +Quotas can be applied based on user, client-id, or both. +The most relevant quota is chosen for each connection. +All connections within a quota group share the same quota. +It is possible to set default quotas for each (user, client-id), user or client-id group by specifying 'default'`, + ValidateFunc: validation.All( + validation.StringLenBetween(1, 255), + validation.StringIsNotEmpty, + ), + }, + "consumer_byte_rate": { + Type: schema.TypeInt, + Optional: true, + ForceNew: true, + Description: ` +Defines the bandwidth limit in bytes/sec for each group of clients sharing a quota. +Every distinct client group is allocated a specific quota, as defined by the cluster, on a per-broker basis. +Exceeding this limit results in client throttling.`, + ValidateFunc: validation.All( + validation.IntAtLeast(0), + validation.IntAtMost(1073741824), + ), + }, + "producer_byte_rate": { + Type: schema.TypeInt, + Optional: true, + ForceNew: true, + Description: ` +Defines the bandwidth limit in bytes/sec for each group of clients sharing a quota. +Every distinct client group is allocated a specific quota, as defined by the cluster, on a per-broker basis. +Exceeding this limit results in client throttling.`, + ValidateFunc: validation.All( + validation.IntAtLeast(0), + validation.IntAtMost(1073741824), + ), + }, + "request_percentage": { + Type: schema.TypeInt, + Optional: true, + ForceNew: true, + Description: ` +Sets the maximum percentage of CPU time that a client group can use on request handler I/O and network threads per broker within a quota window. +Exceeding this limit triggers throttling. +The quota, expressed as a percentage, also indicates the total allowable CPU usage for the client groups sharing the quota.`, + ValidateFunc: validation.All( + validation.IntAtLeast(0), + validation.IntAtMost(100), + ), + }, +} + +func ResourceKafkaQuota() *schema.Resource { + return &schema.Resource{ + Description: "Creates and manages quotas for an Aiven for Apache Kafka® service user.", + CreateContext: common.WithGenClient(resourceKafkaQuotaCreate), + ReadContext: common.WithGenClient(resourceKafkaQuotaRead), + DeleteContext: common.WithGenClient(resourceKafkaQuotaDelete), + Timeouts: schemautil.DefaultResourceTimeouts(), + + Schema: aivenKafkaQuotaSchema, + CustomizeDiff: validateKafkaQuotaDiff, + } +} + +func resourceKafkaQuotaCreate(ctx context.Context, d *schema.ResourceData, client avngen.Client) error { + var ( + project = d.Get("project").(string) + service = d.Get("service_name").(string) + user = d.Get("user").(string) + clientID = d.Get("client_id").(string) + + req kafka.ServiceKafkaQuotaCreateIn + ) + + if err := schemautil.ResourceDataGet( + d, + &req, + schemautil.RenameAlias("client_id", "client-id"), + ); err != nil { + return err + } + + if err := client.ServiceKafkaQuotaCreate(ctx, project, service, &req); err != nil { + return err + } + + d.SetId(schemautil.BuildResourceID(project, service, clientID, user)) + + return resourceKafkaQuotaRead(ctx, d, client) +} + +func resourceKafkaQuotaRead(ctx context.Context, d *schema.ResourceData, client avngen.Client) error { + project, serviceName, clientID, user, err := schemautil.SplitResourceID4(d.Id()) + if err != nil { + return err + } + + var params [][2]string + if user != "" { + params = append(params, kafka.ServiceKafkaQuotaDescribeUser(user)) + } + + if clientID != "" { + params = append(params, kafka.ServiceKafkaQuotaDescribeClientId(clientID)) + } + + resp, err := client.ServiceKafkaQuotaDescribe( + ctx, + project, + serviceName, + params..., + ) + if err != nil { + return err + } + + return schemautil.ResourceDataSet( + aivenKafkaQuotaSchema, + d, + resp, + schemautil.RenameAlias("client_id", "client-id"), + ) +} + +func resourceKafkaQuotaDelete(ctx context.Context, d *schema.ResourceData, client avngen.Client) error { + var ( + project = d.Get("project").(string) + serviceName = d.Get("service_name").(string) + clientID = d.Get("client_id").(string) + user = d.Get("user").(string) + ) + + var params [][2]string + if user != "" { + params = append(params, kafka.ServiceKafkaQuotaDeleteUser(user)) + } + + if clientID != "" { + params = append(params, kafka.ServiceKafkaQuotaDeleteClientId(clientID)) + } + + return client.ServiceKafkaQuotaDelete( + ctx, + project, + serviceName, + params..., + ) +} + +func validateKafkaQuotaDiff(_ context.Context, d *schema.ResourceDiff, _ interface{}) error { + var ( + user = d.Get("user").(string) + clientID = d.Get("client_id").(string) + ) + + if user == "" && clientID == "" { + return fmt.Errorf("at least one of user or client_id must be specified") + } + + var ( + consumerByteRate = d.Get("consumer_byte_rate").(int) + producerByteRate = d.Get("producer_byte_rate").(int) + requestPercentage = d.Get("request_percentage").(int) + ) + + if consumerByteRate == 0 && producerByteRate == 0 && requestPercentage == 0 { + return fmt.Errorf("at least one quota parameter must be specified") + } + + return nil +} diff --git a/internal/sdkprovider/service/kafka/kafka_quota_test.go b/internal/sdkprovider/service/kafka/kafka_quota_test.go new file mode 100644 index 000000000..af80e64ac --- /dev/null +++ b/internal/sdkprovider/service/kafka/kafka_quota_test.go @@ -0,0 +1,233 @@ +package kafka_test + +import ( + "context" + "fmt" + "os" + "regexp" + "testing" + + "github.com/aiven/go-client-codegen/handler/kafka" + "github.com/hashicorp/terraform-plugin-testing/helper/acctest" + "github.com/hashicorp/terraform-plugin-testing/helper/resource" + "github.com/hashicorp/terraform-plugin-testing/terraform" + + acc "github.com/aiven/terraform-provider-aiven/internal/acctest" + "github.com/aiven/terraform-provider-aiven/internal/common" + "github.com/aiven/terraform-provider-aiven/internal/schemautil" +) + +const kafkaQuotaResource = "aiven_kafka_quota" + +func TestAccAivenKafkaQuota(t *testing.T) { + var ( + registry = acc.NewTemplateRegistry(kafkaQuotaResource) + randName = acctest.RandStringFromCharSet(10, acctest.CharSetAlphaNum) + serviceName = fmt.Sprintf("test-acc-sr-%s", randName) + projectName = os.Getenv("AIVEN_PROJECT_NAME") + ) + + // Add templates + registry.MustAddTemplate(t, "project_data", ` +data "aiven_project" "foo" { + project = "{{ .project }}" +}`) + + registry.MustAddTemplate(t, "aiven_kafka", ` +resource "aiven_kafka" "bar" { + project = data.aiven_project.foo.project + cloud_name = "google-europe-west1" + plan = "startup-2" + service_name = "{{ .service_name }}" + maintenance_window_dow = "monday" + maintenance_window_time = "10:00:00" +}`) + + registry.MustAddTemplate(t, "kafka_quota_full", ` +resource "aiven_kafka_quota" "{{ .resource_name }}" { + project = data.aiven_project.foo.project + service_name = aiven_kafka.bar.service_name + user = "{{ .user }}" + client_id = "{{ .client_id }}" + consumer_byte_rate = {{ .consumer_byte_rate }} + producer_byte_rate = {{ .producer_byte_rate }} + request_percentage = {{ .request_percentage }} +}`) + + registry.MustAddTemplate(t, "kafka_quota_user", ` +resource "aiven_kafka_quota" "{{ .resource_name }}" { + project = data.aiven_project.foo.project + service_name = aiven_kafka.bar.service_name + user = "{{ .user }}" + request_percentage = {{ .request_percentage }} +}`) + + registry.MustAddTemplate(t, "kafka_quota_client_id", ` +resource "aiven_kafka_quota" "{{ .resource_name }}" { + project = data.aiven_project.foo.project + service_name = aiven_kafka.bar.service_name + client_id = "{{ .client_id }}" + producer_byte_rate = {{ .producer_byte_rate }} +}`) + + registry.MustAddTemplate(t, "invalid", ` +resource "aiven_kafka_quota" "{{ .resource_name }}" { + project = data.aiven_project.foo.project + service_name = aiven_kafka.bar.service_name + consumer_byte_rate = {{ .consumer_byte_rate }} + producer_byte_rate = {{ .producer_byte_rate }} + request_percentage = {{ .request_percentage }} +}`) + + var newComposition = func() *acc.CompositionBuilder { + return registry.NewCompositionBuilder(). + Add("project_data", map[string]interface{}{ + "project": projectName}). + Add("aiven_kafka", map[string]interface{}{ + "service_name": serviceName, + }) + } + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { acc.TestAccPreCheck(t) }, + ProtoV6ProviderFactories: acc.TestProtoV6ProviderFactories, + CheckDestroy: testAccCheckAivenKafkaQuotaDestroy, + Steps: []resource.TestStep{ + { + Config: newComposition(). + Add("kafka_quota_full", map[string]any{ + "resource_name": "full", + "service_name": serviceName, + "user": fmt.Sprintf("acc_test_user_%s", randName), + "client_id": fmt.Sprintf("acc_test_client_%s", randName), + "consumer_byte_rate": 1000, + "producer_byte_rate": 1000, + "request_percentage": 101, + }). + MustRender(t), + ExpectError: regexp.MustCompile(`expected .+ to be at (?:most|least) \(\d+\), got -?\d+`), + }, + { + Config: newComposition(). + Add("invalid", map[string]any{ + "resource_name": "invalid", + "service_name": serviceName, + "consumer_byte_rate": 1000, + "producer_byte_rate": 1000, + "request_percentage": 10, + }). + MustRender(t), + ExpectError: regexp.MustCompile(`at least one of user or client_id must be specified`), + }, + { + Config: newComposition(). + Add("kafka_quota_full", map[string]any{ + "resource_name": "full", + "service_name": serviceName, + "user": fmt.Sprintf("acc_test_user_%s", randName), + "client_id": fmt.Sprintf("acc_test_client_%s", randName), + "consumer_byte_rate": 1000, + "producer_byte_rate": 1000, + "request_percentage": 10, + }). + Add("kafka_quota_user", map[string]any{ + "resource_name": "user", + "service_name": serviceName, + "user": fmt.Sprintf("acc_test_user_%s", randName), + "request_percentage": 20, + }). + MustRender(t), + Check: resource.ComposeTestCheckFunc( + resource.TestCheckResourceAttr(fmt.Sprintf("%s.full", kafkaQuotaResource), "project", projectName), + resource.TestCheckResourceAttr(fmt.Sprintf("%s.full", kafkaQuotaResource), "service_name", serviceName), + resource.TestCheckResourceAttr(fmt.Sprintf("%s.full", kafkaQuotaResource), "user", fmt.Sprintf("acc_test_user_%s", randName)), + resource.TestCheckResourceAttr(fmt.Sprintf("%s.full", kafkaQuotaResource), "client_id", fmt.Sprintf("acc_test_client_%s", randName)), + resource.TestCheckResourceAttr(fmt.Sprintf("%s.full", kafkaQuotaResource), "consumer_byte_rate", "1000"), + resource.TestCheckResourceAttr(fmt.Sprintf("%s.full", kafkaQuotaResource), "producer_byte_rate", "1000"), + resource.TestCheckResourceAttr(fmt.Sprintf("%s.full", kafkaQuotaResource), "request_percentage", "10"), + + resource.TestCheckResourceAttr(fmt.Sprintf("%s.user", kafkaQuotaResource), "project", projectName), + resource.TestCheckResourceAttr(fmt.Sprintf("%s.user", kafkaQuotaResource), "service_name", serviceName), + resource.TestCheckResourceAttr(fmt.Sprintf("%s.user", kafkaQuotaResource), "user", fmt.Sprintf("acc_test_user_%s", randName)), + resource.TestCheckResourceAttr(fmt.Sprintf("%s.user", kafkaQuotaResource), "request_percentage", "20"), + ), + }, + { + Config: newComposition(). + Add("kafka_quota_client_id", map[string]any{ + "resource_name": "client", + "service_name": serviceName, + "client_id": fmt.Sprintf("acc_test_client_%s", randName), + "producer_byte_rate": 1000, + }). + MustRender(t), + Check: resource.ComposeTestCheckFunc( + resource.TestCheckResourceAttr(fmt.Sprintf("%s.client", kafkaQuotaResource), "project", projectName), + resource.TestCheckResourceAttr(fmt.Sprintf("%s.client", kafkaQuotaResource), "service_name", serviceName), + resource.TestCheckResourceAttr(fmt.Sprintf("%s.client", kafkaQuotaResource), "client_id", fmt.Sprintf("acc_test_client_%s", randName)), + resource.TestCheckResourceAttr(fmt.Sprintf("%s.client", kafkaQuotaResource), "producer_byte_rate", "1000"), + ), + }, + { + Taint: []string{fmt.Sprintf("%s.client", kafkaQuotaResource)}, + Config: newComposition(). + Add("kafka_quota_client_id", map[string]any{ + "resource_name": "client", + "service_name": serviceName, + "client_id": fmt.Sprintf("acc_test_client_%s", randName), + "producer_byte_rate": 1000, + }). + MustRender(t), + Check: resource.ComposeTestCheckFunc( + resource.TestCheckResourceAttr(fmt.Sprintf("%s.client", kafkaQuotaResource), "project", projectName), + resource.TestCheckResourceAttr(fmt.Sprintf("%s.client", kafkaQuotaResource), "service_name", serviceName), + resource.TestCheckResourceAttr(fmt.Sprintf("%s.client", kafkaQuotaResource), "client_id", fmt.Sprintf("acc_test_client_%s", randName)), + resource.TestCheckResourceAttr(fmt.Sprintf("%s.client", kafkaQuotaResource), "producer_byte_rate", "1000"), + ), + }, + }, + }) +} + +func testAccCheckAivenKafkaQuotaDestroy(s *terraform.State) error { + var ( + c, err = acc.GetTestGenAivenClient() + ctx = context.Background() + ) + + if err != nil { + return fmt.Errorf("failed to instantiate GenAiven client: %w", err) + } + + for _, rs := range s.RootModule().Resources { + if rs.Type != "aiven_kafka_quota" { + continue + } + + p, sn, cID, u, err := schemautil.SplitResourceID4(rs.Primary.ID) + if err != nil { + return fmt.Errorf("error splitting resource ID: %w", err) + } + + var params [][2]string + if u != "" { + params = append(params, kafka.ServiceKafkaQuotaDeleteUser(u)) + } + if cID != "" { + params = append(params, kafka.ServiceKafkaQuotaDeleteClientId(cID)) + } + + _, err = c.ServiceKafkaQuotaDescribe(ctx, p, sn, params...) + if err != nil { + if !common.IsCritical(err) { + return nil + } + + return err + } + + return fmt.Errorf("kafka quota still exists") + } + + return nil +}