diff --git a/orca-api/src/main/java/com/netflix/spinnaker/orca/api/pipeline/models/StageExecution.java b/orca-api/src/main/java/com/netflix/spinnaker/orca/api/pipeline/models/StageExecution.java index b67ffa6352..5919343bb4 100644 --- a/orca-api/src/main/java/com/netflix/spinnaker/orca/api/pipeline/models/StageExecution.java +++ b/orca-api/src/main/java/com/netflix/spinnaker/orca/api/pipeline/models/StageExecution.java @@ -86,6 +86,11 @@ public interface StageExecution { void setContext(@Nonnull Map context); + @Nonnull + Map getOthers(); + + void setOthers(@Nonnull Map others); + /** TODO(rz): getOutputs(Class)? */ @Nonnull Map getOutputs(); diff --git a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/model/StageExecutionImpl.java b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/model/StageExecutionImpl.java index 7a1c9d377f..19d528055c 100644 --- a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/model/StageExecutionImpl.java +++ b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/model/StageExecutionImpl.java @@ -248,6 +248,16 @@ public void setOutputs(@Nonnull Map outputs) { this.outputs = outputs; } + private Map others = new HashMap<>(); + + public @Nonnull Map getOthers() { + return others; + } + + public void setOthers(@Nonnull Map others) { + this.others = others; + } + /** * Returns the tasks that are associated with this stage. Tasks are the most granular unit of work * in a stage. Because tasks can be dynamically composed, this list is open updated during a diff --git a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/DualExecutionRepository.kt b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/DualExecutionRepository.kt index b6362e4929..6bbec8a221 100644 --- a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/DualExecutionRepository.kt +++ b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/DualExecutionRepository.kt @@ -119,6 +119,14 @@ class DualExecutionRepository( select(stage.execution).updateStageContext(stage) } + override fun updateStageOthers(stage: StageExecution) { + select(stage.execution).updateStageOthers(stage) + } + + override fun deleteStageOthers(stage: StageExecution) { + select(stage.execution).deleteStageOthers(stage) + } + override fun removeStage(execution: PipelineExecution, stageId: String) { select(execution).removeStage(execution, stageId) } diff --git a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/ExecutionRepository.java b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/ExecutionRepository.java index e9e1c5976a..2968c12b6a 100644 --- a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/ExecutionRepository.java +++ b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/ExecutionRepository.java @@ -35,6 +35,10 @@ public interface ExecutionRepository { void updateStageContext(@Nonnull StageExecution stage); + void updateStageOthers(@Nonnull StageExecution stage); + + void deleteStageOthers(@Nonnull StageExecution stage); + void removeStage(@Nonnull PipelineExecution execution, @Nonnull String stageId); void addStage(@Nonnull StageExecution stage); diff --git a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/InMemoryExecutionRepository.kt b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/InMemoryExecutionRepository.kt index 4395c9f2bf..7799e34861 100644 --- a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/InMemoryExecutionRepository.kt +++ b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/InMemoryExecutionRepository.kt @@ -183,6 +183,14 @@ class InMemoryExecutionRepository : ExecutionRepository { // Do nothing } + override fun updateStageOthers(stage: StageExecution) { + // Do nothing + } + + override fun deleteStageOthers(stage: StageExecution) { + // Do nothing + } + override fun retrievePipelineForCorrelationId(correlationId: String): PipelineExecution { return retrieveByCorrelationId(PIPELINE, correlationId) } diff --git a/orca-echo/src/main/groovy/com/netflix/spinnaker/orca/echo/pipeline/ManualJudgmentStage.groovy b/orca-echo/src/main/groovy/com/netflix/spinnaker/orca/echo/pipeline/ManualJudgmentStage.groovy index 5f61847868..14f3b29190 100644 --- a/orca-echo/src/main/groovy/com/netflix/spinnaker/orca/echo/pipeline/ManualJudgmentStage.groovy +++ b/orca-echo/src/main/groovy/com/netflix/spinnaker/orca/echo/pipeline/ManualJudgmentStage.groovy @@ -25,7 +25,9 @@ import com.netflix.spinnaker.orca.api.pipeline.models.PipelineExecution import com.netflix.spinnaker.orca.api.pipeline.models.StageExecution import com.netflix.spinnaker.orca.api.pipeline.TaskResult import com.netflix.spinnaker.orca.echo.util.ManualJudgmentAuthorization -import com.netflix.spinnaker.orca.pipeline.model.PipelineExecutionImpl +import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionType +import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository +import org.springframework.beans.factory.annotation.Value import javax.annotation.Nonnull import java.util.concurrent.TimeUnit @@ -73,14 +75,20 @@ class ManualJudgmentStage implements StageDefinitionBuilder, AuthenticatedStage final long backoffPeriod = 15000 final long timeout = TimeUnit.DAYS.toMillis(3) + @Value('${spinnaker.manual-judgment-navigation:false}') + boolean manualJudgmentNavigation + private final EchoService echoService private final ManualJudgmentAuthorization manualJudgmentAuthorization + private final ExecutionRepository executionRepository @Autowired WaitForManualJudgmentTask(Optional echoService, - ManualJudgmentAuthorization manualJudgmentAuthorization) { + ManualJudgmentAuthorization manualJudgmentAuthorization, + ExecutionRepository executionRepository) { this.echoService = echoService.orElse(null) this.manualJudgmentAuthorization = manualJudgmentAuthorization + this.executionRepository = executionRepository } @Override @@ -89,14 +97,24 @@ class ManualJudgmentStage implements StageDefinitionBuilder, AuthenticatedStage String notificationState ExecutionStatus executionStatus + if (manualJudgmentNavigation) { + checkForAnyParentExecutions(stage) + } + switch (stageData.state) { case StageData.State.CONTINUE: notificationState = "manualJudgmentContinue" executionStatus = ExecutionStatus.SUCCEEDED + if (manualJudgmentNavigation) { + deleteLeafnodeAttributesFromTheParentExecutions(stage) + } break case StageData.State.STOP: notificationState = "manualJudgmentStop" executionStatus = ExecutionStatus.TERMINAL + if (manualJudgmentNavigation) { + deleteLeafnodeAttributesFromTheParentExecutions(stage) + } break default: notificationState = "manualJudgment" @@ -120,6 +138,69 @@ class ManualJudgmentStage implements StageDefinitionBuilder, AuthenticatedStage return TaskResult.builder(executionStatus).context(outputs).build() } + /** + * This method checks if this manual judgment stage is triggered by any other pipeline(parent execution). + * If yes, it fetches all the parent executions, which triggered this stage and sets the current + * running stage(manual judgment stage execution id and application name) to leafnode execution id and + * application name. + * + * p1 --> p2 --> p3 --> p4 (running manual judgment stage & waiting for judgment) + * + * p1 leafnodeExecutionId : p4 execution id + * p1 leafnodeApplicationName : p4 application name + * + * p2 leafnodeExecutionId : p4 execution id + * p2 leafnodeApplicationName : p4 application name + * + * p3 leafnodeExecutionId : p4 execution id + * p3 leafnodeApplicationName : p4 application name + * + * @param stage + */ + void checkForAnyParentExecutions(StageExecution stage) { + + def status = stage?.execution?.status + def trigger = stage?.execution?.trigger + def appName = stage?.execution?.application + def executionId = stage?.execution?.id + def stageId = stage?.execution?.id + while (ExecutionStatus.RUNNING.equals(status) && trigger && trigger.hasProperty("parentExecution")) { + PipelineExecution parentExecution = trigger?.parentExecution + parentExecution = executionRepository.retrieve(ExecutionType.PIPELINE, parentExecution.id) + parentExecution.getStages().each { + if (("pipeline").equals(it.getType()) && (ExecutionStatus.RUNNING.equals(it.getStatus()))) { + if (it.context && stageId.equals(it.context.executionId)) { + def others = [leafnodePipelineExecutionId: executionId, leafnodeApplicationName: appName] + it.setOthers(others) + stageId = it.execution.getId() + executionRepository.updateStageOthers(it) + } + } + } + trigger = parentExecution?.trigger + } + } + + /** + * This method deletes the leafnode attributes from all the parent stage executions. + * @param stage + */ + void deleteLeafnodeAttributesFromTheParentExecutions(StageExecution stage) { + + def status = stage?.execution?.status + def trigger = stage?.execution?.trigger + while (ExecutionStatus.RUNNING.equals(status) && trigger && trigger.hasProperty("parentExecution")) { + PipelineExecution parentExecution = trigger?.parentExecution + PipelineExecution execution = executionRepository.retrieve(ExecutionType.PIPELINE, parentExecution.id) + execution.getStages().each { + if (ExecutionStatus.RUNNING.equals(it.getStatus())) { + executionRepository.deleteStageOthers(it) + } + } + trigger = parentExecution?.trigger + } + } + Map processNotifications(StageExecution stage, StageData stageData, String notificationState) { if (echoService) { // sendNotifications will be true if using the new scheme for configuration notifications. diff --git a/orca-echo/src/test/groovy/com/netflix/spinnaker/orca/echo/pipeline/ManualJudgmentStageSpec.groovy b/orca-echo/src/test/groovy/com/netflix/spinnaker/orca/echo/pipeline/ManualJudgmentStageSpec.groovy index a64325789e..665240498f 100644 --- a/orca-echo/src/test/groovy/com/netflix/spinnaker/orca/echo/pipeline/ManualJudgmentStageSpec.groovy +++ b/orca-echo/src/test/groovy/com/netflix/spinnaker/orca/echo/pipeline/ManualJudgmentStageSpec.groovy @@ -26,6 +26,7 @@ import com.netflix.spinnaker.orca.echo.EchoService import com.netflix.spinnaker.orca.echo.util.ManualJudgmentAuthorization import com.netflix.spinnaker.orca.pipeline.model.PipelineExecutionImpl import com.netflix.spinnaker.orca.pipeline.model.StageExecutionImpl +import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository import spock.lang.Specification import spock.lang.Unroll import static com.netflix.spinnaker.orca.echo.pipeline.ManualJudgmentStage.Notification @@ -33,6 +34,7 @@ import static com.netflix.spinnaker.orca.echo.pipeline.ManualJudgmentStage.WaitF class ManualJudgmentStageSpec extends Specification { EchoService echoService = Mock(EchoService) + ExecutionRepository executionRepository = Mock(ExecutionRepository) FiatPermissionEvaluator fiatPermissionEvaluator = Mock(FiatPermissionEvaluator) @@ -48,7 +50,7 @@ class ManualJudgmentStageSpec extends Specification { @Unroll void "should return execution status based on judgmentStatus"() { given: - def task = new WaitForManualJudgmentTask(Optional.of(echoService), manualJudgmentAuthorization) + def task = new WaitForManualJudgmentTask(Optional.of(echoService), manualJudgmentAuthorization, executionRepository) when: def result = task.execute(new StageExecutionImpl(PipelineExecutionImpl.newPipeline("orca"), "", context)) @@ -73,7 +75,7 @@ class ManualJudgmentStageSpec extends Specification { new UserPermission().addResources([new Role('foo')]).setAdmin(isAdmin).view } - def task = new WaitForManualJudgmentTask(Optional.of(echoService), manualJudgmentAuthorization) + def task = new WaitForManualJudgmentTask(Optional.of(echoService), manualJudgmentAuthorization, executionRepository) when: def stage = new StageExecutionImpl(PipelineExecutionImpl.newPipeline("orca"), "", context) @@ -97,7 +99,7 @@ class ManualJudgmentStageSpec extends Specification { void "should only send notifications for supported types"() { given: - def task = new WaitForManualJudgmentTask(Optional.of(echoService), manualJudgmentAuthorization) + def task = new WaitForManualJudgmentTask(Optional.of(echoService), manualJudgmentAuthorization, executionRepository) when: def result = task.execute(new StageExecutionImpl(PipelineExecutionImpl.newPipeline("orca"), "", [notifications: [ @@ -118,7 +120,7 @@ class ManualJudgmentStageSpec extends Specification { @Unroll void "if deprecated notification configuration is in use, only send notifications for awaiting judgment state"() { given: - def task = new WaitForManualJudgmentTask(Optional.of(echoService), manualJudgmentAuthorization) + def task = new WaitForManualJudgmentTask(Optional.of(echoService), manualJudgmentAuthorization, executionRepository) when: def result = task.execute(new StageExecutionImpl(PipelineExecutionImpl.newPipeline("orca"), "", [ @@ -199,7 +201,7 @@ class ManualJudgmentStageSpec extends Specification { @Unroll void "should retain unknown fields in the notification context"() { given: - def task = new WaitForManualJudgmentTask(Optional.of(echoService), manualJudgmentAuthorization) + def task = new WaitForManualJudgmentTask(Optional.of(echoService), manualJudgmentAuthorization, executionRepository) def slackNotification = new Notification(type: "slack") slackNotification.setOther("customMessage", "hello slack") diff --git a/orca-redis/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/jedis/RedisExecutionRepository.java b/orca-redis/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/jedis/RedisExecutionRepository.java index 815f335e24..fe099db328 100644 --- a/orca-redis/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/jedis/RedisExecutionRepository.java +++ b/orca-redis/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/jedis/RedisExecutionRepository.java @@ -159,6 +159,44 @@ public void updateStageContext(@Nonnull StageExecution stage) { }); } + @Override + public void updateStageOthers(@Nonnull StageExecution stage) { + RedisClientDelegate delegate = getRedisDelegate(stage); + String key = executionKey(stage); + String contextKey = format("stage.%s.others", stage.getId()); + delegate.withCommandsClient( + c -> { + try { + c.hset(key, contextKey, mapper.writeValueAsString(stage.getOthers())); + } catch (JsonProcessingException e) { + throw new StageSerializationException( + format( + "Failed serializing stage, executionId: %s, stageId: %s", + stage.getExecution().getId(), stage.getId()), + e); + } + }); + } + + @Override + public void deleteStageOthers(@Nonnull StageExecution stage) { + RedisClientDelegate delegate = getRedisDelegate(stage); + String key = executionKey(stage); + String contextKey = format("stage.%s.others", stage.getId()); + delegate.withCommandsClient( + c -> { + try { + c.hdel(key, contextKey, mapper.writeValueAsString(stage.getOthers())); + } catch (JsonProcessingException e) { + throw new StageSerializationException( + format( + "Failed serializing stage, executionId: %s, stageId: %s", + stage.getExecution().getId(), stage.getId()), + e); + } + }); + } + @Override public void removeStage(@Nonnull PipelineExecution execution, @Nonnull String stageId) { RedisClientDelegate delegate = getRedisDelegate(execution); @@ -936,6 +974,11 @@ protected PipelineExecution buildExecution( } else { stage.setOutputs(emptyMap()); } + if (map.get(prefix + "others") != null) { + stage.setOthers(mapper.readValue(map.get(prefix + "others"), MAP_STRING_TO_OBJECT)); + } else { + stage.setOthers(emptyMap()); + } if (map.get(prefix + "tasks") != null) { stage.setTasks(mapper.readValue(map.get(prefix + "tasks"), LIST_OF_TASKS)); } else { diff --git a/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepository.kt b/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepository.kt index 07dc3e99a1..69a6c55ea5 100644 --- a/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepository.kt +++ b/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepository.kt @@ -146,6 +146,16 @@ class SqlExecutionRepository( storeStage(stage) } + override fun updateStageOthers(stage: StageExecution) { + storeStage(stage) + } + + override fun deleteStageOthers(stage: StageExecution) { + val others = mapOf() + stage.others = others; + storeStage(stage) + } + override fun removeStage(execution: PipelineExecution, stageId: String) { validateHandledPartitionOrThrow(execution)