From af50770570f5670ad515c0916a97ea33b7c7f33f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Cie=C5=9Blak?= Date: Thu, 14 Nov 2024 14:00:49 +0100 Subject: [PATCH] feat: Task resource v1 readiness part 2 (#3170) - [x] Apply comments from https://github.com/Snowflake-Labs/terraform-provider-snowflake/pull/3086 - [x] Check if more helper functions from user resource could be used in task resource. - [ ] Apply comments from https://github.com/Snowflake-Labs/terraform-provider-snowflake/pull/3113 - [x] Cron https://github.com/Snowflake-Labs/terraform-provider-snowflake/pull/3113#discussion_r1789842978 - [ ] Resource logic https://github.com/Snowflake-Labs/terraform-provider-snowflake/pull/3113#discussion_r1789869045 - [ ] Refactor SDK suspend root logic for tasks (and overall suspend logic in SDK/resource) https://github.com/Snowflake-Labs/terraform-provider-snowflake/pull/3113#discussion_r1789872814 - [ ] Move some of the logic to SDK (if possible) - [ ] https://github.com/Snowflake-Labs/terraform-provider-snowflake/pull/3170#discussion_r1838382878 - [ ] Refactor task resuming in task resource (most likely with the use of defer) because currently, there may be cases that error can cause tasks to be not resumed. - [ ] Tests - [ ] External changes - https://github.com/Snowflake-Labs/terraform-provider-snowflake/pull/3113#discussion_r1789903936 - [ ] Add more complicated DAG structures to show the resource can handle more complex structures - [ ] Calling (`as` field) - https://github.com/Snowflake-Labs/terraform-provider-snowflake/pull/3113#discussion_r1789900590 - [ ] For showing how the DAG of tasks could be owned by a role other than the one created with Terraform (also with less privileges, only to run the task). - [ ] Check in one acceptance test why finalizer task in show_output is not set (is that Snowflake or mapping error). - [ ] Data source - [ ] Examples, documentation, and migration guide - [ ] Keep manually changed files after regeneration https://github.com/Snowflake-Labs/terraform-provider-snowflake/pull/3113#discussion_r1802561725 - [ ] Make config without $$ escapes needed - [ ] Support session paramters - [ ] Analyze non-deterministic test cases - [ ] Check test tasks_gen_integration_test.go:937 (and see why it's non deterministic). - [ ] Re-generate and list all the issues with asserts and models --- MIGRATION_GUIDE.md | 33 + docs/resources/task.md | 11 +- .../assert/objectassert/task_snowflake_ext.go | 36 +- .../resourceassert/task_resource_ext.go | 19 +- .../task_show_output_ext.go | 15 + .../bettertestspoc/config/config.go | 13 + .../config/model/task_model_ext.go | 14 + .../config/model/task_model_gen.go | 5 +- pkg/datasources/tasks_acceptance_test.go | 4 +- pkg/resources/resource_helpers_create.go | 11 + pkg/resources/resource_helpers_read.go | 14 + pkg/resources/task.go | 215 +++-- pkg/resources/task_acceptance_test.go | 769 +++++++++++++----- .../testdata/TestAcc_Task/basic/test.tf | 82 ++ .../testdata/TestAcc_Task/basic/variables.tf | 341 ++++++++ .../TestAcc_Task/with_task_dependency/test.tf | 49 ++ .../with_task_dependency/variables.tf | 18 + pkg/sdk/grants_impl.go | 2 +- pkg/sdk/tasks_gen.go | 4 + pkg/sdk/tasks_impl_gen.go | 2 +- pkg/sdk/testint/tasks_gen_integration_test.go | 8 +- 21 files changed, 1347 insertions(+), 318 deletions(-) create mode 100644 pkg/resources/testdata/TestAcc_Task/basic/test.tf create mode 100644 pkg/resources/testdata/TestAcc_Task/basic/variables.tf create mode 100644 pkg/resources/testdata/TestAcc_Task/with_task_dependency/test.tf create mode 100644 pkg/resources/testdata/TestAcc_Task/with_task_dependency/variables.tf diff --git a/MIGRATION_GUIDE.md b/MIGRATION_GUIDE.md index 9fe0fec214..085302fc64 100644 --- a/MIGRATION_GUIDE.md +++ b/MIGRATION_GUIDE.md @@ -7,6 +7,39 @@ across different versions. > [!TIP] > We highly recommend upgrading the versions one by one instead of bulk upgrades. +## v0.98.0 ➞ v0.99.0 + +### snowflake_task resource changes +Changes: +- `enabled` field changed to `started` and type changed to string with only boolean values available (see ["empty" values](./v1-preparations/CHANGES_BEFORE_V1.md#empty-values)) +- `shedule` field changed from single value to nested object that allows for specifying either minutes or cron + +Before: +```terraform +resource "snowflake_task" "example" { + # ... + schedule = "5 MINUTES" + # or + schedule = "USING SCHEDULE * * * * * UTC" + # ... +} +``` +After: +```terraform +resource "snowflake_task" "example" { + # ... + schedule { + minutes = 5 + # or + using_cron = "* * * * * UTC" + } + # ... +} +``` +- All task parameters defined in [the Snowflake documentation](https://docs.snowflake.com/en/sql-reference/parameters) added into the top-level schema and removed `session_paramters` map. +- `show_output` and `paramters` fields added for holding SHOW and SHOW PARAMETERS output (see [raw Snowflake output](./v1-preparations/CHANGES_BEFORE_V1.md#raw-snowflake-output)). +- Added support for finalizer tasks with `finalize` field. It conflicts with `after` and `schedule` (see [finalizer tasks](https://docs.snowflake.com/en/user-guide/tasks-graphs#release-and-cleanup-of-task-graphs)). + ## v0.97.0 ➞ v0.98.0 ### snowflake_streams data source changes diff --git a/docs/resources/task.md b/docs/resources/task.md index c7d8965aa3..4405b10da3 100644 --- a/docs/resources/task.md +++ b/docs/resources/task.md @@ -121,7 +121,7 @@ resource "snowflake_task" "test_task" { - `quoted_identifiers_ignore_case` (Boolean) Specifies whether letters in double-quoted object identifiers are stored and resolved as uppercase letters. By default, Snowflake preserves the case of alphabetic characters when storing and resolving double-quoted identifiers (see [Identifier resolution](https://docs.snowflake.com/en/sql-reference/identifiers-syntax.html#label-identifier-casing)). You can use this parameter in situations in which [third-party applications always use double quotes around identifiers](https://docs.snowflake.com/en/sql-reference/identifiers-syntax.html#label-identifier-casing-parameter). For more information, check [QUOTED_IDENTIFIERS_IGNORE_CASE docs](https://docs.snowflake.com/en/sql-reference/parameters#quoted-identifiers-ignore-case). - `rows_per_resultset` (Number) Specifies the maximum number of rows returned in a result set. A value of 0 specifies no maximum. For more information, check [ROWS_PER_RESULTSET docs](https://docs.snowflake.com/en/sql-reference/parameters#rows-per-resultset). - `s3_stage_vpce_dns_name` (String) Specifies the DNS name of an Amazon S3 interface endpoint. Requests sent to the internal stage of an account via [AWS PrivateLink for Amazon S3](https://docs.aws.amazon.com/AmazonS3/latest/userguide/privatelink-interface-endpoints.html) use this endpoint to connect. For more information, see [Accessing Internal stages with dedicated interface endpoints](https://docs.snowflake.com/en/user-guide/private-internal-stages-aws.html#label-aws-privatelink-internal-stage-network-isolation). For more information, check [S3_STAGE_VPCE_DNS_NAME docs](https://docs.snowflake.com/en/sql-reference/parameters#s3-stage-vpce-dns-name). -- `schedule` (String) The schedule for periodically running the task. This can be a cron or interval in minutes. (Conflicts with finalize and after) +- `schedule` (Block List, Max: 1) The schedule for periodically running the task. This can be a cron or interval in minutes. (Conflicts with finalize and after; when set, one of the sub-fields `minutes` or `using_cron` should be set) (see [below for nested schema](#nestedblock--schedule)) - `search_path` (String) Specifies the path to search to resolve unqualified object names in queries. For more information, see [Name resolution in queries](https://docs.snowflake.com/en/sql-reference/name-resolution.html#label-object-name-resolution-search-path). Comma-separated list of identifiers. An identifier can be a fully or partially qualified schema name. For more information, check [SEARCH_PATH docs](https://docs.snowflake.com/en/sql-reference/parameters#search-path). - `statement_queued_timeout_in_seconds` (Number) Amount of time, in seconds, a SQL statement (query, DDL, DML, etc.) remains queued for a warehouse before it is canceled by the system. This parameter can be used in conjunction with the [MAX_CONCURRENCY_LEVEL](https://docs.snowflake.com/en/sql-reference/parameters#label-max-concurrency-level) parameter to ensure a warehouse is never backlogged. For more information, check [STATEMENT_QUEUED_TIMEOUT_IN_SECONDS docs](https://docs.snowflake.com/en/sql-reference/parameters#statement-queued-timeout-in-seconds). - `statement_timeout_in_seconds` (Number) Amount of time, in seconds, after which a running SQL statement (query, DDL, DML, etc.) is canceled by the system. For more information, check [STATEMENT_TIMEOUT_IN_SECONDS docs](https://docs.snowflake.com/en/sql-reference/parameters#statement-timeout-in-seconds). @@ -159,6 +159,15 @@ resource "snowflake_task" "test_task" { - `parameters` (List of Object) Outputs the result of `SHOW PARAMETERS IN TASK` for the given task. (see [below for nested schema](#nestedatt--parameters)) - `show_output` (List of Object) Outputs the result of `SHOW TASKS` for the given task. (see [below for nested schema](#nestedatt--show_output)) + +### Nested Schema for `schedule` + +Optional: + +- `minutes` (Number) Specifies an interval (in minutes) of wait time inserted between runs of the task. Accepts positive integers only. (conflicts with `using_cron`) +- `using_cron` (String) Specifies a cron expression and time zone for periodically running the task. Supports a subset of standard cron utility syntax. (conflicts with `minutes`) + + ### Nested Schema for `parameters` diff --git a/pkg/acceptance/bettertestspoc/assert/objectassert/task_snowflake_ext.go b/pkg/acceptance/bettertestspoc/assert/objectassert/task_snowflake_ext.go index e340d8c13a..696d990459 100644 --- a/pkg/acceptance/bettertestspoc/assert/objectassert/task_snowflake_ext.go +++ b/pkg/acceptance/bettertestspoc/assert/objectassert/task_snowflake_ext.go @@ -4,9 +4,10 @@ import ( "errors" "fmt" "reflect" - "slices" "testing" + "github.com/stretchr/testify/assert" + "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/sdk" ) @@ -32,21 +33,13 @@ func (t *TaskAssert) HasNotEmptyId() *TaskAssert { return t } -func (t *TaskAssert) HasPredecessors(ids ...sdk.SchemaObjectIdentifier) *TaskAssert { +func (t *TaskAssert) HasPredecessorsInAnyOrder(ids ...sdk.SchemaObjectIdentifier) *TaskAssert { t.AddAssertion(func(t *testing.T, o *sdk.Task) error { t.Helper() - if len(o.Predecessors) != len(ids) { - return fmt.Errorf("expected %d (%v) predecessors, got %d (%v)", len(ids), ids, len(o.Predecessors), o.Predecessors) - } - var errs []error - for _, id := range ids { - if !slices.ContainsFunc(o.Predecessors, func(predecessorId sdk.SchemaObjectIdentifier) bool { - return predecessorId.FullyQualifiedName() == id.FullyQualifiedName() - }) { - errs = append(errs, fmt.Errorf("expected id: %s, to be in the list of predecessors: %v", id.FullyQualifiedName(), o.Predecessors)) - } + if !assert.ElementsMatch(t, ids, o.Predecessors) { + return fmt.Errorf("expected %v predecessors in task relations, got %v", ids, o.TaskRelations.Predecessors) } - return errors.Join(errs...) + return nil }) return t } @@ -54,20 +47,17 @@ func (t *TaskAssert) HasPredecessors(ids ...sdk.SchemaObjectIdentifier) *TaskAss func (t *TaskAssert) HasTaskRelations(expected sdk.TaskRelations) *TaskAssert { t.AddAssertion(func(t *testing.T, o *sdk.Task) error { t.Helper() - if len(o.TaskRelations.Predecessors) != len(expected.Predecessors) { - return fmt.Errorf("expected %d (%v) predecessors in task relations, got %d (%v)", len(expected.Predecessors), expected.Predecessors, len(o.TaskRelations.Predecessors), o.TaskRelations.Predecessors) - } - var errs []error - for _, id := range expected.Predecessors { - if !slices.ContainsFunc(o.TaskRelations.Predecessors, func(predecessorId sdk.SchemaObjectIdentifier) bool { - return predecessorId.FullyQualifiedName() == id.FullyQualifiedName() - }) { - errs = append(errs, fmt.Errorf("expected id: %s, to be in the list of predecessors in task relations: %v", id.FullyQualifiedName(), o.TaskRelations.Predecessors)) - } + errs := make([]error, 0) + if !assert.ElementsMatch(t, o.TaskRelations.Predecessors, expected.Predecessors) { + errs = append(errs, fmt.Errorf("expected %v predecessors in task relations, got %v", expected.Predecessors, o.TaskRelations.Predecessors)) } if !reflect.DeepEqual(expected.FinalizerTask, o.TaskRelations.FinalizerTask) { errs = append(errs, fmt.Errorf("expected finalizer task: %v; got: %v", expected.FinalizerTask, o.TaskRelations.FinalizerTask)) } + if expected.FinalizedRootTask != nil { + // This is not supported because we would have to traverse the task graph to find the root task. + errs = append(errs, fmt.Errorf("asserting FinalizedRootTask is not supported")) + } return errors.Join(errs...) }) return t diff --git a/pkg/acceptance/bettertestspoc/assert/resourceassert/task_resource_ext.go b/pkg/acceptance/bettertestspoc/assert/resourceassert/task_resource_ext.go index 4dd36b2cf8..0226d1d862 100644 --- a/pkg/acceptance/bettertestspoc/assert/resourceassert/task_resource_ext.go +++ b/pkg/acceptance/bettertestspoc/assert/resourceassert/task_resource_ext.go @@ -9,10 +9,27 @@ import ( "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/acceptance/bettertestspoc/assert" ) -func (t *TaskResourceAssert) HasAfterIds(ids ...sdk.SchemaObjectIdentifier) *TaskResourceAssert { +func (t *TaskResourceAssert) HasAfterIdsInOrder(ids ...sdk.SchemaObjectIdentifier) *TaskResourceAssert { t.AddAssertion(assert.ValueSet("after.#", strconv.FormatInt(int64(len(ids)), 10))) for i, id := range ids { t.AddAssertion(assert.ValueSet(fmt.Sprintf("after.%d", i), id.FullyQualifiedName())) } return t } + +func (t *TaskResourceAssert) HasScheduleMinutes(minutes int) *TaskResourceAssert { + t.AddAssertion(assert.ValueSet("schedule.#", "1")) + t.AddAssertion(assert.ValueSet("schedule.0.minutes", strconv.Itoa(minutes))) + return t +} + +func (t *TaskResourceAssert) HasScheduleCron(cron string) *TaskResourceAssert { + t.AddAssertion(assert.ValueSet("schedule.#", "1")) + t.AddAssertion(assert.ValueSet("schedule.0.using_cron", cron)) + return t +} + +func (t *TaskResourceAssert) HasNoScheduleSet() *TaskResourceAssert { + t.AddAssertion(assert.ValueSet("schedule.#", "0")) + return t +} diff --git a/pkg/acceptance/bettertestspoc/assert/resourceshowoutputassert/task_show_output_ext.go b/pkg/acceptance/bettertestspoc/assert/resourceshowoutputassert/task_show_output_ext.go index c058df020e..3bfea52bd6 100644 --- a/pkg/acceptance/bettertestspoc/assert/resourceshowoutputassert/task_show_output_ext.go +++ b/pkg/acceptance/bettertestspoc/assert/resourceshowoutputassert/task_show_output_ext.go @@ -50,3 +50,18 @@ func (t *TaskShowOutputAssert) HasTaskRelations(expected sdk.TaskRelations) *Tas } return t } + +func (t *TaskShowOutputAssert) HasNoSchedule() *TaskShowOutputAssert { + t.AddAssertion(assert.ResourceShowOutputValueSet("schedule", "")) + return t +} + +func (t *TaskShowOutputAssert) HasScheduleMinutes(minutes int) *TaskShowOutputAssert { + t.AddAssertion(assert.ResourceShowOutputValueSet("schedule", fmt.Sprintf("%d MINUTE", minutes))) + return t +} + +func (t *TaskShowOutputAssert) HasScheduleCron(cron string) *TaskShowOutputAssert { + t.AddAssertion(assert.ResourceShowOutputValueSet("schedule", fmt.Sprintf("USING CRON %s", cron))) + return t +} diff --git a/pkg/acceptance/bettertestspoc/config/config.go b/pkg/acceptance/bettertestspoc/config/config.go index a47a2a43d2..1e256a811c 100644 --- a/pkg/acceptance/bettertestspoc/config/config.go +++ b/pkg/acceptance/bettertestspoc/config/config.go @@ -127,6 +127,19 @@ func ConfigVariablesFromModel(t *testing.T, model ResourceModel) tfconfig.Variab return variables } +// ConfigVariablesFromModels can be used to create a list of objects that are referring to the same resource model. +// It's useful when there's a need to create associations between objects of the same type in Snowflake. +func ConfigVariablesFromModels(t *testing.T, variableName string, models ...ResourceModel) tfconfig.Variables { + t.Helper() + allVariables := make([]tfconfig.Variable, 0) + for _, model := range models { + allVariables = append(allVariables, tfconfig.ObjectVariable(ConfigVariablesFromModel(t, model))) + } + return tfconfig.Variables{ + variableName: tfconfig.ListVariable(allVariables...), + } +} + type nullVariable struct{} // MarshalJSON returns the JSON encoding of nullVariable. diff --git a/pkg/acceptance/bettertestspoc/config/model/task_model_ext.go b/pkg/acceptance/bettertestspoc/config/model/task_model_ext.go index 4968d42ed0..a61ea00f0c 100644 --- a/pkg/acceptance/bettertestspoc/config/model/task_model_ext.go +++ b/pkg/acceptance/bettertestspoc/config/model/task_model_ext.go @@ -71,3 +71,17 @@ func (t *TaskModel) WithUserTaskManagedInitialWarehouseSizeEnum(warehouseSize sd t.UserTaskManagedInitialWarehouseSize = tfconfig.StringVariable(string(warehouseSize)) return t } + +func (t *TaskModel) WithScheduleMinutes(minutes int) *TaskModel { + t.Schedule = tfconfig.MapVariable(map[string]tfconfig.Variable{ + "minutes": tfconfig.IntegerVariable(minutes), + }) + return t +} + +func (t *TaskModel) WithScheduleCron(cron string) *TaskModel { + t.Schedule = tfconfig.MapVariable(map[string]tfconfig.Variable{ + "cron": tfconfig.StringVariable(cron), + }) + return t +} diff --git a/pkg/acceptance/bettertestspoc/config/model/task_model_gen.go b/pkg/acceptance/bettertestspoc/config/model/task_model_gen.go index b6f96f259a..306c525fad 100644 --- a/pkg/acceptance/bettertestspoc/config/model/task_model_gen.go +++ b/pkg/acceptance/bettertestspoc/config/model/task_model_gen.go @@ -325,10 +325,7 @@ func (t *TaskModel) WithS3StageVpceDnsName(s3StageVpceDnsName string) *TaskModel return t } -func (t *TaskModel) WithSchedule(schedule string) *TaskModel { - t.Schedule = tfconfig.StringVariable(schedule) - return t -} +// schedule attribute type is not yet supported, so WithSchedule can't be generated func (t *TaskModel) WithSchema(schema string) *TaskModel { t.Schema = tfconfig.StringVariable(schema) diff --git a/pkg/datasources/tasks_acceptance_test.go b/pkg/datasources/tasks_acceptance_test.go index 79c6069365..de1a55755d 100644 --- a/pkg/datasources/tasks_acceptance_test.go +++ b/pkg/datasources/tasks_acceptance_test.go @@ -60,7 +60,9 @@ func tasks(databaseName string, schemaName string, taskName string) string { warehouse = snowflake_warehouse.test.name sql_statement = "SHOW FUNCTIONS" started = true - schedule = "15 MINUTES" + schedule { + minutes = 15 + } } data snowflake_tasks "t" { diff --git a/pkg/resources/resource_helpers_create.go b/pkg/resources/resource_helpers_create.go index 5ca92130bb..837fada163 100644 --- a/pkg/resources/resource_helpers_create.go +++ b/pkg/resources/resource_helpers_create.go @@ -62,6 +62,17 @@ func attributeDirectValueCreate[T any](d *schema.ResourceData, key string, creat return nil } +func attributeMappedValueCreate[T any](d *schema.ResourceData, key string, createField **T, mapper func(value any) (*T, error)) error { + if v, ok := d.GetOk(key); ok { + value, err := mapper(v) + if err != nil { + return err + } + *createField = value + } + return nil +} + func copyGrantsAttributeCreate(d *schema.ResourceData, isOrReplace bool, orReplaceField, copyGrantsField **bool) error { if isOrReplace { *orReplaceField = sdk.Bool(true) diff --git a/pkg/resources/resource_helpers_read.go b/pkg/resources/resource_helpers_read.go index b8f94a1e63..b3dcfcebf1 100644 --- a/pkg/resources/resource_helpers_read.go +++ b/pkg/resources/resource_helpers_read.go @@ -49,3 +49,17 @@ func setBooleanStringFromBoolProperty(d *schema.ResourceData, key string, proper } return nil } + +func attributeMappedValueReadOrDefault[T, R any](d *schema.ResourceData, key string, value *T, mapper func(*T) (R, error), defaultValue *R) error { + if value != nil { + mappedValue, err := mapper(value) + if err != nil { + return err + } + return d.Set(key, mappedValue) + } + if defaultValue != nil { + return d.Set(key, *defaultValue) + } + return d.Set(key, nil) +} diff --git a/pkg/resources/task.go b/pkg/resources/task.go index 498d1a38ca..74370f13cd 100644 --- a/pkg/resources/task.go +++ b/pkg/resources/task.go @@ -6,9 +6,12 @@ import ( "fmt" "log" "slices" + "strconv" "strings" "time" + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/validation" + "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/internal/collections" "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/internal/logging" "github.com/hashicorp/terraform-plugin-sdk/v2/diag" @@ -65,11 +68,29 @@ var taskSchema = map[string]*schema.Schema{ ConflictsWith: []string{"user_task_managed_initial_warehouse_size"}, }, "schedule": { - Type: schema.TypeString, - Optional: true, - DiffSuppressFunc: IgnoreChangeToCurrentSnowflakeValueInShow("schedule"), - Description: "The schedule for periodically running the task. This can be a cron or interval in minutes. (Conflicts with finalize and after)", - ConflictsWith: []string{"finalize", "after"}, + Type: schema.TypeList, + Optional: true, + MaxItems: 1, + Description: "The schedule for periodically running the task. This can be a cron or interval in minutes. (Conflicts with finalize and after; when set, one of the sub-fields `minutes` or `using_cron` should be set)", + ConflictsWith: []string{"finalize", "after"}, + Elem: &schema.Resource{ + Schema: map[string]*schema.Schema{ + "minutes": { + Type: schema.TypeInt, + Optional: true, + Description: "Specifies an interval (in minutes) of wait time inserted between runs of the task. Accepts positive integers only. (conflicts with `using_cron`)", + ValidateDiagFunc: validation.ToDiagFunc(validation.IntAtLeast(1)), + ExactlyOneOf: []string{"schedule.0.minutes", "schedule.0.using_cron"}, + }, + "using_cron": { + Type: schema.TypeString, + Optional: true, + Description: "Specifies a cron expression and time zone for periodically running the task. Supports a subset of standard cron utility syntax. (conflicts with `minutes`)", + DiffSuppressFunc: ignoreCaseSuppressFunc, + ExactlyOneOf: []string{"schedule.0.minutes", "schedule.0.using_cron"}, + }, + }, + }, }, "config": { Type: schema.TypeString, @@ -204,51 +225,44 @@ func ImportTask(ctx context.Context, d *schema.ResourceData, meta any) ([]*schem return []*schema.ResourceData{d}, nil } -func CreateTask(ctx context.Context, d *schema.ResourceData, meta any) diag.Diagnostics { +func CreateTask(ctx context.Context, d *schema.ResourceData, meta any) (diags diag.Diagnostics) { client := meta.(*provider.Context).Client databaseName := d.Get("database").(string) schemaName := d.Get("schema").(string) name := d.Get("name").(string) id := sdk.NewSchemaObjectIdentifier(databaseName, schemaName, name) - req := sdk.NewCreateTaskRequest(id, d.Get("sql_statement").(string)) + req := sdk.NewCreateTaskRequest(id, d.Get("sql_statement").(string)) tasksToResume := make([]sdk.SchemaObjectIdentifier, 0) - if v, ok := d.GetOk("warehouse"); ok { - warehouseId, err := sdk.ParseAccountObjectIdentifier(v.(string)) - if err != nil { - return diag.FromErr(err) - } - req.WithWarehouse(*sdk.NewCreateTaskWarehouseRequest().WithWarehouse(warehouseId)) - } - - if v, ok := d.GetOk("schedule"); ok { - req.WithSchedule(v.(string)) - } - - if v, ok := d.GetOk("config"); ok { - req.WithConfig(v.(string)) - } - - if v := d.Get("allow_overlapping_execution").(string); v != BooleanDefault { - parsedBool, err := booleanStringToBool(v) - if err != nil { - return diag.FromErr(err) - } - req.WithAllowOverlappingExecution(parsedBool) - } - - if v, ok := d.GetOk("error_integration"); ok { - notificationIntegrationId, err := sdk.ParseAccountObjectIdentifier(v.(string)) - if err != nil { - return diag.FromErr(err) - } - req.WithErrorIntegration(notificationIntegrationId) - } - - if v, ok := d.GetOk("comment"); ok { - req.WithComment(v.(string)) + if errs := errors.Join( + attributeMappedValueCreate(d, "warehouse", &req.Warehouse, func(v any) (*sdk.CreateTaskWarehouseRequest, error) { + warehouseId, err := sdk.ParseAccountObjectIdentifier(v.(string)) + if err != nil { + return nil, err + } + return sdk.NewCreateTaskWarehouseRequest().WithWarehouse(warehouseId), nil + }), + attributeMappedValueCreate(d, "schedule", &req.Schedule, func(v any) (*string, error) { + if len(v.([]any)) > 0 { + if minutes, ok := d.GetOk("schedule.0.minutes"); ok { + return sdk.String(fmt.Sprintf("%d MINUTE", minutes)), nil + } + if cron, ok := d.GetOk("schedule.0.using_cron"); ok { + return sdk.String(fmt.Sprintf("USING CRON %s", cron)), nil + } + return nil, fmt.Errorf("when setting a schedule either minutes or using_cron field should be set") + } + return nil, nil + }), + stringAttributeCreate(d, "config", &req.Config), + booleanStringAttributeCreate(d, "allow_overlapping_execution", &req.AllowOverlappingExecution), + accountObjectIdentifierAttributeCreate(d, "error_integration", &req.ErrorIntegration), + stringAttributeCreate(d, "comment", &req.Comment), + stringAttributeCreate(d, "when", &req.When), + ); errs != nil { + return diag.FromErr(errs) } if v, ok := d.GetOk("finalize"); ok { @@ -262,7 +276,7 @@ func CreateTask(ctx context.Context, d *schema.ResourceData, meta any) diag.Diag return diag.FromErr(err) } - if rootTask.State == sdk.TaskStateStarted { + if rootTask.IsStarted() { if err := client.Tasks.Alter(ctx, sdk.NewAlterTaskRequest(rootTaskId).WithSuspend(true)); err != nil { return diag.FromErr(sdk.JoinErrors(err)) } @@ -290,10 +304,6 @@ func CreateTask(ctx context.Context, d *schema.ResourceData, meta any) diag.Diag req.WithAfter(precedingTasks) } - if v, ok := d.GetOk("when"); ok { - req.WithWhen(v.(string)) - } - if parameterCreateDiags := handleTaskParametersCreate(d, req); len(parameterCreateDiags) > 0 { return parameterCreateDiags } @@ -318,9 +328,11 @@ func CreateTask(ctx context.Context, d *schema.ResourceData, meta any) diag.Diag // Else case not handled, because tasks are created as suspended (https://docs.snowflake.com/en/sql-reference/sql/create-task; "important" section) } - if err := client.Tasks.ResumeTasks(ctx, tasksToResume); err != nil { - log.Printf("[WARN] failed to resume tasks: %s", err) - } + defer func() { + if err := client.Tasks.ResumeTasks(ctx, tasksToResume); err != nil { + diags = append(diags, resumeTaskErrorDiag(id, "create", err)) + } + }() return ReadTask(false)(ctx, d, meta) } @@ -342,8 +354,7 @@ func UpdateTask(ctx context.Context, d *schema.ResourceData, meta any) diag.Diag return diag.FromErr(sdk.JoinErrors(err)) } - if task.State == sdk.TaskStateStarted { - log.Printf("Suspending the task in if") + if task.IsStarted() { if err := client.Tasks.Alter(ctx, sdk.NewAlterTaskRequest(id).WithSuspend(true)); err != nil { return diag.FromErr(sdk.JoinErrors(err)) } @@ -354,7 +365,6 @@ func UpdateTask(ctx context.Context, d *schema.ResourceData, meta any) diag.Diag err = errors.Join( accountObjectIdentifierAttributeUpdate(d, "warehouse", &set.Warehouse, &unset.Warehouse), - stringAttributeUpdate(d, "schedule", &set.Schedule, &unset.Schedule), stringAttributeUpdate(d, "config", &set.Config, &unset.Config), booleanStringAttributeUpdate(d, "allow_overlapping_execution", &set.AllowOverlappingExecution, &unset.AllowOverlappingExecution), accountObjectIdentifierAttributeUpdate(d, "error_integration", &set.ErrorIntegration, &unset.ErrorIntegration), @@ -364,6 +374,21 @@ func UpdateTask(ctx context.Context, d *schema.ResourceData, meta any) diag.Diag return diag.FromErr(err) } + if d.HasChange("schedule") { + _, newSchedule := d.GetChange("schedule") + + if newSchedule != nil && len(newSchedule.([]any)) == 1 { + if _, newMinutes := d.GetChange("schedule.0.minutes"); newMinutes.(int) > 0 { + set.Schedule = sdk.String(fmt.Sprintf("%d MINUTE", newMinutes.(int))) + } + if _, newCron := d.GetChange("schedule.0.using_cron"); newCron.(string) != "" { + set.Schedule = sdk.String(fmt.Sprintf("USING CRON %s", newCron.(string))) + } + } else { + unset.Schedule = sdk.Bool(true) + } + } + if updateDiags := handleTaskParametersUpdate(d, set, unset); len(updateDiags) > 0 { return updateDiags } @@ -404,7 +429,7 @@ func UpdateTask(ctx context.Context, d *schema.ResourceData, meta any) diag.Diag return diag.FromErr(err) } - if rootTask.State == sdk.TaskStateStarted { + if rootTask.IsStarted() { if err := client.Tasks.Alter(ctx, sdk.NewAlterTaskRequest(rootTaskId).WithSuspend(true)); err != nil { return diag.FromErr(sdk.JoinErrors(err)) } @@ -414,7 +439,7 @@ func UpdateTask(ctx context.Context, d *schema.ResourceData, meta any) diag.Diag return diag.FromErr(err) } - if rootTask.State == sdk.TaskStateStarted && !slices.ContainsFunc(tasksToResume, func(identifier sdk.SchemaObjectIdentifier) bool { + if rootTask.IsStarted() && !slices.ContainsFunc(tasksToResume, func(identifier sdk.SchemaObjectIdentifier) bool { return identifier.FullyQualifiedName() == rootTaskId.FullyQualifiedName() }) { tasksToResume = append(tasksToResume, rootTaskId) @@ -429,7 +454,7 @@ func UpdateTask(ctx context.Context, d *schema.ResourceData, meta any) diag.Diag return diag.FromErr(err) } - if rootTask.State == sdk.TaskStateStarted { + if rootTask.IsStarted() { if err := client.Tasks.Alter(ctx, sdk.NewAlterTaskRequest(rootTask.ID()).WithSuspend(true)); err != nil { return diag.FromErr(sdk.JoinErrors(err)) } @@ -439,7 +464,7 @@ func UpdateTask(ctx context.Context, d *schema.ResourceData, meta any) diag.Diag return diag.FromErr(err) } - if rootTask.State == sdk.TaskStateStarted && !slices.ContainsFunc(tasksToResume, func(identifier sdk.SchemaObjectIdentifier) bool { + if rootTask.IsStarted() && !slices.ContainsFunc(tasksToResume, func(identifier sdk.SchemaObjectIdentifier) bool { return identifier.FullyQualifiedName() == rootTask.ID().FullyQualifiedName() }) { tasksToResume = append(tasksToResume, rootTask.ID()) @@ -493,14 +518,15 @@ func UpdateTask(ctx context.Context, d *schema.ResourceData, meta any) diag.Diag } if d.Get("started").(bool) { - log.Printf("Resuming the task in handled update") if err := waitForTaskStart(ctx, client, id); err != nil { return diag.FromErr(fmt.Errorf("failed to resume task %s, err = %w", id.FullyQualifiedName(), err)) } } // We don't process the else case, because the task was already suspended at the beginning of the Update method. + tasksToResume = slices.DeleteFunc(tasksToResume, func(identifier sdk.SchemaObjectIdentifier) bool { + return identifier.FullyQualifiedName() == id.FullyQualifiedName() + }) - log.Printf("Resuming the root tasks: %v", collections.Map(tasksToResume, sdk.SchemaObjectIdentifier.Name)) if err := client.Tasks.ResumeTasks(ctx, tasksToResume); err != nil { log.Printf("[WARN] failed to resume tasks: %s", err) } @@ -550,32 +576,53 @@ func ReadTask(withExternalChangesMarking bool) schema.ReadContextFunc { } } - warehouseId := "" - if task.Warehouse != nil { - warehouseId = task.Warehouse.Name() - } - - errorIntegrationId := "" - if task.ErrorIntegration != nil { - errorIntegrationId = task.ErrorIntegration.Name() - } - - finalizedRootTaskId := "" - if task.TaskRelations.FinalizedRootTask != nil { - finalizedRootTaskId = task.TaskRelations.FinalizedRootTask.FullyQualifiedName() - } - if errs := errors.Join( - d.Set("started", task.State == sdk.TaskStateStarted), - d.Set("warehouse", warehouseId), - d.Set("schedule", task.Schedule), + attributeMappedValueReadOrDefault(d, "finalize", task.TaskRelations.FinalizedRootTask, func(finalizedRootTask *sdk.SchemaObjectIdentifier) (string, error) { + return finalizedRootTask.FullyQualifiedName(), nil + }, nil), + attributeMappedValueReadOrDefault(d, "error_integration", task.ErrorIntegration, func(errorIntegration *sdk.AccountObjectIdentifier) (string, error) { + return errorIntegration.Name(), nil + }, nil), + attributeMappedValueReadOrDefault(d, "warehouse", task.Warehouse, func(warehouse *sdk.AccountObjectIdentifier) (string, error) { + return warehouse.Name(), nil + }, nil), + func() error { + upperSchedule := strings.ToUpper(task.Schedule) + switch { + case strings.Contains(upperSchedule, "USING CRON"): + // We have to do it this was because we want to get rid of the prefix and leave the casing as is (mostly because timezones like America/Los_Angeles are case-sensitive). + // That why the prefix trimming has to be done by slicing rather than using strings.TrimPrefix. + cron := task.Schedule[len("USING CRON "):] + if err := d.Set("schedule", []any{map[string]any{ + "using_cron": cron, + }}); err != nil { + return err + } + case strings.Contains(upperSchedule, "MINUTE"): + minuteParts := strings.Split(upperSchedule, " ") + minutes, err := strconv.Atoi(minuteParts[0]) + if err != nil { + return err + } + + if err := d.Set("schedule", []any{map[string]any{ + "minutes": minutes, + }}); err != nil { + return err + } + default: + if err := d.Set("schedule", nil); err != nil { + return err + } + } + return nil + }(), + d.Set("started", task.IsStarted()), d.Set("when", task.Condition), d.Set("config", task.Config), - d.Set("error_integration", errorIntegrationId), d.Set("comment", task.Comment), d.Set("sql_statement", task.Definition), d.Set("after", collections.Map(task.Predecessors, sdk.SchemaObjectIdentifier.FullyQualifiedName)), - d.Set("finalize", finalizedRootTaskId), handleTaskParameterRead(d, taskParameters), d.Set(FullyQualifiedNameAttributeName, id.FullyQualifiedName()), d.Set(ShowOutputAttributeName, []map[string]any{schemas.TaskToSchema(task)}), @@ -588,7 +635,7 @@ func ReadTask(withExternalChangesMarking bool) schema.ReadContextFunc { } } -func DeleteTask(ctx context.Context, d *schema.ResourceData, meta any) diag.Diagnostics { +func DeleteTask(ctx context.Context, d *schema.ResourceData, meta any) (diags diag.Diagnostics) { client := meta.(*provider.Context).Client id, err := sdk.ParseSchemaObjectIdentifier(d.Id()) if err != nil { @@ -598,7 +645,7 @@ func DeleteTask(ctx context.Context, d *schema.ResourceData, meta any) diag.Diag tasksToResume, err := client.Tasks.SuspendRootTasks(ctx, id, id) defer func() { if err := client.Tasks.ResumeTasks(ctx, tasksToResume); err != nil { - log.Printf("[WARN] failed to resume tasks: %s", err) + diags = append(diags, resumeTaskErrorDiag(id, "delete", err)) } }() if err != nil { @@ -614,6 +661,14 @@ func DeleteTask(ctx context.Context, d *schema.ResourceData, meta any) diag.Diag return nil } +func resumeTaskErrorDiag(id sdk.SchemaObjectIdentifier, operation string, originalErr error) diag.Diagnostic { + return diag.Diagnostic{ + Severity: diag.Warning, + Summary: fmt.Sprintf("Failed to resume tasks in %s operation (id=%s)", operation, id.FullyQualifiedName()), + Detail: fmt.Sprintf("Failed to resume some of the tasks with the following errors (tasks can be resumed by applying the same configuration again): %v", originalErr), + } +} + func waitForTaskStart(ctx context.Context, client *sdk.Client, id sdk.SchemaObjectIdentifier) error { err := client.Tasks.Alter(ctx, sdk.NewAlterTaskRequest(id).WithResume(true)) if err != nil { diff --git a/pkg/resources/task_acceptance_test.go b/pkg/resources/task_acceptance_test.go index d17a48b174..5484eefb73 100644 --- a/pkg/resources/task_acceptance_test.go +++ b/pkg/resources/task_acceptance_test.go @@ -1,35 +1,35 @@ package resources_test import ( + "bytes" "fmt" "regexp" "strings" "testing" + "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/acceptance/bettertestspoc/assert/objectparametersassert" + "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/acceptance/bettertestspoc/assert/resourceparametersassert" + "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/acceptance/helpers/random" + "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/helpers" + "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/sdk" + configvariable "github.com/hashicorp/terraform-plugin-testing/config" "github.com/hashicorp/terraform-plugin-testing/plancheck" acc "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/acceptance" "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/acceptance/bettertestspoc/assert" - "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/acceptance/bettertestspoc/assert/objectparametersassert" "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/acceptance/bettertestspoc/assert/resourceassert" - "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/acceptance/bettertestspoc/assert/resourceparametersassert" "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/acceptance/bettertestspoc/assert/resourceshowoutputassert" "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/acceptance/bettertestspoc/config" "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/acceptance/bettertestspoc/config/model" - "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/acceptance/helpers/random" "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/acceptance/testenvs" - "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/helpers" "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/provider/resources" r "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/resources" - "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/sdk" - configvariable "github.com/hashicorp/terraform-plugin-testing/config" "github.com/hashicorp/terraform-plugin-testing/helper/resource" "github.com/hashicorp/terraform-plugin-testing/tfversion" ) // TODO(SNOW-1348116 - next pr): More tests for complicated DAGs // TODO(SNOW-1348116 - next pr): Test for stored procedures passed to sql_statement (decide on name) -// TODO(SNOW-1348116 - next pr): Test with cron schedule // TODO(SNOW-1348116 - next pr): More test with external changes func TestAcc_Task_Basic(t *testing.T) { @@ -60,13 +60,13 @@ func TestAcc_Task_Basic(t *testing.T) { HasNameString(id.Name()). HasStartedString(r.BooleanFalse). HasWarehouseString(""). - HasScheduleString(""). + HasNoScheduleSet(). HasConfigString(""). HasAllowOverlappingExecutionString(r.BooleanDefault). HasErrorIntegrationString(""). HasCommentString(""). HasFinalizeString(""). - HasAfterIds(). + HasAfterIdsInOrder(). HasWhenString(""). HasSqlStatementString(statement), resourceshowoutputassert.TaskShowOutput(t, configModel.ResourceReference()). @@ -78,7 +78,7 @@ func TestAcc_Task_Basic(t *testing.T) { HasOwner(currentRole.Name()). HasComment(""). HasWarehouse(sdk.NewAccountObjectIdentifier("")). - HasSchedule(""). + HasNoSchedule(). HasPredecessors(). HasState(sdk.TaskStateSuspended). HasDefinition(statement). @@ -106,7 +106,7 @@ func TestAcc_Task_Basic(t *testing.T) { HasNameString(id.Name()). HasStartedString(r.BooleanFalse). HasWarehouseString(""). - HasScheduleString(""). + HasNoScheduleSet(). HasConfigString(""). HasAllowOverlappingExecutionString(r.BooleanFalse). HasErrorIntegrationString(""). @@ -136,13 +136,12 @@ func TestAcc_Task_Complete(t *testing.T) { // We have to do three $ at the beginning because Terraform will remove one $. // It's because `${` is a special pattern, and it's escaped by `$${`. expectedTaskConfig := strings.ReplaceAll(taskConfig, "$", "") - taskConfigVariableValue := "$" + taskConfig comment := random.Comment() condition := `SYSTEM$STREAM_HAS_DATA('MYSTREAM')` configModel := model.TaskWithId("test", id, true, statement). WithWarehouse(acc.TestClient().Ids.WarehouseId().Name()). - WithSchedule("10 MINUTES"). - WithConfigValue(configvariable.StringVariable(taskConfigVariableValue)). + WithScheduleMinutes(10). + WithConfigValue(configvariable.StringVariable(taskConfig)). WithAllowOverlappingExecution(r.BooleanTrue). WithErrorIntegration(errorNotificationIntegration.ID().Name()). WithComment(comment). @@ -157,7 +156,8 @@ func TestAcc_Task_Complete(t *testing.T) { CheckDestroy: acc.CheckDestroy(t, resources.Task), Steps: []resource.TestStep{ { - Config: config.FromModel(t, configModel), + ConfigDirectory: acc.ConfigurationDirectory("TestAcc_Task/basic"), + ConfigVariables: config.ConfigVariablesFromModel(t, configModel), Check: assert.AssertThat(t, resourceassert.TaskResource(t, configModel.ResourceReference()). HasFullyQualifiedNameString(id.FullyQualifiedName()). @@ -166,7 +166,7 @@ func TestAcc_Task_Complete(t *testing.T) { HasNameString(id.Name()). HasStartedString(r.BooleanTrue). HasWarehouseString(acc.TestClient().Ids.WarehouseId().Name()). - HasScheduleString("10 MINUTES"). + HasScheduleMinutes(10). HasConfigString(expectedTaskConfig). HasAllowOverlappingExecutionString(r.BooleanTrue). HasErrorIntegrationString(errorNotificationIntegration.ID().Name()). @@ -184,7 +184,7 @@ func TestAcc_Task_Complete(t *testing.T) { HasOwner(currentRole.Name()). HasComment(comment). HasWarehouse(acc.TestClient().Ids.WarehouseId()). - HasSchedule("10 MINUTES"). + HasScheduleMinutes(10). HasPredecessors(). HasState(sdk.TaskStateStarted). HasDefinition(statement). @@ -202,8 +202,10 @@ func TestAcc_Task_Complete(t *testing.T) { ), }, { - ResourceName: "snowflake_task.test", - ImportState: true, + ResourceName: "snowflake_task.test", + ImportState: true, + ConfigDirectory: acc.ConfigurationDirectory("TestAcc_Task/basic"), + ConfigVariables: config.ConfigVariablesFromModel(t, configModel), ImportStateCheck: assert.AssertThatImport(t, resourceassert.ImportedTaskResource(t, helpers.EncodeResourceIdentifier(id)). HasFullyQualifiedNameString(id.FullyQualifiedName()). @@ -212,7 +214,7 @@ func TestAcc_Task_Complete(t *testing.T) { HasNameString(id.Name()). HasStartedString(r.BooleanTrue). HasWarehouseString(acc.TestClient().Ids.WarehouseId().Name()). - HasScheduleString("10 MINUTES"). + HasScheduleMinutes(10). HasConfigString(expectedTaskConfig). HasAllowOverlappingExecutionString(r.BooleanTrue). HasErrorIntegrationString(errorNotificationIntegration.ID().Name()). @@ -248,13 +250,12 @@ func TestAcc_Task_Updates(t *testing.T) { // We have to do three $ at the beginning because Terraform will remove one $. // It's because `${` is a special pattern, and it's escaped by `$${`. expectedTaskConfig := strings.ReplaceAll(taskConfig, "$", "") - taskConfigVariableValue := "$" + taskConfig comment := random.Comment() condition := `SYSTEM$STREAM_HAS_DATA('MYSTREAM')` completeConfigModel := model.TaskWithId("test", id, true, statement). WithWarehouse(warehouse.ID().Name()). - WithSchedule("5 MINUTES"). - WithConfigValue(configvariable.StringVariable(taskConfigVariableValue)). + WithScheduleMinutes(5). + WithConfigValue(configvariable.StringVariable(taskConfig)). WithAllowOverlappingExecution(r.BooleanTrue). WithErrorIntegration(errorNotificationIntegration.ID().Name()). WithComment(comment). @@ -278,13 +279,13 @@ func TestAcc_Task_Updates(t *testing.T) { HasNameString(id.Name()). HasStartedString(r.BooleanFalse). HasWarehouseString(""). - HasScheduleString(""). + HasNoScheduleSet(). HasConfigString(""). HasAllowOverlappingExecutionString(r.BooleanDefault). HasErrorIntegrationString(""). HasCommentString(""). HasFinalizeString(""). - HasAfterIds(). + HasAfterIdsInOrder(). HasWhenString(""). HasSqlStatementString(statement), resourceshowoutputassert.TaskShowOutput(t, basicConfigModel.ResourceReference()). @@ -296,7 +297,7 @@ func TestAcc_Task_Updates(t *testing.T) { HasOwner(currentRole.Name()). HasComment(""). HasWarehouse(sdk.NewAccountObjectIdentifier("")). - HasSchedule(""). + HasNoSchedule(). HasPredecessors(). HasState(sdk.TaskStateSuspended). HasDefinition(statement). @@ -313,7 +314,8 @@ func TestAcc_Task_Updates(t *testing.T) { }, // Set { - Config: config.FromModel(t, completeConfigModel), + ConfigDirectory: acc.ConfigurationDirectory("TestAcc_Task/basic"), + ConfigVariables: config.ConfigVariablesFromModel(t, completeConfigModel), Check: assert.AssertThat(t, resourceassert.TaskResource(t, completeConfigModel.ResourceReference()). HasFullyQualifiedNameString(id.FullyQualifiedName()). @@ -322,13 +324,13 @@ func TestAcc_Task_Updates(t *testing.T) { HasNameString(id.Name()). HasStartedString(r.BooleanTrue). HasWarehouseString(warehouse.ID().Name()). - HasScheduleString("5 MINUTES"). + HasScheduleMinutes(5). HasConfigString(expectedTaskConfig). HasAllowOverlappingExecutionString(r.BooleanTrue). HasErrorIntegrationString(errorNotificationIntegration.ID().Name()). HasCommentString(comment). HasFinalizeString(""). - HasAfterIds(). + HasAfterIdsInOrder(). HasWhenString(condition). HasSqlStatementString(statement), resourceshowoutputassert.TaskShowOutput(t, completeConfigModel.ResourceReference()). @@ -340,7 +342,7 @@ func TestAcc_Task_Updates(t *testing.T) { HasOwner(currentRole.Name()). HasWarehouse(warehouse.ID()). HasComment(comment). - HasSchedule("5 MINUTES"). + HasScheduleMinutes(5). HasPredecessors(). HasState(sdk.TaskStateStarted). HasDefinition(statement). @@ -366,13 +368,13 @@ func TestAcc_Task_Updates(t *testing.T) { HasNameString(id.Name()). HasStartedString(r.BooleanFalse). HasWarehouseString(""). - HasScheduleString(""). + HasNoScheduleSet(). HasConfigString(""). HasAllowOverlappingExecutionString(r.BooleanDefault). HasErrorIntegrationString(""). HasCommentString(""). HasFinalizeString(""). - HasAfterIds(). + HasAfterIdsInOrder(). HasWhenString(""). HasSqlStatementString(statement), resourceshowoutputassert.TaskShowOutput(t, basicConfigModel.ResourceReference()). @@ -384,7 +386,7 @@ func TestAcc_Task_Updates(t *testing.T) { HasOwner(currentRole.Name()). HasComment(""). HasWarehouse(sdk.NewAccountObjectIdentifier("")). - HasSchedule(""). + HasNoSchedule(). HasPredecessors(). HasState(sdk.TaskStateSuspended). HasDefinition(statement). @@ -403,6 +405,342 @@ func TestAcc_Task_Updates(t *testing.T) { }) } +func TestAcc_Task_CronAndMinutes(t *testing.T) { + _ = testenvs.GetOrSkipTest(t, testenvs.EnableAcceptance) + acc.TestAccPreCheck(t) + + id := acc.TestClient().Ids.RandomSchemaObjectIdentifier() + minutes := 5 + cron := "*/5 * * * * UTC" + configModelWithoutSchedule := model.TaskWithId("test", id, false, "SELECT 1") + configModelWithMinutes := model.TaskWithId("test", id, true, "SELECT 1").WithScheduleMinutes(minutes) + configModelWithCron := model.TaskWithId("test", id, true, "SELECT 1").WithScheduleCron(cron) + + resource.Test(t, resource.TestCase{ + ProtoV6ProviderFactories: acc.TestAccProtoV6ProviderFactories, + PreCheck: func() { acc.TestAccPreCheck(t) }, + TerraformVersionChecks: []tfversion.TerraformVersionCheck{ + tfversion.RequireAbove(tfversion.Version1_5_0), + }, + CheckDestroy: acc.CheckDestroy(t, resources.Task), + Steps: []resource.TestStep{ + // create with minutes + { + ConfigDirectory: acc.ConfigurationDirectory("TestAcc_Task/basic"), + ConfigVariables: config.ConfigVariablesFromModel(t, configModelWithMinutes), + Check: assert.AssertThat(t, + resourceassert.TaskResource(t, configModelWithMinutes.ResourceReference()). + HasDatabaseString(id.DatabaseName()). + HasSchemaString(id.SchemaName()). + HasNameString(id.Name()). + HasStartedString(r.BooleanTrue). + HasScheduleMinutes(minutes). + HasSqlStatementString("SELECT 1"), + resourceshowoutputassert.TaskShowOutput(t, configModelWithMinutes.ResourceReference()). + HasName(id.Name()). + HasDatabaseName(id.DatabaseName()). + HasSchemaName(id.SchemaName()). + HasScheduleMinutes(minutes), + ), + }, + // Unset schedule (from minutes) + { + ConfigDirectory: acc.ConfigurationDirectory("TestAcc_Task/basic"), + ConfigVariables: config.ConfigVariablesFromModel(t, configModelWithoutSchedule), + Check: assert.AssertThat(t, + resourceassert.TaskResource(t, configModelWithoutSchedule.ResourceReference()). + HasDatabaseString(id.DatabaseName()). + HasSchemaString(id.SchemaName()). + HasNameString(id.Name()). + HasStartedString(r.BooleanFalse). + HasNoScheduleSet(). + HasSqlStatementString("SELECT 1"), + resourceshowoutputassert.TaskShowOutput(t, configModelWithoutSchedule.ResourceReference()). + HasName(id.Name()). + HasDatabaseName(id.DatabaseName()). + HasSchemaName(id.SchemaName()). + HasNoSchedule(), + ), + }, + // Create with cron + { + ConfigDirectory: acc.ConfigurationDirectory("TestAcc_Task/basic"), + ConfigVariables: config.ConfigVariablesFromModel(t, configModelWithCron), + Check: assert.AssertThat(t, + resourceassert.TaskResource(t, configModelWithCron.ResourceReference()). + HasDatabaseString(id.DatabaseName()). + HasSchemaString(id.SchemaName()). + HasNameString(id.Name()). + HasStartedString(r.BooleanTrue). + HasScheduleCron(cron). + HasSqlStatementString("SELECT 1"), + resourceshowoutputassert.TaskShowOutput(t, configModelWithCron.ResourceReference()). + HasName(id.Name()). + HasDatabaseName(id.DatabaseName()). + HasSchemaName(id.SchemaName()). + HasScheduleCron(cron), + ), + }, + // Change to minutes + { + ConfigDirectory: acc.ConfigurationDirectory("TestAcc_Task/basic"), + ConfigVariables: config.ConfigVariablesFromModel(t, configModelWithMinutes), + Check: assert.AssertThat(t, + resourceassert.TaskResource(t, configModelWithMinutes.ResourceReference()). + HasDatabaseString(id.DatabaseName()). + HasSchemaString(id.SchemaName()). + HasNameString(id.Name()). + HasStartedString(r.BooleanTrue). + HasScheduleMinutes(minutes). + HasSqlStatementString("SELECT 1"), + resourceshowoutputassert.TaskShowOutput(t, configModelWithMinutes.ResourceReference()). + HasName(id.Name()). + HasDatabaseName(id.DatabaseName()). + HasSchemaName(id.SchemaName()). + HasScheduleMinutes(minutes), + ), + }, + // Change back to cron + { + ConfigDirectory: acc.ConfigurationDirectory("TestAcc_Task/basic"), + ConfigVariables: config.ConfigVariablesFromModel(t, configModelWithCron), + Check: assert.AssertThat(t, + resourceassert.TaskResource(t, configModelWithCron.ResourceReference()). + HasDatabaseString(id.DatabaseName()). + HasSchemaString(id.SchemaName()). + HasNameString(id.Name()). + HasStartedString(r.BooleanTrue). + HasScheduleCron(cron). + HasSqlStatementString("SELECT 1"), + resourceshowoutputassert.TaskShowOutput(t, configModelWithCron.ResourceReference()). + HasName(id.Name()). + HasDatabaseName(id.DatabaseName()). + HasSchemaName(id.SchemaName()). + HasScheduleCron(cron), + ), + }, + // Unset schedule (from cron) + { + ConfigDirectory: acc.ConfigurationDirectory("TestAcc_Task/basic"), + ConfigVariables: config.ConfigVariablesFromModel(t, configModelWithoutSchedule), + Check: assert.AssertThat(t, + resourceassert.TaskResource(t, configModelWithoutSchedule.ResourceReference()). + HasDatabaseString(id.DatabaseName()). + HasSchemaString(id.SchemaName()). + HasNameString(id.Name()). + HasStartedString(r.BooleanFalse). + HasNoScheduleSet(). + HasSqlStatementString("SELECT 1"), + resourceshowoutputassert.TaskShowOutput(t, configModelWithoutSchedule.ResourceReference()). + HasName(id.Name()). + HasDatabaseName(id.DatabaseName()). + HasSchemaName(id.SchemaName()). + HasNoSchedule(), + ), + }, + }, + }) +} + +func TestAcc_Task_CronAndMinutes_ExternalChanges(t *testing.T) { + _ = testenvs.GetOrSkipTest(t, testenvs.EnableAcceptance) + acc.TestAccPreCheck(t) + + id := acc.TestClient().Ids.RandomSchemaObjectIdentifier() + minutes := 5 + cron := "*/5 * * * * UTC" + configModelWithoutSchedule := model.TaskWithId("test", id, false, "SELECT 1") + configModelWithMinutes := model.TaskWithId("test", id, false, "SELECT 1").WithScheduleMinutes(minutes) + configModelWithCron := model.TaskWithId("test", id, false, "SELECT 1").WithScheduleCron(cron) + + resource.Test(t, resource.TestCase{ + ProtoV6ProviderFactories: acc.TestAccProtoV6ProviderFactories, + PreCheck: func() { acc.TestAccPreCheck(t) }, + TerraformVersionChecks: []tfversion.TerraformVersionCheck{ + tfversion.RequireAbove(tfversion.Version1_5_0), + }, + CheckDestroy: acc.CheckDestroy(t, resources.Task), + Steps: []resource.TestStep{ + // Create without a schedule + { + ConfigDirectory: acc.ConfigurationDirectory("TestAcc_Task/basic"), + ConfigVariables: config.ConfigVariablesFromModel(t, configModelWithoutSchedule), + Check: assert.AssertThat(t, + resourceassert.TaskResource(t, configModelWithoutSchedule.ResourceReference()). + HasDatabaseString(id.DatabaseName()). + HasSchemaString(id.SchemaName()). + HasNameString(id.Name()). + HasNoScheduleSet(), + resourceshowoutputassert.TaskShowOutput(t, configModelWithoutSchedule.ResourceReference()). + HasName(id.Name()). + HasDatabaseName(id.DatabaseName()). + HasSchemaName(id.SchemaName()). + HasNoSchedule(), + ), + }, + // External change - set minutes + { + PreConfig: func() { + acc.TestClient().Task.Alter(t, sdk.NewAlterTaskRequest(id).WithSet(*sdk.NewTaskSetRequest().WithSchedule("5 MINUTES"))) + }, + ConfigDirectory: acc.ConfigurationDirectory("TestAcc_Task/basic"), + ConfigVariables: config.ConfigVariablesFromModel(t, configModelWithoutSchedule), + Check: assert.AssertThat(t, + resourceassert.TaskResource(t, configModelWithoutSchedule.ResourceReference()). + HasDatabaseString(id.DatabaseName()). + HasSchemaString(id.SchemaName()). + HasNameString(id.Name()). + HasNoScheduleSet(), + resourceshowoutputassert.TaskShowOutput(t, configModelWithoutSchedule.ResourceReference()). + HasName(id.Name()). + HasDatabaseName(id.DatabaseName()). + HasSchemaName(id.SchemaName()). + HasNoSchedule(), + ), + }, + // External change - set cron + { + PreConfig: func() { + acc.TestClient().Task.Alter(t, sdk.NewAlterTaskRequest(id).WithSet(*sdk.NewTaskSetRequest().WithSchedule(fmt.Sprintf("USING CRON %s", cron)))) + }, + ConfigDirectory: acc.ConfigurationDirectory("TestAcc_Task/basic"), + ConfigVariables: config.ConfigVariablesFromModel(t, configModelWithoutSchedule), + Check: assert.AssertThat(t, + resourceassert.TaskResource(t, configModelWithoutSchedule.ResourceReference()). + HasDatabaseString(id.DatabaseName()). + HasSchemaString(id.SchemaName()). + HasNameString(id.Name()). + HasNoScheduleSet(), + resourceshowoutputassert.TaskShowOutput(t, configModelWithoutSchedule.ResourceReference()). + HasName(id.Name()). + HasDatabaseName(id.DatabaseName()). + HasSchemaName(id.SchemaName()). + HasNoSchedule(), + ), + }, + // Set minutes schedule + { + ConfigDirectory: acc.ConfigurationDirectory("TestAcc_Task/basic"), + ConfigVariables: config.ConfigVariablesFromModel(t, configModelWithMinutes), + Check: assert.AssertThat(t, + resourceassert.TaskResource(t, configModelWithMinutes.ResourceReference()). + HasDatabaseString(id.DatabaseName()). + HasSchemaString(id.SchemaName()). + HasNameString(id.Name()). + HasScheduleMinutes(minutes), + resourceshowoutputassert.TaskShowOutput(t, configModelWithMinutes.ResourceReference()). + HasName(id.Name()). + HasDatabaseName(id.DatabaseName()). + HasSchemaName(id.SchemaName()). + HasScheduleMinutes(minutes), + ), + }, + // External change - unset schedule + { + PreConfig: func() { + acc.TestClient().Task.Alter(t, sdk.NewAlterTaskRequest(id).WithUnset(*sdk.NewTaskUnsetRequest().WithSchedule(true))) + }, + ConfigDirectory: acc.ConfigurationDirectory("TestAcc_Task/basic"), + ConfigVariables: config.ConfigVariablesFromModel(t, configModelWithMinutes), + Check: assert.AssertThat(t, + resourceassert.TaskResource(t, configModelWithMinutes.ResourceReference()). + HasDatabaseString(id.DatabaseName()). + HasSchemaString(id.SchemaName()). + HasNameString(id.Name()). + HasScheduleMinutes(minutes), + resourceshowoutputassert.TaskShowOutput(t, configModelWithMinutes.ResourceReference()). + HasName(id.Name()). + HasDatabaseName(id.DatabaseName()). + HasSchemaName(id.SchemaName()). + HasScheduleMinutes(minutes), + ), + }, + // Set cron schedule + { + ConfigDirectory: acc.ConfigurationDirectory("TestAcc_Task/basic"), + ConfigVariables: config.ConfigVariablesFromModel(t, configModelWithCron), + Check: assert.AssertThat(t, + resourceassert.TaskResource(t, configModelWithCron.ResourceReference()). + HasDatabaseString(id.DatabaseName()). + HasSchemaString(id.SchemaName()). + HasNameString(id.Name()). + HasScheduleCron(cron), + resourceshowoutputassert.TaskShowOutput(t, configModelWithCron.ResourceReference()). + HasName(id.Name()). + HasDatabaseName(id.DatabaseName()). + HasSchemaName(id.SchemaName()). + HasScheduleCron(cron), + ), + }, + // External change - unset schedule + { + ConfigDirectory: acc.ConfigurationDirectory("TestAcc_Task/basic"), + ConfigVariables: config.ConfigVariablesFromModel(t, configModelWithCron), + Check: assert.AssertThat(t, + resourceassert.TaskResource(t, configModelWithCron.ResourceReference()). + HasDatabaseString(id.DatabaseName()). + HasSchemaString(id.SchemaName()). + HasNameString(id.Name()). + HasScheduleCron(cron), + resourceshowoutputassert.TaskShowOutput(t, configModelWithCron.ResourceReference()). + HasName(id.Name()). + HasDatabaseName(id.DatabaseName()). + HasSchemaName(id.SchemaName()). + HasScheduleCron(cron), + ), + }, + }, + }) +} + +func TestAcc_Task_ScheduleSchemaValidation(t *testing.T) { + _ = testenvs.GetOrSkipTest(t, testenvs.EnableAcceptance) + + id := acc.TestClient().Ids.RandomSchemaObjectIdentifier() + + resource.Test(t, resource.TestCase{ + ProtoV6ProviderFactories: acc.TestAccProtoV6ProviderFactories, + PreCheck: func() { acc.TestAccPreCheck(t) }, + TerraformVersionChecks: []tfversion.TerraformVersionCheck{ + tfversion.RequireAbove(tfversion.Version1_5_0), + }, + CheckDestroy: acc.CheckDestroy(t, resources.Task), + Steps: []resource.TestStep{ + { + Config: taskConfigInvalidScheduleSetMultipleOrEmpty(id, true), + ExpectError: regexp.MustCompile("\"schedule.0.minutes\": only one of `schedule.0.minutes,schedule.0.using_cron`"), + }, + { + Config: taskConfigInvalidScheduleSetMultipleOrEmpty(id, false), + ExpectError: regexp.MustCompile("\"schedule.0.minutes\": one of `schedule.0.minutes,schedule.0.using_cron`"), + }, + }, + }) +} + +func taskConfigInvalidScheduleSetMultipleOrEmpty(id sdk.SchemaObjectIdentifier, setMultiple bool) string { + var scheduleString string + scheduleBuffer := new(bytes.Buffer) + scheduleBuffer.WriteString("schedule {\n") + if setMultiple { + scheduleBuffer.WriteString("minutes = 10\n") + scheduleBuffer.WriteString("using_cron = \"*/5 * * * * UTC\"\n") + } + scheduleBuffer.WriteString("}\n") + scheduleString = scheduleBuffer.String() + + return fmt.Sprintf(` +resource "snowflake_task" "test" { + database = "%[1]s" + schema = "%[2]s" + name = "%[3]s" + started = false + sql_statement = "SELECT 1" + + %[4]s +}`, id.DatabaseName(), id.SchemaName(), id.Name(), scheduleString) +} + func TestAcc_Task_AllParameters(t *testing.T) { _ = testenvs.GetOrSkipTest(t, testenvs.EnableAcceptance) acc.TestAccPreCheck(t) @@ -410,9 +748,9 @@ func TestAcc_Task_AllParameters(t *testing.T) { id := acc.TestClient().Ids.RandomSchemaObjectIdentifier() statement := "SELECT 1" configModel := model.TaskWithId("test", id, true, statement). - WithSchedule("5 MINUTES") + WithScheduleMinutes(5) configModelWithAllParametersSet := model.TaskWithId("test", id, true, statement). - WithSchedule("5 MINUTES"). + WithScheduleMinutes(5). WithSuspendTaskAfterNumFailures(15). WithTaskAutoRetryAttempts(15). WithUserTaskManagedInitialWarehouseSizeEnum(sdk.WarehouseSizeXSmall). @@ -481,7 +819,8 @@ func TestAcc_Task_AllParameters(t *testing.T) { Steps: []resource.TestStep{ // create with default values for all the parameters { - Config: config.FromModel(t, configModel), + ConfigDirectory: acc.ConfigurationDirectory("TestAcc_Task/basic"), + ConfigVariables: config.ConfigVariablesFromModel(t, configModel), Check: assert.AssertThat(t, objectparametersassert.TaskParameters(t, id). HasAllDefaults(). @@ -492,8 +831,10 @@ func TestAcc_Task_AllParameters(t *testing.T) { }, // import when no parameter set { - ResourceName: configModel.ResourceReference(), - ImportState: true, + ResourceName: configModel.ResourceReference(), + ImportState: true, + ConfigDirectory: acc.ConfigurationDirectory("TestAcc_Task/basic"), + ConfigVariables: config.ConfigVariablesFromModel(t, configModel), ImportStateCheck: assert.AssertThatImport(t, resourceparametersassert.ImportedTaskResourceParameters(t, helpers.EncodeResourceIdentifier(id)). HasAllDefaults(), @@ -501,7 +842,8 @@ func TestAcc_Task_AllParameters(t *testing.T) { }, // set all parameters { - Config: config.FromModel(t, configModelWithAllParametersSet), + ConfigDirectory: acc.ConfigurationDirectory("TestAcc_Task/basic"), + ConfigVariables: config.ConfigVariablesFromModel(t, configModelWithAllParametersSet), Check: assert.AssertThat(t, objectparametersassert.TaskParameters(t, id). HasSuspendTaskAfterNumFailures(15). @@ -623,8 +965,10 @@ func TestAcc_Task_AllParameters(t *testing.T) { }, // import when all parameters set { - ResourceName: configModelWithAllParametersSet.ResourceReference(), - ImportState: true, + ResourceName: configModelWithAllParametersSet.ResourceReference(), + ImportState: true, + ConfigDirectory: acc.ConfigurationDirectory("TestAcc_Task/basic"), + ConfigVariables: config.ConfigVariablesFromModel(t, configModelWithAllParametersSet), ImportStateCheck: assert.AssertThatImport(t, resourceparametersassert.ImportedTaskResourceParameters(t, helpers.EncodeResourceIdentifier(id)). HasSuspendTaskAfterNumFailures(15). @@ -688,7 +1032,8 @@ func TestAcc_Task_AllParameters(t *testing.T) { }, // unset all the parameters { - Config: config.FromModel(t, configModel), + ConfigDirectory: acc.ConfigurationDirectory("TestAcc_Task/basic"), + ConfigVariables: config.ConfigVariablesFromModel(t, configModel), Check: assert.AssertThat(t, objectparametersassert.TaskParameters(t, id). HasAllDefaults(). @@ -708,9 +1053,9 @@ func TestAcc_Task_Enabled(t *testing.T) { id := acc.TestClient().Ids.RandomSchemaObjectIdentifier() statement := "SELECT 1" configModelEnabled := model.TaskWithId("test", id, true, statement). - WithSchedule("5 MINUTES") + WithScheduleMinutes(5) configModelDisabled := model.TaskWithId("test", id, false, statement). - WithSchedule("5 MINUTES") + WithScheduleMinutes(5) resource.Test(t, resource.TestCase{ ProtoV6ProviderFactories: acc.TestAccProtoV6ProviderFactories, @@ -721,7 +1066,8 @@ func TestAcc_Task_Enabled(t *testing.T) { CheckDestroy: acc.CheckDestroy(t, resources.Task), Steps: []resource.TestStep{ { - Config: config.FromModel(t, configModelDisabled), + ConfigDirectory: acc.ConfigurationDirectory("TestAcc_Task/basic"), + ConfigVariables: config.ConfigVariablesFromModel(t, configModelDisabled), Check: assert.AssertThat(t, resourceassert.TaskResource(t, configModelDisabled.ResourceReference()). HasStartedString(r.BooleanFalse), @@ -730,7 +1076,8 @@ func TestAcc_Task_Enabled(t *testing.T) { ), }, { - Config: config.FromModel(t, configModelEnabled), + ConfigDirectory: acc.ConfigurationDirectory("TestAcc_Task/basic"), + ConfigVariables: config.ConfigVariablesFromModel(t, configModelEnabled), Check: assert.AssertThat(t, resourceassert.TaskResource(t, configModelEnabled.ResourceReference()). HasStartedString(r.BooleanTrue), @@ -739,7 +1086,8 @@ func TestAcc_Task_Enabled(t *testing.T) { ), }, { - Config: config.FromModel(t, configModelDisabled), + ConfigDirectory: acc.ConfigurationDirectory("TestAcc_Task/basic"), + ConfigVariables: config.ConfigVariablesFromModel(t, configModelDisabled), Check: assert.AssertThat(t, resourceassert.TaskResource(t, configModelDisabled.ResourceReference()). HasStartedString(r.BooleanFalse), @@ -751,7 +1099,6 @@ func TestAcc_Task_Enabled(t *testing.T) { }) } -// TODO(SNOW-1348116 - analyze in next pr): This test may also be not deterministic and sometimes it fail when resuming a task while other task is modifying DAG (removing after) func TestAcc_Task_ConvertStandaloneTaskToSubtask(t *testing.T) { _ = testenvs.GetOrSkipTest(t, testenvs.EnableAcceptance) acc.TestAccPreCheck(t) @@ -759,25 +1106,24 @@ func TestAcc_Task_ConvertStandaloneTaskToSubtask(t *testing.T) { id := acc.TestClient().Ids.RandomSchemaObjectIdentifier() id2 := acc.TestClient().Ids.RandomSchemaObjectIdentifier() statement := "SELECT 1" - schedule := "5 MINUTES" - firstTaskStandaloneModel := model.TaskWithId("main_task", id, true, statement). - WithSchedule(schedule). + firstTaskStandaloneModel := model.TaskWithId("root", id, true, statement). + WithScheduleMinutes(5). WithSuspendTaskAfterNumFailures(1) - secondTaskStandaloneModel := model.TaskWithId("second_task", id2, true, statement). - WithSchedule(schedule) + secondTaskStandaloneModel := model.TaskWithId("child", id2, true, statement). + WithScheduleMinutes(5) - rootTaskModel := model.TaskWithId("main_task", id, true, statement). - WithSchedule(schedule). + rootTaskModel := model.TaskWithId("root", id, true, statement). + WithScheduleMinutes(5). WithSuspendTaskAfterNumFailures(2) - childTaskModel := model.TaskWithId("second_task", id2, true, statement). + childTaskModel := model.TaskWithId("child", id2, true, statement). WithAfterValue(configvariable.SetVariable(configvariable.StringVariable(id.FullyQualifiedName()))) childTaskModel.SetDependsOn(rootTaskModel.ResourceReference()) - firstTaskStandaloneModelDisabled := model.TaskWithId("main_task", id, false, statement). - WithSchedule(schedule) - secondTaskStandaloneModelDisabled := model.TaskWithId("second_task", id2, false, statement). - WithSchedule(schedule) + firstTaskStandaloneModelDisabled := model.TaskWithId("root", id, false, statement). + WithScheduleMinutes(5) + secondTaskStandaloneModelDisabled := model.TaskWithId("child", id2, false, statement). + WithScheduleMinutes(5) secondTaskStandaloneModelDisabled.SetDependsOn(firstTaskStandaloneModelDisabled.ResourceReference()) resource.Test(t, resource.TestCase{ @@ -789,36 +1135,38 @@ func TestAcc_Task_ConvertStandaloneTaskToSubtask(t *testing.T) { CheckDestroy: acc.CheckDestroy(t, resources.Task), Steps: []resource.TestStep{ { - Config: config.FromModel(t, firstTaskStandaloneModel) + config.FromModel(t, secondTaskStandaloneModel), + ConfigDirectory: acc.ConfigurationDirectory("TestAcc_Task/with_task_dependency"), + ConfigVariables: config.ConfigVariablesFromModels(t, "tasks", firstTaskStandaloneModel, secondTaskStandaloneModel), Check: assert.AssertThat(t, resourceassert.TaskResource(t, firstTaskStandaloneModel.ResourceReference()). - HasScheduleString(schedule). + HasScheduleMinutes(5). HasStartedString(r.BooleanTrue). HasSuspendTaskAfterNumFailuresString("1"), resourceshowoutputassert.TaskShowOutput(t, firstTaskStandaloneModel.ResourceReference()). - HasSchedule(schedule). + HasScheduleMinutes(5). HasState(sdk.TaskStateStarted), resourceassert.TaskResource(t, secondTaskStandaloneModel.ResourceReference()). - HasScheduleString(schedule). + HasScheduleMinutes(5). HasStartedString(r.BooleanTrue), resourceshowoutputassert.TaskShowOutput(t, secondTaskStandaloneModel.ResourceReference()). - HasSchedule(schedule). + HasScheduleMinutes(5). HasState(sdk.TaskStateStarted), ), }, // Change the second task to run after the first one (creating a DAG) { - Config: config.FromModel(t, rootTaskModel) + config.FromModel(t, childTaskModel), + ConfigDirectory: acc.ConfigurationDirectory("TestAcc_Task/with_task_dependency"), + ConfigVariables: config.ConfigVariablesFromModels(t, "tasks", rootTaskModel, childTaskModel), Check: assert.AssertThat(t, resourceassert.TaskResource(t, rootTaskModel.ResourceReference()). - HasScheduleString(schedule). + HasScheduleMinutes(5). HasStartedString(r.BooleanTrue). HasSuspendTaskAfterNumFailuresString("2"), resourceshowoutputassert.TaskShowOutput(t, rootTaskModel.ResourceReference()). - HasSchedule(schedule). + HasScheduleMinutes(5). HasState(sdk.TaskStateStarted), resourceassert.TaskResource(t, childTaskModel.ResourceReference()). - HasAfterIds(id). + HasAfterIdsInOrder(id). HasStartedString(r.BooleanTrue), resourceshowoutputassert.TaskShowOutput(t, childTaskModel.ResourceReference()). HasPredecessors(id). @@ -827,20 +1175,21 @@ func TestAcc_Task_ConvertStandaloneTaskToSubtask(t *testing.T) { }, // Change tasks in DAG to standalone tasks (disabled to check if resuming/suspending works correctly) { - Config: config.FromModel(t, firstTaskStandaloneModelDisabled) + config.FromModel(t, secondTaskStandaloneModelDisabled), + ConfigDirectory: acc.ConfigurationDirectory("TestAcc_Task/with_task_dependency"), + ConfigVariables: config.ConfigVariablesFromModels(t, "tasks", firstTaskStandaloneModelDisabled, secondTaskStandaloneModelDisabled), Check: assert.AssertThat(t, resourceassert.TaskResource(t, firstTaskStandaloneModelDisabled.ResourceReference()). - HasScheduleString(schedule). + HasScheduleMinutes(5). HasStartedString(r.BooleanFalse). HasSuspendTaskAfterNumFailuresString("10"), resourceshowoutputassert.TaskShowOutput(t, firstTaskStandaloneModelDisabled.ResourceReference()). - HasSchedule(schedule). + HasScheduleMinutes(5). HasState(sdk.TaskStateSuspended), resourceassert.TaskResource(t, secondTaskStandaloneModelDisabled.ResourceReference()). - HasScheduleString(schedule). + HasScheduleMinutes(5). HasStartedString(r.BooleanFalse), resourceshowoutputassert.TaskShowOutput(t, secondTaskStandaloneModelDisabled.ResourceReference()). - HasSchedule(schedule). + HasScheduleMinutes(5). HasState(sdk.TaskStateSuspended), ), }, @@ -855,25 +1204,25 @@ func TestAcc_Task_ConvertStandaloneTaskToFinalizer(t *testing.T) { rootTaskId := acc.TestClient().Ids.RandomSchemaObjectIdentifier() finalizerTaskId := acc.TestClient().Ids.RandomSchemaObjectIdentifier() statement := "SELECT 1" - schedule := "5 MINUTES" + schedule := 5 - firstTaskStandaloneModel := model.TaskWithId("main_task", rootTaskId, true, statement). - WithSchedule(schedule). + firstTaskStandaloneModel := model.TaskWithId("root", rootTaskId, true, statement). + WithScheduleMinutes(schedule). WithSuspendTaskAfterNumFailures(1) - secondTaskStandaloneModel := model.TaskWithId("second_task", finalizerTaskId, true, statement). - WithSchedule(schedule) + secondTaskStandaloneModel := model.TaskWithId("child", finalizerTaskId, true, statement). + WithScheduleMinutes(schedule) - rootTaskModel := model.TaskWithId("main_task", rootTaskId, true, statement). - WithSchedule(schedule). + rootTaskModel := model.TaskWithId("root", rootTaskId, true, statement). + WithScheduleMinutes(schedule). WithSuspendTaskAfterNumFailures(2) - childTaskModel := model.TaskWithId("second_task", finalizerTaskId, true, statement). + childTaskModel := model.TaskWithId("child", finalizerTaskId, true, statement). WithFinalize(rootTaskId.FullyQualifiedName()) childTaskModel.SetDependsOn(rootTaskModel.ResourceReference()) - firstTaskStandaloneModelDisabled := model.TaskWithId("main_task", rootTaskId, false, statement). - WithSchedule(schedule) - secondTaskStandaloneModelDisabled := model.TaskWithId("second_task", finalizerTaskId, false, statement). - WithSchedule(schedule) + firstTaskStandaloneModelDisabled := model.TaskWithId("root", rootTaskId, false, statement). + WithScheduleMinutes(schedule) + secondTaskStandaloneModelDisabled := model.TaskWithId("child", finalizerTaskId, false, statement). + WithScheduleMinutes(schedule) secondTaskStandaloneModelDisabled.SetDependsOn(firstTaskStandaloneModelDisabled.ResourceReference()) resource.Test(t, resource.TestCase{ @@ -885,36 +1234,38 @@ func TestAcc_Task_ConvertStandaloneTaskToFinalizer(t *testing.T) { CheckDestroy: acc.CheckDestroy(t, resources.Task), Steps: []resource.TestStep{ { - Config: config.FromModel(t, firstTaskStandaloneModel) + config.FromModel(t, secondTaskStandaloneModel), + ConfigDirectory: acc.ConfigurationDirectory("TestAcc_Task/with_task_dependency"), + ConfigVariables: config.ConfigVariablesFromModels(t, "tasks", firstTaskStandaloneModel, secondTaskStandaloneModel), Check: assert.AssertThat(t, resourceassert.TaskResource(t, firstTaskStandaloneModel.ResourceReference()). - HasScheduleString(schedule). + HasScheduleMinutes(schedule). HasStartedString(r.BooleanTrue). HasSuspendTaskAfterNumFailuresString("1"), resourceshowoutputassert.TaskShowOutput(t, firstTaskStandaloneModel.ResourceReference()). - HasSchedule(schedule). + HasScheduleMinutes(schedule). HasTaskRelations(sdk.TaskRelations{}). HasState(sdk.TaskStateStarted), resourceassert.TaskResource(t, secondTaskStandaloneModel.ResourceReference()). - HasScheduleString(schedule). + HasScheduleMinutes(schedule). HasStartedString(r.BooleanTrue), resourceshowoutputassert.TaskShowOutput(t, secondTaskStandaloneModel.ResourceReference()). - HasSchedule(schedule). + HasScheduleMinutes(schedule). HasTaskRelations(sdk.TaskRelations{}). HasState(sdk.TaskStateStarted), ), }, // Change the second task to run after the first one (creating a DAG) { - Config: config.FromModel(t, rootTaskModel) + config.FromModel(t, childTaskModel), + ConfigDirectory: acc.ConfigurationDirectory("TestAcc_Task/with_task_dependency"), + ConfigVariables: config.ConfigVariablesFromModels(t, "tasks", rootTaskModel, childTaskModel), Check: assert.AssertThat(t, resourceassert.TaskResource(t, rootTaskModel.ResourceReference()). - HasScheduleString(schedule). + HasScheduleMinutes(schedule). HasStartedString(r.BooleanTrue). HasSuspendTaskAfterNumFailuresString("2"), resourceshowoutputassert.TaskShowOutput(t, rootTaskModel.ResourceReference()). - HasSchedule(schedule). - // TODO(SNOW-1348116 - next pr): See why finalizer task is not populated + HasScheduleMinutes(schedule). + // TODO(SNOW-1348116 - next pr): Create ticket and report; this field in task relations seems to have mixed chances of appearing (needs deeper digging, doesn't affect the resource; could be removed for now) // HasTaskRelations(sdk.TaskRelations{FinalizerTask: &finalizerTaskId}). HasState(sdk.TaskStateStarted), resourceassert.TaskResource(t, childTaskModel.ResourceReference()). @@ -926,21 +1277,22 @@ func TestAcc_Task_ConvertStandaloneTaskToFinalizer(t *testing.T) { }, // Change tasks in DAG to standalone tasks (disabled to check if resuming/suspending works correctly) { - Config: config.FromModel(t, firstTaskStandaloneModelDisabled) + config.FromModel(t, secondTaskStandaloneModelDisabled), + ConfigDirectory: acc.ConfigurationDirectory("TestAcc_Task/with_task_dependency"), + ConfigVariables: config.ConfigVariablesFromModels(t, "tasks", firstTaskStandaloneModelDisabled, secondTaskStandaloneModelDisabled), Check: assert.AssertThat(t, resourceassert.TaskResource(t, firstTaskStandaloneModelDisabled.ResourceReference()). - HasScheduleString(schedule). + HasScheduleMinutes(schedule). HasStartedString(r.BooleanFalse). HasSuspendTaskAfterNumFailuresString("10"), resourceshowoutputassert.TaskShowOutput(t, firstTaskStandaloneModelDisabled.ResourceReference()). - HasSchedule(schedule). + HasScheduleMinutes(schedule). HasTaskRelations(sdk.TaskRelations{}). HasState(sdk.TaskStateSuspended), resourceassert.TaskResource(t, secondTaskStandaloneModelDisabled.ResourceReference()). - HasScheduleString(schedule). + HasScheduleMinutes(schedule). HasStartedString(r.BooleanFalse), resourceshowoutputassert.TaskShowOutput(t, secondTaskStandaloneModelDisabled.ResourceReference()). - HasSchedule(schedule). + HasScheduleMinutes(schedule). HasTaskRelations(sdk.TaskRelations{}). HasState(sdk.TaskStateSuspended), ), @@ -949,29 +1301,28 @@ func TestAcc_Task_ConvertStandaloneTaskToFinalizer(t *testing.T) { }) } -// TODO(SNOW-1348116 - analyze in next pr): This test is not deterministic and sometimes it fails when resuming a task while other task is modifying DAG (removing after) func TestAcc_Task_SwitchScheduledWithAfter(t *testing.T) { rootId := acc.TestClient().Ids.RandomSchemaObjectIdentifier() childId := acc.TestClient().Ids.RandomSchemaObjectIdentifier() statement := "SELECT 1" - schedule := "5 MINUTES" + schedule := 5 rootTaskConfigModel := model.TaskWithId("root", rootId, true, statement). - WithSchedule(schedule). + WithScheduleMinutes(schedule). WithSuspendTaskAfterNumFailures(1) childTaskConfigModel := model.TaskWithId("child", childId, true, statement). - WithSchedule(schedule) + WithScheduleMinutes(schedule) rootTaskConfigModelAfterSuspendFailuresUpdate := model.TaskWithId("root", rootId, true, statement). - WithSchedule(schedule). + WithScheduleMinutes(schedule). WithSuspendTaskAfterNumFailures(2) childTaskConfigModelWithAfter := model.TaskWithId("child", childId, true, statement). WithAfterValue(configvariable.SetVariable(configvariable.StringVariable(rootId.FullyQualifiedName()))) childTaskConfigModelWithAfter.SetDependsOn(rootTaskConfigModelAfterSuspendFailuresUpdate.ResourceReference()) rootTaskConfigModelDisabled := model.TaskWithId("root", rootId, false, statement). - WithSchedule(schedule) + WithScheduleMinutes(schedule) childTaskConfigModelDisabled := model.TaskWithId("child", childId, false, statement). - WithSchedule(schedule) + WithScheduleMinutes(schedule) childTaskConfigModelDisabled.SetDependsOn(rootTaskConfigModelDisabled.ResourceReference()) resource.Test(t, resource.TestCase{ @@ -983,58 +1334,62 @@ func TestAcc_Task_SwitchScheduledWithAfter(t *testing.T) { CheckDestroy: acc.CheckDestroy(t, resources.Task), Steps: []resource.TestStep{ { - Config: config.FromModel(t, rootTaskConfigModel) + config.FromModel(t, childTaskConfigModel), + ConfigDirectory: acc.ConfigurationDirectory("TestAcc_Task/with_task_dependency"), + ConfigVariables: config.ConfigVariablesFromModels(t, "tasks", rootTaskConfigModel, childTaskConfigModel), Check: assert.AssertThat(t, - resourceassert.TaskResource(t, "snowflake_task.child"). - HasStartedString(r.BooleanTrue). - HasScheduleString(schedule). - HasAfterIds(). - HasSuspendTaskAfterNumFailuresString("10"), - resourceassert.TaskResource(t, "snowflake_task.root"). + resourceassert.TaskResource(t, rootTaskConfigModel.ResourceReference()). HasStartedString(r.BooleanTrue). - HasScheduleString(schedule). + HasScheduleMinutes(schedule). HasSuspendTaskAfterNumFailuresString("1"), + resourceassert.TaskResource(t, childTaskConfigModel.ResourceReference()). + HasStartedString(r.BooleanTrue). + HasScheduleMinutes(schedule). + HasAfterIdsInOrder(). + HasSuspendTaskAfterNumFailuresString("10"), ), }, { - Config: config.FromModel(t, rootTaskConfigModelAfterSuspendFailuresUpdate) + config.FromModel(t, childTaskConfigModelWithAfter), + ConfigDirectory: acc.ConfigurationDirectory("TestAcc_Task/with_task_dependency"), + ConfigVariables: config.ConfigVariablesFromModels(t, "tasks", rootTaskConfigModelAfterSuspendFailuresUpdate, childTaskConfigModelWithAfter), Check: assert.AssertThat(t, - resourceassert.TaskResource(t, "snowflake_task.child"). + resourceassert.TaskResource(t, rootTaskConfigModelAfterSuspendFailuresUpdate.ResourceReference()). HasStartedString(r.BooleanTrue). - HasScheduleString(""). - HasAfterIds(rootId). - HasSuspendTaskAfterNumFailuresString("10"), - resourceassert.TaskResource(t, "snowflake_task.root"). - HasStartedString(r.BooleanTrue). - HasScheduleString(schedule). + HasScheduleMinutes(schedule). HasSuspendTaskAfterNumFailuresString("2"), + resourceassert.TaskResource(t, childTaskConfigModelWithAfter.ResourceReference()). + HasStartedString(r.BooleanTrue). + HasNoScheduleSet(). + HasAfterIdsInOrder(rootId). + HasSuspendTaskAfterNumFailuresString("10"), ), }, { - Config: config.FromModel(t, rootTaskConfigModel) + config.FromModel(t, childTaskConfigModel), + ConfigDirectory: acc.ConfigurationDirectory("TestAcc_Task/with_task_dependency"), + ConfigVariables: config.ConfigVariablesFromModels(t, "tasks", rootTaskConfigModel, childTaskConfigModel), Check: assert.AssertThat(t, - resourceassert.TaskResource(t, "snowflake_task.child"). - HasStartedString(r.BooleanTrue). - HasScheduleString(schedule). - HasAfterIds(). - HasSuspendTaskAfterNumFailuresString("10"), - resourceassert.TaskResource(t, "snowflake_task.root"). + resourceassert.TaskResource(t, rootTaskConfigModel.ResourceReference()). HasStartedString(r.BooleanTrue). - HasScheduleString(schedule). + HasScheduleMinutes(schedule). HasSuspendTaskAfterNumFailuresString("1"), + resourceassert.TaskResource(t, childTaskConfigModel.ResourceReference()). + HasStartedString(r.BooleanTrue). + HasScheduleMinutes(schedule). + HasAfterIdsInOrder(). + HasSuspendTaskAfterNumFailuresString("10"), ), }, { - Config: config.FromModel(t, rootTaskConfigModelDisabled) + config.FromModel(t, childTaskConfigModelDisabled), + ConfigDirectory: acc.ConfigurationDirectory("TestAcc_Task/with_task_dependency"), + ConfigVariables: config.ConfigVariablesFromModels(t, "tasks", rootTaskConfigModelDisabled, childTaskConfigModelDisabled), Check: assert.AssertThat(t, - resourceassert.TaskResource(t, "snowflake_task.child"). + resourceassert.TaskResource(t, rootTaskConfigModelDisabled.ResourceReference()). HasStartedString(r.BooleanFalse). - HasScheduleString(schedule). - HasAfterIds(). + HasScheduleMinutes(schedule). HasSuspendTaskAfterNumFailuresString("10"), - resourceassert.TaskResource(t, "snowflake_task.root"). + resourceassert.TaskResource(t, childTaskConfigModelDisabled.ResourceReference()). HasStartedString(r.BooleanFalse). - HasScheduleString(schedule). + HasScheduleMinutes(schedule). + HasAfterIdsInOrder(). HasSuspendTaskAfterNumFailuresString("10"), ), }, @@ -1049,24 +1404,22 @@ func TestAcc_Task_WithAfter(t *testing.T) { rootId := acc.TestClient().Ids.RandomSchemaObjectIdentifier() childId := acc.TestClient().Ids.RandomSchemaObjectIdentifier() statement := "SELECT 1" - schedule := "5 MINUTES" + schedule := 5 rootTaskConfigModel := model.TaskWithId("root", rootId, true, statement). WithWarehouse(acc.TestClient().Ids.WarehouseId().Name()). - WithSchedule(schedule). + WithScheduleMinutes(schedule). WithSqlStatement(statement) childTaskConfigModelWithAfter := model.TaskWithId("child", childId, true, statement). WithWarehouse(acc.TestClient().Ids.WarehouseId().Name()). WithAfterValue(configvariable.SetVariable(configvariable.StringVariable(rootId.FullyQualifiedName()))). WithSqlStatement(statement) - childTaskConfigModelWithAfter.SetDependsOn(rootTaskConfigModel.ResourceReference()) childTaskConfigModelWithoutAfter := model.TaskWithId("child", childId, true, statement). WithWarehouse(acc.TestClient().Ids.WarehouseId().Name()). - WithSchedule(schedule). + WithScheduleMinutes(schedule). WithSqlStatement(statement) - childTaskConfigModelWithoutAfter.SetDependsOn(rootTaskConfigModel.ResourceReference()) resource.Test(t, resource.TestCase{ ProtoV6ProviderFactories: acc.TestAccProtoV6ProviderFactories, @@ -1077,25 +1430,27 @@ func TestAcc_Task_WithAfter(t *testing.T) { CheckDestroy: acc.CheckDestroy(t, resources.Task), Steps: []resource.TestStep{ { - Config: config.FromModel(t, rootTaskConfigModel) + config.FromModel(t, childTaskConfigModelWithAfter), + ConfigDirectory: acc.ConfigurationDirectory("TestAcc_Task/with_task_dependency"), + ConfigVariables: config.ConfigVariablesFromModels(t, "tasks", rootTaskConfigModel, childTaskConfigModelWithAfter), Check: assert.AssertThat(t, resourceassert.TaskResource(t, rootTaskConfigModel.ResourceReference()). HasStartedString(r.BooleanTrue). - HasScheduleString(schedule), + HasScheduleMinutes(schedule), resourceassert.TaskResource(t, childTaskConfigModelWithAfter.ResourceReference()). HasStartedString(r.BooleanTrue). - HasAfterIds(rootId), + HasAfterIdsInOrder(rootId), ), }, { - Config: config.FromModel(t, rootTaskConfigModel) + config.FromModel(t, childTaskConfigModelWithoutAfter), + ConfigDirectory: acc.ConfigurationDirectory("TestAcc_Task/with_task_dependency"), + ConfigVariables: config.ConfigVariablesFromModels(t, "tasks", rootTaskConfigModel, childTaskConfigModelWithoutAfter), Check: assert.AssertThat(t, resourceassert.TaskResource(t, rootTaskConfigModel.ResourceReference()). HasStartedString(r.BooleanTrue). - HasScheduleString(schedule), + HasScheduleMinutes(schedule), resourceassert.TaskResource(t, childTaskConfigModelWithoutAfter.ResourceReference()). HasStartedString(r.BooleanTrue). - HasAfterIds(), + HasAfterIdsInOrder(), ), }, }, @@ -1109,24 +1464,22 @@ func TestAcc_Task_WithFinalizer(t *testing.T) { rootId := acc.TestClient().Ids.RandomSchemaObjectIdentifier() childId := acc.TestClient().Ids.RandomSchemaObjectIdentifier() statement := "SELECT 1" - schedule := "5 MINUTES" + schedule := 5 rootTaskConfigModel := model.TaskWithId("root", rootId, true, statement). WithWarehouse(acc.TestClient().Ids.WarehouseId().Name()). - WithSchedule(schedule). + WithScheduleMinutes(schedule). WithSqlStatement(statement) childTaskConfigModelWithFinalizer := model.TaskWithId("child", childId, true, statement). WithWarehouse(acc.TestClient().Ids.WarehouseId().Name()). WithFinalize(rootId.FullyQualifiedName()). WithSqlStatement(statement) - childTaskConfigModelWithFinalizer.SetDependsOn(rootTaskConfigModel.ResourceReference()) childTaskConfigModelWithoutFinalizer := model.TaskWithId("child", childId, true, statement). WithWarehouse(acc.TestClient().Ids.WarehouseId().Name()). - WithSchedule(schedule). + WithScheduleMinutes(schedule). WithSqlStatement(statement) - childTaskConfigModelWithoutFinalizer.SetDependsOn(rootTaskConfigModel.ResourceReference()) resource.Test(t, resource.TestCase{ ProtoV6ProviderFactories: acc.TestAccProtoV6ProviderFactories, @@ -1137,22 +1490,24 @@ func TestAcc_Task_WithFinalizer(t *testing.T) { CheckDestroy: acc.CheckDestroy(t, resources.Task), Steps: []resource.TestStep{ { - Config: config.FromModel(t, rootTaskConfigModel) + config.FromModel(t, childTaskConfigModelWithFinalizer), + ConfigDirectory: acc.ConfigurationDirectory("TestAcc_Task/with_task_dependency"), + ConfigVariables: config.ConfigVariablesFromModels(t, "tasks", rootTaskConfigModel, childTaskConfigModelWithFinalizer), Check: assert.AssertThat(t, resourceassert.TaskResource(t, rootTaskConfigModel.ResourceReference()). HasStartedString(r.BooleanTrue). - HasScheduleString(schedule), + HasScheduleMinutes(schedule), resourceassert.TaskResource(t, childTaskConfigModelWithFinalizer.ResourceReference()). HasStartedString(r.BooleanTrue). HasFinalizeString(rootId.FullyQualifiedName()), ), }, { - Config: config.FromModel(t, rootTaskConfigModel) + config.FromModel(t, childTaskConfigModelWithoutFinalizer), + ConfigDirectory: acc.ConfigurationDirectory("TestAcc_Task/with_task_dependency"), + ConfigVariables: config.ConfigVariablesFromModels(t, "tasks", rootTaskConfigModel, childTaskConfigModelWithoutFinalizer), Check: assert.AssertThat(t, resourceassert.TaskResource(t, rootTaskConfigModel.ResourceReference()). HasStartedString(r.BooleanTrue). - HasScheduleString(schedule), + HasScheduleMinutes(schedule), resourceassert.TaskResource(t, childTaskConfigModelWithoutFinalizer.ResourceReference()). HasStartedString(r.BooleanTrue). HasFinalizeString(""), @@ -1169,26 +1524,24 @@ func TestAcc_Task_UpdateFinalizerExternally(t *testing.T) { rootId := acc.TestClient().Ids.RandomSchemaObjectIdentifier() childId := acc.TestClient().Ids.RandomSchemaObjectIdentifier() statement := "SELECT 1" - schedule := "5 MINUTES" + schedule := 5 rootTaskConfigModel := model.TaskWithId("root", rootId, true, statement). WithWarehouse(acc.TestClient().Ids.WarehouseId().Name()). - WithSchedule(schedule). + WithScheduleMinutes(schedule). WithSqlStatement(statement) childTaskConfigModelWithoutFinalizer := model.TaskWithId("child", childId, true, statement). WithWarehouse(acc.TestClient().Ids.WarehouseId().Name()). - WithSchedule(schedule). + WithScheduleMinutes(schedule). WithComment("abc"). WithSqlStatement(statement) - childTaskConfigModelWithoutFinalizer.SetDependsOn(rootTaskConfigModel.ResourceReference()) childTaskConfigModelWithFinalizer := model.TaskWithId("child", childId, true, statement). WithWarehouse(acc.TestClient().Ids.WarehouseId().Name()). WithFinalize(rootId.FullyQualifiedName()). WithComment("abc"). WithSqlStatement(statement) - childTaskConfigModelWithFinalizer.SetDependsOn(rootTaskConfigModel.ResourceReference()) resource.Test(t, resource.TestCase{ ProtoV6ProviderFactories: acc.TestAccProtoV6ProviderFactories, @@ -1199,7 +1552,8 @@ func TestAcc_Task_UpdateFinalizerExternally(t *testing.T) { CheckDestroy: acc.CheckDestroy(t, resources.Task), Steps: []resource.TestStep{ { - Config: config.FromModel(t, rootTaskConfigModel) + config.FromModel(t, childTaskConfigModelWithoutFinalizer), + ConfigDirectory: acc.ConfigurationDirectory("TestAcc_Task/with_task_dependency"), + ConfigVariables: config.ConfigVariablesFromModels(t, "tasks", rootTaskConfigModel, childTaskConfigModelWithoutFinalizer), }, // Set finalizer externally { @@ -1213,7 +1567,8 @@ func TestAcc_Task_UpdateFinalizerExternally(t *testing.T) { acc.TestClient().Task.Alter(t, sdk.NewAlterTaskRequest(childId).WithResume(true)) acc.TestClient().Task.Alter(t, sdk.NewAlterTaskRequest(rootId).WithResume(true)) }, - Config: config.FromModel(t, rootTaskConfigModel) + config.FromModel(t, childTaskConfigModelWithoutFinalizer), + ConfigDirectory: acc.ConfigurationDirectory("TestAcc_Task/with_task_dependency"), + ConfigVariables: config.ConfigVariablesFromModels(t, "tasks", rootTaskConfigModel, childTaskConfigModelWithoutFinalizer), Check: assert.AssertThat(t, resourceassert.TaskResource(t, childTaskConfigModelWithoutFinalizer.ResourceReference()). HasStartedString(r.BooleanTrue). @@ -1225,7 +1580,8 @@ func TestAcc_Task_UpdateFinalizerExternally(t *testing.T) { }, // Set finalizer in config { - Config: config.FromModel(t, rootTaskConfigModel) + config.FromModel(t, childTaskConfigModelWithFinalizer), + ConfigDirectory: acc.ConfigurationDirectory("TestAcc_Task/with_task_dependency"), + ConfigVariables: config.ConfigVariablesFromModels(t, "tasks", rootTaskConfigModel, childTaskConfigModelWithFinalizer), Check: assert.AssertThat(t, resourceassert.TaskResource(t, childTaskConfigModelWithFinalizer.ResourceReference()). HasStartedString(r.BooleanTrue). @@ -1242,12 +1598,13 @@ func TestAcc_Task_UpdateFinalizerExternally(t *testing.T) { acc.TestClient().Task.Alter(t, sdk.NewAlterTaskRequest(childId).WithSuspend(true)) acc.TestClient().Task.Alter(t, sdk.NewAlterTaskRequest(childId).WithUnsetFinalize(true)) - acc.TestClient().Task.Alter(t, sdk.NewAlterTaskRequest(childId).WithSet(*sdk.NewTaskSetRequest().WithSchedule(schedule))) + acc.TestClient().Task.Alter(t, sdk.NewAlterTaskRequest(childId).WithSet(*sdk.NewTaskSetRequest().WithSchedule(fmt.Sprintf("%d minutes", schedule)))) acc.TestClient().Task.Alter(t, sdk.NewAlterTaskRequest(childId).WithResume(true)) acc.TestClient().Task.Alter(t, sdk.NewAlterTaskRequest(rootId).WithResume(true)) }, - Config: config.FromModel(t, rootTaskConfigModel) + config.FromModel(t, childTaskConfigModelWithFinalizer), + ConfigDirectory: acc.ConfigurationDirectory("TestAcc_Task/with_task_dependency"), + ConfigVariables: config.ConfigVariablesFromModels(t, "tasks", rootTaskConfigModel, childTaskConfigModelWithFinalizer), Check: assert.AssertThat(t, resourceassert.TaskResource(t, childTaskConfigModelWithFinalizer.ResourceReference()). HasStartedString(r.BooleanTrue). @@ -1259,7 +1616,8 @@ func TestAcc_Task_UpdateFinalizerExternally(t *testing.T) { }, // Unset finalizer in config { - Config: config.FromModel(t, rootTaskConfigModel) + config.FromModel(t, childTaskConfigModelWithoutFinalizer), + ConfigDirectory: acc.ConfigurationDirectory("TestAcc_Task/with_task_dependency"), + ConfigVariables: config.ConfigVariablesFromModels(t, "tasks", rootTaskConfigModel, childTaskConfigModelWithoutFinalizer), Check: assert.AssertThat(t, resourceassert.TaskResource(t, childTaskConfigModelWithoutFinalizer.ResourceReference()). HasStartedString(r.BooleanTrue). @@ -1280,26 +1638,24 @@ func TestAcc_Task_UpdateAfterExternally(t *testing.T) { rootId := acc.TestClient().Ids.RandomSchemaObjectIdentifier() childId := acc.TestClient().Ids.RandomSchemaObjectIdentifier() statement := "SELECT 1" - schedule := "5 MINUTES" + schedule := 5 rootTaskConfigModel := model.TaskWithId("root", rootId, true, statement). WithWarehouse(acc.TestClient().Ids.WarehouseId().Name()). - WithSchedule(schedule). + WithScheduleMinutes(schedule). WithSqlStatement(statement) childTaskConfigModelWithoutAfter := model.TaskWithId("child", childId, true, statement). WithWarehouse(acc.TestClient().Ids.WarehouseId().Name()). - WithSchedule(schedule). + WithScheduleMinutes(schedule). WithComment("abc"). WithSqlStatement(statement) - childTaskConfigModelWithoutAfter.SetDependsOn(rootTaskConfigModel.ResourceReference()) childTaskConfigModelWithAfter := model.TaskWithId("child", childId, true, statement). WithWarehouse(acc.TestClient().Ids.WarehouseId().Name()). WithAfterValue(configvariable.SetVariable(configvariable.StringVariable(rootId.FullyQualifiedName()))). WithComment("abc"). WithSqlStatement(statement) - childTaskConfigModelWithAfter.SetDependsOn(rootTaskConfigModel.ResourceReference()) resource.Test(t, resource.TestCase{ ProtoV6ProviderFactories: acc.TestAccProtoV6ProviderFactories, @@ -1310,7 +1666,8 @@ func TestAcc_Task_UpdateAfterExternally(t *testing.T) { CheckDestroy: acc.CheckDestroy(t, resources.Task), Steps: []resource.TestStep{ { - Config: config.FromModel(t, rootTaskConfigModel) + config.FromModel(t, childTaskConfigModelWithoutAfter), + ConfigDirectory: acc.ConfigurationDirectory("TestAcc_Task/with_task_dependency"), + ConfigVariables: config.ConfigVariablesFromModels(t, "tasks", rootTaskConfigModel, childTaskConfigModelWithoutAfter), }, // Set after externally { @@ -1324,11 +1681,12 @@ func TestAcc_Task_UpdateAfterExternally(t *testing.T) { acc.TestClient().Task.Alter(t, sdk.NewAlterTaskRequest(childId).WithResume(true)) acc.TestClient().Task.Alter(t, sdk.NewAlterTaskRequest(rootId).WithResume(true)) }, - Config: config.FromModel(t, rootTaskConfigModel) + config.FromModel(t, childTaskConfigModelWithoutAfter), + ConfigDirectory: acc.ConfigurationDirectory("TestAcc_Task/with_task_dependency"), + ConfigVariables: config.ConfigVariablesFromModels(t, "tasks", rootTaskConfigModel, childTaskConfigModelWithoutAfter), Check: assert.AssertThat(t, resourceassert.TaskResource(t, childTaskConfigModelWithoutAfter.ResourceReference()). HasStartedString(r.BooleanTrue). - HasAfterIds(), + HasAfterIdsInOrder(), resourceshowoutputassert.TaskShowOutput(t, childTaskConfigModelWithoutAfter.ResourceReference()). HasState(sdk.TaskStateStarted). HasTaskRelations(sdk.TaskRelations{}), @@ -1336,11 +1694,12 @@ func TestAcc_Task_UpdateAfterExternally(t *testing.T) { }, // Set after in config { - Config: config.FromModel(t, rootTaskConfigModel) + config.FromModel(t, childTaskConfigModelWithAfter), + ConfigDirectory: acc.ConfigurationDirectory("TestAcc_Task/with_task_dependency"), + ConfigVariables: config.ConfigVariablesFromModels(t, "tasks", rootTaskConfigModel, childTaskConfigModelWithAfter), Check: assert.AssertThat(t, resourceassert.TaskResource(t, childTaskConfigModelWithAfter.ResourceReference()). HasStartedString(r.BooleanTrue). - HasAfterIds(rootId), + HasAfterIdsInOrder(rootId), resourceshowoutputassert.TaskShowOutput(t, childTaskConfigModelWithAfter.ResourceReference()). HasState(sdk.TaskStateStarted). HasTaskRelations(sdk.TaskRelations{Predecessors: []sdk.SchemaObjectIdentifier{rootId}}), @@ -1353,16 +1712,17 @@ func TestAcc_Task_UpdateAfterExternally(t *testing.T) { acc.TestClient().Task.Alter(t, sdk.NewAlterTaskRequest(childId).WithSuspend(true)) acc.TestClient().Task.Alter(t, sdk.NewAlterTaskRequest(childId).WithRemoveAfter([]sdk.SchemaObjectIdentifier{rootId})) - acc.TestClient().Task.Alter(t, sdk.NewAlterTaskRequest(childId).WithSet(*sdk.NewTaskSetRequest().WithSchedule(schedule))) + acc.TestClient().Task.Alter(t, sdk.NewAlterTaskRequest(childId).WithSet(*sdk.NewTaskSetRequest().WithSchedule(fmt.Sprintf("%d MINUTES", schedule)))) acc.TestClient().Task.Alter(t, sdk.NewAlterTaskRequest(childId).WithResume(true)) acc.TestClient().Task.Alter(t, sdk.NewAlterTaskRequest(rootId).WithResume(true)) }, - Config: config.FromModel(t, rootTaskConfigModel) + config.FromModel(t, childTaskConfigModelWithAfter), + ConfigDirectory: acc.ConfigurationDirectory("TestAcc_Task/with_task_dependency"), + ConfigVariables: config.ConfigVariablesFromModels(t, "tasks", rootTaskConfigModel, childTaskConfigModelWithAfter), Check: assert.AssertThat(t, resourceassert.TaskResource(t, childTaskConfigModelWithAfter.ResourceReference()). HasStartedString(r.BooleanTrue). - HasAfterIds(rootId), + HasAfterIdsInOrder(rootId), resourceshowoutputassert.TaskShowOutput(t, childTaskConfigModelWithAfter.ResourceReference()). HasState(sdk.TaskStateStarted). HasTaskRelations(sdk.TaskRelations{Predecessors: []sdk.SchemaObjectIdentifier{rootId}}), @@ -1370,11 +1730,12 @@ func TestAcc_Task_UpdateAfterExternally(t *testing.T) { }, // Unset after in config { - Config: config.FromModel(t, rootTaskConfigModel) + config.FromModel(t, childTaskConfigModelWithoutAfter), + ConfigDirectory: acc.ConfigurationDirectory("TestAcc_Task/with_task_dependency"), + ConfigVariables: config.ConfigVariablesFromModels(t, "tasks", rootTaskConfigModel, childTaskConfigModelWithoutAfter), Check: assert.AssertThat(t, resourceassert.TaskResource(t, childTaskConfigModelWithoutAfter.ResourceReference()). HasStartedString(r.BooleanTrue). - HasAfterIds(), + HasAfterIdsInOrder(), resourceshowoutputassert.TaskShowOutput(t, childTaskConfigModelWithoutAfter.ResourceReference()). HasState(sdk.TaskStateStarted). HasTaskRelations(sdk.TaskRelations{}), @@ -1391,11 +1752,11 @@ func TestAcc_Task_issue2207(t *testing.T) { rootId := acc.TestClient().Ids.RandomSchemaObjectIdentifier() childId := acc.TestClient().Ids.RandomSchemaObjectIdentifier() statement := "SELECT 1" - schedule := "5 MINUTES" + schedule := 5 rootTaskConfigModel := model.TaskWithId("root", rootId, true, statement). WithWarehouse(acc.TestClient().Ids.WarehouseId().Name()). - WithSchedule(schedule). + WithScheduleMinutes(schedule). WithSqlStatement(statement) childTaskConfigModel := model.TaskWithId("child", childId, true, statement). @@ -1403,14 +1764,12 @@ func TestAcc_Task_issue2207(t *testing.T) { WithAfterValue(configvariable.SetVariable(configvariable.StringVariable(rootId.FullyQualifiedName()))). WithComment("abc"). WithSqlStatement(statement) - childTaskConfigModel.SetDependsOn(rootTaskConfigModel.ResourceReference()) childTaskConfigModelWithDifferentComment := model.TaskWithId("child", childId, true, statement). WithWarehouse(acc.TestClient().Ids.WarehouseId().Name()). WithAfterValue(configvariable.SetVariable(configvariable.StringVariable(rootId.FullyQualifiedName()))). WithComment("def"). WithSqlStatement(statement) - childTaskConfigModelWithDifferentComment.SetDependsOn(rootTaskConfigModel.ResourceReference()) resource.Test(t, resource.TestCase{ ProtoV6ProviderFactories: acc.TestAccProtoV6ProviderFactories, @@ -1421,14 +1780,15 @@ func TestAcc_Task_issue2207(t *testing.T) { CheckDestroy: acc.CheckDestroy(t, resources.Task), Steps: []resource.TestStep{ { - Config: config.FromModel(t, rootTaskConfigModel) + config.FromModel(t, childTaskConfigModel), + ConfigDirectory: acc.ConfigurationDirectory("TestAcc_Task/with_task_dependency"), + ConfigVariables: config.ConfigVariablesFromModels(t, "tasks", rootTaskConfigModel, childTaskConfigModel), Check: assert.AssertThat(t, resourceassert.TaskResource(t, rootTaskConfigModel.ResourceReference()). HasStartedString(r.BooleanTrue). - HasScheduleString(schedule), + HasScheduleMinutes(schedule), resourceassert.TaskResource(t, childTaskConfigModel.ResourceReference()). HasStartedString(r.BooleanTrue). - HasAfterIds(rootId). + HasAfterIdsInOrder(rootId). HasCommentString("abc"), ), }, @@ -1439,14 +1799,15 @@ func TestAcc_Task_issue2207(t *testing.T) { plancheck.ExpectResourceAction(childTaskConfigModelWithDifferentComment.ResourceReference(), plancheck.ResourceActionUpdate), }, }, - Config: config.FromModel(t, rootTaskConfigModel) + config.FromModel(t, childTaskConfigModelWithDifferentComment), + ConfigDirectory: acc.ConfigurationDirectory("TestAcc_Task/with_task_dependency"), + ConfigVariables: config.ConfigVariablesFromModels(t, "tasks", rootTaskConfigModel, childTaskConfigModelWithDifferentComment), Check: assert.AssertThat(t, resourceassert.TaskResource(t, rootTaskConfigModel.ResourceReference()). HasStartedString(r.BooleanTrue). - HasScheduleString(schedule), + HasScheduleMinutes(schedule), resourceassert.TaskResource(t, childTaskConfigModelWithDifferentComment.ResourceReference()). HasStartedString(r.BooleanTrue). - HasAfterIds(rootId). + HasAfterIdsInOrder(rootId). HasCommentString("def"), ), }, @@ -1460,15 +1821,15 @@ func TestAcc_Task_issue2036(t *testing.T) { id := acc.TestClient().Ids.RandomSchemaObjectIdentifier() statement := "SELECT 1" - schedule := "5 MINUTES" + schedule := 5 when := "TRUE" taskConfigModelWithoutWhen := model.TaskWithId("test", id, true, statement). - WithSchedule(schedule). + WithScheduleMinutes(schedule). WithSqlStatement(statement) taskConfigModelWithWhen := model.TaskWithId("test", id, true, statement). - WithSchedule(schedule). + WithScheduleMinutes(schedule). WithSqlStatement(statement). WithWhen(when) @@ -1482,7 +1843,8 @@ func TestAcc_Task_issue2036(t *testing.T) { Steps: []resource.TestStep{ // create without when { - Config: config.FromModel(t, taskConfigModelWithoutWhen), + ConfigDirectory: acc.ConfigurationDirectory("TestAcc_Task/basic"), + ConfigVariables: config.ConfigVariablesFromModel(t, taskConfigModelWithoutWhen), Check: assert.AssertThat(t, resourceassert.TaskResource(t, taskConfigModelWithoutWhen.ResourceReference()). HasStartedString(r.BooleanTrue). @@ -1491,7 +1853,8 @@ func TestAcc_Task_issue2036(t *testing.T) { }, // add when { - Config: config.FromModel(t, taskConfigModelWithWhen), + ConfigDirectory: acc.ConfigurationDirectory("TestAcc_Task/basic"), + ConfigVariables: config.ConfigVariablesFromModel(t, taskConfigModelWithWhen), Check: assert.AssertThat(t, resourceassert.TaskResource(t, taskConfigModelWithWhen.ResourceReference()). HasStartedString(r.BooleanTrue). @@ -1500,7 +1863,8 @@ func TestAcc_Task_issue2036(t *testing.T) { }, // remove when { - Config: config.FromModel(t, taskConfigModelWithoutWhen), + ConfigDirectory: acc.ConfigurationDirectory("TestAcc_Task/basic"), + ConfigVariables: config.ConfigVariablesFromModel(t, taskConfigModelWithoutWhen), Check: assert.AssertThat(t, resourceassert.TaskResource(t, taskConfigModelWithoutWhen.ResourceReference()). HasStartedString(r.BooleanTrue). @@ -1520,9 +1884,9 @@ func TestAcc_Task_issue3113(t *testing.T) { id := acc.TestClient().Ids.RandomSchemaObjectIdentifier() statement := "SELECT 1" - schedule := "5 MINUTES" + schedule := 5 configModel := model.TaskWithId("test", id, true, statement). - WithSchedule(schedule). + WithScheduleMinutes(schedule). WithSqlStatement(statement). WithErrorIntegration(errorNotificationIntegration.ID().Name()) @@ -1548,7 +1912,8 @@ func TestAcc_Task_issue3113(t *testing.T) { acc.TestClient().Task.DropFunc(t, id)() }, ProtoV6ProviderFactories: acc.TestAccProtoV6ProviderFactories, - Config: config.FromModel(t, configModel), + ConfigDirectory: acc.ConfigurationDirectory("TestAcc_Task/basic"), + ConfigVariables: config.ConfigVariablesFromModel(t, configModel), Check: assert.AssertThat(t, resourceassert.TaskResource(t, configModel.ResourceReference()). HasErrorIntegrationString(errorNotificationIntegration.ID().Name()), diff --git a/pkg/resources/testdata/TestAcc_Task/basic/test.tf b/pkg/resources/testdata/TestAcc_Task/basic/test.tf new file mode 100644 index 0000000000..78e5a97811 --- /dev/null +++ b/pkg/resources/testdata/TestAcc_Task/basic/test.tf @@ -0,0 +1,82 @@ +resource "snowflake_task" "test" { + name = var.name + database = var.database + schema = var.schema + started = var.started + sql_statement = var.sql_statement + + # Optionals + warehouse = var.warehouse + config = var.config + allow_overlapping_execution = var.allow_overlapping_execution + error_integration = var.error_integration + when = var.when + comment = var.comment + + dynamic "schedule" { + for_each = [for element in [var.schedule] : element if element != null] + content { + minutes = lookup(var.schedule, "minutes", null) + using_cron = lookup(var.schedule, "cron", null) + } + } + + # Parameters + suspend_task_after_num_failures = var.suspend_task_after_num_failures + task_auto_retry_attempts = var.task_auto_retry_attempts + user_task_managed_initial_warehouse_size = var.user_task_managed_initial_warehouse_size + user_task_minimum_trigger_interval_in_seconds = var.user_task_minimum_trigger_interval_in_seconds + user_task_timeout_ms = var.user_task_timeout_ms + abort_detached_query = var.abort_detached_query + autocommit = var.autocommit + binary_input_format = var.binary_input_format + binary_output_format = var.binary_output_format + client_memory_limit = var.client_memory_limit + client_metadata_request_use_connection_ctx = var.client_metadata_request_use_connection_ctx + client_prefetch_threads = var.client_prefetch_threads + client_result_chunk_size = var.client_result_chunk_size + client_result_column_case_insensitive = var.client_result_column_case_insensitive + client_session_keep_alive = var.client_session_keep_alive + client_session_keep_alive_heartbeat_frequency = var.client_session_keep_alive_heartbeat_frequency + client_timestamp_type_mapping = var.client_timestamp_type_mapping + date_input_format = var.date_input_format + date_output_format = var.date_output_format + enable_unload_physical_type_optimization = var.enable_unload_physical_type_optimization + error_on_nondeterministic_merge = var.error_on_nondeterministic_merge + error_on_nondeterministic_update = var.error_on_nondeterministic_update + geography_output_format = var.geography_output_format + geometry_output_format = var.geometry_output_format + jdbc_use_session_timezone = var.jdbc_use_session_timezone + json_indent = var.json_indent + lock_timeout = var.lock_timeout + log_level = var.log_level + multi_statement_count = var.multi_statement_count + noorder_sequence_as_default = var.noorder_sequence_as_default + odbc_treat_decimal_as_int = var.odbc_treat_decimal_as_int + query_tag = var.query_tag + quoted_identifiers_ignore_case = var.quoted_identifiers_ignore_case + rows_per_resultset = var.rows_per_resultset + s3_stage_vpce_dns_name = var.s3_stage_vpce_dns_name + search_path = var.search_path + statement_queued_timeout_in_seconds = var.statement_queued_timeout_in_seconds + statement_timeout_in_seconds = var.statement_timeout_in_seconds + strict_json_output = var.strict_json_output + timestamp_day_is_always_24h = var.timestamp_day_is_always_24h + timestamp_input_format = var.timestamp_input_format + timestamp_ltz_output_format = var.timestamp_ltz_output_format + timestamp_ntz_output_format = var.timestamp_ntz_output_format + timestamp_output_format = var.timestamp_output_format + timestamp_type_mapping = var.timestamp_type_mapping + timestamp_tz_output_format = var.timestamp_tz_output_format + timezone = var.timezone + time_input_format = var.time_input_format + time_output_format = var.time_output_format + trace_level = var.trace_level + transaction_abort_on_error = var.transaction_abort_on_error + transaction_default_isolation_level = var.transaction_default_isolation_level + two_digit_century_start = var.two_digit_century_start + unsupported_ddl_action = var.unsupported_ddl_action + use_cached_result = var.use_cached_result + week_of_year_policy = var.week_of_year_policy + week_start = var.week_start +} diff --git a/pkg/resources/testdata/TestAcc_Task/basic/variables.tf b/pkg/resources/testdata/TestAcc_Task/basic/variables.tf new file mode 100644 index 0000000000..70b21e25b2 --- /dev/null +++ b/pkg/resources/testdata/TestAcc_Task/basic/variables.tf @@ -0,0 +1,341 @@ +variable "database" { + type = string +} + +variable "schema" { + type = string +} + +variable "name" { + type = string +} + +variable "started" { + type = bool +} + +variable "sql_statement" { + type = string +} + +# Optionals +variable "comment" { + type = string + default = null +} + +variable "warehouse" { + type = string + default = null +} + +variable "config" { + type = string + default = null +} + +variable "allow_overlapping_execution" { + type = string + default = null +} + +variable "error_integration" { + type = string + default = null +} + +variable "when" { + type = string + default = null +} + +variable "schedule" { + default = null + type = map(string) +} + +# Parameters +variable "suspend_task_after_num_failures" { + default = null + type = number +} + +variable "task_auto_retry_attempts" { + default = null + type = number +} + +variable "user_task_managed_initial_warehouse_size" { + default = null + type = string +} + +variable "user_task_minimum_trigger_interval_in_seconds" { + default = null + type = number +} + +variable "user_task_timeout_ms" { + default = null + type = number +} + +variable "abort_detached_query" { + default = null + type = bool +} + +variable "autocommit" { + default = null + type = bool +} + +variable "binary_input_format" { + default = null + type = string +} + +variable "binary_output_format" { + default = null + type = string +} + +variable "client_memory_limit" { + default = null + type = number +} + +variable "client_metadata_request_use_connection_ctx" { + default = null + type = bool +} + +variable "client_prefetch_threads" { + default = null + type = number +} + +variable "client_result_chunk_size" { + default = null + type = number +} + +variable "client_result_column_case_insensitive" { + default = null + type = bool +} + +variable "client_session_keep_alive" { + default = null + type = bool +} + +variable "client_session_keep_alive_heartbeat_frequency" { + default = null + type = number +} + +variable "client_timestamp_type_mapping" { + default = null + type = string +} + +variable "date_input_format" { + default = null + type = string +} + +variable "date_output_format" { + default = null + type = string +} + +variable "enable_unload_physical_type_optimization" { + default = null + type = bool +} + +variable "error_on_nondeterministic_merge" { + default = null + type = bool +} + +variable "error_on_nondeterministic_update" { + default = null + type = bool +} + +variable "geography_output_format" { + default = null + type = string +} + +variable "geometry_output_format" { + default = null + type = string +} + +variable "jdbc_use_session_timezone" { + default = null + type = bool +} + +variable "json_indent" { + default = null + type = number +} + +variable "lock_timeout" { + default = null + type = number +} + +variable "log_level" { + default = null + type = string +} + +variable "multi_statement_count" { + default = null + type = number +} + +variable "noorder_sequence_as_default" { + default = null + type = bool +} + +variable "odbc_treat_decimal_as_int" { + default = null + type = bool +} + +variable "query_tag" { + default = null + type = string +} + +variable "quoted_identifiers_ignore_case" { + default = null + type = bool +} + +variable "rows_per_resultset" { + default = null + type = number +} + +variable "s3_stage_vpce_dns_name" { + default = null + type = string +} + +variable "search_path" { + default = null + type = string +} + +variable "statement_queued_timeout_in_seconds" { + default = null + type = number +} + +variable "statement_timeout_in_seconds" { + default = null + type = number +} + +variable "strict_json_output" { + default = null + type = bool +} + +variable "timestamp_day_is_always_24h" { + default = null + type = bool +} + +variable "timestamp_input_format" { + default = null + type = string +} + +variable "timestamp_ltz_output_format" { + default = null + type = string +} + +variable "timestamp_ntz_output_format" { + default = null + type = string +} + +variable "timestamp_output_format" { + default = null + type = string +} + +variable "timestamp_type_mapping" { + default = null + type = string +} + +variable "timestamp_tz_output_format" { + default = null + type = string +} + +variable "timezone" { + default = null + type = string +} + +variable "time_input_format" { + default = null + type = string +} + +variable "time_output_format" { + default = null + type = string +} + +variable "trace_level" { + default = null + type = string +} + +variable "transaction_abort_on_error" { + default = null + type = bool +} + +variable "transaction_default_isolation_level" { + default = null + type = string +} + +variable "two_digit_century_start" { + default = null + type = number +} + +variable "unsupported_ddl_action" { + default = null + type = string +} + +variable "use_cached_result" { + default = null + type = bool +} + +variable "week_of_year_policy" { + default = null + type = number +} + +variable "week_start" { + default = null + type = number +} diff --git a/pkg/resources/testdata/TestAcc_Task/with_task_dependency/test.tf b/pkg/resources/testdata/TestAcc_Task/with_task_dependency/test.tf new file mode 100644 index 0000000000..d8cb747aef --- /dev/null +++ b/pkg/resources/testdata/TestAcc_Task/with_task_dependency/test.tf @@ -0,0 +1,49 @@ +resource "snowflake_task" "root" { + name = var.tasks[0].name + database = var.tasks[0].database + schema = var.tasks[0].schema + started = var.tasks[0].started + sql_statement = var.tasks[0].sql_statement + + # Optionals + dynamic "schedule" { + for_each = [for element in [var.tasks[0].schedule] : element if element != null] + content { + minutes = lookup(var.tasks[0].schedule, "minutes", null) + using_cron = lookup(var.tasks[0].schedule, "cron", null) + } + } + + comment = var.tasks[0].comment + after = var.tasks[0].after + finalize = var.tasks[0].finalize + + # Parameters + suspend_task_after_num_failures = var.tasks[0].suspend_task_after_num_failures +} + +resource "snowflake_task" "child" { + depends_on = [snowflake_task.root] + + name = var.tasks[1].name + database = var.tasks[1].database + schema = var.tasks[1].schema + started = var.tasks[1].started + sql_statement = var.tasks[1].sql_statement + + # Optionals + dynamic "schedule" { + for_each = [for element in [var.tasks[1].schedule] : element if element != null] + content { + minutes = lookup(var.tasks[1].schedule, "minutes", null) + using_cron = lookup(var.tasks[1].schedule, "cron", null) + } + } + + comment = var.tasks[1].comment + after = var.tasks[1].after + finalize = var.tasks[1].finalize + + # Parameters + suspend_task_after_num_failures = var.tasks[1].suspend_task_after_num_failures +} diff --git a/pkg/resources/testdata/TestAcc_Task/with_task_dependency/variables.tf b/pkg/resources/testdata/TestAcc_Task/with_task_dependency/variables.tf new file mode 100644 index 0000000000..f08bfd135f --- /dev/null +++ b/pkg/resources/testdata/TestAcc_Task/with_task_dependency/variables.tf @@ -0,0 +1,18 @@ +variable "tasks" { + type = list(object({ + database = string + schema = string + name = string + started = bool + sql_statement = string + + # Optionals + comment = optional(string) + schedule = optional(map(string)) + after = optional(set(string)) + finalize = optional(string) + + # Parameters + suspend_task_after_num_failures = optional(number) + })) +} diff --git a/pkg/sdk/grants_impl.go b/pkg/sdk/grants_impl.go index 678f328bf9..3cb14ae05c 100644 --- a/pkg/sdk/grants_impl.go +++ b/pkg/sdk/grants_impl.go @@ -446,7 +446,7 @@ func (v *grants) grantOwnershipOnTask(ctx context.Context, taskId SchemaObjectId return err } - if currentTask.State == TaskStateStarted && !slices.ContainsFunc(tasksToResume, func(id SchemaObjectIdentifier) bool { + if currentTask.IsStarted() && !slices.ContainsFunc(tasksToResume, func(id SchemaObjectIdentifier) bool { return id.FullyQualifiedName() == currentTask.ID().FullyQualifiedName() }) { tasksToResume = append(tasksToResume, currentTask.ID()) diff --git a/pkg/sdk/tasks_gen.go b/pkg/sdk/tasks_gen.go index 3f8798351c..f2fd244d75 100644 --- a/pkg/sdk/tasks_gen.go +++ b/pkg/sdk/tasks_gen.go @@ -209,6 +209,10 @@ func (v *Task) ID() SchemaObjectIdentifier { return NewSchemaObjectIdentifier(v.DatabaseName, v.SchemaName, v.Name) } +func (v *Task) IsStarted() bool { + return v.State == TaskStateStarted +} + // DescribeTaskOptions is based on https://docs.snowflake.com/en/sql-reference/sql/desc-task. type DescribeTaskOptions struct { describe bool `ddl:"static" sql:"DESCRIBE"` diff --git a/pkg/sdk/tasks_impl_gen.go b/pkg/sdk/tasks_impl_gen.go index 9a15a6c680..ac390a67c9 100644 --- a/pkg/sdk/tasks_impl_gen.go +++ b/pkg/sdk/tasks_impl_gen.go @@ -100,7 +100,7 @@ func (v *tasks) SuspendRootTasks(ctx context.Context, taskId SchemaObjectIdentif for _, rootTask := range rootTasks { // If a root task is started, then it needs to be suspended before the child tasks can be created - if rootTask.State == TaskStateStarted { + if rootTask.IsStarted() { err := v.client.Tasks.Alter(ctx, NewAlterTaskRequest(rootTask.ID()).WithSuspend(true)) if err != nil { log.Printf("[WARN] failed to suspend task %s", rootTask.ID().FullyQualifiedName()) diff --git a/pkg/sdk/testint/tasks_gen_integration_test.go b/pkg/sdk/testint/tasks_gen_integration_test.go index bc3da46a95..34edff7495 100644 --- a/pkg/sdk/testint/tasks_gen_integration_test.go +++ b/pkg/sdk/testint/tasks_gen_integration_test.go @@ -36,7 +36,7 @@ func TestInt_Tasks(t *testing.T) { HasComment(""). HasWarehouse(warehouseId). HasSchedule(""). - HasPredecessors(). + HasPredecessorsInAnyOrder(). HasState(sdk.TaskStateStarted). HasDefinition(sql). HasCondition(""). @@ -78,12 +78,12 @@ func TestInt_Tasks(t *testing.T) { HasLastSuspendedOn("") if predecessor != nil { - asserts.HasPredecessors(*predecessor) + asserts.HasPredecessorsInAnyOrder(*predecessor) asserts.HasTaskRelations(sdk.TaskRelations{ Predecessors: []sdk.SchemaObjectIdentifier{*predecessor}, }) } else { - asserts.HasPredecessors() + asserts.HasPredecessorsInAnyOrder() asserts.HasTaskRelations(sdk.TaskRelations{}) } @@ -103,7 +103,7 @@ func TestInt_Tasks(t *testing.T) { HasOwner(""). HasComment(""). HasWarehouse(nil). - HasPredecessors(). + HasPredecessorsInAnyOrder(). HasState(""). HasDefinition(""). HasCondition("").