Skip to content

Commit

Permalink
Integrate new task tracker
Browse files Browse the repository at this point in the history
Signed-off-by: Stoyan Zhelyazkov <[email protected]>
  • Loading branch information
spacegospod committed Nov 20, 2024
1 parent 1078c62 commit 8b8694e
Show file tree
Hide file tree
Showing 12 changed files with 21 additions and 154 deletions.
106 changes: 0 additions & 106 deletions internal/api_client/sddc_manager_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,10 @@ import (
"fmt"
"log"
"net/http"
"strings"
"time"

"github.com/hashicorp/terraform-plugin-log/tflog"
"github.com/vmware/vcf-sdk-go/vcf"
"golang.org/x/exp/slices"
)

// SddcManagerClient model that represents properties to authenticate against VCF instance.
Expand All @@ -30,7 +28,6 @@ type SddcManagerClient struct {
allowUnverifiedTls bool
lastRefreshTime time.Time
isRefreshing bool
getTaskRetries int
}

// NewSddcManagerClient constructs new Client instance with vcf credentials.
Expand All @@ -42,13 +39,9 @@ func NewSddcManagerClient(username, password, url string, allowUnverifiedTls boo
allowUnverifiedTls: allowUnverifiedTls,
lastRefreshTime: time.Now(),
isRefreshing: false,
getTaskRetries: 0,
}
}

const maxGetTaskRetries int = 10
const maxTaskRetries int = 6

