From 876143e3ed4c6e09dbc7845bb794a0bc3bd742b9 Mon Sep 17 00:00:00 2001 From: Ivan Savciuc Date: Thu, 14 Sep 2023 16:35:26 +0300 Subject: [PATCH] feat(kafkatopic): creation simplification --- .../service/kafkatopic/kafka_topic.go | 20 ++---- .../service/kafkatopic/kafka_topic_cache.go | 4 ++ .../service/kafkatopic/kafka_topic_create.go | 68 ------------------- 3 files changed, 11 insertions(+), 81 deletions(-) delete mode 100644 internal/sdkprovider/service/kafkatopic/kafka_topic_create.go diff --git a/internal/sdkprovider/service/kafkatopic/kafka_topic.go b/internal/sdkprovider/service/kafkatopic/kafka_topic.go index 3eab1ec1c..49075d5da 100644 --- a/internal/sdkprovider/service/kafkatopic/kafka_topic.go +++ b/internal/sdkprovider/service/kafkatopic/kafka_topic.go @@ -289,19 +289,13 @@ func resourceKafkaTopicCreate(ctx context.Context, d *schema.ResourceData, m int Tags: getTags(d), } - w := &kafkaTopicCreateWaiter{ - Context: ctx, - Client: m.(*aiven.Client), - Project: project, - ServiceName: serviceName, - CreateRequest: createRequest, - } - - timeout := d.Timeout(schema.TimeoutCreate) - - // nolint:staticcheck // TODO: Migrate to helper/retry package to avoid deprecated WaitForStateContext. - _, err = w.Conf(timeout).WaitForStateContext(ctx) - if err != nil { + err = m.(*aiven.Client).KafkaTopics.Create( + ctx, + project, + serviceName, + createRequest, + ) + if err != nil && !aiven.IsAlreadyExists(err) { return diag.FromErr(err) } diff --git a/internal/sdkprovider/service/kafkatopic/kafka_topic_cache.go b/internal/sdkprovider/service/kafkatopic/kafka_topic_cache.go index 0c724e2ec..932759eb0 100644 --- a/internal/sdkprovider/service/kafkatopic/kafka_topic_cache.go +++ b/internal/sdkprovider/service/kafkatopic/kafka_topic_cache.go @@ -172,6 +172,10 @@ func (t *kafkaTopicCache) GetV1List(projectName, serviceName string) []string { } // DeleteTopicFromCache Invalidates cache for the topic +// This function only exists to pass acceptance tests. Cache invalidation +// happens automatically in Terraform when used in the real-life world between +// each subsequent operation. However, during the acceptance test execution, +// we need to mimic the cache invalidation mechanism by calling this function. func DeleteTopicFromCache(projectName, serviceName, topicName string) { t := getTopicCache() t.Lock() diff --git a/internal/sdkprovider/service/kafkatopic/kafka_topic_create.go b/internal/sdkprovider/service/kafkatopic/kafka_topic_create.go deleted file mode 100644 index 429885bec..000000000 --- a/internal/sdkprovider/service/kafkatopic/kafka_topic_create.go +++ /dev/null @@ -1,68 +0,0 @@ -package kafkatopic - -import ( - "context" - "log" - "time" - - "github.com/aiven/aiven-go-client/v2" - "github.com/hashicorp/terraform-plugin-testing/helper/resource" -) - -// kafkaTopicCreateWaiter is used to create topics. Since topics are often -// created right after Kafka service is created there may be temporary issues -// that prevent creating the topics like all brokers not being online. This -// allows retrying the operation until failing it. -type kafkaTopicCreateWaiter struct { - Context context.Context - Client *aiven.Client - Project string - ServiceName string - CreateRequest aiven.CreateKafkaTopicRequest -} - -// RefreshFunc will call the Aiven client and refresh it's state. -// nolint:staticcheck // TODO: Migrate to helper/retry package to avoid deprecated resource.StateRefreshFunc. -func (w *kafkaTopicCreateWaiter) RefreshFunc() resource.StateRefreshFunc { - // Should check if topic does not exist before create - // Assumes it exists, should prove it doesn't by getting no error - return func() (interface{}, string, error) { - err := w.Client.KafkaTopics.Create( - w.Context, - w.Project, - w.ServiceName, - w.CreateRequest, - ) - - if err != nil { - // If some brokers are offline while the request is being executed - // the operation may fail. - aivenError, ok := err.(aiven.Error) - if !ok { - return nil, "", err - } - - if !aiven.IsAlreadyExists(aivenError) { - log.Printf("[DEBUG] Got error %v while waiting for topic to be created.", aivenError) - return nil, "CREATING", nil - } - } - - return w.CreateRequest.TopicName, "CREATED", nil - } -} - -// Conf sets up the configuration to refresh. -// nolint:staticcheck // TODO: Migrate to helper/retry package to avoid deprecated resource.StateRefreshFunc. -func (w *kafkaTopicCreateWaiter) Conf(timeout time.Duration) *resource.StateChangeConf { - log.Printf("[DEBUG] Create waiter timeout %.0f minutes", timeout.Minutes()) - - return &resource.StateChangeConf{ - Pending: []string{"CREATING"}, - Target: []string{"CREATED"}, - Refresh: w.RefreshFunc(), - Delay: 5 * time.Second, - Timeout: timeout, - MinTimeout: 10 * time.Second, - } -}