From dcc89cdf3b2fb0d97b34839ccc3f347c9a0a43cc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Cie=C5=9Blak?= Date: Thu, 14 Nov 2024 14:41:13 +0100 Subject: [PATCH 1/3] wip --- MIGRATION_GUIDE.md | 62 +- docs/data-sources/tasks.md | 2 +- docs/resources/task.md | 150 +++- examples/resources/snowflake_task/resource.tf | 150 +++- .../resourceassert/task_resource_ext.go | 5 + pkg/datasources/common.go | 4 + pkg/datasources/tasks.go | 2 +- pkg/resources/resource_helpers_update.go | 15 + pkg/resources/task.go | 140 ++-- pkg/resources/task_acceptance_test.go | 656 +++++++++++++++++- pkg/resources/task_state_upgraders.go | 49 ++ pkg/sdk/tasks_def.go | 3 +- pkg/sdk/tasks_gen.go | 30 + pkg/sdk/tasks_gen_test.go | 67 +- pkg/sdk/tasks_validations_gen.go | 4 +- pkg/sdk/testint/tasks_gen_integration_test.go | 4 +- 16 files changed, 1165 insertions(+), 178 deletions(-) create mode 100644 pkg/resources/task_state_upgraders.go diff --git a/MIGRATION_GUIDE.md b/MIGRATION_GUIDE.md index d1d76a6b25..e78f9ecbae 100644 --- a/MIGRATION_GUIDE.md +++ b/MIGRATION_GUIDE.md @@ -9,11 +9,6 @@ across different versions. ## v0.98.0 ➞ v0.99.0 -### snowflake_task resource changes - -new fields: -- `config` - ### snowflake_tasks data source changes New filtering options: @@ -65,14 +60,32 @@ output "simple_output" { } ``` -Please adjust your Terraform configuration files. - -## v0.98.0 ➞ v0.99.0 - ### snowflake_task resource changes +New fields: +- `config` - enables to specify JSON-formatted metadata that can be retrieved in the `sql_statement` by using [SYSTEM$GET_TASK_GRAPH_CONFIG](https://docs.snowflake.com/en/sql-reference/functions/system_get_task_graph_config). +- `show_output` and `parameters` fields added for holding SHOW and SHOW PARAMETERS output (see [raw Snowflake output](./v1-preparations/CHANGES_BEFORE_V1.md#raw-snowflake-output)). +- Added support for finalizer tasks with `finalize` field. It conflicts with `after` and `schedule` (see [finalizer tasks](https://docs.snowflake.com/en/user-guide/tasks-graphs#release-and-cleanup-of-task-graphs)). + Changes: -- `enabled` field changed to `started` and type changed to string with only boolean values available (see ["empty" values](./v1-preparations/CHANGES_BEFORE_V1.md#empty-values)) -- `shedule` field changed from single value to nested object that allows for specifying either minutes or cron +- `enabled` field changed to `started` and type changed to string with only boolean values available (see ["empty" values](./v1-preparations/CHANGES_BEFORE_V1.md#empty-values)). + +Before: +```terraform +resource "snowflake_task" "example" { + # ... + enabled = true + # ... +} +``` +After: +```terraform +resource "snowflake_task" "example" { + # ... + started = true + # ... +} +``` +- `schedule` field changed from single value to a nested object that allows for specifying either minutes or cron Before: ```terraform @@ -80,7 +93,7 @@ resource "snowflake_task" "example" { # ... schedule = "5 MINUTES" # or - schedule = "USING SCHEDULE * * * * * UTC" + schedule = "USING CRON * * * * * UTC" # ... } ``` @@ -96,9 +109,28 @@ resource "snowflake_task" "example" { # ... } ``` -- All task parameters defined in [the Snowflake documentation](https://docs.snowflake.com/en/sql-reference/parameters) added into the top-level schema and removed `session_paramters` map. -- `show_output` and `paramters` fields added for holding SHOW and SHOW PARAMETERS output (see [raw Snowflake output](./v1-preparations/CHANGES_BEFORE_V1.md#raw-snowflake-output)). -- Added support for finalizer tasks with `finalize` field. It conflicts with `after` and `schedule` (see [finalizer tasks](https://docs.snowflake.com/en/user-guide/tasks-graphs#release-and-cleanup-of-task-graphs)). +- All task parameters defined in [the Snowflake documentation](https://docs.snowflake.com/en/sql-reference/parameters) added into the top-level schema and removed `session_parameters` map. + +Before: +```terraform +resource "snowflake_task" "example" { + # ... + session_parameters = { + QUERY_TAG = "" + } + # ... +} +``` +After: +```terraform +resource "snowflake_task" "example" { + # ... + query_tag = "" + # ... +} +``` + +- `after` field type was changed from `list` to `set`. No changes in configuration are necessary. ## v0.98.0 ➞ v0.99.0 diff --git a/docs/data-sources/tasks.md b/docs/data-sources/tasks.md index a9b8e7efc0..f035be55cd 100644 --- a/docs/data-sources/tasks.md +++ b/docs/data-sources/tasks.md @@ -146,7 +146,7 @@ check "task_check" { - `limit` (Block List, Max: 1) Limits the number of rows returned. If the `limit.from` is set, then the limit wll start from the first element matched by the expression. The expression is only used to match with the first element, later on the elements are not matched by the prefix, but you can enforce a certain pattern with `starts_with` or `like`. (see [below for nested schema](#nestedblock--limit)) - `root_only` (Boolean) Filters the command output to return only root tasks (tasks with no predecessors). - `starts_with` (String) Filters the output with **case-sensitive** characters indicating the beginning of the object name. -- `with_parameters` (Boolean) Runs SHOW PARAMETERS FOR TASK for each user returned by SHOW TASK. The output of describe is saved to the parameters field as a map. By default this value is set to true. +- `with_parameters` (Boolean) Runs SHOW PARAMETERS FOR TASK for each task returned by SHOW TASK and saves the output to the parameters field as a map. By default this value is set to true. ### Read-Only diff --git a/docs/resources/task.md b/docs/resources/task.md index 549a73f9d9..642c358454 100644 --- a/docs/resources/task.md +++ b/docs/resources/task.md @@ -14,59 +14,133 @@ Resource used to manage task objects. For more information, check [task document ## Example Usage ```terraform +# Basic standalone task resource "snowflake_task" "task" { - comment = "my task" - database = "database" schema = "schema" + name = "task" warehouse = "warehouse" - - name = "task" - schedule = "10 MINUTE" - sql_statement = "select * from foo;" - - session_parameters = { - "foo" : "bar", + started = true + schedule { + minutes = 5 } - - user_task_timeout_ms = 10000 - after = "preceding_task" - when = "foo AND bar" - enabled = true + sql_statement = "select 1" } +# Basic serverless task resource "snowflake_task" "serverless_task" { - comment = "my serverless task" - - database = "db" - schema = "schema" - - name = "serverless_task" - schedule = "10 MINUTE" - sql_statement = "select * from foo;" - - session_parameters = { - "foo" : "bar", - } - - user_task_timeout_ms = 10000 + database = "database" + schema = "schema" + name = "task" user_task_managed_initial_warehouse_size = "XSMALL" - after = [snowflake_task.task.name] - when = "foo AND bar" - enabled = true + started = true + schedule { + minutes = 5 + } + sql_statement = "select 1" } -resource "snowflake_task" "test_task" { - comment = "task with allow_overlapping_execution" +# Basic child task +resource "snowflake_task" "child_task" { + database = "database" + schema = "schema" + name = "task" + warehouse = "warehouse" + started = true + # You can do it by referring to task by computed fully_qualified_name field or write the task name in manually if it's not managed by Terraform + after = [snowflake_task.root_task.fully_qualified_name, ".."] + sql_statement = "select 1" +} - database = "database" - schema = "schema" +# Basic finalizer task +resource "snowflake_task" "child_task" { + database = "database" + schema = "schema" + name = "task" + warehouse = "warehouse" + started = true + # You can do it by referring to task by computed fully_qualified_name field or write the task name in manually if it's not managed by Terraform + finalize = snowflake_task.root_task.fully_qualified_name + sql_statement = "select 1" +} - name = "test_task" - sql_statement = "select 1 as c;" +# Complete standalone task +resource "snowflake_task" "test" { + database = "database" + schema = "schema" + name = "task" + warehouse = "warehouse" + started = true + sql_statement = "select 1" + config = "{\"key\":\"value\"}" allow_overlapping_execution = true - enabled = true + error_integration = "" + when = "SYSTEM$STREAM_HAS_DATA('')" + comment = "complete task" + + schedule { + minutes = 10 + } + + # Session Parameters + suspend_task_after_num_failures = 10 + task_auto_retry_attempts = 0 + user_task_managed_initial_warehouse_size = "Medium" + user_task_minimum_trigger_interval_in_seconds = 30 + user_task_timeout_ms = 3600000 + abort_detached_query = false + autocommit = true + binary_input_format = "HEX" + binary_output_format = "HEX" + client_memory_limit = 1536 + client_metadata_request_use_connection_ctx = false + client_prefetch_threads = 4 + client_result_chunk_size = 160 + client_result_column_case_insensitive = false + client_session_keep_alive = false + client_session_keep_alive_heartbeat_frequency = 3600 + client_timestamp_type_mapping = "TIMESTAMP_LTZ" + date_input_format = "AUTO" + date_output_format = "YYYY-MM-DD" + enable_unload_physical_type_optimization = true + error_on_nondeterministic_merge = true + error_on_nondeterministic_update = false + geography_output_format = "GeoJSON" + geometry_output_format = "GeoJSON" + jdbc_use_session_timezone = true + json_indent = 2 + lock_timeout = 43200 + log_level = "OFF" + multi_statement_count = 1 + noorder_sequence_as_default = true + odbc_treat_decimal_as_int = false + query_tag = "" + quoted_identifiers_ignore_case = false + rows_per_resultset = 0 + s3_stage_vpce_dns_name = "" + search_path = "$current, $public" + statement_queued_timeout_in_seconds = 0 + statement_timeout_in_seconds = 172800 + strict_json_output = false + timestamp_day_is_always_24h = false + timestamp_input_format = "AUTO" + timestamp_ltz_output_format = "" + timestamp_ntz_output_format = "YYYY-MM-DD HH24:MI:SS.FF3" + timestamp_output_format = "YYYY-MM-DD HH24:MI:SS.FF3 TZHTZM" + timestamp_type_mapping = "TIMESTAMP_NTZ" + timestamp_tz_output_format = "" + timezone = "America/Los_Angeles" + time_input_format = "AUTO" + time_output_format = "HH24:MI:SS" + trace_level = "OFF" + transaction_abort_on_error = false + transaction_default_isolation_level = "READ COMMITTED" + two_digit_century_start = 1970 + unsupported_ddl_action = "ignore" + use_cached_result = true + week_of_year_policy = 0 + week_start = 0 } ``` -> **Note** Instead of using fully_qualified_name, you can reference objects managed outside Terraform by constructing a correct ID, consult [identifiers guide](https://registry.terraform.io/providers/Snowflake-Labs/snowflake/latest/docs/guides/identifiers#new-computed-fully-qualified-name-field-in-resources). diff --git a/examples/resources/snowflake_task/resource.tf b/examples/resources/snowflake_task/resource.tf index 76e8a3c7a6..da18d05a81 100644 --- a/examples/resources/snowflake_task/resource.tf +++ b/examples/resources/snowflake_task/resource.tf @@ -1,54 +1,128 @@ +# Basic standalone task resource "snowflake_task" "task" { - comment = "my task" - database = "database" schema = "schema" + name = "task" warehouse = "warehouse" - - name = "task" - schedule = "10 MINUTE" - sql_statement = "select * from foo;" - - session_parameters = { - "foo" : "bar", + started = true + schedule { + minutes = 5 } - - user_task_timeout_ms = 10000 - after = "preceding_task" - when = "foo AND bar" - enabled = true + sql_statement = "select 1" } +# Basic serverless task resource "snowflake_task" "serverless_task" { - comment = "my serverless task" - - database = "db" - schema = "schema" - - name = "serverless_task" - schedule = "10 MINUTE" - sql_statement = "select * from foo;" - - session_parameters = { - "foo" : "bar", - } - - user_task_timeout_ms = 10000 + database = "database" + schema = "schema" + name = "task" user_task_managed_initial_warehouse_size = "XSMALL" - after = [snowflake_task.task.name] - when = "foo AND bar" - enabled = true + started = true + schedule { + minutes = 5 + } + sql_statement = "select 1" } -resource "snowflake_task" "test_task" { - comment = "task with allow_overlapping_execution" +# Basic child task +resource "snowflake_task" "child_task" { + database = "database" + schema = "schema" + name = "task" + warehouse = "warehouse" + started = true + # You can do it by referring to task by computed fully_qualified_name field or write the task name in manually if it's not managed by Terraform + after = [snowflake_task.root_task.fully_qualified_name, ".."] + sql_statement = "select 1" +} - database = "database" - schema = "schema" +# Basic finalizer task +resource "snowflake_task" "child_task" { + database = "database" + schema = "schema" + name = "task" + warehouse = "warehouse" + started = true + # You can do it by referring to task by computed fully_qualified_name field or write the task name in manually if it's not managed by Terraform + finalize = snowflake_task.root_task.fully_qualified_name + sql_statement = "select 1" +} - name = "test_task" - sql_statement = "select 1 as c;" +# Complete standalone task +resource "snowflake_task" "test" { + database = "database" + schema = "schema" + name = "task" + warehouse = "warehouse" + started = true + sql_statement = "select 1" + config = "{\"key\":\"value\"}" allow_overlapping_execution = true - enabled = true + error_integration = "" + when = "SYSTEM$STREAM_HAS_DATA('')" + comment = "complete task" + + schedule { + minutes = 10 + } + + # Session Parameters + suspend_task_after_num_failures = 10 + task_auto_retry_attempts = 0 + user_task_managed_initial_warehouse_size = "Medium" + user_task_minimum_trigger_interval_in_seconds = 30 + user_task_timeout_ms = 3600000 + abort_detached_query = false + autocommit = true + binary_input_format = "HEX" + binary_output_format = "HEX" + client_memory_limit = 1536 + client_metadata_request_use_connection_ctx = false + client_prefetch_threads = 4 + client_result_chunk_size = 160 + client_result_column_case_insensitive = false + client_session_keep_alive = false + client_session_keep_alive_heartbeat_frequency = 3600 + client_timestamp_type_mapping = "TIMESTAMP_LTZ" + date_input_format = "AUTO" + date_output_format = "YYYY-MM-DD" + enable_unload_physical_type_optimization = true + error_on_nondeterministic_merge = true + error_on_nondeterministic_update = false + geography_output_format = "GeoJSON" + geometry_output_format = "GeoJSON" + jdbc_use_session_timezone = true + json_indent = 2 + lock_timeout = 43200 + log_level = "OFF" + multi_statement_count = 1 + noorder_sequence_as_default = true + odbc_treat_decimal_as_int = false + query_tag = "" + quoted_identifiers_ignore_case = false + rows_per_resultset = 0 + s3_stage_vpce_dns_name = "" + search_path = "$current, $public" + statement_queued_timeout_in_seconds = 0 + statement_timeout_in_seconds = 172800 + strict_json_output = false + timestamp_day_is_always_24h = false + timestamp_input_format = "AUTO" + timestamp_ltz_output_format = "" + timestamp_ntz_output_format = "YYYY-MM-DD HH24:MI:SS.FF3" + timestamp_output_format = "YYYY-MM-DD HH24:MI:SS.FF3 TZHTZM" + timestamp_type_mapping = "TIMESTAMP_NTZ" + timestamp_tz_output_format = "" + timezone = "America/Los_Angeles" + time_input_format = "AUTO" + time_output_format = "HH24:MI:SS" + trace_level = "OFF" + transaction_abort_on_error = false + transaction_default_isolation_level = "READ COMMITTED" + two_digit_century_start = 1970 + unsupported_ddl_action = "ignore" + use_cached_result = true + week_of_year_policy = 0 + week_start = 0 } diff --git a/pkg/acceptance/bettertestspoc/assert/resourceassert/task_resource_ext.go b/pkg/acceptance/bettertestspoc/assert/resourceassert/task_resource_ext.go index 0226d1d862..e13c43b5d9 100644 --- a/pkg/acceptance/bettertestspoc/assert/resourceassert/task_resource_ext.go +++ b/pkg/acceptance/bettertestspoc/assert/resourceassert/task_resource_ext.go @@ -33,3 +33,8 @@ func (t *TaskResourceAssert) HasNoScheduleSet() *TaskResourceAssert { t.AddAssertion(assert.ValueSet("schedule.#", "0")) return t } + +func (t *TaskResourceAssert) HasUserTaskManagedInitialWarehouseSizeEnum(size sdk.WarehouseSize) *TaskResourceAssert { + t.AddAssertion(assert.ValueSet("user_task_managed_initial_warehouse_size", string(size))) + return t +} diff --git a/pkg/datasources/common.go b/pkg/datasources/common.go index 2a7ae9ed10..24c68fcb85 100644 --- a/pkg/datasources/common.go +++ b/pkg/datasources/common.go @@ -1,6 +1,8 @@ package datasources import ( + "fmt" + "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/resources" "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/sdk" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" @@ -163,6 +165,8 @@ func handleIn(d *schema.ResourceData, setField **sdk.In) error { return err } *setField = &sdk.In{Schema: schemaId} + default: + return fmt.Errorf("the `in` filtering field was set, but none of the subfields (account, database, schema) was specified") } } return nil diff --git a/pkg/datasources/tasks.go b/pkg/datasources/tasks.go index b840c547e3..a43cad365d 100644 --- a/pkg/datasources/tasks.go +++ b/pkg/datasources/tasks.go @@ -17,7 +17,7 @@ var tasksSchema = map[string]*schema.Schema{ Type: schema.TypeBool, Optional: true, Default: true, - Description: "Runs SHOW PARAMETERS FOR TASK for each user returned by SHOW TASK. The output of describe is saved to the parameters field as a map. By default this value is set to true.", + Description: "Runs SHOW PARAMETERS FOR TASK for each task returned by SHOW TASK and saves the output to the parameters field as a map. By default this value is set to true.", }, "like": likeSchema, "in": extendedInSchema, diff --git a/pkg/resources/resource_helpers_update.go b/pkg/resources/resource_helpers_update.go index 602b2408b0..aca7856e66 100644 --- a/pkg/resources/resource_helpers_update.go +++ b/pkg/resources/resource_helpers_update.go @@ -104,3 +104,18 @@ func attributeDirectValueUpdate[T any](d *schema.ResourceData, key string, setFi } return nil } + +func attributeMappedValueUpdate[T, R any](d *schema.ResourceData, key string, setField **R, unsetField **bool, mapper func(T) (R, error)) error { + if d.HasChange(key) { + if v, ok := d.GetOk(key); ok { + mappedValue, err := mapper(v.(T)) + if err != nil { + return err + } + *setField = sdk.Pointer(mappedValue) + } else { + *unsetField = sdk.Bool(true) + } + } + return nil +} diff --git a/pkg/resources/task.go b/pkg/resources/task.go index 74370f13cd..c25d457400 100644 --- a/pkg/resources/task.go +++ b/pkg/resources/task.go @@ -4,12 +4,12 @@ import ( "context" "errors" "fmt" - "log" "slices" - "strconv" "strings" "time" + "github.com/hashicorp/go-cty/cty" + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/validation" "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/internal/collections" @@ -67,6 +67,14 @@ var taskSchema = map[string]*schema.Schema{ Description: "The warehouse the task will use. Omit this parameter to use Snowflake-managed compute resources for runs of this task. Due to Snowflake limitations warehouse identifier can consist of only upper-cased letters. (Conflicts with user_task_managed_initial_warehouse_size)", ConflictsWith: []string{"user_task_managed_initial_warehouse_size"}, }, + "user_task_managed_initial_warehouse_size": { + Type: schema.TypeString, + Optional: true, + ValidateDiagFunc: sdkValidation(sdk.ToWarehouseSize), + DiffSuppressFunc: NormalizeAndCompare(sdk.ToWarehouseSize), + Description: "Specifies the size of the compute resources to provision for the first run of the task, before a task history is available for Snowflake to determine an ideal size. Once a task has successfully completed a few runs, Snowflake ignores this parameter setting. (Conflicts with warehouse)", + ConflictsWith: []string{"warehouse"}, + }, "schedule": { Type: schema.TypeList, Optional: true, @@ -93,16 +101,10 @@ var taskSchema = map[string]*schema.Schema{ }, }, "config": { - Type: schema.TypeString, - Optional: true, - DiffSuppressFunc: SuppressIfAny( - IgnoreChangeToCurrentSnowflakeValueInShow("config"), - // TODO(SNOW-1348116 - next pr): Currently config has to be passed with $$ prefix and suffix. The best solution would be to put there only json, so it could be retrieved from file, etc. Move $$ adding to the SDK. - func(k, oldValue, newValue string, d *schema.ResourceData) bool { - return strings.Trim(oldValue, "$") == strings.Trim(newValue, "$") - }, - ), - Description: "Specifies a string representation of key value pairs that can be accessed by all tasks in the task graph. Must be in JSON format.", + Type: schema.TypeString, + Optional: true, + DiffSuppressFunc: IgnoreChangeToCurrentSnowflakeValueInShow("config"), + Description: "Specifies a string representation of key value pairs that can be accessed by all tasks in the task graph. Must be in JSON format.", }, "allow_overlapping_execution": { Type: schema.TypeString, @@ -136,15 +138,15 @@ var taskSchema = map[string]*schema.Schema{ ConflictsWith: []string{"schedule", "after"}, }, "after": { - Type: schema.TypeSet, + Type: schema.TypeSet, + Optional: true, Elem: &schema.Schema{ Type: schema.TypeString, - DiffSuppressFunc: suppressIdentifierQuoting, ValidateDiagFunc: IsValidIdentifier[sdk.SchemaObjectIdentifier](), }, - Optional: true, - Description: blocklistedCharactersFieldDescription("Specifies one or more predecessor tasks for the current task. Use this option to [create a DAG](https://docs.snowflake.com/en/user-guide/tasks-graphs.html#label-task-dag) of tasks or add this task to an existing DAG. A DAG is a series of tasks that starts with a scheduled root task and is linked together by dependencies."), - ConflictsWith: []string{"schedule", "finalize"}, + DiffSuppressFunc: NormalizeAndCompareIdentifiersInSet("after"), + Description: blocklistedCharactersFieldDescription("Specifies one or more predecessor tasks for the current task. Use this option to [create a DAG](https://docs.snowflake.com/en/user-guide/tasks-graphs.html#label-task-dag) of tasks or add this task to an existing DAG. A DAG is a series of tasks that starts with a scheduled root task and is linked together by dependencies."), + ConflictsWith: []string{"schedule", "finalize"}, }, "when": { Type: schema.TypeString, @@ -155,7 +157,6 @@ var taskSchema = map[string]*schema.Schema{ "sql_statement": { Type: schema.TypeString, Required: true, - ForceNew: false, DiffSuppressFunc: SuppressIfAny(DiffSuppressStatement, IgnoreChangeToCurrentSnowflakeValueInShow("definition")), Description: "Any single SQL statement, or a call to a stored procedure, executed when the task runs.", }, @@ -197,6 +198,15 @@ func Task() *schema.Resource { ComputedIfAnyAttributeChanged(taskSchema, FullyQualifiedNameAttributeName, "name"), taskParametersCustomDiff, ), + + SchemaVersion: 1, + StateUpgraders: []schema.StateUpgrader{ + { + Version: 0, + Type: cty.EmptyObject, + Upgrade: v098TaskStateUpgrader, + }, + }, } } @@ -312,7 +322,6 @@ func CreateTask(ctx context.Context, d *schema.ResourceData, meta any) (diags di return diag.FromErr(err) } - // TODO(SNOW-1348116 - next pr): State upgrader for "id" (and potentially other fields) d.SetId(helpers.EncodeResourceIdentifier(id)) if d.Get("started").(bool) { @@ -334,10 +343,10 @@ func CreateTask(ctx context.Context, d *schema.ResourceData, meta any) (diags di } }() - return ReadTask(false)(ctx, d, meta) + return append(diags, ReadTask(false)(ctx, d, meta)...) } -func UpdateTask(ctx context.Context, d *schema.ResourceData, meta any) diag.Diagnostics { +func UpdateTask(ctx context.Context, d *schema.ResourceData, meta any) (diags diag.Diagnostics) { client := meta.(*provider.Context).Client id, err := sdk.ParseSchemaObjectIdentifier(d.Id()) if err != nil { @@ -354,6 +363,12 @@ func UpdateTask(ctx context.Context, d *schema.ResourceData, meta any) diag.Diag return diag.FromErr(sdk.JoinErrors(err)) } + defer func() { + if err := client.Tasks.ResumeTasks(ctx, tasksToResume); err != nil { + diags = append(diags, resumeTaskErrorDiag(id, "create", err)) + } + }() + if task.IsStarted() { if err := client.Tasks.Alter(ctx, sdk.NewAlterTaskRequest(id).WithSuspend(true)); err != nil { return diag.FromErr(sdk.JoinErrors(err)) @@ -364,6 +379,7 @@ func UpdateTask(ctx context.Context, d *schema.ResourceData, meta any) diag.Diag set := sdk.NewTaskSetRequest() err = errors.Join( + attributeMappedValueUpdate(d, "user_task_managed_initial_warehouse_size", &set.UserTaskManagedInitialWarehouseSize, &unset.UserTaskManagedInitialWarehouseSize, sdk.ToWarehouseSize), accountObjectIdentifierAttributeUpdate(d, "warehouse", &set.Warehouse, &unset.Warehouse), stringAttributeUpdate(d, "config", &set.Config, &unset.Config), booleanStringAttributeUpdate(d, "allow_overlapping_execution", &set.AllowOverlappingExecution, &unset.AllowOverlappingExecution), @@ -445,10 +461,6 @@ func UpdateTask(ctx context.Context, d *schema.ResourceData, meta any) diag.Diag tasksToResume = append(tasksToResume, rootTaskId) } } else { - if task.TaskRelations.FinalizedRootTask == nil { - return diag.Errorf("trying to remove the finalizer when it's already unset") - } - rootTask, err := client.Tasks.ShowByID(ctx, *task.TaskRelations.FinalizedRootTask) if err != nil { return diag.FromErr(err) @@ -527,11 +539,7 @@ func UpdateTask(ctx context.Context, d *schema.ResourceData, meta any) diag.Diag return identifier.FullyQualifiedName() == id.FullyQualifiedName() }) - if err := client.Tasks.ResumeTasks(ctx, tasksToResume); err != nil { - log.Printf("[WARN] failed to resume tasks: %s", err) - } - - return ReadTask(false)(ctx, d, meta) + return append(diags, ReadTask(false)(ctx, d, meta)...) } func ReadTask(withExternalChangesMarking bool) schema.ReadContextFunc { @@ -587,35 +595,40 @@ func ReadTask(withExternalChangesMarking bool) schema.ReadContextFunc { return warehouse.Name(), nil }, nil), func() error { - upperSchedule := strings.ToUpper(task.Schedule) - switch { - case strings.Contains(upperSchedule, "USING CRON"): - // We have to do it this was because we want to get rid of the prefix and leave the casing as is (mostly because timezones like America/Los_Angeles are case-sensitive). - // That why the prefix trimming has to be done by slicing rather than using strings.TrimPrefix. - cron := task.Schedule[len("USING CRON "):] - if err := d.Set("schedule", []any{map[string]any{ - "using_cron": cron, - }}); err != nil { - return err - } - case strings.Contains(upperSchedule, "MINUTE"): - minuteParts := strings.Split(upperSchedule, " ") - minutes, err := strconv.Atoi(minuteParts[0]) + managedInitialWarehouseSizeIdx := slices.IndexFunc(taskParameters, func(p *sdk.Parameter) bool { + return p != nil && p.Key == string(sdk.TaskParameterUserTaskManagedInitialWarehouseSize) + }) + if managedInitialWarehouseSizeIdx == -1 { + return fmt.Errorf("unable to find user_task_managed_initial_warehouse_size parameter") + } + if taskParameters[managedInitialWarehouseSizeIdx].Level == sdk.ParameterTypeTask { + return d.Set("user_task_managed_initial_warehouse_size", taskParameters[managedInitialWarehouseSizeIdx].Value) + } + return nil + }(), + func() error { + if len(task.Schedule) > 0 { + taskSchedule, err := sdk.ParseTaskSchedule(task.Schedule) if err != nil { return err } - - if err := d.Set("schedule", []any{map[string]any{ - "minutes": minutes, - }}); err != nil { - return err - } - default: - if err := d.Set("schedule", nil); err != nil { - return err + switch { + case len(taskSchedule.Cron) > 0: + if err := d.Set("schedule", []any{map[string]any{ + "using_cron": taskSchedule.Cron, + }}); err != nil { + return err + } + case taskSchedule.Minutes > 0: + if err := d.Set("schedule", []any{map[string]any{ + "minutes": taskSchedule.Minutes, + }}); err != nil { + return err + } } + return nil } - return nil + return d.Set("schedule", nil) }(), d.Set("started", task.IsStarted()), d.Set("when", task.Condition), @@ -658,7 +671,7 @@ func DeleteTask(ctx context.Context, d *schema.ResourceData, meta any) (diags di } d.SetId("") - return nil + return diags } func resumeTaskErrorDiag(id sdk.SchemaObjectIdentifier, operation string, originalErr error) diag.Diagnostic { @@ -685,20 +698,3 @@ func waitForTaskStart(ctx context.Context, client *sdk.Client, id sdk.SchemaObje return nil, true }) } - -func waitForTaskSuspend(ctx context.Context, client *sdk.Client, id sdk.SchemaObjectIdentifier) error { - err := client.Tasks.Alter(ctx, sdk.NewAlterTaskRequest(id).WithSuspend(true)) - if err != nil { - return fmt.Errorf("error suspending task %s err = %w", id.FullyQualifiedName(), err) - } - return util.Retry(5, 5*time.Second, func() (error, bool) { - task, err := client.Tasks.ShowByID(ctx, id) - if err != nil { - return fmt.Errorf("error suspending task %s err = %w", id.FullyQualifiedName(), err), false - } - if task.State != sdk.TaskStateSuspended { - return nil, false - } - return nil, true - }) -} diff --git a/pkg/resources/task_acceptance_test.go b/pkg/resources/task_acceptance_test.go index d38e94bb22..9592dcb58d 100644 --- a/pkg/resources/task_acceptance_test.go +++ b/pkg/resources/task_acceptance_test.go @@ -4,6 +4,7 @@ import ( "bytes" "fmt" "regexp" + "strconv" "testing" "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/acceptance/bettertestspoc/assert/objectparametersassert" @@ -27,10 +28,6 @@ import ( "github.com/hashicorp/terraform-plugin-testing/tfversion" ) -// TODO(SNOW-1348116 - next pr): More tests for complicated DAGs -// TODO(SNOW-1348116 - next pr): Test for stored procedures passed to sql_statement (decide on name) -// TODO(SNOW-1348116 - next pr): More test with external changes - func TestAcc_Task_Basic(t *testing.T) { _ = testenvs.GetOrSkipTest(t, testenvs.EnableAcceptance) acc.TestAccPreCheck(t) @@ -198,7 +195,7 @@ func TestAcc_Task_Complete(t *testing.T) { ), }, { - ResourceName: "snowflake_task.test", + ResourceName: configModel.ResourceReference(), ImportState: true, ConfigDirectory: acc.ConfigurationDirectory("TestAcc_Task/basic"), ConfigVariables: config.ConfigVariablesFromModel(t, configModel), @@ -233,6 +230,7 @@ func TestAcc_Task_Updates(t *testing.T) { id := acc.TestClient().Ids.RandomSchemaObjectIdentifier() statement := "SELECT 1" + newStatement := "SELECT 123" basicConfigModel := model.TaskWithId("test", id, false, statement) // TODO(SNOW-1736173): New warehouse created, because the common one has lower-case letters that won't work @@ -245,7 +243,7 @@ func TestAcc_Task_Updates(t *testing.T) { taskConfig := `{"output_dir": "/temp/test_directory/", "learning_rate": 0.1}` comment := random.Comment() condition := `SYSTEM$STREAM_HAS_DATA('MYSTREAM')` - completeConfigModel := model.TaskWithId("test", id, true, statement). + completeConfigModel := model.TaskWithId("test", id, true, newStatement). WithWarehouse(warehouse.ID().Name()). WithScheduleMinutes(5). WithConfigValue(configvariable.StringVariable(taskConfig)). @@ -306,6 +304,303 @@ func TestAcc_Task_Updates(t *testing.T) { ), }, // Set + { + ConfigDirectory: acc.ConfigurationDirectory("TestAcc_Task/basic"), + ConfigVariables: config.ConfigVariablesFromModel(t, completeConfigModel), + ConfigPlanChecks: resource.ConfigPlanChecks{ + PreApply: []plancheck.PlanCheck{ + plancheck.ExpectResourceAction(completeConfigModel.ResourceReference(), plancheck.ResourceActionUpdate), + }, + }, + Check: assert.AssertThat(t, + resourceassert.TaskResource(t, completeConfigModel.ResourceReference()). + HasFullyQualifiedNameString(id.FullyQualifiedName()). + HasDatabaseString(id.DatabaseName()). + HasSchemaString(id.SchemaName()). + HasNameString(id.Name()). + HasStartedString(r.BooleanTrue). + HasWarehouseString(warehouse.ID().Name()). + HasScheduleMinutes(5). + HasConfigString(taskConfig). + HasAllowOverlappingExecutionString(r.BooleanTrue). + HasErrorIntegrationString(errorNotificationIntegration.ID().Name()). + HasCommentString(comment). + HasFinalizeString(""). + HasAfterIdsInOrder(). + HasWhenString(condition). + HasSqlStatementString(newStatement), + resourceshowoutputassert.TaskShowOutput(t, completeConfigModel.ResourceReference()). + HasCreatedOnNotEmpty(). + HasName(id.Name()). + HasIdNotEmpty(). + HasDatabaseName(id.DatabaseName()). + HasSchemaName(id.SchemaName()). + HasOwner(currentRole.Name()). + HasWarehouse(warehouse.ID()). + HasComment(comment). + HasScheduleMinutes(5). + HasPredecessors(). + HasState(sdk.TaskStateStarted). + HasDefinition(newStatement). + HasCondition(condition). + HasAllowOverlappingExecution(true). + HasErrorIntegration(errorNotificationIntegration.ID()). + HasLastCommittedOnNotEmpty(). + HasLastSuspendedOn(""). + HasOwnerRoleType("ROLE"). + HasConfig(taskConfig). + HasBudget(""). + HasTaskRelations(sdk.TaskRelations{}), + ), + }, + // Unset + { + Config: config.FromModel(t, basicConfigModel), + ConfigPlanChecks: resource.ConfigPlanChecks{ + PreApply: []plancheck.PlanCheck{ + plancheck.ExpectResourceAction(basicConfigModel.ResourceReference(), plancheck.ResourceActionUpdate), + }, + }, + Check: assert.AssertThat(t, + resourceassert.TaskResource(t, basicConfigModel.ResourceReference()). + HasFullyQualifiedNameString(id.FullyQualifiedName()). + HasDatabaseString(id.DatabaseName()). + HasSchemaString(id.SchemaName()). + HasNameString(id.Name()). + HasStartedString(r.BooleanFalse). + HasWarehouseString(""). + HasNoScheduleSet(). + HasConfigString(""). + HasAllowOverlappingExecutionString(r.BooleanDefault). + HasErrorIntegrationString(""). + HasCommentString(""). + HasFinalizeString(""). + HasAfterIdsInOrder(). + HasWhenString(""). + HasSqlStatementString(statement), + resourceshowoutputassert.TaskShowOutput(t, basicConfigModel.ResourceReference()). + HasCreatedOnNotEmpty(). + HasName(id.Name()). + HasIdNotEmpty(). + HasDatabaseName(id.DatabaseName()). + HasSchemaName(id.SchemaName()). + HasOwner(currentRole.Name()). + HasComment(""). + HasWarehouse(sdk.NewAccountObjectIdentifier("")). + HasNoSchedule(). + HasPredecessors(). + HasState(sdk.TaskStateSuspended). + HasDefinition(statement). + HasCondition(""). + HasAllowOverlappingExecution(false). + HasErrorIntegration(sdk.NewAccountObjectIdentifier("")). + HasLastCommittedOnNotEmpty(). + HasLastSuspendedOnNotEmpty(). + HasOwnerRoleType("ROLE"). + HasConfig(""). + HasBudget(""). + HasTaskRelations(sdk.TaskRelations{}), + ), + }, + }, + }) +} + +/* +DAG structure (the test proves child3 won't have any issues with updates in the following scenario): + + child1 + / \ + root child3 + \ / + child2 +*/ +func TestAcc_Task_UpdatesInComplexDAG(t *testing.T) { + _ = testenvs.GetOrSkipTest(t, testenvs.EnableAcceptance) + acc.TestAccPreCheck(t) + + rootTask, rootTaskCleanup := acc.TestClient().Task.CreateWithSchedule(t) + t.Cleanup(rootTaskCleanup) + + child1, child1Cleanup := acc.TestClient().Task.CreateWithAfter(t, rootTask.ID()) + t.Cleanup(child1Cleanup) + + child2, child2Cleanup := acc.TestClient().Task.CreateWithAfter(t, rootTask.ID()) + t.Cleanup(child2Cleanup) + + acc.TestClient().Task.Alter(t, sdk.NewAlterTaskRequest(child1.ID()).WithResume(true)) + acc.TestClient().Task.Alter(t, sdk.NewAlterTaskRequest(child2.ID()).WithResume(true)) + acc.TestClient().Task.Alter(t, sdk.NewAlterTaskRequest(rootTask.ID()).WithResume(true)) + t.Cleanup(func() { acc.TestClient().Task.Alter(t, sdk.NewAlterTaskRequest(rootTask.ID()).WithSuspend(true)) }) + + child3Id := acc.TestClient().Ids.RandomSchemaObjectIdentifier() + basicConfigModel := model.TaskWithId("test", child3Id, true, "SELECT 1"). + WithAfterValue(configvariable.SetVariable( + configvariable.StringVariable(child1.ID().FullyQualifiedName()), + configvariable.StringVariable(child2.ID().FullyQualifiedName()), + )) + + comment := random.Comment() + basicConfigModelAfterUpdate := model.TaskWithId("test", child3Id, true, "SELECT 1"). + WithAfterValue(configvariable.SetVariable( + configvariable.StringVariable(child1.ID().FullyQualifiedName()), + configvariable.StringVariable(child2.ID().FullyQualifiedName()), + )). + WithSqlStatement("SELECT 123"). // Overrides sql_statement + WithComment(comment) + + resource.Test(t, resource.TestCase{ + ProtoV6ProviderFactories: acc.TestAccProtoV6ProviderFactories, + PreCheck: func() { acc.TestAccPreCheck(t) }, + TerraformVersionChecks: []tfversion.TerraformVersionCheck{ + tfversion.RequireAbove(tfversion.Version1_5_0), + }, + CheckDestroy: acc.CheckDestroy(t, resources.Task), + Steps: []resource.TestStep{ + { + Config: config.FromModel(t, basicConfigModel), + Check: assert.AssertThat(t, + resourceassert.TaskResource(t, basicConfigModel.ResourceReference()). + HasFullyQualifiedNameString(child3Id.FullyQualifiedName()). + HasDatabaseString(child3Id.DatabaseName()). + HasSchemaString(child3Id.SchemaName()). + HasNameString(child3Id.Name()). + HasStartedString(r.BooleanTrue). + HasSqlStatementString("SELECT 1"), + resourceshowoutputassert.TaskShowOutput(t, basicConfigModel.ResourceReference()). + HasCreatedOnNotEmpty(). + HasName(child3Id.Name()). + HasDatabaseName(child3Id.DatabaseName()). + HasSchemaName(child3Id.SchemaName()). + HasState(sdk.TaskStateStarted). + HasDefinition("SELECT 1"), + ), + }, + // Update some fields in child3 + { + Config: config.FromModel(t, basicConfigModelAfterUpdate), + Check: assert.AssertThat(t, + resourceassert.TaskResource(t, basicConfigModelAfterUpdate.ResourceReference()). + HasFullyQualifiedNameString(child3Id.FullyQualifiedName()). + HasDatabaseString(child3Id.DatabaseName()). + HasSchemaString(child3Id.SchemaName()). + HasNameString(child3Id.Name()). + HasStartedString(r.BooleanTrue). + HasCommentString(comment). + HasSqlStatementString("SELECT 123"), + resourceshowoutputassert.TaskShowOutput(t, basicConfigModelAfterUpdate.ResourceReference()). + HasCreatedOnNotEmpty(). + HasName(child3Id.Name()). + HasDatabaseName(child3Id.DatabaseName()). + HasSchemaName(child3Id.SchemaName()). + HasState(sdk.TaskStateStarted). + HasComment(comment). + HasDefinition("SELECT 123"), + ), + }, + }, + }) +} + +func TestAcc_Task_StatementSpaces(t *testing.T) { + _ = testenvs.GetOrSkipTest(t, testenvs.EnableAcceptance) + acc.TestAccPreCheck(t) + + id := acc.TestClient().Ids.RandomSchemaObjectIdentifier() + statement := "SELECT 1" + when := "1 > 2" + configModel := model.TaskWithId("test", id, false, statement).WithWhen(when) + + statementWithSpaces := " SELECT 1 " + whenWithSpaces := " 1 > 2 " + configModelWithSpacesInStatements := model.TaskWithId("test", id, false, statementWithSpaces).WithWhen(whenWithSpaces) + + resource.Test(t, resource.TestCase{ + ProtoV6ProviderFactories: acc.TestAccProtoV6ProviderFactories, + PreCheck: func() { acc.TestAccPreCheck(t) }, + TerraformVersionChecks: []tfversion.TerraformVersionCheck{ + tfversion.RequireAbove(tfversion.Version1_5_0), + }, + CheckDestroy: acc.CheckDestroy(t, resources.Task), + Steps: []resource.TestStep{ + { + Config: config.FromModel(t, configModel), + Check: assert.AssertThat(t, + resourceassert.TaskResource(t, configModel.ResourceReference()). + HasFullyQualifiedNameString(id.FullyQualifiedName()). + HasDatabaseString(id.DatabaseName()). + HasSchemaString(id.SchemaName()). + HasNameString(id.Name()). + HasWhenString(when). + HasSqlStatementString(statement), + resourceshowoutputassert.TaskShowOutput(t, configModel.ResourceReference()). + HasDatabaseName(id.DatabaseName()). + HasSchemaName(id.SchemaName()). + HasName(id.Name()). + HasCondition(when). + HasDefinition(statement), + ), + }, + { + Config: config.FromModel(t, configModelWithSpacesInStatements), + Check: assert.AssertThat(t, + resourceassert.TaskResource(t, configModel.ResourceReference()). + HasFullyQualifiedNameString(id.FullyQualifiedName()). + HasDatabaseString(id.DatabaseName()). + HasSchemaString(id.SchemaName()). + HasNameString(id.Name()). + HasWhenString(when). + HasSqlStatementString(statement), + resourceshowoutputassert.TaskShowOutput(t, configModel.ResourceReference()). + HasDatabaseName(id.DatabaseName()). + HasSchemaName(id.SchemaName()). + HasName(id.Name()). + HasCondition(when). + HasDefinition(statement), + ), + }, + }, + }) +} + +func TestAcc_Task_ExternalChanges(t *testing.T) { + _ = testenvs.GetOrSkipTest(t, testenvs.EnableAcceptance) + acc.TestAccPreCheck(t) + + currentRole := acc.TestClient().Context.CurrentRole(t) + + id := acc.TestClient().Ids.RandomSchemaObjectIdentifier() + statement := "SELECT 1" + basicConfigModel := model.TaskWithId("test", id, false, statement) + + // TODO(SNOW-1736173): New warehouse created, because the common one has lower-case letters that won't work + warehouse, warehouseCleanup := acc.TestClient().Warehouse.CreateWarehouse(t) + t.Cleanup(warehouseCleanup) + + errorNotificationIntegration, errorNotificationIntegrationCleanup := acc.TestClient().NotificationIntegration.Create(t) + t.Cleanup(errorNotificationIntegrationCleanup) + + taskConfig := `{"output_dir": "/temp/test_directory/", "learning_rate": 0.1}` + comment := random.Comment() + condition := `SYSTEM$STREAM_HAS_DATA('MYSTREAM')` + completeConfigModel := model.TaskWithId("test", id, true, statement). + WithWarehouse(warehouse.ID().Name()). + WithScheduleMinutes(5). + WithConfigValue(configvariable.StringVariable(taskConfig)). + WithAllowOverlappingExecution(r.BooleanTrue). + WithErrorIntegration(errorNotificationIntegration.ID().Name()). + WithComment(comment). + WithWhen(condition) + + resource.Test(t, resource.TestCase{ + ProtoV6ProviderFactories: acc.TestAccProtoV6ProviderFactories, + PreCheck: func() { acc.TestAccPreCheck(t) }, + TerraformVersionChecks: []tfversion.TerraformVersionCheck{ + tfversion.RequireAbove(tfversion.Version1_5_0), + }, + CheckDestroy: acc.CheckDestroy(t, resources.Task), + Steps: []resource.TestStep{ + // Optionals set { ConfigDirectory: acc.ConfigurationDirectory("TestAcc_Task/basic"), ConfigVariables: config.ConfigVariablesFromModel(t, completeConfigModel), @@ -350,8 +645,132 @@ func TestAcc_Task_Updates(t *testing.T) { HasTaskRelations(sdk.TaskRelations{}), ), }, - // Unset + // External change - unset all optional fields and expect no change { + PreConfig: func() { + acc.TestClient().Task.Alter(t, sdk.NewAlterTaskRequest(id).WithSuspend(true)) + acc.TestClient().Task.Alter(t, sdk.NewAlterTaskRequest(id).WithUnset(*sdk.NewTaskUnsetRequest(). + WithWarehouse(true). + WithConfig(true). + WithAllowOverlappingExecution(true). + WithErrorIntegration(true). + WithComment(true). + WithSchedule(true), + )) + acc.TestClient().Task.Alter(t, sdk.NewAlterTaskRequest(id).WithRemoveWhen(true)) + }, + ConfigPlanChecks: resource.ConfigPlanChecks{ + PreApply: []plancheck.PlanCheck{ + plancheck.ExpectResourceAction(basicConfigModel.ResourceReference(), plancheck.ResourceActionUpdate), + }, + }, + ConfigDirectory: acc.ConfigurationDirectory("TestAcc_Task/basic"), + ConfigVariables: config.ConfigVariablesFromModel(t, completeConfigModel), + Check: assert.AssertThat(t, + resourceassert.TaskResource(t, completeConfigModel.ResourceReference()). + HasFullyQualifiedNameString(id.FullyQualifiedName()). + HasDatabaseString(id.DatabaseName()). + HasSchemaString(id.SchemaName()). + HasNameString(id.Name()). + HasStartedString(r.BooleanTrue). + HasWarehouseString(warehouse.ID().Name()). + HasScheduleMinutes(5). + HasConfigString(taskConfig). + HasAllowOverlappingExecutionString(r.BooleanTrue). + HasErrorIntegrationString(errorNotificationIntegration.ID().Name()). + HasCommentString(comment). + HasFinalizeString(""). + HasAfterIdsInOrder(). + HasWhenString(condition). + HasSqlStatementString(statement), + resourceshowoutputassert.TaskShowOutput(t, completeConfigModel.ResourceReference()). + HasCreatedOnNotEmpty(). + HasName(id.Name()). + HasIdNotEmpty(). + HasDatabaseName(id.DatabaseName()). + HasSchemaName(id.SchemaName()). + HasOwner(currentRole.Name()). + HasWarehouse(warehouse.ID()). + HasComment(comment). + HasScheduleMinutes(5). + HasPredecessors(). + HasState(sdk.TaskStateStarted). + HasDefinition(statement). + HasCondition(condition). + HasAllowOverlappingExecution(true). + HasErrorIntegration(errorNotificationIntegration.ID()). + HasLastCommittedOnNotEmpty(). + HasLastSuspendedOnNotEmpty(). + HasOwnerRoleType("ROLE"). + HasConfig(taskConfig). + HasBudget(""). + HasTaskRelations(sdk.TaskRelations{}), + ), + }, + // Unset optional values + { + Config: config.FromModel(t, basicConfigModel), + Check: assert.AssertThat(t, + resourceassert.TaskResource(t, basicConfigModel.ResourceReference()). + HasFullyQualifiedNameString(id.FullyQualifiedName()). + HasDatabaseString(id.DatabaseName()). + HasSchemaString(id.SchemaName()). + HasNameString(id.Name()). + HasStartedString(r.BooleanFalse). + HasWarehouseString(""). + HasNoScheduleSet(). + HasConfigString(""). + HasAllowOverlappingExecutionString(r.BooleanDefault). + HasErrorIntegrationString(""). + HasCommentString(""). + HasFinalizeString(""). + HasAfterIdsInOrder(). + HasWhenString(""). + HasSqlStatementString(statement), + resourceshowoutputassert.TaskShowOutput(t, basicConfigModel.ResourceReference()). + HasCreatedOnNotEmpty(). + HasName(id.Name()). + HasIdNotEmpty(). + HasDatabaseName(id.DatabaseName()). + HasSchemaName(id.SchemaName()). + HasOwner(currentRole.Name()). + HasComment(""). + HasWarehouse(sdk.NewAccountObjectIdentifier("")). + HasNoSchedule(). + HasPredecessors(). + HasState(sdk.TaskStateSuspended). + HasDefinition(statement). + HasCondition(""). + HasAllowOverlappingExecution(false). + HasErrorIntegration(sdk.NewAccountObjectIdentifier("")). + HasLastCommittedOnNotEmpty(). + HasLastSuspendedOnNotEmpty(). + HasOwnerRoleType("ROLE"). + HasConfig(""). + HasBudget(""). + HasTaskRelations(sdk.TaskRelations{}), + ), + }, + // External change - set all optional fields and expect no change + { + PreConfig: func() { + acc.TestClient().Task.Alter(t, sdk.NewAlterTaskRequest(id).WithSuspend(true)) + acc.TestClient().Task.Alter(t, sdk.NewAlterTaskRequest(id).WithSet(*sdk.NewTaskSetRequest(). + WithWarehouse(warehouse.ID()). + WithConfig(taskConfig). + WithAllowOverlappingExecution(true). + WithErrorIntegration(errorNotificationIntegration.ID()). + WithComment(comment). + WithSchedule("5 MINUTE"), + )) + acc.TestClient().Task.Alter(t, sdk.NewAlterTaskRequest(id).WithModifyWhen(condition)) + acc.TestClient().Task.Alter(t, sdk.NewAlterTaskRequest(id).WithModifyAs("SELECT 123")) + }, + ConfigPlanChecks: resource.ConfigPlanChecks{ + PreApply: []plancheck.PlanCheck{ + plancheck.ExpectResourceAction(basicConfigModel.ResourceReference(), plancheck.ResourceActionUpdate), + }, + }, Config: config.FromModel(t, basicConfigModel), Check: assert.AssertThat(t, resourceassert.TaskResource(t, basicConfigModel.ResourceReference()). @@ -398,6 +817,50 @@ func TestAcc_Task_Updates(t *testing.T) { }) } +func TestAcc_Task_CallingProcedure(t *testing.T) { + _ = testenvs.GetOrSkipTest(t, testenvs.EnableAcceptance) + acc.TestAccPreCheck(t) + + procedure := acc.TestClient().Procedure.Create(t, sdk.DataTypeNumber) + + id := acc.TestClient().Ids.RandomSchemaObjectIdentifier() + statement := fmt.Sprintf("call %s(123)", procedure.Name) + configModel := model.TaskWithId("test", id, false, statement).WithUserTaskManagedInitialWarehouseSizeEnum(sdk.WarehouseSizeXSmall) + + resource.Test(t, resource.TestCase{ + ProtoV6ProviderFactories: acc.TestAccProtoV6ProviderFactories, + PreCheck: func() { acc.TestAccPreCheck(t) }, + TerraformVersionChecks: []tfversion.TerraformVersionCheck{ + tfversion.RequireAbove(tfversion.Version1_5_0), + }, + CheckDestroy: acc.CheckDestroy(t, resources.Task), + Steps: []resource.TestStep{ + { + Config: config.FromModel(t, configModel), + Check: assert.AssertThat(t, + resourceassert.TaskResource(t, configModel.ResourceReference()). + HasFullyQualifiedNameString(id.FullyQualifiedName()). + HasDatabaseString(id.DatabaseName()). + HasSchemaString(id.SchemaName()). + HasNameString(id.Name()). + HasStartedString(r.BooleanFalse). + HasUserTaskManagedInitialWarehouseSizeEnum(sdk.WarehouseSizeXSmall). + HasSqlStatementString(statement), + resourceshowoutputassert.TaskShowOutput(t, configModel.ResourceReference()). + HasCreatedOnNotEmpty(). + HasDatabaseName(id.DatabaseName()). + HasSchemaName(id.SchemaName()). + HasName(id.Name()). + HasState(sdk.TaskStateSuspended). + HasDefinition(statement), + resourceparametersassert.TaskResourceParameters(t, configModel.ResourceReference()). + HasUserTaskManagedInitialWarehouseSize(sdk.WarehouseSizeXSmall), + ), + }, + }, + }) +} + func TestAcc_Task_CronAndMinutes(t *testing.T) { _ = testenvs.GetOrSkipTest(t, testenvs.EnableAcceptance) acc.TestAccPreCheck(t) @@ -1929,3 +2392,182 @@ resource "snowflake_task" "test" { } `, id.DatabaseName(), id.SchemaName(), id.Name(), errorIntegrationId.Name()) } + +func TestAcc_Task_StateUpgrade(t *testing.T) { + _ = testenvs.GetOrSkipTest(t, testenvs.EnableAcceptance) + acc.TestAccPreCheck(t) + + id := acc.TestClient().Ids.RandomSchemaObjectIdentifier() + statement := "SELECT 1" + condition := "2 < 1" + configModel := model.TaskWithId("test", id, false, statement). + WithScheduleMinutes(5). + WithAllowOverlappingExecution(r.BooleanTrue). + WithSuspendTaskAfterNumFailures(10). + WithWhen(condition). + WithUserTaskManagedInitialWarehouseSizeEnum(sdk.WarehouseSizeXSmall) + + resource.Test(t, resource.TestCase{ + PreCheck: func() { acc.TestAccPreCheck(t) }, + TerraformVersionChecks: []tfversion.TerraformVersionCheck{ + tfversion.RequireAbove(tfversion.Version1_5_0), + }, + CheckDestroy: acc.CheckDestroy(t, resources.Task), + Steps: []resource.TestStep{ + { + ExternalProviders: map[string]resource.ExternalProvider{ + "snowflake": { + VersionConstraint: "=0.98.0", + Source: "Snowflake-Labs/snowflake", + }, + }, + Config: taskBasicConfigV0980(id, condition), + Check: resource.ComposeAggregateTestCheckFunc( + resource.TestCheckResourceAttr("snowflake_task.test", "enabled", "false"), + resource.TestCheckResourceAttr("snowflake_task.test", "allow_overlapping_execution", "true"), + resource.TestCheckResourceAttr("snowflake_task.test", "schedule", "5 MINUTES"), + resource.TestCheckResourceAttr("snowflake_task.test", "suspend_task_after_num_failures", "10"), + resource.TestCheckResourceAttr("snowflake_task.test", "when", condition), + resource.TestCheckResourceAttr("snowflake_task.test", "user_task_managed_initial_warehouse_size", "XSMALL"), + ), + }, + { + ProtoV6ProviderFactories: acc.TestAccProtoV6ProviderFactories, + ConfigDirectory: acc.ConfigurationDirectory("TestAcc_Task/basic"), + ConfigVariables: config.ConfigVariablesFromModel(t, configModel), + Check: assert.AssertThat(t, + resourceassert.TaskResource(t, configModel.ResourceReference()). + HasFullyQualifiedNameString(id.FullyQualifiedName()). + HasDatabaseString(id.DatabaseName()). + HasSchemaString(id.SchemaName()). + HasNameString(id.Name()). + HasStartedString(r.BooleanFalse). + HasScheduleMinutes(5). + HasAllowOverlappingExecutionString(r.BooleanTrue). + HasSuspendTaskAfterNumFailuresString("10"). + HasWhenString(condition). + HasUserTaskManagedInitialWarehouseSizeEnum(sdk.WarehouseSizeXSmall), + ), + }, + }, + }) +} + +func taskBasicConfigV0980(id sdk.SchemaObjectIdentifier, condition string) string { + return fmt.Sprintf(` +resource "snowflake_task" "test" { + database = "%[1]s" + schema = "%[2]s" + name = "%[3]s" + enabled = false + sql_statement = "SELECT 1" + schedule = "5 MINUTES" + allow_overlapping_execution = true + suspend_task_after_num_failures = 10 + when = "%[4]s" + user_task_managed_initial_warehouse_size = "XSMALL" +} +`, id.DatabaseName(), id.SchemaName(), id.Name(), condition) +} + +func TestAcc_Task_StateUpgradeWithAfter(t *testing.T) { + _ = testenvs.GetOrSkipTest(t, testenvs.EnableAcceptance) + acc.TestAccPreCheck(t) + + rootTask, rootTaskCleanup := acc.TestClient().Task.Create(t) + t.Cleanup(rootTaskCleanup) + + id := acc.TestClient().Ids.RandomSchemaObjectIdentifier() + statement := "SELECT 1" + comment := random.Comment() + configModel := model.TaskWithId("test", id, false, statement). + WithUserTaskTimeoutMs(50). + WithWarehouse(acc.TestClient().Ids.WarehouseId().Name()). + WithAfterValue(configvariable.SetVariable(configvariable.StringVariable(rootTask.ID().FullyQualifiedName()))). + WithComment(comment). + WithLogLevelEnum(sdk.LogLevelInfo). + WithAutocommit(false). + WithJsonIndent(4) + + resource.Test(t, resource.TestCase{ + PreCheck: func() { acc.TestAccPreCheck(t) }, + TerraformVersionChecks: []tfversion.TerraformVersionCheck{ + tfversion.RequireAbove(tfversion.Version1_5_0), + }, + CheckDestroy: acc.CheckDestroy(t, resources.Task), + Steps: []resource.TestStep{ + { + ExternalProviders: map[string]resource.ExternalProvider{ + "snowflake": { + VersionConstraint: "=0.98.0", + Source: "Snowflake-Labs/snowflake", + }, + }, + Config: taskCompleteConfigV0980(id, rootTask.ID(), acc.TestClient().Ids.WarehouseId(), 50, comment), + Check: resource.ComposeAggregateTestCheckFunc( + resource.TestCheckResourceAttr("snowflake_task.test", "after.#", "1"), + resource.TestCheckResourceAttr("snowflake_task.test", "after.0", rootTask.ID().Name()), + resource.TestCheckResourceAttr("snowflake_task.test", "warehouse", acc.TestClient().Ids.WarehouseId().Name()), + resource.TestCheckResourceAttr("snowflake_task.test", "user_task_timeout_ms", "50"), + resource.TestCheckResourceAttr("snowflake_task.test", "comment", comment), + resource.TestCheckResourceAttr("snowflake_task.test", "session_parameters.LOG_LEVEL", "INFO"), + resource.TestCheckResourceAttr("snowflake_task.test", "session_parameters.AUTOCOMMIT", "false"), + resource.TestCheckResourceAttr("snowflake_task.test", "session_parameters.JSON_INDENT", "4"), + ), + }, + { + ProtoV6ProviderFactories: acc.TestAccProtoV6ProviderFactories, + Config: config.FromModel(t, configModel), + Check: assert.AssertThat(t, + resourceassert.TaskResource(t, configModel.ResourceReference()). + HasFullyQualifiedNameString(id.FullyQualifiedName()). + HasDatabaseString(id.DatabaseName()). + HasSchemaString(id.SchemaName()). + HasNameString(id.Name()). + HasStartedString(r.BooleanFalse). + HasSqlStatementString(statement). + HasAfterIdsInOrder(rootTask.ID()). + HasWarehouseString(acc.TestClient().Ids.WarehouseId().Name()). + HasUserTaskTimeoutMsString("50"). + HasLogLevelString(string(sdk.LogLevelInfo)). + HasAutocommitString("false"). + HasJsonIndentString("4"). + HasCommentString(comment), + ), + }, + }, + }) +} + +func taskCompleteConfigV0980( + id sdk.SchemaObjectIdentifier, + rootTaskId sdk.SchemaObjectIdentifier, + warehouseId sdk.AccountObjectIdentifier, + userTaskTimeoutMs int, + comment string, +) string { + return fmt.Sprintf(` +resource "snowflake_task" "test" { + database = "%[1]s" + schema = "%[2]s" + name = "%[3]s" + enabled = false + sql_statement = "SELECT 1" + + after = [%[4]s] + warehouse = "%[5]s" + user_task_timeout_ms = %[6]d + comment = "%[7]s" + session_parameters = { + LOG_LEVEL = "INFO", + AUTOCOMMIT = false, + JSON_INDENT = 4, + } +} +`, id.DatabaseName(), id.SchemaName(), id.Name(), + strconv.Quote(rootTaskId.Name()), + warehouseId.Name(), + userTaskTimeoutMs, + comment, + ) +} diff --git a/pkg/resources/task_state_upgraders.go b/pkg/resources/task_state_upgraders.go new file mode 100644 index 0000000000..e02512dd30 --- /dev/null +++ b/pkg/resources/task_state_upgraders.go @@ -0,0 +1,49 @@ +package resources + +import ( + "context" + + "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/sdk" +) + +func v098TaskStateUpgrader(ctx context.Context, rawState map[string]any, meta any) (map[string]any, error) { + if rawState == nil { + return rawState, nil + } + + rawState["condition"] = rawState["when"] + rawState["started"] = booleanStringFromBool(rawState["enabled"].(bool)) + rawState["allow_overlapping_execution"] = booleanStringFromBool(rawState["allow_overlapping_execution"].(bool)) + if rawState["after"] != nil { + newAfter := make([]string, len(rawState["after"].([]any))) + for i, name := range rawState["after"].([]any) { + newAfter[i] = sdk.NewSchemaObjectIdentifier(rawState["database"].(string), rawState["schema"].(string), name.(string)).FullyQualifiedName() + } + rawState["after"] = newAfter + } + if rawState["session_parameters"] != nil { + for k, v := range rawState["session_parameters"].(map[string]any) { + rawState[k] = v + } + } + delete(rawState, "session_parameters") + + if rawState["schedule"] != nil && len(rawState["schedule"].(string)) > 0 { + taskSchedule, err := sdk.ParseTaskSchedule(rawState["schedule"].(string)) + scheduleMap := make(map[string]any) + if err != nil { + return nil, err + } + switch { + case len(taskSchedule.Cron) > 0: + scheduleMap["using_cron"] = taskSchedule.Cron + case taskSchedule.Minutes > 0: + scheduleMap["minutes"] = taskSchedule.Minutes + } + rawState["schedule"] = []any{scheduleMap} + } else { + delete(rawState, "schedule") + } + + return migratePipeSeparatedObjectIdentifierResourceIdToFullyQualifiedName(ctx, rawState, meta) +} diff --git a/pkg/sdk/tasks_def.go b/pkg/sdk/tasks_def.go index a05400bcb0..83ff671492 100644 --- a/pkg/sdk/tasks_def.go +++ b/pkg/sdk/tasks_def.go @@ -245,6 +245,7 @@ var TasksDef = g.NewInterface( "Unset", g.NewQueryStruct("TaskUnset"). OptionalSQL("WAREHOUSE"). + OptionalSQL("USER_TASK_MANAGED_INITIAL_WAREHOUSE_SIZE"). OptionalSQL("SCHEDULE"). OptionalSQL("CONFIG"). OptionalSQL("ALLOW_OVERLAPPING_EXECUTION"). @@ -255,7 +256,7 @@ var TasksDef = g.NewInterface( OptionalSQL("TASK_AUTO_RETRY_ATTEMPTS"). OptionalSQL("USER_TASK_MINIMUM_TRIGGER_INTERVAL_IN_SECONDS"). OptionalSessionParametersUnset(). - WithValidation(g.AtLeastOneValueSet, "Warehouse", "Schedule", "Config", "AllowOverlappingExecution", "UserTaskTimeoutMs", "SuspendTaskAfterNumFailures", "ErrorIntegration", "Comment", "SessionParametersUnset", "TaskAutoRetryAttempts", "UserTaskMinimumTriggerIntervalInSeconds"), + WithValidation(g.AtLeastOneValueSet, "Warehouse", "UserTaskManagedInitialWarehouseSize", "Schedule", "Config", "AllowOverlappingExecution", "UserTaskTimeoutMs", "SuspendTaskAfterNumFailures", "ErrorIntegration", "Comment", "SessionParametersUnset", "TaskAutoRetryAttempts", "UserTaskMinimumTriggerIntervalInSeconds"), g.ListOptions().SQL("UNSET"), ). OptionalSetTags(). diff --git a/pkg/sdk/tasks_gen.go b/pkg/sdk/tasks_gen.go index 09da9fa9f5..203e76459b 100644 --- a/pkg/sdk/tasks_gen.go +++ b/pkg/sdk/tasks_gen.go @@ -3,6 +3,9 @@ package sdk import ( "context" "database/sql" + "fmt" + "strconv" + "strings" ) type Tasks interface { @@ -213,6 +216,33 @@ func (v *Task) IsStarted() bool { return v.State == TaskStateStarted } +type TaskSchedule struct { + Minutes int + Cron string +} + +func ParseTaskSchedule(schedule string) (*TaskSchedule, error) { + upperSchedule := strings.ToUpper(schedule) + switch { + case strings.Contains(upperSchedule, "USING CRON"): + // We have to do it this was because we want to get rid of the prefix and leave the casing as is (mostly because timezones like America/Los_Angeles are case-sensitive). + // That why the prefix trimming has to be done by slicing rather than using strings.TrimPrefix. + cron := schedule[len("USING CRON "):] + return &TaskSchedule{Cron: cron}, nil + case strings.HasSuffix(upperSchedule, "M") || + strings.HasSuffix(upperSchedule, "MINUTE") || + strings.HasSuffix(upperSchedule, "MINUTES"): + minuteParts := strings.Split(upperSchedule, " ") + minutes, err := strconv.Atoi(minuteParts[0]) + if err != nil { + return nil, err + } + + return &TaskSchedule{Minutes: minutes}, nil + } + return nil, fmt.Errorf("invalid schedule format: %s", schedule) +} + // DescribeTaskOptions is based on https://docs.snowflake.com/en/sql-reference/sql/desc-task. type DescribeTaskOptions struct { describe bool `ddl:"static" sql:"DESCRIBE"` diff --git a/pkg/sdk/tasks_gen_test.go b/pkg/sdk/tasks_gen_test.go index 35ac195b37..0d0c72181f 100644 --- a/pkg/sdk/tasks_gen_test.go +++ b/pkg/sdk/tasks_gen_test.go @@ -2,6 +2,8 @@ package sdk import ( "testing" + + "github.com/stretchr/testify/assert" ) func TestTasks_Create(t *testing.T) { @@ -271,7 +273,7 @@ func TestTasks_Alter(t *testing.T) { t.Run("validation: at least one of the fields [opts.Unset.Warehouse opts.Unset.Schedule opts.Unset.Config opts.Unset.AllowOverlappingExecution opts.Unset.UserTaskTimeoutMs opts.Unset.SuspendTaskAfterNumFailures opts.Unset.ErrorIntegration opts.Unset.Comment opts.Unset.SessionParametersUnset] should be set", func(t *testing.T) { opts := defaultOpts() opts.Unset = &TaskUnset{} - assertOptsInvalidJoinedErrors(t, opts, errAtLeastOneOf("AlterTaskOptions.Unset", "Warehouse", "Schedule", "Config", "AllowOverlappingExecution", "UserTaskTimeoutMs", "SuspendTaskAfterNumFailures", "ErrorIntegration", "Comment", "SessionParametersUnset", "TaskAutoRetryAttempts", "UserTaskMinimumTriggerIntervalInSeconds")) + assertOptsInvalidJoinedErrors(t, opts, errAtLeastOneOf("AlterTaskOptions.Unset", "Warehouse", "UserTaskManagedInitialWarehouseSize", "Schedule", "Config", "AllowOverlappingExecution", "UserTaskTimeoutMs", "SuspendTaskAfterNumFailures", "ErrorIntegration", "Comment", "SessionParametersUnset", "TaskAutoRetryAttempts", "UserTaskMinimumTriggerIntervalInSeconds")) }) t.Run("validation: opts.Unset.SessionParametersUnset.SessionParametersUnset should be valid", func(t *testing.T) { @@ -552,3 +554,66 @@ func TestTasks_Execute(t *testing.T) { assertOptsValidAndSQLEquals(t, opts, "EXECUTE TASK %s RETRY LAST", id.FullyQualifiedName()) }) } + +func TestParseTaskSchedule(t *testing.T) { + testCases := map[string]struct { + Schedule string + ExpectedTaskSchedule *TaskSchedule + Error string + }{ + "valid schedule: m minutes": { + Schedule: "5 m", + ExpectedTaskSchedule: &TaskSchedule{Minutes: 5}, + Error: "", + }, + "valid schedule: M minutes": { + Schedule: "5 m", + ExpectedTaskSchedule: &TaskSchedule{Minutes: 5}, + Error: "", + }, + "valid schedule: MINUTE minutes": { + Schedule: "5 MINUTE", + ExpectedTaskSchedule: &TaskSchedule{Minutes: 5}, + Error: "", + }, + "valid schedule: MINUTES minutes": { + Schedule: "5 MINUTES", + ExpectedTaskSchedule: &TaskSchedule{Minutes: 5}, + Error: "", + }, + "valid schedule: cron": { + Schedule: "USING CRON * * * * * UTC", + ExpectedTaskSchedule: &TaskSchedule{Cron: "* * * * * UTC"}, + Error: "", + }, + "invalid schedule: wrong schedule format": { + Schedule: "SOME SCHEDULE", + ExpectedTaskSchedule: nil, + Error: "invalid schedule format", + }, + "invalid schedule: wrong minutes format": { + Schedule: "a5 MINUTE", + ExpectedTaskSchedule: nil, + Error: `strconv.Atoi: parsing "A5": invalid syntax`, + }, + // currently cron expressions are not validated (they are on Snowflake level) + "invalid schedule: wrong cron format": { + Schedule: "USING CRON some_cron", + ExpectedTaskSchedule: &TaskSchedule{Cron: "some_cron"}, + Error: "", + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + taskSchedule, err := ParseTaskSchedule(tc.Schedule) + if tc.Error != "" { + assert.Nil(t, taskSchedule) + assert.ErrorContains(t, err, tc.Error) + } else { + assert.EqualValues(t, tc.ExpectedTaskSchedule, taskSchedule) + assert.NoError(t, err) + } + }) + } +} diff --git a/pkg/sdk/tasks_validations_gen.go b/pkg/sdk/tasks_validations_gen.go index 501b0f48e4..d5e30aa4cd 100644 --- a/pkg/sdk/tasks_validations_gen.go +++ b/pkg/sdk/tasks_validations_gen.go @@ -104,8 +104,8 @@ func (opts *AlterTaskOptions) validate() error { } } if valueSet(opts.Unset) { - if !anyValueSet(opts.Unset.Warehouse, opts.Unset.Schedule, opts.Unset.Config, opts.Unset.AllowOverlappingExecution, opts.Unset.UserTaskTimeoutMs, opts.Unset.SuspendTaskAfterNumFailures, opts.Unset.ErrorIntegration, opts.Unset.Comment, opts.Unset.SessionParametersUnset, opts.Unset.TaskAutoRetryAttempts, opts.Unset.UserTaskMinimumTriggerIntervalInSeconds) { - errs = append(errs, errAtLeastOneOf("AlterTaskOptions.Unset", "Warehouse", "Schedule", "Config", "AllowOverlappingExecution", "UserTaskTimeoutMs", "SuspendTaskAfterNumFailures", "ErrorIntegration", "Comment", "SessionParametersUnset", "TaskAutoRetryAttempts", "UserTaskMinimumTriggerIntervalInSeconds")) + if !anyValueSet(opts.Unset.Warehouse, opts.Unset.UserTaskManagedInitialWarehouseSize, opts.Unset.Schedule, opts.Unset.Config, opts.Unset.AllowOverlappingExecution, opts.Unset.UserTaskTimeoutMs, opts.Unset.SuspendTaskAfterNumFailures, opts.Unset.ErrorIntegration, opts.Unset.Comment, opts.Unset.SessionParametersUnset, opts.Unset.TaskAutoRetryAttempts, opts.Unset.UserTaskMinimumTriggerIntervalInSeconds) { + errs = append(errs, errAtLeastOneOf("AlterTaskOptions.Unset", "Warehouse", "UserTaskManagedInitialWarehouseSize", "Schedule", "Config", "AllowOverlappingExecution", "UserTaskTimeoutMs", "SuspendTaskAfterNumFailures", "ErrorIntegration", "Comment", "SessionParametersUnset", "TaskAutoRetryAttempts", "UserTaskMinimumTriggerIntervalInSeconds")) } if valueSet(opts.Unset.SessionParametersUnset) { if err := opts.Unset.SessionParametersUnset.validate(); err != nil { diff --git a/pkg/sdk/testint/tasks_gen_integration_test.go b/pkg/sdk/testint/tasks_gen_integration_test.go index b73c8484fd..7b427cbabb 100644 --- a/pkg/sdk/testint/tasks_gen_integration_test.go +++ b/pkg/sdk/testint/tasks_gen_integration_test.go @@ -534,7 +534,7 @@ func TestInt_Tasks(t *testing.T) { err := client.Tasks.CreateOrAlter(ctx, sdk.NewCreateOrAlterTaskRequest(id, sql). WithWarehouse(*sdk.NewCreateTaskWarehouseRequest().WithWarehouse(testClientHelper().Ids.WarehouseId())). WithSchedule("10 MINUTES"). - WithConfig(`$${"output_dir": "/temp/test_directory/", "learning_rate": 0.1}$$`). + WithConfig(`{"output_dir": "/temp/test_directory/", "learning_rate": 0.1}`). WithAllowOverlappingExecution(true). WithUserTaskTimeoutMs(10). WithSessionParameters(sessionParametersSet). @@ -617,7 +617,7 @@ func TestInt_Tasks(t *testing.T) { WithErrorIntegration(errorIntegration.ID()). WithSessionParameters(sessionParametersSet). WithSchedule("10 MINUTE"). - WithConfig(`$${"output_dir": "/temp/test_directory/", "learning_rate": 0.1}$$`). + WithConfig(`{"output_dir": "/temp/test_directory/", "learning_rate": 0.1}`). WithAllowOverlappingExecution(true). WithUserTaskTimeoutMs(1000). WithSuspendTaskAfterNumFailures(100). From c03f171dcb2730b11456cb0e548406c403f9cfa4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Cie=C5=9Blak?= Date: Fri, 22 Nov 2024 15:02:16 +0100 Subject: [PATCH 2/3] changes after review --- MIGRATION_GUIDE.md | 22 ++- docs/resources/task.md | 12 +- .../assert/resource_assertions.go | 10 + .../resourceassert/task_resource_ext.go | 7 +- .../notification_integration_client.go | 12 ++ pkg/resources/task.go | 22 +-- pkg/resources/task_acceptance_test.go | 174 ++++++++++++------ pkg/resources/task_state_upgraders.go | 18 +- pkg/sdk/tasks_gen.go | 3 +- pkg/sdk/tasks_gen_test.go | 14 +- 10 files changed, 188 insertions(+), 106 deletions(-) diff --git a/MIGRATION_GUIDE.md b/MIGRATION_GUIDE.md index e78f9ecbae..b0976107d9 100644 --- a/MIGRATION_GUIDE.md +++ b/MIGRATION_GUIDE.md @@ -67,7 +67,8 @@ New fields: - Added support for finalizer tasks with `finalize` field. It conflicts with `after` and `schedule` (see [finalizer tasks](https://docs.snowflake.com/en/user-guide/tasks-graphs#release-and-cleanup-of-task-graphs)). Changes: -- `enabled` field changed to `started` and type changed to string with only boolean values available (see ["empty" values](./v1-preparations/CHANGES_BEFORE_V1.md#empty-values)). +- `enabled` field changed to `started` and type changed to string with only boolean values available (see ["empty" values](./v1-preparations/CHANGES_BEFORE_V1.md#empty-values)). It is also now required field, so make sure it's explicitly set (previously it was optional with the default value set to `false`). +- `allow_overlapping_execution` type was changed to string with only boolean values available (see ["empty" values](./v1-preparations/CHANGES_BEFORE_V1.md#empty-values)). Previously, it had the default set to `false` which will be migrated. If nothing will be set the provider will plan the change to `default` value. If you want to make sure it's turned off, set it explicitly to `false`. Before: ```terraform @@ -130,7 +131,24 @@ resource "snowflake_task" "example" { } ``` -- `after` field type was changed from `list` to `set`. No changes in configuration are necessary. +- `after` field type was changed from `list` to `set` and the values were changed from names to fully qualified names. + +Before: +```terraform +resource "snowflake_task" "example" { + # ... + after = ["", snowflake_task.some_task.name] + # ... +} +``` +After: +```terraform +resource "snowflake_task" "example" { + # ... + after = ["..", snowflake_task.some_task.fully_qualified_name] + # ... +} +``` ## v0.98.0 ➞ v0.99.0 diff --git a/docs/resources/task.md b/docs/resources/task.md index 642c358454..8343f80763 100644 --- a/docs/resources/task.md +++ b/docs/resources/task.md @@ -151,16 +151,16 @@ resource "snowflake_task" "test" { ### Required -- `database` (String) The database in which to create the task. Due to technical limitations (read more [here](https://github.com/Snowflake-Labs/terraform-provider-snowflake/blob/main/docs/technical-documentation/identifiers_rework_design_decisions.md#known-limitations-and-identifier-recommendations)), avoid using the following characters: `|`, `.`, `(`, `)`, `"` -- `name` (String) Specifies the identifier for the task; must be unique for the database and schema in which the task is created. Due to technical limitations (read more [here](https://github.com/Snowflake-Labs/terraform-provider-snowflake/blob/main/docs/technical-documentation/identifiers_rework_design_decisions.md#known-limitations-and-identifier-recommendations)), avoid using the following characters: `|`, `.`, `(`, `)`, `"` -- `schema` (String) The schema in which to create the task. Due to technical limitations (read more [here](https://github.com/Snowflake-Labs/terraform-provider-snowflake/blob/main/docs/technical-documentation/identifiers_rework_design_decisions.md#known-limitations-and-identifier-recommendations)), avoid using the following characters: `|`, `.`, `(`, `)`, `"` +- `database` (String) The database in which to create the task. Due to technical limitations (read more [here](https://github.com/Snowflake-Labs/terraform-provider-snowflake/blob/main/docs/technical-documentation/identifiers_rework_design_decisions.md#known-limitations-and-identifier-recommendations)), avoid using the following characters: `|`, `.`, `"` +- `name` (String) Specifies the identifier for the task; must be unique for the database and schema in which the task is created. Due to technical limitations (read more [here](https://github.com/Snowflake-Labs/terraform-provider-snowflake/blob/main/docs/technical-documentation/identifiers_rework_design_decisions.md#known-limitations-and-identifier-recommendations)), avoid using the following characters: `|`, `.`, `"` +- `schema` (String) The schema in which to create the task. Due to technical limitations (read more [here](https://github.com/Snowflake-Labs/terraform-provider-snowflake/blob/main/docs/technical-documentation/identifiers_rework_design_decisions.md#known-limitations-and-identifier-recommendations)), avoid using the following characters: `|`, `.`, `"` - `sql_statement` (String) Any single SQL statement, or a call to a stored procedure, executed when the task runs. - `started` (Boolean) Specifies if the task should be started or suspended. ### Optional - `abort_detached_query` (Boolean) Specifies the action that Snowflake performs for in-progress queries if connectivity is lost due to abrupt termination of a session (e.g. network outage, browser termination, service interruption). For more information, check [ABORT_DETACHED_QUERY docs](https://docs.snowflake.com/en/sql-reference/parameters#abort-detached-query). -- `after` (Set of String) Specifies one or more predecessor tasks for the current task. Use this option to [create a DAG](https://docs.snowflake.com/en/user-guide/tasks-graphs.html#label-task-dag) of tasks or add this task to an existing DAG. A DAG is a series of tasks that starts with a scheduled root task and is linked together by dependencies. Due to technical limitations (read more [here](https://github.com/Snowflake-Labs/terraform-provider-snowflake/blob/main/docs/technical-documentation/identifiers_rework_design_decisions.md#known-limitations-and-identifier-recommendations)), avoid using the following characters: `|`, `.`, `(`, `)`, `"` +- `after` (Set of String) Specifies one or more predecessor tasks for the current task. Use this option to [create a DAG](https://docs.snowflake.com/en/user-guide/tasks-graphs.html#label-task-dag) of tasks or add this task to an existing DAG. A DAG is a series of tasks that starts with a scheduled root task and is linked together by dependencies. Due to technical limitations (read more [here](https://github.com/Snowflake-Labs/terraform-provider-snowflake/blob/main/docs/technical-documentation/identifiers_rework_design_decisions.md#known-limitations-and-identifier-recommendations)), avoid using the following characters: `|`, `.`, `"` - `allow_overlapping_execution` (String) By default, Snowflake ensures that only one instance of a particular DAG is allowed to run at a time, setting the parameter value to TRUE permits DAG runs to overlap. Available options are: "true" or "false". When the value is not set in the configuration the provider will put "default" there which means to use the Snowflake default for this value. - `autocommit` (Boolean) Specifies whether autocommit is enabled for the session. Autocommit determines whether a DML statement, when executed without an active transaction, is automatically committed after the statement successfully completes. For more information, see [Transactions](https://docs.snowflake.com/en/sql-reference/transactions). For more information, check [AUTOCOMMIT docs](https://docs.snowflake.com/en/sql-reference/parameters#autocommit). - `binary_input_format` (String) The format of VARCHAR values passed as input to VARCHAR-to-BINARY conversion functions. For more information, see [Binary input and output](https://docs.snowflake.com/en/sql-reference/binary-input-output). For more information, check [BINARY_INPUT_FORMAT docs](https://docs.snowflake.com/en/sql-reference/parameters#binary-input-format). @@ -178,10 +178,10 @@ resource "snowflake_task" "test" { - `date_input_format` (String) Specifies the input format for the DATE data type. For more information, see [Date and time input and output formats](https://docs.snowflake.com/en/sql-reference/date-time-input-output). For more information, check [DATE_INPUT_FORMAT docs](https://docs.snowflake.com/en/sql-reference/parameters#date-input-format). - `date_output_format` (String) Specifies the display format for the DATE data type. For more information, see [Date and time input and output formats](https://docs.snowflake.com/en/sql-reference/date-time-input-output). For more information, check [DATE_OUTPUT_FORMAT docs](https://docs.snowflake.com/en/sql-reference/parameters#date-output-format). - `enable_unload_physical_type_optimization` (Boolean) Specifies whether to set the schema for unloaded Parquet files based on the logical column data types (i.e. the types in the unload SQL query or source table) or on the unloaded column values (i.e. the smallest data types and precision that support the values in the output columns of the unload SQL statement or source table). For more information, check [ENABLE_UNLOAD_PHYSICAL_TYPE_OPTIMIZATION docs](https://docs.snowflake.com/en/sql-reference/parameters#enable-unload-physical-type-optimization). -- `error_integration` (String) Specifies the name of the notification integration used for error notifications. Due to technical limitations (read more [here](https://github.com/Snowflake-Labs/terraform-provider-snowflake/blob/main/docs/technical-documentation/identifiers_rework_design_decisions.md#known-limitations-and-identifier-recommendations)), avoid using the following characters: `|`, `.`, `(`, `)`, `"` +- `error_integration` (String) Specifies the name of the notification integration used for error notifications. Due to technical limitations (read more [here](https://github.com/Snowflake-Labs/terraform-provider-snowflake/blob/main/docs/technical-documentation/identifiers_rework_design_decisions.md#known-limitations-and-identifier-recommendations)), avoid using the following characters: `|`, `.`, `"` - `error_on_nondeterministic_merge` (Boolean) Specifies whether to return an error when the [MERGE](https://docs.snowflake.com/en/sql-reference/sql/merge) command is used to update or delete a target row that joins multiple source rows and the system cannot determine the action to perform on the target row. For more information, check [ERROR_ON_NONDETERMINISTIC_MERGE docs](https://docs.snowflake.com/en/sql-reference/parameters#error-on-nondeterministic-merge). - `error_on_nondeterministic_update` (Boolean) Specifies whether to return an error when the [UPDATE](https://docs.snowflake.com/en/sql-reference/sql/update) command is used to update a target row that joins multiple source rows and the system cannot determine the action to perform on the target row. For more information, check [ERROR_ON_NONDETERMINISTIC_UPDATE docs](https://docs.snowflake.com/en/sql-reference/parameters#error-on-nondeterministic-update). -- `finalize` (String) Specifies the name of a root task that the finalizer task is associated with. Finalizer tasks run after all other tasks in the task graph run to completion. You can define the SQL of a finalizer task to handle notifications and the release and cleanup of resources that a task graph uses. For more information, see [Release and cleanup of task graphs](https://docs.snowflake.com/en/user-guide/tasks-graphs.html#label-finalizer-task). Due to technical limitations (read more [here](https://github.com/Snowflake-Labs/terraform-provider-snowflake/blob/main/docs/technical-documentation/identifiers_rework_design_decisions.md#known-limitations-and-identifier-recommendations)), avoid using the following characters: `|`, `.`, `(`, `)`, `"` +- `finalize` (String) Specifies the name of a root task that the finalizer task is associated with. Finalizer tasks run after all other tasks in the task graph run to completion. You can define the SQL of a finalizer task to handle notifications and the release and cleanup of resources that a task graph uses. For more information, see [Release and cleanup of task graphs](https://docs.snowflake.com/en/user-guide/tasks-graphs.html#label-finalizer-task). Due to technical limitations (read more [here](https://github.com/Snowflake-Labs/terraform-provider-snowflake/blob/main/docs/technical-documentation/identifiers_rework_design_decisions.md#known-limitations-and-identifier-recommendations)), avoid using the following characters: `|`, `.`, `"` - `geography_output_format` (String) Display format for [GEOGRAPHY values](https://docs.snowflake.com/en/sql-reference/data-types-geospatial.html#label-data-types-geography). For more information, check [GEOGRAPHY_OUTPUT_FORMAT docs](https://docs.snowflake.com/en/sql-reference/parameters#geography-output-format). - `geometry_output_format` (String) Display format for [GEOMETRY values](https://docs.snowflake.com/en/sql-reference/data-types-geospatial.html#label-data-types-geometry). For more information, check [GEOMETRY_OUTPUT_FORMAT docs](https://docs.snowflake.com/en/sql-reference/parameters#geometry-output-format). - `jdbc_treat_timestamp_ntz_as_utc` (Boolean) Specifies how JDBC processes TIMESTAMP_NTZ values. For more information, check [JDBC_TREAT_TIMESTAMP_NTZ_AS_UTC docs](https://docs.snowflake.com/en/sql-reference/parameters#jdbc-treat-timestamp-ntz-as-utc). diff --git a/pkg/acceptance/bettertestspoc/assert/resource_assertions.go b/pkg/acceptance/bettertestspoc/assert/resource_assertions.go index 79f4e47ac0..09c8c875cc 100644 --- a/pkg/acceptance/bettertestspoc/assert/resource_assertions.go +++ b/pkg/acceptance/bettertestspoc/assert/resource_assertions.go @@ -62,6 +62,7 @@ const ( resourceAssertionTypeValuePresent = "VALUE_PRESENT" resourceAssertionTypeValueSet = "VALUE_SET" resourceAssertionTypeValueNotSet = "VALUE_NOT_SET" + resourceAssertionTypeSetElem = "SET_ELEM" ) type ResourceAssertion struct { @@ -75,6 +76,10 @@ func (r *ResourceAssert) AddAssertion(assertion ResourceAssertion) { r.assertions = append(r.assertions, assertion) } +func SetElem(fieldName string, expected string) ResourceAssertion { + return ResourceAssertion{fieldName: fieldName, expectedValue: expected, resourceAssertionType: resourceAssertionTypeSetElem} +} + func ValuePresent(fieldName string) ResourceAssertion { return ResourceAssertion{fieldName: fieldName, resourceAssertionType: resourceAssertionTypeValuePresent} } @@ -152,6 +157,11 @@ func (r *ResourceAssert) ToTerraformTestCheckFunc(t *testing.T) resource.TestChe for i, a := range r.assertions { switch a.resourceAssertionType { + case resourceAssertionTypeSetElem: + if err := resource.TestCheckTypeSetElemAttr(r.name, a.fieldName, a.expectedValue)(s); err != nil { + errCut, _ := strings.CutPrefix(err.Error(), fmt.Sprintf("%s: ", r.name)) + result = append(result, fmt.Errorf("%s %s assertion [%d/%d]: failed with error: %s", r.name, r.prefix, i+1, len(r.assertions), errCut)) + } case resourceAssertionTypeValueSet: if err := resource.TestCheckResourceAttr(r.name, a.fieldName, a.expectedValue)(s); err != nil { errCut, _ := strings.CutPrefix(err.Error(), fmt.Sprintf("%s: ", r.name)) diff --git a/pkg/acceptance/bettertestspoc/assert/resourceassert/task_resource_ext.go b/pkg/acceptance/bettertestspoc/assert/resourceassert/task_resource_ext.go index e13c43b5d9..f948594340 100644 --- a/pkg/acceptance/bettertestspoc/assert/resourceassert/task_resource_ext.go +++ b/pkg/acceptance/bettertestspoc/assert/resourceassert/task_resource_ext.go @@ -1,7 +1,6 @@ package resourceassert import ( - "fmt" "strconv" "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/sdk" @@ -9,10 +8,10 @@ import ( "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/acceptance/bettertestspoc/assert" ) -func (t *TaskResourceAssert) HasAfterIdsInOrder(ids ...sdk.SchemaObjectIdentifier) *TaskResourceAssert { +func (t *TaskResourceAssert) HasAfter(ids ...sdk.SchemaObjectIdentifier) *TaskResourceAssert { t.AddAssertion(assert.ValueSet("after.#", strconv.FormatInt(int64(len(ids)), 10))) - for i, id := range ids { - t.AddAssertion(assert.ValueSet(fmt.Sprintf("after.%d", i), id.FullyQualifiedName())) + for _, id := range ids { + t.AddAssertion(assert.SetElem("after.*", id.FullyQualifiedName())) } return t } diff --git a/pkg/acceptance/helpers/notification_integration_client.go b/pkg/acceptance/helpers/notification_integration_client.go index 7b0717e43c..e8686a234d 100644 --- a/pkg/acceptance/helpers/notification_integration_client.go +++ b/pkg/acceptance/helpers/notification_integration_client.go @@ -8,6 +8,9 @@ import ( "github.com/stretchr/testify/require" ) +// TODO [SNOW-1017580]: replace with real value +const gcpPubsubSubscriptionName = "projects/project-1234/subscriptions/sub2" + type NotificationIntegrationClient struct { context *TestClientContext ids *IdsGenerator @@ -24,6 +27,15 @@ func (c *NotificationIntegrationClient) client() sdk.NotificationIntegrations { return c.context.client.NotificationIntegrations } +func (c *NotificationIntegrationClient) CreateWithGcpPubSub(t *testing.T) (*sdk.NotificationIntegration, func()) { + t.Helper() + return c.CreateWithRequest(t, sdk.NewCreateNotificationIntegrationRequest(c.ids.RandomAccountObjectIdentifier(), true). + WithAutomatedDataLoadsParams(sdk.NewAutomatedDataLoadsParamsRequest(). + WithGoogleAutoParams(sdk.NewGoogleAutoParamsRequest(gcpPubsubSubscriptionName)), + ), + ) +} + func (c *NotificationIntegrationClient) Create(t *testing.T) (*sdk.NotificationIntegration, func()) { t.Helper() ctx := context.Background() diff --git a/pkg/resources/task.go b/pkg/resources/task.go index c25d457400..5b1ca6eb2b 100644 --- a/pkg/resources/task.go +++ b/pkg/resources/task.go @@ -67,14 +67,6 @@ var taskSchema = map[string]*schema.Schema{ Description: "The warehouse the task will use. Omit this parameter to use Snowflake-managed compute resources for runs of this task. Due to Snowflake limitations warehouse identifier can consist of only upper-cased letters. (Conflicts with user_task_managed_initial_warehouse_size)", ConflictsWith: []string{"user_task_managed_initial_warehouse_size"}, }, - "user_task_managed_initial_warehouse_size": { - Type: schema.TypeString, - Optional: true, - ValidateDiagFunc: sdkValidation(sdk.ToWarehouseSize), - DiffSuppressFunc: NormalizeAndCompare(sdk.ToWarehouseSize), - Description: "Specifies the size of the compute resources to provision for the first run of the task, before a task history is available for Snowflake to determine an ideal size. Once a task has successfully completed a few runs, Snowflake ignores this parameter setting. (Conflicts with warehouse)", - ConflictsWith: []string{"warehouse"}, - }, "schedule": { Type: schema.TypeList, Optional: true, @@ -187,7 +179,7 @@ func Task() *schema.Resource { DeleteContext: DeleteTask, Description: "Resource used to manage task objects. For more information, check [task documentation](https://docs.snowflake.com/en/user-guide/tasks-intro).", - Schema: helpers.MergeMaps(taskSchema, taskParametersSchema), + Schema: collections.MergeMaps(taskSchema, taskParametersSchema), Importer: &schema.ResourceImporter{ StateContext: ImportTask, }, @@ -594,18 +586,6 @@ func ReadTask(withExternalChangesMarking bool) schema.ReadContextFunc { attributeMappedValueReadOrDefault(d, "warehouse", task.Warehouse, func(warehouse *sdk.AccountObjectIdentifier) (string, error) { return warehouse.Name(), nil }, nil), - func() error { - managedInitialWarehouseSizeIdx := slices.IndexFunc(taskParameters, func(p *sdk.Parameter) bool { - return p != nil && p.Key == string(sdk.TaskParameterUserTaskManagedInitialWarehouseSize) - }) - if managedInitialWarehouseSizeIdx == -1 { - return fmt.Errorf("unable to find user_task_managed_initial_warehouse_size parameter") - } - if taskParameters[managedInitialWarehouseSizeIdx].Level == sdk.ParameterTypeTask { - return d.Set("user_task_managed_initial_warehouse_size", taskParameters[managedInitialWarehouseSizeIdx].Value) - } - return nil - }(), func() error { if len(task.Schedule) > 0 { taskSchedule, err := sdk.ParseTaskSchedule(task.Schedule) diff --git a/pkg/resources/task_acceptance_test.go b/pkg/resources/task_acceptance_test.go index 9592dcb58d..64eb896cde 100644 --- a/pkg/resources/task_acceptance_test.go +++ b/pkg/resources/task_acceptance_test.go @@ -28,6 +28,8 @@ import ( "github.com/hashicorp/terraform-plugin-testing/tfversion" ) +// TODO(SNOW-1822118): Create more complicated tests for task + func TestAcc_Task_Basic(t *testing.T) { _ = testenvs.GetOrSkipTest(t, testenvs.EnableAcceptance) acc.TestAccPreCheck(t) @@ -62,7 +64,7 @@ func TestAcc_Task_Basic(t *testing.T) { HasErrorIntegrationString(""). HasCommentString(""). HasFinalizeString(""). - HasAfterIdsInOrder(). + HasAfter(). HasWhenString(""). HasSqlStatementString(statement), resourceshowoutputassert.TaskShowOutput(t, configModel.ResourceReference()). @@ -123,7 +125,7 @@ func TestAcc_Task_Complete(t *testing.T) { currentRole := acc.TestClient().Context.CurrentRole(t) - errorNotificationIntegration, errorNotificationIntegrationCleanup := acc.TestClient().NotificationIntegration.Create(t) + errorNotificationIntegration, errorNotificationIntegrationCleanup := acc.TestClient().NotificationIntegration.CreateWithGcpPubSub(t) t.Cleanup(errorNotificationIntegrationCleanup) id := acc.TestClient().Ids.RandomSchemaObjectIdentifier() @@ -237,7 +239,7 @@ func TestAcc_Task_Updates(t *testing.T) { warehouse, warehouseCleanup := acc.TestClient().Warehouse.CreateWarehouse(t) t.Cleanup(warehouseCleanup) - errorNotificationIntegration, errorNotificationIntegrationCleanup := acc.TestClient().NotificationIntegration.Create(t) + errorNotificationIntegration, errorNotificationIntegrationCleanup := acc.TestClient().NotificationIntegration.CreateWithGcpPubSub(t) t.Cleanup(errorNotificationIntegrationCleanup) taskConfig := `{"output_dir": "/temp/test_directory/", "learning_rate": 0.1}` @@ -276,7 +278,7 @@ func TestAcc_Task_Updates(t *testing.T) { HasErrorIntegrationString(""). HasCommentString(""). HasFinalizeString(""). - HasAfterIdsInOrder(). + HasAfter(). HasWhenString(""). HasSqlStatementString(statement), resourceshowoutputassert.TaskShowOutput(t, basicConfigModel.ResourceReference()). @@ -326,7 +328,7 @@ func TestAcc_Task_Updates(t *testing.T) { HasErrorIntegrationString(errorNotificationIntegration.ID().Name()). HasCommentString(comment). HasFinalizeString(""). - HasAfterIdsInOrder(). + HasAfter(). HasWhenString(condition). HasSqlStatementString(newStatement), resourceshowoutputassert.TaskShowOutput(t, completeConfigModel.ResourceReference()). @@ -375,7 +377,7 @@ func TestAcc_Task_Updates(t *testing.T) { HasErrorIntegrationString(""). HasCommentString(""). HasFinalizeString(""). - HasAfterIdsInOrder(). + HasAfter(). HasWhenString(""). HasSqlStatementString(statement), resourceshowoutputassert.TaskShowOutput(t, basicConfigModel.ResourceReference()). @@ -441,12 +443,11 @@ func TestAcc_Task_UpdatesInComplexDAG(t *testing.T) { )) comment := random.Comment() - basicConfigModelAfterUpdate := model.TaskWithId("test", child3Id, true, "SELECT 1"). + basicConfigModelAfterUpdate := model.TaskWithId("test", child3Id, true, "SELECT 123"). WithAfterValue(configvariable.SetVariable( configvariable.StringVariable(child1.ID().FullyQualifiedName()), configvariable.StringVariable(child2.ID().FullyQualifiedName()), )). - WithSqlStatement("SELECT 123"). // Overrides sql_statement WithComment(comment) resource.Test(t, resource.TestCase{ @@ -466,6 +467,7 @@ func TestAcc_Task_UpdatesInComplexDAG(t *testing.T) { HasSchemaString(child3Id.SchemaName()). HasNameString(child3Id.Name()). HasStartedString(r.BooleanTrue). + HasAfter(child1.ID(), child2.ID()). HasSqlStatementString("SELECT 1"), resourceshowoutputassert.TaskShowOutput(t, basicConfigModel.ResourceReference()). HasCreatedOnNotEmpty(). @@ -487,6 +489,7 @@ func TestAcc_Task_UpdatesInComplexDAG(t *testing.T) { HasNameString(child3Id.Name()). HasStartedString(r.BooleanTrue). HasCommentString(comment). + HasAfter(child1.ID(), child2.ID()). HasSqlStatementString("SELECT 123"), resourceshowoutputassert.TaskShowOutput(t, basicConfigModelAfterUpdate.ResourceReference()). HasCreatedOnNotEmpty(). @@ -577,7 +580,7 @@ func TestAcc_Task_ExternalChanges(t *testing.T) { warehouse, warehouseCleanup := acc.TestClient().Warehouse.CreateWarehouse(t) t.Cleanup(warehouseCleanup) - errorNotificationIntegration, errorNotificationIntegrationCleanup := acc.TestClient().NotificationIntegration.Create(t) + errorNotificationIntegration, errorNotificationIntegrationCleanup := acc.TestClient().NotificationIntegration.CreateWithGcpPubSub(t) t.Cleanup(errorNotificationIntegrationCleanup) taskConfig := `{"output_dir": "/temp/test_directory/", "learning_rate": 0.1}` @@ -618,7 +621,7 @@ func TestAcc_Task_ExternalChanges(t *testing.T) { HasErrorIntegrationString(errorNotificationIntegration.ID().Name()). HasCommentString(comment). HasFinalizeString(""). - HasAfterIdsInOrder(). + HasAfter(). HasWhenString(condition). HasSqlStatementString(statement), resourceshowoutputassert.TaskShowOutput(t, completeConfigModel.ResourceReference()). @@ -680,7 +683,7 @@ func TestAcc_Task_ExternalChanges(t *testing.T) { HasErrorIntegrationString(errorNotificationIntegration.ID().Name()). HasCommentString(comment). HasFinalizeString(""). - HasAfterIdsInOrder(). + HasAfter(). HasWhenString(condition). HasSqlStatementString(statement), resourceshowoutputassert.TaskShowOutput(t, completeConfigModel.ResourceReference()). @@ -724,7 +727,7 @@ func TestAcc_Task_ExternalChanges(t *testing.T) { HasErrorIntegrationString(""). HasCommentString(""). HasFinalizeString(""). - HasAfterIdsInOrder(). + HasAfter(). HasWhenString(""). HasSqlStatementString(statement), resourceshowoutputassert.TaskShowOutput(t, basicConfigModel.ResourceReference()). @@ -786,7 +789,7 @@ func TestAcc_Task_ExternalChanges(t *testing.T) { HasErrorIntegrationString(""). HasCommentString(""). HasFinalizeString(""). - HasAfterIdsInOrder(). + HasAfter(). HasWhenString(""). HasSqlStatementString(statement), resourceshowoutputassert.TaskShowOutput(t, basicConfigModel.ResourceReference()). @@ -1622,7 +1625,7 @@ func TestAcc_Task_ConvertStandaloneTaskToSubtask(t *testing.T) { HasScheduleMinutes(5). HasState(sdk.TaskStateStarted), resourceassert.TaskResource(t, childTaskModel.ResourceReference()). - HasAfterIdsInOrder(id). + HasAfter(id). HasStartedString(r.BooleanTrue), resourceshowoutputassert.TaskShowOutput(t, childTaskModel.ResourceReference()). HasPredecessors(id). @@ -1800,7 +1803,7 @@ func TestAcc_Task_SwitchScheduledWithAfter(t *testing.T) { resourceassert.TaskResource(t, childTaskConfigModel.ResourceReference()). HasStartedString(r.BooleanTrue). HasScheduleMinutes(schedule). - HasAfterIdsInOrder(). + HasAfter(). HasSuspendTaskAfterNumFailuresString("10"), ), }, @@ -1815,7 +1818,7 @@ func TestAcc_Task_SwitchScheduledWithAfter(t *testing.T) { resourceassert.TaskResource(t, childTaskConfigModelWithAfter.ResourceReference()). HasStartedString(r.BooleanTrue). HasNoScheduleSet(). - HasAfterIdsInOrder(rootId). + HasAfter(rootId). HasSuspendTaskAfterNumFailuresString("10"), ), }, @@ -1830,7 +1833,7 @@ func TestAcc_Task_SwitchScheduledWithAfter(t *testing.T) { resourceassert.TaskResource(t, childTaskConfigModel.ResourceReference()). HasStartedString(r.BooleanTrue). HasScheduleMinutes(schedule). - HasAfterIdsInOrder(). + HasAfter(). HasSuspendTaskAfterNumFailuresString("10"), ), }, @@ -1845,7 +1848,7 @@ func TestAcc_Task_SwitchScheduledWithAfter(t *testing.T) { resourceassert.TaskResource(t, childTaskConfigModelDisabled.ResourceReference()). HasStartedString(r.BooleanFalse). HasScheduleMinutes(schedule). - HasAfterIdsInOrder(). + HasAfter(). HasSuspendTaskAfterNumFailuresString("10"), ), }, @@ -1894,7 +1897,7 @@ func TestAcc_Task_WithAfter(t *testing.T) { HasScheduleMinutes(schedule), resourceassert.TaskResource(t, childTaskConfigModelWithAfter.ResourceReference()). HasStartedString(r.BooleanTrue). - HasAfterIdsInOrder(rootId), + HasAfter(rootId), ), }, { @@ -1906,7 +1909,7 @@ func TestAcc_Task_WithAfter(t *testing.T) { HasScheduleMinutes(schedule), resourceassert.TaskResource(t, childTaskConfigModelWithoutAfter.ResourceReference()). HasStartedString(r.BooleanTrue). - HasAfterIdsInOrder(), + HasAfter(), ), }, }, @@ -2142,7 +2145,7 @@ func TestAcc_Task_UpdateAfterExternally(t *testing.T) { Check: assert.AssertThat(t, resourceassert.TaskResource(t, childTaskConfigModelWithoutAfter.ResourceReference()). HasStartedString(r.BooleanTrue). - HasAfterIdsInOrder(), + HasAfter(), resourceshowoutputassert.TaskShowOutput(t, childTaskConfigModelWithoutAfter.ResourceReference()). HasState(sdk.TaskStateStarted). HasTaskRelations(sdk.TaskRelations{}), @@ -2155,7 +2158,7 @@ func TestAcc_Task_UpdateAfterExternally(t *testing.T) { Check: assert.AssertThat(t, resourceassert.TaskResource(t, childTaskConfigModelWithAfter.ResourceReference()). HasStartedString(r.BooleanTrue). - HasAfterIdsInOrder(rootId), + HasAfter(rootId), resourceshowoutputassert.TaskShowOutput(t, childTaskConfigModelWithAfter.ResourceReference()). HasState(sdk.TaskStateStarted). HasTaskRelations(sdk.TaskRelations{Predecessors: []sdk.SchemaObjectIdentifier{rootId}}), @@ -2178,7 +2181,7 @@ func TestAcc_Task_UpdateAfterExternally(t *testing.T) { Check: assert.AssertThat(t, resourceassert.TaskResource(t, childTaskConfigModelWithAfter.ResourceReference()). HasStartedString(r.BooleanTrue). - HasAfterIdsInOrder(rootId), + HasAfter(rootId), resourceshowoutputassert.TaskShowOutput(t, childTaskConfigModelWithAfter.ResourceReference()). HasState(sdk.TaskStateStarted). HasTaskRelations(sdk.TaskRelations{Predecessors: []sdk.SchemaObjectIdentifier{rootId}}), @@ -2191,7 +2194,7 @@ func TestAcc_Task_UpdateAfterExternally(t *testing.T) { Check: assert.AssertThat(t, resourceassert.TaskResource(t, childTaskConfigModelWithoutAfter.ResourceReference()). HasStartedString(r.BooleanTrue). - HasAfterIdsInOrder(), + HasAfter(), resourceshowoutputassert.TaskShowOutput(t, childTaskConfigModelWithoutAfter.ResourceReference()). HasState(sdk.TaskStateStarted). HasTaskRelations(sdk.TaskRelations{}), @@ -2244,7 +2247,7 @@ func TestAcc_Task_issue2207(t *testing.T) { HasScheduleMinutes(schedule), resourceassert.TaskResource(t, childTaskConfigModel.ResourceReference()). HasStartedString(r.BooleanTrue). - HasAfterIdsInOrder(rootId). + HasAfter(rootId). HasCommentString("abc"), ), }, @@ -2263,7 +2266,7 @@ func TestAcc_Task_issue2207(t *testing.T) { HasScheduleMinutes(schedule), resourceassert.TaskResource(t, childTaskConfigModelWithDifferentComment.ResourceReference()). HasStartedString(r.BooleanTrue). - HasAfterIdsInOrder(rootId). + HasAfter(rootId). HasCommentString("def"), ), }, @@ -2335,7 +2338,7 @@ func TestAcc_Task_issue3113(t *testing.T) { _ = testenvs.GetOrSkipTest(t, testenvs.EnableAcceptance) acc.TestAccPreCheck(t) - errorNotificationIntegration, errorNotificationIntegrationCleanup := acc.TestClient().NotificationIntegration.Create(t) + errorNotificationIntegration, errorNotificationIntegrationCleanup := acc.TestClient().NotificationIntegration.CreateWithGcpPubSub(t) t.Cleanup(errorNotificationIntegrationCleanup) id := acc.TestClient().Ids.RandomSchemaObjectIdentifier() @@ -2379,18 +2382,50 @@ func TestAcc_Task_issue3113(t *testing.T) { }) } -func taskConfigWithErrorIntegration(id sdk.SchemaObjectIdentifier, errorIntegrationId sdk.AccountObjectIdentifier) string { - return fmt.Sprintf(` -resource "snowflake_task" "test" { - database = "%[1]s" - schema = "%[2]s" - name = "%[3]s" - schedule = "5 MINUTES" - sql_statement = "SELECT 1" - enabled = true - error_integration = "%[4]s" -} -`, id.DatabaseName(), id.SchemaName(), id.Name(), errorIntegrationId.Name()) +func TestAcc_Task_StateUpgrade_NoOptionalFields(t *testing.T) { + _ = testenvs.GetOrSkipTest(t, testenvs.EnableAcceptance) + acc.TestAccPreCheck(t) + + id := acc.TestClient().Ids.RandomSchemaObjectIdentifier() + statement := "SELECT 1" + configModel := model.TaskWithId("test", id, false, statement) + + resource.Test(t, resource.TestCase{ + PreCheck: func() { acc.TestAccPreCheck(t) }, + TerraformVersionChecks: []tfversion.TerraformVersionCheck{ + tfversion.RequireAbove(tfversion.Version1_5_0), + }, + CheckDestroy: acc.CheckDestroy(t, resources.Task), + Steps: []resource.TestStep{ + { + ExternalProviders: map[string]resource.ExternalProvider{ + "snowflake": { + VersionConstraint: "=0.98.0", + Source: "Snowflake-Labs/snowflake", + }, + }, + Config: taskNoOptionalFieldsConfigV0980(id), + Check: resource.ComposeAggregateTestCheckFunc( + resource.TestCheckResourceAttr("snowflake_task.test", "enabled", "false"), + resource.TestCheckResourceAttr("snowflake_task.test", "allow_overlapping_execution", "false"), + ), + }, + { + ProtoV6ProviderFactories: acc.TestAccProtoV6ProviderFactories, + ConfigDirectory: acc.ConfigurationDirectory("TestAcc_Task/basic"), + ConfigVariables: config.ConfigVariablesFromModel(t, configModel), + Check: assert.AssertThat(t, + resourceassert.TaskResource(t, configModel.ResourceReference()). + HasFullyQualifiedNameString(id.FullyQualifiedName()). + HasDatabaseString(id.DatabaseName()). + HasSchemaString(id.SchemaName()). + HasNameString(id.Name()). + HasStartedString(r.BooleanFalse). + HasAllowOverlappingExecutionString(r.BooleanDefault), + ), + }, + }, + }) } func TestAcc_Task_StateUpgrade(t *testing.T) { @@ -2453,23 +2488,6 @@ func TestAcc_Task_StateUpgrade(t *testing.T) { }) } -func taskBasicConfigV0980(id sdk.SchemaObjectIdentifier, condition string) string { - return fmt.Sprintf(` -resource "snowflake_task" "test" { - database = "%[1]s" - schema = "%[2]s" - name = "%[3]s" - enabled = false - sql_statement = "SELECT 1" - schedule = "5 MINUTES" - allow_overlapping_execution = true - suspend_task_after_num_failures = 10 - when = "%[4]s" - user_task_managed_initial_warehouse_size = "XSMALL" -} -`, id.DatabaseName(), id.SchemaName(), id.Name(), condition) -} - func TestAcc_Task_StateUpgradeWithAfter(t *testing.T) { _ = testenvs.GetOrSkipTest(t, testenvs.EnableAcceptance) acc.TestAccPreCheck(t) @@ -2526,7 +2544,7 @@ func TestAcc_Task_StateUpgradeWithAfter(t *testing.T) { HasNameString(id.Name()). HasStartedString(r.BooleanFalse). HasSqlStatementString(statement). - HasAfterIdsInOrder(rootTask.ID()). + HasAfter(rootTask.ID()). HasWarehouseString(acc.TestClient().Ids.WarehouseId().Name()). HasUserTaskTimeoutMsString("50"). HasLogLevelString(string(sdk.LogLevelInfo)). @@ -2539,6 +2557,48 @@ func TestAcc_Task_StateUpgradeWithAfter(t *testing.T) { }) } +func taskNoOptionalFieldsConfigV0980(id sdk.SchemaObjectIdentifier) string { + return fmt.Sprintf(` +resource "snowflake_task" "test" { + database = "%[1]s" + schema = "%[2]s" + name = "%[3]s" + sql_statement = "SELECT 1" +} +`, id.DatabaseName(), id.SchemaName(), id.Name()) +} + +func taskConfigWithErrorIntegration(id sdk.SchemaObjectIdentifier, errorIntegrationId sdk.AccountObjectIdentifier) string { + return fmt.Sprintf(` +resource "snowflake_task" "test" { + database = "%[1]s" + schema = "%[2]s" + name = "%[3]s" + schedule = "5 MINUTES" + sql_statement = "SELECT 1" + enabled = true + error_integration = "%[4]s" +} +`, id.DatabaseName(), id.SchemaName(), id.Name(), errorIntegrationId.Name()) +} + +func taskBasicConfigV0980(id sdk.SchemaObjectIdentifier, condition string) string { + return fmt.Sprintf(` +resource "snowflake_task" "test" { + database = "%[1]s" + schema = "%[2]s" + name = "%[3]s" + enabled = false + sql_statement = "SELECT 1" + schedule = "5 MINUTES" + allow_overlapping_execution = true + suspend_task_after_num_failures = 10 + when = "%[4]s" + user_task_managed_initial_warehouse_size = "XSMALL" +} +`, id.DatabaseName(), id.SchemaName(), id.Name(), condition) +} + func taskCompleteConfigV0980( id sdk.SchemaObjectIdentifier, rootTaskId sdk.SchemaObjectIdentifier, diff --git a/pkg/resources/task_state_upgraders.go b/pkg/resources/task_state_upgraders.go index e02512dd30..1a77858405 100644 --- a/pkg/resources/task_state_upgraders.go +++ b/pkg/resources/task_state_upgraders.go @@ -12,18 +12,22 @@ func v098TaskStateUpgrader(ctx context.Context, rawState map[string]any, meta an } rawState["condition"] = rawState["when"] - rawState["started"] = booleanStringFromBool(rawState["enabled"].(bool)) + rawState["started"] = rawState["enabled"].(bool) rawState["allow_overlapping_execution"] = booleanStringFromBool(rawState["allow_overlapping_execution"].(bool)) if rawState["after"] != nil { - newAfter := make([]string, len(rawState["after"].([]any))) - for i, name := range rawState["after"].([]any) { - newAfter[i] = sdk.NewSchemaObjectIdentifier(rawState["database"].(string), rawState["schema"].(string), name.(string)).FullyQualifiedName() + if afterSlice, okType := rawState["after"].([]any); okType { + newAfter := make([]string, len(afterSlice)) + for i, name := range afterSlice { + newAfter[i] = sdk.NewSchemaObjectIdentifier(rawState["database"].(string), rawState["schema"].(string), name.(string)).FullyQualifiedName() + } + rawState["after"] = newAfter } - rawState["after"] = newAfter } if rawState["session_parameters"] != nil { - for k, v := range rawState["session_parameters"].(map[string]any) { - rawState[k] = v + if sessionParamsMap, okType := rawState["session_parameters"].(map[string]any); okType { + for k, v := range sessionParamsMap { + rawState[k] = v + } } } delete(rawState, "session_parameters") diff --git a/pkg/sdk/tasks_gen.go b/pkg/sdk/tasks_gen.go index 203e76459b..3d6bf112bb 100644 --- a/pkg/sdk/tasks_gen.go +++ b/pkg/sdk/tasks_gen.go @@ -239,8 +239,9 @@ func ParseTaskSchedule(schedule string) (*TaskSchedule, error) { } return &TaskSchedule{Minutes: minutes}, nil + default: + return nil, fmt.Errorf("invalid schedule format: %s", schedule) } - return nil, fmt.Errorf("invalid schedule format: %s", schedule) } // DescribeTaskOptions is based on https://docs.snowflake.com/en/sql-reference/sql/desc-task. diff --git a/pkg/sdk/tasks_gen_test.go b/pkg/sdk/tasks_gen_test.go index 0d0c72181f..9422d73824 100644 --- a/pkg/sdk/tasks_gen_test.go +++ b/pkg/sdk/tasks_gen_test.go @@ -564,27 +564,26 @@ func TestParseTaskSchedule(t *testing.T) { "valid schedule: m minutes": { Schedule: "5 m", ExpectedTaskSchedule: &TaskSchedule{Minutes: 5}, - Error: "", }, "valid schedule: M minutes": { - Schedule: "5 m", + Schedule: "5 M", ExpectedTaskSchedule: &TaskSchedule{Minutes: 5}, - Error: "", }, "valid schedule: MINUTE minutes": { Schedule: "5 MINUTE", ExpectedTaskSchedule: &TaskSchedule{Minutes: 5}, - Error: "", }, "valid schedule: MINUTES minutes": { Schedule: "5 MINUTES", ExpectedTaskSchedule: &TaskSchedule{Minutes: 5}, - Error: "", }, "valid schedule: cron": { Schedule: "USING CRON * * * * * UTC", ExpectedTaskSchedule: &TaskSchedule{Cron: "* * * * * UTC"}, - Error: "", + }, + "valid schedule: cron with case sensitive location": { + Schedule: "USING CRON * * * * * America/Loc_Angeles", + ExpectedTaskSchedule: &TaskSchedule{Cron: "* * * * * America/Loc_Angeles"}, }, "invalid schedule: wrong schedule format": { Schedule: "SOME SCHEDULE", @@ -596,11 +595,10 @@ func TestParseTaskSchedule(t *testing.T) { ExpectedTaskSchedule: nil, Error: `strconv.Atoi: parsing "A5": invalid syntax`, }, - // currently cron expressions are not validated (they are on Snowflake level) + // currently, cron expressions are not validated (they are on Snowflake level) "invalid schedule: wrong cron format": { Schedule: "USING CRON some_cron", ExpectedTaskSchedule: &TaskSchedule{Cron: "some_cron"}, - Error: "", }, } From 02245594c3997fe1e14fba76537526ee456dc60c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Cie=C5=9Blak?= Date: Mon, 25 Nov 2024 10:17:35 +0100 Subject: [PATCH 3/3] Fix tests --- pkg/sdk/testint/tasks_gen_integration_test.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/pkg/sdk/testint/tasks_gen_integration_test.go b/pkg/sdk/testint/tasks_gen_integration_test.go index 7b427cbabb..b47c7c9139 100644 --- a/pkg/sdk/testint/tasks_gen_integration_test.go +++ b/pkg/sdk/testint/tasks_gen_integration_test.go @@ -2,6 +2,7 @@ package testint import ( "fmt" + "strings" "testing" "time" @@ -21,7 +22,7 @@ func TestInt_Tasks(t *testing.T) { ctx := testContext(t) sql := "SELECT CURRENT_TIMESTAMP" - errorIntegration, ErrorIntegrationCleanup := testClientHelper().NotificationIntegration.Create(t) + errorIntegration, ErrorIntegrationCleanup := testClientHelper().NotificationIntegration.CreateWithGcpPubSub(t) t.Cleanup(ErrorIntegrationCleanup) assertTask := func(t *testing.T, task *sdk.Task, id sdk.SchemaObjectIdentifier, warehouseId *sdk.AccountObjectIdentifier) { @@ -899,8 +900,11 @@ func TestInt_Tasks(t *testing.T) { err := client.Tasks.Execute(ctx, sdk.NewExecuteTaskRequest(task.ID())) require.NoError(t, err) - err = client.Tasks.Execute(ctx, sdk.NewExecuteTaskRequest(task.ID()).WithRetryLast(true)) - require.ErrorContains(t, err, fmt.Sprintf("Cannot perform retry: no suitable run of graph with root task %s to retry.", task.ID().Name())) + require.Eventually(t, func() bool { + err = client.Tasks.Execute(ctx, sdk.NewExecuteTaskRequest(task.ID()).WithRetryLast(true)) + return strings.Contains(err.Error(), fmt.Sprintf("Cannot perform retry: no suitable run of graph with root task %s to retry.", task.ID().Name())) || + strings.Contains(err.Error(), fmt.Sprintf("graph with root task %s had no failures.", task.ID().Name())) + }, 2*time.Second, time.Millisecond*300) }) t.Run("execute task: retry last after failed last task", func(t *testing.T) {