func (sddcManagerClient *SddcManagerClient) authEditor(ctx context.Context, req *http.Request) error {
// Refresh the access token every 20 minutes so that SDK operations won't start to
// fail with 401, 403 because of token expiration, during long-running tasks
Expand Down Expand Up @@ -106,89 +99,6 @@ func (sddcManagerClient *SddcManagerClient) Connect() error {
return nil
}

// WaitForTask Wait for a task to complete (waits for up to a minute).
func (sddcManagerClient *SddcManagerClient) WaitForTask(ctx context.Context, taskId string) error {
// Fetch task status 10 times with a delay of 20 seconds each time
taskStatusRetry := 10

for taskStatusRetry > 0 {
task, err := sddcManagerClient.getTask(ctx, taskId)
if err != nil {
log.Println("error = ", err)
return err
}
waitStatuses := []string{"in progress", "pending", "in_progress"}
if slices.Contains(waitStatuses, strings.ToLower(*task.Status)) {
time.Sleep(20 * time.Second)
taskStatusRetry--
continue
}

failStatuses := []string{"failed", "cancelled"}
if slices.Contains(failStatuses, strings.ToLower(*task.Status)) {
errorMsg := fmt.Sprintf("Task with ID = %s is in state %s", taskId, *task.Status)
log.Println(errorMsg)
return errors.New(errorMsg)
}

var completionTimestamp string
if task.CompletionTimestamp != nil {
completionTimestamp = *task.CompletionTimestamp
}

log.Printf("Task with ID = %s is in state %s, completed at %s", taskId, *task.Status, completionTimestamp)
return nil
}

return fmt.Errorf("timedout waiting for task %s", taskId)
}

// WaitForTaskComplete Wait for task till it completes (either succeeds or fails).
func (sddcManagerClient *SddcManagerClient) WaitForTaskComplete(ctx context.Context, taskId string, retry bool) error {
log.Printf("Getting status of task %s", taskId)
currentTaskRetries := 0
for {
task, err := sddcManagerClient.getTask(ctx, taskId)
if err != nil {
return err
}

waitStatuses := []string{"in progress", "pending", "in_progress"}
if slices.Contains(waitStatuses, strings.ToLower(*task.Status)) {
time.Sleep(20 * time.Second)
continue
}

failStatuses := []string{"failed", "cancelled"}
if slices.Contains(failStatuses, strings.ToLower(*task.Status)) {
errorMsg := fmt.Sprintf("Task with ID = %s , Name: %q Type: %q is in state %s", taskId, *task.Name, *task.Type, *task.Status)
tflog.Error(ctx, errorMsg)

if retry && currentTaskRetries < maxTaskRetries {
currentTaskRetries++
err := sddcManagerClient.retryTask(ctx, taskId)
if err != nil {
tflog.Error(ctx, fmt.Sprintf("Task %q %q failed after %d retries",
taskId, *task.Type, currentTaskRetries))
return err
}
} else {
return errors.New(errorMsg)
}
time.Sleep(20 * time.Second)
continue
}

var completionTimestamp string
if task.CompletionTimestamp != nil {
completionTimestamp = *task.CompletionTimestamp
}

log.Printf("Task with ID = %s is in state %s, completed at %s", taskId, *task.Status, completionTimestamp)
return nil
}
}

func (sddcManagerClient *SddcManagerClient) GetResourceIdAssociatedWithTask(ctx context.Context, taskId, resourceType string) (string, error) {
task, err := sddcManagerClient.getTask(ctx, taskId)
if err != nil {
Expand All @@ -210,29 +120,13 @@ func (sddcManagerClient *SddcManagerClient) getTask(ctx context.Context, taskId
res, err := apiClient.GetTaskWithResponse(ctx, taskId)
task, vcfErr := GetResponseAs[vcf.Task](res)
if err != nil || vcfErr != nil {
// retry the task up to maxGetTaskRetries
if sddcManagerClient.getTaskRetries < maxGetTaskRetries {
sddcManagerClient.getTaskRetries++
return sddcManagerClient.getTask(ctx, taskId)
}
log.Println("error = ", err)
return nil, err
}
// reset the counter
sddcManagerClient.getTaskRetries = 0

return task, nil
}

func (sddcManagerClient *SddcManagerClient) retryTask(ctx context.Context, taskId string) error {
apiClient := sddcManagerClient.ApiClient
_, err := apiClient.RetryTaskWithResponse(ctx, taskId)
if err != nil {
return err
}
return nil
}

type Response interface {
GetBody() []byte
StatusCode() int
Expand Down
6 changes: 1 addition & 5 deletions internal/certificates/certificate_operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,7 @@ func GenerateCertificateForResource(ctx context.Context, client *api_client.Sddc
api_client.LogError(vcfErr)
return errors.New(*vcfErr.Message)
}
err = client.WaitForTaskComplete(ctx, *task.Id, true)
if err != nil {
return err
}
return nil
return api_client.NewTaskTracker(ctx, client.ApiClient, *task.Id).WaitForTask()
}

func ReadCertificate(ctx context.Context, client *vcf.ClientWithResponses,
Expand Down
6 changes: 1 addition & 5 deletions internal/credentials/credential_operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,11 +206,7 @@ func executeCredentialsUpdate(ctx context.Context, updateSpec *vcf.CredentialsUp
api_client.LogError(vcfErr)
return errors.New(*vcfErr.Message)
}
if err := sddcClient.WaitForTask(ctx, *task.Id); err != nil {
return err
}

return nil
return api_client.NewTaskTracker(ctx, apiClient, *task.Id).WaitForTask()
}

func CreatePasswordChangeID(data *schema.ResourceData, operation string) (string, error) {
Expand Down
4 changes: 2 additions & 2 deletions internal/provider/resource_ceip.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func resourceCeipUpdate(ctx context.Context, d *schema.ResourceData, meta interf
return diag.FromErr(errors.New(*vcfErr.Message))
}

if vcfClient.WaitForTask(ctx, *task.Id) != nil {
if err = api_client.NewTaskTracker(ctx, apiClient, *task.Id).WaitForTask(); err != nil {
return diag.FromErr(err)
}

Expand Down Expand Up @@ -135,7 +135,7 @@ func resourceCeipDelete(ctx context.Context, d *schema.ResourceData, meta interf
return diag.FromErr(errors.New(*vcfErr.Message))
}

if vcfClient.WaitForTask(ctx, *task.Id) != nil {
if err = api_client.NewTaskTracker(ctx, apiClient, *task.Id).WaitForTask(); err != nil {
return diag.FromErr(err)
}

Expand Down
7 changes: 2 additions & 5 deletions internal/provider/resource_certificate.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ func resourceResourceCertificateCreate(ctx context.Context, data *schema.Resourc
}},
}

var taskId string
res, err := apiClient.ReplaceCertificatesWithResponse(ctx, domainID, certificateOperationSpec)
if err != nil {
return diag.FromErr(err)
Expand All @@ -95,12 +94,10 @@ func resourceResourceCertificateCreate(ctx context.Context, data *schema.Resourc
return diag.FromErr(errors.New(*vcfErr.Message))
}

taskId = *task.Id
err = vcfClient.WaitForTaskComplete(ctx, taskId, true)
if err != nil {
if err = api_client.NewTaskTracker(ctx, apiClient, *task.Id).WaitForTask(); err != nil {
return diag.FromErr(err)
}
data.SetId("cert:" + domainID + ":" + resourceType + ":" + taskId)
data.SetId("cert:" + domainID + ":" + resourceType + ":" + *task.Id)

return resourceResourceCertificateRead(ctx, data, meta)
}
Expand Down
12 changes: 4 additions & 8 deletions internal/provider/resource_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,12 +335,10 @@ func createCluster(ctx context.Context, domainId string, clusterSpec vcf.Cluster
api_client.LogError(vcfErr)
return "", diag.FromErr(errors.New(*vcfErr.Message))
}
taskId := *task.Id
err = vcfClient.WaitForTaskComplete(ctx, taskId, true)
if err != nil {
if err = api_client.NewTaskTracker(ctx, apiClient, *task.Id).WaitForTask(); err != nil {
return "", diag.FromErr(err)
}
clusterId, err := vcfClient.GetResourceIdAssociatedWithTask(ctx, taskId, "Cluster")
clusterId, err := vcfClient.GetResourceIdAssociatedWithTask(ctx, *task.Id, "Cluster")
if err != nil {
return "", diag.FromErr(err)
}
Expand All @@ -366,8 +364,7 @@ func updateCluster(ctx context.Context, clusterId string, clusterUpdateSpec vcf.
return diag.FromErr(errors.New(*vcfErr.Message))
}

err = vcfClient.WaitForTaskComplete(ctx, *task.Id, false)
if err != nil {
if err = api_client.NewTaskTracker(ctx, apiClient, *task.Id).WaitForTask(); err != nil {
return diag.FromErr(err)
}
return nil
Expand Down Expand Up @@ -400,8 +397,7 @@ func deleteCluster(ctx context.Context, clusterId string, vcfClient *api_client.
return diag.FromErr(errors.New(*vcfErr.Message))
}

err = vcfClient.WaitForTaskComplete(ctx, *task.Id, true)
if err != nil {
if err = api_client.NewTaskTracker(ctx, apiClient, *task.Id).WaitForTask(); err != nil {
return diag.FromErr(err)
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion internal/provider/resource_cluster_personality.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func resourceClusterPersonalityCreate(ctx context.Context, data *schema.Resource
return diag.FromErr(errors.New(*vcfErr.Message))
}

if err := meta.(*api_client.SddcManagerClient).WaitForTaskComplete(ctx, *task.Id, false); err != nil {
if err = api_client.NewTaskTracker(ctx, client, *task.Id).WaitForTask(); err != nil {
return diag.FromErr(err)
}

Expand Down
3 changes: 1 addition & 2 deletions internal/provider/resource_csr.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,7 @@ func resourceCsrCreate(ctx context.Context, data *schema.ResourceData, meta inte
api_client.LogError(vcfErr)
return diag.FromErr(errors.New(*vcfErr.Message))
}
err = vcfClient.WaitForTaskComplete(ctx, *task.Id, true)
if err != nil {
if err = api_client.NewTaskTracker(ctx, apiClient, *task.Id).WaitForTask(); err != nil {
return diag.FromErr(err)
}
data.SetId(fmt.Sprintf("csr:%s:%s:%s:%s", domainId, resourceType, resourceFqdn, *task.Id))
Expand Down
12 changes: 4 additions & 8 deletions internal/provider/resource_domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,12 +143,10 @@ func resourceDomainCreate(ctx context.Context, data *schema.ResourceData, meta i
api_client.LogError(vcfErr)
return diag.FromErr(errors.New(*vcfErr.Message))
}
taskId := *task.Id
err = vcfClient.WaitForTaskComplete(ctx, taskId, true)
if err != nil {
if err = api_client.NewTaskTracker(ctx, apiClient, *task.Id).WaitForTask(); err != nil {
return diag.FromErr(err)
}
domainId, err := vcfClient.GetResourceIdAssociatedWithTask(ctx, taskId, "Domain")
domainId, err := vcfClient.GetResourceIdAssociatedWithTask(ctx, *task.Id, "Domain")
if err != nil {
return diag.FromErr(err)
}
Expand Down Expand Up @@ -197,8 +195,7 @@ func resourceDomainUpdate(ctx context.Context, data *schema.ResourceData, meta i
return diag.FromErr(errors.New(*vcfErr.Message))
}

err = vcfClient.WaitForTaskComplete(ctx, *task.Id, false)
if err != nil {
if err = api_client.NewTaskTracker(ctx, apiClient, *task.Id).WaitForTask(); err != nil {
return diag.FromErr(err)
}
}
Expand Down Expand Up @@ -313,8 +310,7 @@ func resourceDomainDelete(ctx context.Context, data *schema.ResourceData, meta i
api_client.LogError(vcfErr)
return diag.FromErr(errors.New(*vcfErr.Message))
}
err = vcfClient.WaitForTaskComplete(ctx, *task.Id, true)
if err != nil {
if err = api_client.NewTaskTracker(ctx, apiClient, *task.Id).WaitForTask(); err != nil {
return diag.FromErr(err)
}

Expand Down
6 changes: 2 additions & 4 deletions internal/provider/resource_edge_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,7 @@ func resourceNsxEdgeClusterCreate(ctx context.Context, data *schema.ResourceData
}

tflog.Info(ctx, "Edge cluster creation has started.")
err = meta.(*api_client.SddcManagerClient).WaitForTaskComplete(ctx, *task.Id, false)
if err != nil {
if err = api_client.NewTaskTracker(ctx, client, *task.Id).WaitForTask(); err != nil {
return diag.FromErr(err)
}

Expand Down Expand Up @@ -283,8 +282,7 @@ func resourceNsxEdgeClusterUpdate(ctx context.Context, data *schema.ResourceData
return diag.FromErr(errors.New(*vcfErr.Message))
}

err = meta.(*api_client.SddcManagerClient).WaitForTaskComplete(ctx, *task.Id, false)
if err != nil {
if err = api_client.NewTaskTracker(ctx, client, *task.Id).WaitForTask(); err != nil {
return diag.FromErr(err)
}
}
Expand Down
3 changes: 1 addition & 2 deletions internal/provider/resource_external_certificate.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,7 @@ func resourceResourceExternalCertificateCreate(ctx context.Context, data *schema
api_client.LogError(vcfErr)
return diag.FromErr(errors.New(*vcfErr.Message))
}
err = vcfClient.WaitForTaskComplete(ctx, *task.Id, true)
if err != nil {
if err = api_client.NewTaskTracker(ctx, apiClient, *task.Id).WaitForTask(); err != nil {
return diag.FromErr(err)
}
data.SetId("ext_cert:" + domainID + ":" + resourceType + ":" + *task.Id)
Expand Down
8 changes: 2 additions & 6 deletions internal/provider/resource_host.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,7 @@ func resourceHostCreate(ctx context.Context, d *schema.ResourceData, meta interf
tflog.Info(ctx, fmt.Sprintf("%s commissionSpec commission initiated. waiting for task id = %s",
commissionSpec.Fqdn, *task.Id))

err = vcfClient.WaitForTaskComplete(ctx, *task.Id, false)
if err != nil {
tflog.Error(ctx, err.Error())
if err = api_client.NewTaskTrackerWithCustomPollingInterval(ctx, apiClient, *task.Id, time.Second*5).WaitForTask(); err != nil {
return diag.FromErr(err)
}
hostId, err := vcfClient.GetResourceIdAssociatedWithTask(ctx, *task.Id, "Esxi")
Expand Down Expand Up @@ -219,9 +217,7 @@ func resourceHostDelete(ctx context.Context, d *schema.ResourceData, meta interf

log.Printf("%s %s: Decommission task initiated. Task id %s",
d.Get("fqdn").(string), d.Id(), *task.Id)
err = vcfClient.WaitForTaskComplete(ctx, *task.Id, false)
if err != nil {
tflog.Error(ctx, err.Error())
if err = api_client.NewTaskTracker(ctx, apiClient, *task.Id).WaitForTask(); err != nil {
return diag.FromErr(err)
}

Expand Down

0 comments on commit 8b8694e

Please sign in to comment.