Skip to content

Commit

Permalink
feat: Use tasks from the SDK followup (#2153)
Browse files Browse the repository at this point in the history
* Support cycles when getting root tasks

* Test setting parameter

* Use set parameter method

* Add missing parameter

* Fix linter

* Fix session parameters handling for task

* Try to fix failing test

* Fix tests

* Fix typo
  • Loading branch information
sfc-gh-asawicki authored Oct 30, 2023
1 parent b86c4c3 commit 82c3c13
Show file tree
Hide file tree
Showing 11 changed files with 368 additions and 366 deletions.
1 change: 1 addition & 0 deletions docs/resources/task.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ resource "snowflake_task" "test_task" {
- `error_integration` (String) Specifies the name of the notification integration used for error notifications.
- `schedule` (String) The schedule for periodically running the task. This can be a cron or interval in minutes. (Conflict with after)
- `session_parameters` (Map of String) Specifies session parameters to set for the session when the task runs. A task supports all session parameters.
- `suspend_task_after_num_failures` (Number) Specifies the number of consecutive failed task runs after which the current task is suspended automatically. The default is 0 (no automatic suspension).
- `user_task_managed_initial_warehouse_size` (String) Specifies the size of the compute resources to provision for the first run of the task, before a task history is available for Snowflake to determine an ideal size. Once a task has successfully completed a few runs, Snowflake ignores this parameter setting. (Conflicts with warehouse)
- `user_task_timeout_ms` (Number) Specifies the time limit on a single run of the task before it times out (in milliseconds).
- `warehouse` (String) The warehouse the task will use. Omit this parameter to use Snowflake-managed compute resources for runs of this task. (Conflicts with user_task_managed_initial_warehouse_size)
Expand Down
64 changes: 59 additions & 5 deletions pkg/resources/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"golang.org/x/exp/slices"
)

// TODO [SNOW-884987]: add missing SUSPEND_TASK_AFTER_NUM_FAILURES attribute.
var taskSchema = map[string]*schema.Schema{
"enabled": {
Type: schema.TypeBool,
Expand Down Expand Up @@ -67,6 +66,13 @@ var taskSchema = map[string]*schema.Schema{
ValidateFunc: validation.IntBetween(0, 86400000),
Description: "Specifies the time limit on a single run of the task before it times out (in milliseconds).",
},
"suspend_task_after_num_failures": {
Type: schema.TypeInt,
Optional: true,
Default: 0,
ValidateFunc: validation.IntAtLeast(0),
Description: "Specifies the number of consecutive failed task runs after which the current task is suspended automatically. The default is 0 (no automatic suspension).",
},
"comment": {
Type: schema.TypeString,
Optional: true,
Expand Down Expand Up @@ -124,6 +130,19 @@ func difference(a, b map[string]any) map[string]any {
return diff
}

// differentValue find keys present both in 'a' and 'b' but having different values.
func differentValue(a, b map[string]any) map[string]any {
diff := make(map[string]any)
for k, va := range a {
if vb, ok := b[k]; ok {
if vb != va {
diff[k] = vb
}
}
}
return diff
}

// Task returns a pointer to the resource representing a task.
func Task() *schema.Resource {
return &schema.Resource{
Expand Down Expand Up @@ -214,7 +233,7 @@ func ReadTask(d *schema.ResourceData, meta interface{}) error {
}

if len(params) > 0 {
sessionParameters := map[string]interface{}{}
sessionParameters := make(map[string]any)
fieldParameters := map[string]interface{}{
"user_task_managed_initial_warehouse_size": "",
}
Expand All @@ -233,6 +252,13 @@ func ReadTask(d *schema.ResourceData, meta interface{}) error {
}

fieldParameters["user_task_timeout_ms"] = timeout
case "SUSPEND_TASK_AFTER_NUM_FAILURES":
num, err := strconv.ParseInt(param.Value, 10, 64)
if err != nil {
return err
}

fieldParameters["suspend_task_after_num_failures"] = num
default:
sessionParameters[param.Key] = param.Value
}
Expand Down Expand Up @@ -299,6 +325,10 @@ func CreateTask(d *schema.ResourceData, meta interface{}) error {
createRequest.WithUserTaskTimeoutMs(sdk.Int(v.(int)))
}

if v, ok := d.GetOk("suspend_task_after_num_failures"); ok {
createRequest.WithSuspendTaskAfterNumFailures(sdk.Int(v.(int)))
}

if v, ok := d.GetOk("comment"); ok {
createRequest.WithComment(sdk.String(v.(string)))
}
Expand Down Expand Up @@ -558,6 +588,20 @@ func UpdateTask(d *schema.ResourceData, meta interface{}) error {
}
}

if d.HasChange("suspend_task_after_num_failures") {
o, n := d.GetChange("suspend_task_after_num_failures")
alterRequest := sdk.NewAlterTaskRequest(taskId)
if o.(int) > 0 && n.(int) == 0 {
alterRequest.WithUnset(sdk.NewTaskUnsetRequest().WithSuspendTaskAfterNumFailures(sdk.Bool(true)))
} else {
alterRequest.WithSet(sdk.NewTaskSetRequest().WithSuspendTaskAfterNumFailures(sdk.Int(n.(int))))
}
err := client.Tasks.Alter(ctx, alterRequest)
if err != nil {
return fmt.Errorf("error updating suspend task after num failures on task %s", taskId.FullyQualifiedName())
}
}

if d.HasChange("comment") {
newComment := d.Get("comment")
alterRequest := sdk.NewAlterTaskRequest(taskId)
Expand Down Expand Up @@ -586,7 +630,6 @@ func UpdateTask(d *schema.ResourceData, meta interface{}) error {
}
}

// TODO [SNOW-884987]: old implementation does not handle changing parameter value correctly (only finds for parameters to add od remove, not change)
if d.HasChange("session_parameters") {
o, n := d.GetChange("session_parameters")

Expand All @@ -601,14 +644,15 @@ func UpdateTask(d *schema.ResourceData, meta interface{}) error {

remove := difference(os, ns)
add := difference(ns, os)
change := differentValue(os, ns)

if len(remove) > 0 {
sessionParametersUnset, err := sdk.GetSessionParametersUnsetFrom(remove)
if err != nil {
return err
}
if err := client.Tasks.Alter(ctx, sdk.NewAlterTaskRequest(taskId).WithUnset(sdk.NewTaskUnsetRequest().WithSessionParametersUnset(sessionParametersUnset))); err != nil {
return fmt.Errorf("error removing session_parameters on task %v", d.Id())
return fmt.Errorf("error removing session_parameters on task %v err = %w", d.Id(), err)
}
}

Expand All @@ -618,7 +662,17 @@ func UpdateTask(d *schema.ResourceData, meta interface{}) error {
return err
}
if err := client.Tasks.Alter(ctx, sdk.NewAlterTaskRequest(taskId).WithSet(sdk.NewTaskSetRequest().WithSessionParameters(sessionParameters))); err != nil {
return fmt.Errorf("error adding session_parameters to task %v", d.Id())
return fmt.Errorf("error adding session_parameters to task %v err = %w", d.Id(), err)
}
}

if len(change) > 0 {
sessionParameters, err := sdk.GetSessionParametersFrom(change)
if err != nil {
return err
}
if err := client.Tasks.Alter(ctx, sdk.NewAlterTaskRequest(taskId).WithSet(sdk.NewTaskSetRequest().WithSessionParameters(sessionParameters))); err != nil {
return fmt.Errorf("error updating session_parameters in task %v err = %w", d.Id(), err)
}
}
}
Expand Down
79 changes: 59 additions & 20 deletions pkg/resources/task_acceptance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"text/template"

acc "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/acceptance"
"github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/sdk"
"github.com/hashicorp/terraform-plugin-testing/helper/acctest"
"github.com/hashicorp/terraform-plugin-testing/helper/resource"
"github.com/hashicorp/terraform-plugin-testing/terraform"
Expand Down Expand Up @@ -50,6 +51,10 @@ var (
Enabled: true,
Schedule: "5 MINUTE",
UserTaskTimeoutMs: 1800000,
SessionParams: map[string]string{
string(sdk.SessionParameterLockTimeout): "1000",
string(sdk.SessionParameterStrictJSONOutput): "true",
},
},

ChildTask: &TaskSettings{
Expand Down Expand Up @@ -79,6 +84,10 @@ var (
Enabled: true,
Schedule: "5 MINUTE",
UserTaskTimeoutMs: 1800000,
SessionParams: map[string]string{
string(sdk.SessionParameterLockTimeout): "1000",
string(sdk.SessionParameterStrictJSONOutput): "true",
},
},

ChildTask: &TaskSettings{
Expand All @@ -95,7 +104,7 @@ var (
When: "TRUE",
Enabled: true,
SessionParams: map[string]string{
"TIMESTAMP_INPUT_FORMAT": "YYYY-MM-DD HH24",
string(sdk.SessionParameterTimestampInputFormat): "YYYY-MM-DD HH24",
},
Schedule: "5 MINUTE",
UserTaskTimeoutMs: 1800000,
Expand All @@ -113,6 +122,10 @@ var (
Enabled: true,
Schedule: "15 MINUTE",
UserTaskTimeoutMs: 1800000,
SessionParams: map[string]string{
string(sdk.SessionParameterLockTimeout): "1000",
string(sdk.SessionParameterStrictJSONOutput): "true",
},
},

ChildTask: &TaskSettings{
Expand Down Expand Up @@ -144,6 +157,11 @@ var (
Enabled: false,
Schedule: "5 MINUTE",
UserTaskTimeoutMs: 1800000,
// Changes session params: one is updated, one is removed, one is added
SessionParams: map[string]string{
string(sdk.SessionParameterLockTimeout): "2000",
string(sdk.SessionParameterMultiStatementCount): "5",
},
},

ChildTask: &TaskSettings{
Expand All @@ -160,7 +178,7 @@ var (
When: "TRUE",
Enabled: true,
SessionParams: map[string]string{
"TIMESTAMP_INPUT_FORMAT": "YYYY-MM-DD HH24",
string(sdk.SessionParameterTimestampInputFormat): "YYYY-MM-DD HH24",
},
Schedule: "5 MINUTE",
UserTaskTimeoutMs: 0,
Expand Down Expand Up @@ -193,6 +211,9 @@ func TestAcc_Task(t *testing.T) {
resource.TestCheckResourceAttr("snowflake_task.child_task", "schedule", initialState.ChildTask.Schedule),
checkInt64("snowflake_task.root_task", "user_task_timeout_ms", initialState.RootTask.UserTaskTimeoutMs),
resource.TestCheckNoResourceAttr("snowflake_task.solo_task", "user_task_timeout_ms"),
checkInt64("snowflake_task.root_task", "session_parameters.LOCK_TIMEOUT", 1000),
checkBool("snowflake_task.root_task", "session_parameters.STRICT_JSON_OUTPUT", true),
resource.TestCheckNoResourceAttr("snowflake_task.root_task", "session_parameters.MULTI_STATEMENT_COUNT"),
),
},
{
Expand All @@ -213,6 +234,9 @@ func TestAcc_Task(t *testing.T) {
resource.TestCheckResourceAttr("snowflake_task.child_task", "schedule", stepOne.ChildTask.Schedule),
checkInt64("snowflake_task.root_task", "user_task_timeout_ms", stepOne.RootTask.UserTaskTimeoutMs),
checkInt64("snowflake_task.solo_task", "user_task_timeout_ms", stepOne.SoloTask.UserTaskTimeoutMs),
checkInt64("snowflake_task.root_task", "session_parameters.LOCK_TIMEOUT", 1000),
checkBool("snowflake_task.root_task", "session_parameters.STRICT_JSON_OUTPUT", true),
resource.TestCheckNoResourceAttr("snowflake_task.root_task", "session_parameters.MULTI_STATEMENT_COUNT"),
),
},
{
Expand All @@ -233,6 +257,9 @@ func TestAcc_Task(t *testing.T) {
resource.TestCheckResourceAttr("snowflake_task.child_task", "schedule", stepTwo.ChildTask.Schedule),
checkInt64("snowflake_task.root_task", "user_task_timeout_ms", stepTwo.RootTask.UserTaskTimeoutMs),
checkInt64("snowflake_task.solo_task", "user_task_timeout_ms", stepTwo.SoloTask.UserTaskTimeoutMs),
checkInt64("snowflake_task.root_task", "session_parameters.LOCK_TIMEOUT", 1000),
checkBool("snowflake_task.root_task", "session_parameters.STRICT_JSON_OUTPUT", true),
resource.TestCheckNoResourceAttr("snowflake_task.root_task", "session_parameters.MULTI_STATEMENT_COUNT"),
),
},
{
Expand All @@ -253,6 +280,9 @@ func TestAcc_Task(t *testing.T) {
resource.TestCheckResourceAttr("snowflake_task.child_task", "schedule", stepThree.ChildTask.Schedule),
checkInt64("snowflake_task.root_task", "user_task_timeout_ms", stepThree.RootTask.UserTaskTimeoutMs),
checkInt64("snowflake_task.solo_task", "user_task_timeout_ms", stepThree.SoloTask.UserTaskTimeoutMs),
checkInt64("snowflake_task.root_task", "session_parameters.LOCK_TIMEOUT", 2000),
resource.TestCheckNoResourceAttr("snowflake_task.root_task", "session_parameters.STRICT_JSON_OUTPUT"),
checkInt64("snowflake_task.root_task", "session_parameters.MULTI_STATEMENT_COUNT", 5),
),
},
{
Expand All @@ -279,6 +309,9 @@ func TestAcc_Task(t *testing.T) {
// `user_task_timeout_ms` by unsetting the
// USER_TASK_TIMEOUT_MS session variable.
checkInt64("snowflake_task.solo_task", "user_task_timeout_ms", initialState.ChildTask.UserTaskTimeoutMs),
checkInt64("snowflake_task.root_task", "session_parameters.LOCK_TIMEOUT", 1000),
checkBool("snowflake_task.root_task", "session_parameters.STRICT_JSON_OUTPUT", true),
resource.TestCheckNoResourceAttr("snowflake_task.root_task", "session_parameters.MULTI_STATEMENT_COUNT"),
),
},
},
Expand All @@ -302,12 +335,12 @@ resource "snowflake_task" "root_task" {
user_task_timeout_ms = {{ .RootTask.UserTaskTimeoutMs }}
{{- end }}
{{ if .ChildTask.SessionParams }}
{{ if .RootTask.SessionParams }}
session_parameters = {
{{ range $key, $value := .RootTask.SessionParams}}
{{ range $key, $value := .RootTask.SessionParams}}
{{ $key }} = "{{ $value }}",
}
{{- end }}
}
{{- end }}
}
resource "snowflake_task" "child_task" {
Expand All @@ -325,10 +358,10 @@ resource "snowflake_task" "child_task" {
{{ if .ChildTask.SessionParams }}
session_parameters = {
{{ range $key, $value := .ChildTask.SessionParams}}
{{ range $key, $value := .ChildTask.SessionParams}}
{{ $key }} = "{{ $value }}",
}
{{- end }}
}
{{- end }}
}
resource "snowflake_task" "solo_task" {
Expand All @@ -351,8 +384,8 @@ resource "snowflake_task" "solo_task" {
session_parameters = {
{{ range $key, $value := .SoloTask.SessionParams}}
{{ $key }} = "{{ $value }}",
}
{{- end }}
}
{{- end }}
}
`)
Expand Down Expand Up @@ -519,6 +552,7 @@ func TestAcc_Task_SwitchScheduled(t *testing.T) {
resource.TestCheckResourceAttr("snowflake_task.test_task", "schema", acc.TestSchemaName),
resource.TestCheckResourceAttr("snowflake_task.test_task", "sql_statement", "SELECT 1"),
resource.TestCheckResourceAttr("snowflake_task.test_task", "schedule", "5 MINUTE"),
resource.TestCheckResourceAttr("snowflake_task.test_task_root", "suspend_task_after_num_failures", "1"),
),
},
{
Expand All @@ -529,6 +563,7 @@ func TestAcc_Task_SwitchScheduled(t *testing.T) {
resource.TestCheckResourceAttr("snowflake_task.test_task", "schema", acc.TestSchemaName),
resource.TestCheckResourceAttr("snowflake_task.test_task", "sql_statement", "SELECT 1"),
resource.TestCheckResourceAttr("snowflake_task.test_task", "schedule", ""),
resource.TestCheckResourceAttr("snowflake_task.test_task_root", "suspend_task_after_num_failures", "2"),
),
},
{
Expand All @@ -539,6 +574,7 @@ func TestAcc_Task_SwitchScheduled(t *testing.T) {
resource.TestCheckResourceAttr("snowflake_task.test_task", "schema", acc.TestSchemaName),
resource.TestCheckResourceAttr("snowflake_task.test_task", "sql_statement", "SELECT 1"),
resource.TestCheckResourceAttr("snowflake_task.test_task", "schedule", "5 MINUTE"),
resource.TestCheckResourceAttr("snowflake_task.test_task_root", "suspend_task_after_num_failures", "1"),
),
},
{
Expand All @@ -549,6 +585,7 @@ func TestAcc_Task_SwitchScheduled(t *testing.T) {
resource.TestCheckResourceAttr("snowflake_task.test_task", "schema", acc.TestSchemaName),
resource.TestCheckResourceAttr("snowflake_task.test_task", "sql_statement", "SELECT 1"),
resource.TestCheckResourceAttr("snowflake_task.test_task", "schedule", ""),
resource.TestCheckResourceAttr("snowflake_task.test_task_root", "suspend_task_after_num_failures", "0"),
),
},
},
Expand All @@ -558,12 +595,13 @@ func TestAcc_Task_SwitchScheduled(t *testing.T) {
func taskConfigManagedScheduled(name string, taskRootName string, databaseName string, schemaName string) string {
s := `
resource "snowflake_task" "test_task_root" {
name = "%s"
database = "%s"
schema = "%s"
sql_statement = "SELECT 1"
enabled = true
schedule = "5 MINUTE"
name = "%s"
database = "%s"
schema = "%s"
sql_statement = "SELECT 1"
enabled = true
schedule = "5 MINUTE"
suspend_task_after_num_failures = 1
}
resource "snowflake_task" "test_task" {
Expand All @@ -581,12 +619,13 @@ resource "snowflake_task" "test_task" {
func taskConfigManagedScheduled2(name string, taskRootName string, databaseName string, schemaName string) string {
s := `
resource "snowflake_task" "test_task_root" {
name = "%s"
database = "%s"
schema = "%s"
sql_statement = "SELECT 1"
enabled = true
schedule = "5 MINUTE"
name = "%s"
database = "%s"
schema = "%s"
sql_statement = "SELECT 1"
enabled = true
schedule = "5 MINUTE"
suspend_task_after_num_failures = 2
}
resource "snowflake_task" "test_task" {
Expand Down
Loading

0 comments on commit 82c3c13

Please sign in to comment.