From dfa2db621c358941bc965b6019f12ebb369edec4 Mon Sep 17 00:00:00 2001 From: Andrew Dye Date: Mon, 25 Sep 2023 17:16:20 -0700 Subject: [PATCH] Add tests Signed-off-by: Andrew Dye --- .../transformers/task_execution.go | 19 +- .../transformers/task_execution_test.go | 295 +++++++++++++++++- 2 files changed, 304 insertions(+), 10 deletions(-) diff --git a/pkg/repositories/transformers/task_execution.go b/pkg/repositories/transformers/task_execution.go index 228c84eef..b12a86694 100644 --- a/pkg/repositories/transformers/task_execution.go +++ b/pkg/repositories/transformers/task_execution.go @@ -153,19 +153,27 @@ func CreateTaskExecutionModel(ctx context.Context, input CreateTaskExecutionMode CreatedAt: input.Request.Event.OccurredAt, Logs: input.Request.Event.Logs, CustomInfo: input.Request.Event.CustomInfo, - Reason: input.Request.Event.Reason, TaskType: input.Request.Event.TaskType, Metadata: metadata, EventVersion: input.Request.Event.EventVersion, } - if len(input.Request.Event.Reason) > 0 { + if len(input.Request.Event.Reasons) > 0 { + for _, reason := range input.Request.Event.Reasons { + closure.Reasons = append(closure.Reasons, &admin.Reason{ + OccurredAt: reason.OccurredAt, + Message: reason.Reason, + }) + } + closure.Reason = input.Request.Event.Reasons[len(input.Request.Event.Reasons)-1].Reason + } else if len(input.Request.Event.Reason) > 0 { closure.Reasons = []*admin.Reason{ - &admin.Reason{ + { OccurredAt: input.Request.Event.OccurredAt, Message: input.Request.Event.Reason, }, } + closure.Reason = input.Request.Event.Reason } eventPhase := input.Request.Event.Phase @@ -388,15 +396,14 @@ func UpdateTaskExecutionModel(ctx context.Context, request *admin.TaskExecutionE taskExecutionClosure.Logs = mergeLogs(taskExecutionClosure.Logs, request.Event.Logs) if len(request.Event.Reasons) > 0 { for _, reason := range request.Event.Reasons { - taskExecutionClosure.Reasons = append( // TODO: this is where to unpack batch + taskExecutionClosure.Reasons = append( taskExecutionClosure.Reasons, &admin.Reason{ OccurredAt: reason.OccurredAt, Message: reason.Reason, }) } - // TODO: avoid dupes? - // taskExecutionClosure.Reason = request.Event.Reason + taskExecutionClosure.Reason = request.Event.Reasons[len(request.Event.Reasons)-1].Reason } else if len(request.Event.Reason) > 0 { if taskExecutionClosure.Reason != request.Event.Reason { // by tracking a time-series of reasons we increase the size of the TaskExecutionClosure in scenarios where diff --git a/pkg/repositories/transformers/task_execution_test.go b/pkg/repositories/transformers/task_execution_test.go index e3155b12f..235fbfcd9 100644 --- a/pkg/repositories/transformers/task_execution_test.go +++ b/pkg/repositories/transformers/task_execution_test.go @@ -285,7 +285,7 @@ func TestCreateTaskExecutionModelQueued(t *testing.T) { UpdatedAt: taskEventOccurredAtProto, Reason: "Task was scheduled", Reasons: []*admin.Reason{ - &admin.Reason{ + { OccurredAt: taskEventOccurredAtProto, Message: "Task was scheduled", }, @@ -406,6 +406,93 @@ func TestCreateTaskExecutionModelRunning(t *testing.T) { }, taskExecutionModel) } +func TestCreateTaskExecutionModelSingleEvents(t *testing.T) { + taskExecutionModel, err := CreateTaskExecutionModel(context.TODO(), CreateTaskExecutionModelInput{ + Request: &admin.TaskExecutionEventRequest{ + Event: &event.TaskExecutionEvent{ + TaskId: sampleTaskID, + ParentNodeExecutionId: sampleNodeExecID, + Phase: core.TaskExecution_RUNNING, + PhaseVersion: uint32(2), + RetryAttempt: 1, + InputValue: &event.TaskExecutionEvent_InputUri{ + InputUri: testInputURI, + }, + OutputResult: &event.TaskExecutionEvent_OutputUri{ + OutputUri: "output uri", + }, + OccurredAt: taskEventOccurredAtProto, + Reason: "Task event 1", + }, + }, + }) + assert.Nil(t, err) + + expectedClosure := &admin.TaskExecutionClosure{ + Phase: core.TaskExecution_RUNNING, + StartedAt: taskEventOccurredAtProto, + CreatedAt: taskEventOccurredAtProto, + UpdatedAt: taskEventOccurredAtProto, + Reason: "Task event 1", + Reasons: []*admin.Reason{ + {OccurredAt: taskEventOccurredAtProto, Message: "Task event 1"}, + }, + } + expectedClosureBytes, err := proto.Marshal(expectedClosure) + assert.Nil(t, err) + assert.Equal(t, expectedClosureBytes, taskExecutionModel.Closure) +} + +func TestCreateTaskExecutionModelBatchedEvents(t *testing.T) { + secondTaskEventOccurredAt := taskEventOccurredAt.Add(time.Second) + secondTaskEventOccurredAtProto, _ := ptypes.TimestampProto(secondTaskEventOccurredAt) + taskExecutionModel, err := CreateTaskExecutionModel(context.TODO(), CreateTaskExecutionModelInput{ + Request: &admin.TaskExecutionEventRequest{ + Event: &event.TaskExecutionEvent{ + TaskId: sampleTaskID, + ParentNodeExecutionId: sampleNodeExecID, + Phase: core.TaskExecution_RUNNING, + PhaseVersion: uint32(2), + RetryAttempt: 1, + InputValue: &event.TaskExecutionEvent_InputUri{ + InputUri: testInputURI, + }, + OutputResult: &event.TaskExecutionEvent_OutputUri{ + OutputUri: "output uri", + }, + OccurredAt: taskEventOccurredAtProto, + Reason: "Task event 1", // Here for backwards compatibility + Reasons: []*event.BatchedReason{ + { + OccurredAt: taskEventOccurredAtProto, + Reason: "Task event 1", + }, + { + OccurredAt: secondTaskEventOccurredAtProto, + Reason: "Task event 2", + }, + }, + }, + }, + }) + assert.Nil(t, err) + + expectedClosure := &admin.TaskExecutionClosure{ + Phase: core.TaskExecution_RUNNING, + StartedAt: taskEventOccurredAtProto, + CreatedAt: taskEventOccurredAtProto, + UpdatedAt: taskEventOccurredAtProto, + Reason: "Task event 2", + Reasons: []*admin.Reason{ + {OccurredAt: taskEventOccurredAtProto, Message: "Task event 1"}, + {OccurredAt: secondTaskEventOccurredAtProto, Message: "Task event 2"}, + }, + } + expectedClosureBytes, err := proto.Marshal(expectedClosure) + assert.Nil(t, err) + assert.Equal(t, expectedClosureBytes, taskExecutionModel.Closure) +} + func TestUpdateTaskExecutionModelRunningToFailed(t *testing.T) { existingClosure := &admin.TaskExecutionClosure{ Phase: core.TaskExecution_RUNNING, @@ -425,7 +512,7 @@ func TestUpdateTaskExecutionModelRunningToFailed(t *testing.T) { }), Reason: "Task was scheduled", Reasons: []*admin.Reason{ - &admin.Reason{ + { OccurredAt: taskEventOccurredAtProto, Message: "Task was scheduled", }, @@ -526,11 +613,11 @@ func TestUpdateTaskExecutionModelRunningToFailed(t *testing.T) { }), Reason: "task failed", Reasons: []*admin.Reason{ - &admin.Reason{ + { OccurredAt: taskEventOccurredAtProto, Message: "Task was scheduled", }, - &admin.Reason{ + { OccurredAt: occuredAtProto, Message: "task failed", }, @@ -569,6 +656,206 @@ func TestUpdateTaskExecutionModelRunningToFailed(t *testing.T) { } +func TestUpdateTaskExecutionModelSingleEvents(t *testing.T) { + existingClosure := &admin.TaskExecutionClosure{ + Phase: core.TaskExecution_RUNNING, + StartedAt: taskEventOccurredAtProto, + CreatedAt: taskEventOccurredAtProto, + UpdatedAt: taskEventOccurredAtProto, + Reason: "Task was scheduled", + Reasons: []*admin.Reason{ + { + OccurredAt: taskEventOccurredAtProto, + Message: "Task was scheduled", + }, + }, + } + + closureBytes, err := proto.Marshal(existingClosure) + assert.Nil(t, err) + + existingTaskExecution := models.TaskExecution{ + TaskExecutionKey: models.TaskExecutionKey{ + TaskKey: models.TaskKey{ + Project: sampleTaskID.Project, + Domain: sampleTaskID.Domain, + Name: sampleTaskID.Name, + Version: sampleTaskID.Version, + }, + NodeExecutionKey: models.NodeExecutionKey{ + NodeID: sampleNodeExecID.NodeId, + ExecutionKey: models.ExecutionKey{ + Project: sampleNodeExecID.ExecutionId.Project, + Domain: sampleNodeExecID.ExecutionId.Domain, + Name: sampleNodeExecID.ExecutionId.Name, + }, + }, + RetryAttempt: &retryAttemptValue, + }, + Phase: "TaskExecutionPhase_TASK_PHASE_RUNNING", + InputURI: "input uri", + Closure: closureBytes, + StartedAt: &taskEventOccurredAt, + TaskExecutionCreatedAt: &taskEventOccurredAt, + TaskExecutionUpdatedAt: &taskEventOccurredAt, + } + + occuredAt := taskEventOccurredAt.Add(time.Minute) + occuredAtProto, err := ptypes.TimestampProto(occuredAt) + assert.Nil(t, err) + + taskEventRequest := &admin.TaskExecutionEventRequest{ + Event: &event.TaskExecutionEvent{ + TaskId: sampleTaskID, + ParentNodeExecutionId: sampleNodeExecID, + Phase: core.TaskExecution_RUNNING, + RetryAttempt: 1, + InputValue: &event.TaskExecutionEvent_InputUri{ + InputUri: testInputURI, + }, + OutputResult: &event.TaskExecutionEvent_OutputUri{ + OutputUri: "output uri", + }, + OccurredAt: occuredAtProto, + Reason: "update 1", + }, + } + + err = UpdateTaskExecutionModel(context.TODO(), taskEventRequest, &existingTaskExecution, + interfaces.InlineEventDataPolicyStoreInline, commonMocks.GetMockStorageClient()) + assert.Nil(t, err) + + expectedClosure := &admin.TaskExecutionClosure{ + Phase: core.TaskExecution_RUNNING, + StartedAt: taskEventOccurredAtProto, + UpdatedAt: occuredAtProto, + CreatedAt: taskEventOccurredAtProto, + Reason: "update 1", + Reasons: []*admin.Reason{ + { + OccurredAt: taskEventOccurredAtProto, + Message: "Task was scheduled", + }, + { + OccurredAt: occuredAtProto, + Message: "update 1", + }, + }, + } + + expectedClosureBytes, err := proto.Marshal(expectedClosure) + assert.Nil(t, err) + assert.Equal(t, expectedClosureBytes, existingTaskExecution.Closure) +} + +func TestUpdateTaskExecutionModelBatchedEvents(t *testing.T) { + existingClosure := &admin.TaskExecutionClosure{ + Phase: core.TaskExecution_RUNNING, + StartedAt: taskEventOccurredAtProto, + CreatedAt: taskEventOccurredAtProto, + UpdatedAt: taskEventOccurredAtProto, + Reason: "Task was scheduled", + Reasons: []*admin.Reason{ + { + OccurredAt: taskEventOccurredAtProto, + Message: "Task was scheduled", + }, + }, + } + + closureBytes, err := proto.Marshal(existingClosure) + assert.Nil(t, err) + + existingTaskExecution := models.TaskExecution{ + TaskExecutionKey: models.TaskExecutionKey{ + TaskKey: models.TaskKey{ + Project: sampleTaskID.Project, + Domain: sampleTaskID.Domain, + Name: sampleTaskID.Name, + Version: sampleTaskID.Version, + }, + NodeExecutionKey: models.NodeExecutionKey{ + NodeID: sampleNodeExecID.NodeId, + ExecutionKey: models.ExecutionKey{ + Project: sampleNodeExecID.ExecutionId.Project, + Domain: sampleNodeExecID.ExecutionId.Domain, + Name: sampleNodeExecID.ExecutionId.Name, + }, + }, + RetryAttempt: &retryAttemptValue, + }, + Phase: "TaskExecutionPhase_TASK_PHASE_RUNNING", + InputURI: "input uri", + Closure: closureBytes, + StartedAt: &taskEventOccurredAt, + TaskExecutionCreatedAt: &taskEventOccurredAt, + TaskExecutionUpdatedAt: &taskEventOccurredAt, + } + + occuredAt := taskEventOccurredAt.Add(time.Minute) + occuredAtProto, err := ptypes.TimestampProto(occuredAt) + assert.Nil(t, err) + secondOccuredAt := taskEventOccurredAt.Add(time.Minute * 2) + secondOccuredAtProto, err := ptypes.TimestampProto(secondOccuredAt) + + taskEventRequest := &admin.TaskExecutionEventRequest{ + Event: &event.TaskExecutionEvent{ + TaskId: sampleTaskID, + ParentNodeExecutionId: sampleNodeExecID, + Phase: core.TaskExecution_RUNNING, + RetryAttempt: 1, + InputValue: &event.TaskExecutionEvent_InputUri{ + InputUri: testInputURI, + }, + OutputResult: &event.TaskExecutionEvent_OutputUri{ + OutputUri: "output uri", + }, + OccurredAt: occuredAtProto, + Reason: "update 1", // Here for backwards compatibility + Reasons: []*event.BatchedReason{ + &event.BatchedReason{ + OccurredAt: occuredAtProto, + Reason: "update 1", + }, + &event.BatchedReason{ + OccurredAt: secondOccuredAtProto, + Reason: "update 2", + }, + }, + }, + } + + err = UpdateTaskExecutionModel(context.TODO(), taskEventRequest, &existingTaskExecution, + interfaces.InlineEventDataPolicyStoreInline, commonMocks.GetMockStorageClient()) + assert.Nil(t, err) + + expectedClosure := &admin.TaskExecutionClosure{ + Phase: core.TaskExecution_RUNNING, + StartedAt: taskEventOccurredAtProto, + UpdatedAt: occuredAtProto, + CreatedAt: taskEventOccurredAtProto, + Reason: "update 2", + Reasons: []*admin.Reason{ + { + OccurredAt: taskEventOccurredAtProto, + Message: "Task was scheduled", + }, + { + OccurredAt: occuredAtProto, + Message: "update 1", + }, + { + OccurredAt: secondOccuredAtProto, + Message: "update 2", + }, + }, + } + + expectedClosureBytes, err := proto.Marshal(expectedClosure) + assert.Nil(t, err) + assert.Equal(t, expectedClosureBytes, existingTaskExecution.Closure) +} + func TestFromTaskExecutionModel(t *testing.T) { taskClosure := &admin.TaskExecutionClosure{ Phase: core.TaskExecution_RUNNING,