diff --git a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/CompoundExecutionOperator.java b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/CompoundExecutionOperator.java index 335f9c4303..a3f2a53836 100644 --- a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/CompoundExecutionOperator.java +++ b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/CompoundExecutionOperator.java @@ -119,6 +119,20 @@ public PipelineExecution restartStage(@Nonnull String executionId, @Nonnull Stri return execution; } + public PipelineExecution ignoreStageFailure( + @Nonnull String executionId, @Nonnull String stageId, String reason) { + PipelineExecution execution = repository.retrieve(ExecutionType.PIPELINE, executionId); + if (repository.handlesPartition(execution.getPartition())) { + runner.ignoreFailure(execution, stageId, reason); + } else { + log.info( + "Not pushing queue message action='ignoreFailure' for execution with foreign partition='{}'", + execution.getPartition()); + repository.ignoreStageFailure(executionId, stageId, reason); + } + return execution; + } + private PipelineExecution doInternal( Consumer runnerAction, Runnable repositoryAction, diff --git a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/ExecutionRunner.java b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/ExecutionRunner.java index b6b2a7a76c..55be5178c0 100644 --- a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/ExecutionRunner.java +++ b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/ExecutionRunner.java @@ -27,6 +27,11 @@ default void restart(@Nonnull PipelineExecution execution, @Nonnull String stage throw new UnsupportedOperationException(); } + default void ignoreFailure( + @Nonnull PipelineExecution execution, @Nonnull String stageId, String reason) { + throw new UnsupportedOperationException(); + } + default void reschedule(@Nonnull PipelineExecution execution) { throw new UnsupportedOperationException(); } diff --git a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/ExecutionRepository.java b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/ExecutionRepository.java index e9e1c5976a..12592de813 100644 --- a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/ExecutionRepository.java +++ b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/ExecutionRepository.java @@ -186,6 +186,8 @@ default boolean handlesPartition(@Nullable String partitionOfExecution) { // foreign executions default void restartStage(String executionId, String stageId) {} + default void ignoreStageFailure(String executionId, String stageId, String reason) {} + final class ExecutionCriteria { private int pageSize = 3500; private Collection statuses = new ArrayList<>(); diff --git a/orca-interlink/src/main/java/com/netflix/spinnaker/orca/interlink/events/IgnoreStageFailureInterlinkEvent.java b/orca-interlink/src/main/java/com/netflix/spinnaker/orca/interlink/events/IgnoreStageFailureInterlinkEvent.java new file mode 100644 index 0000000000..b36bd5b365 --- /dev/null +++ b/orca-interlink/src/main/java/com/netflix/spinnaker/orca/interlink/events/IgnoreStageFailureInterlinkEvent.java @@ -0,0 +1,66 @@ +/* + * Copyright 2020 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netflix.spinnaker.orca.interlink.events; + +import static com.netflix.spinnaker.orca.interlink.events.InterlinkEvent.EventType.IGNORE_FAILURE; + +import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionType; +import com.netflix.spinnaker.orca.pipeline.CompoundExecutionOperator; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * This event is published on the interlink as a result of a user IGNORING THE FAILURE of a stage on + * an orca instance that can't handle the partition for the given execution. + * + *

The event is then handled by an orca instance (listening on interlink) whose partition matches + * that of the execution. The resulting repository mutations of this event will then be peered by + * the PeeringAgent + */ +@Data +@AllArgsConstructor +@NoArgsConstructor +public class IgnoreStageFailureInterlinkEvent implements InterlinkEvent { + final EventType eventType = IGNORE_FAILURE; + @Nullable String partition; + @Nonnull ExecutionType executionType; + @Nonnull String executionId; + @Nonnull String stageId; + String reason; + + public IgnoreStageFailureInterlinkEvent( + @Nonnull ExecutionType executionType, + @Nonnull String executionId, + @Nonnull String stageId, + String reason) { + // for the moment, only ExecutionType.PIPELINE can have ignored stages + // but since we are defining the protocol on the wire here, let's be a bit future proof and + // accept potentially different execution types + this.executionType = executionType; + this.executionId = executionId; + this.stageId = stageId; + this.reason = reason; + } + + @Override + public void applyTo(CompoundExecutionOperator executionOperator) { + executionOperator.ignoreStageFailure(executionId, stageId, reason); + } +} diff --git a/orca-interlink/src/main/java/com/netflix/spinnaker/orca/interlink/events/InterlinkEvent.java b/orca-interlink/src/main/java/com/netflix/spinnaker/orca/interlink/events/InterlinkEvent.java index 8469422002..0ce62a71e5 100644 --- a/orca-interlink/src/main/java/com/netflix/spinnaker/orca/interlink/events/InterlinkEvent.java +++ b/orca-interlink/src/main/java/com/netflix/spinnaker/orca/interlink/events/InterlinkEvent.java @@ -44,7 +44,8 @@ @JsonSubTypes.Type(value = ResumeInterlinkEvent.class, name = "RESUME"), @JsonSubTypes.Type(value = DeleteInterlinkEvent.class, name = "DELETE"), @JsonSubTypes.Type(value = PatchStageInterlinkEvent.class, name = "PATCH"), - @JsonSubTypes.Type(value = RestartStageInterlinkEvent.class, name = "RESTART") + @JsonSubTypes.Type(value = RestartStageInterlinkEvent.class, name = "RESTART"), + @JsonSubTypes.Type(value = IgnoreStageFailureInterlinkEvent.class, name = "IGNORE_FAILURE") }) public interface InterlinkEvent { enum EventType { @@ -53,7 +54,8 @@ enum EventType { DELETE, RESUME, PATCH, - RESTART + RESTART, + IGNORE_FAILURE } @JsonIgnore diff --git a/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/QueueExecutionRunner.kt b/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/QueueExecutionRunner.kt index c53c176f36..e1a8e69a55 100644 --- a/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/QueueExecutionRunner.kt +++ b/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/QueueExecutionRunner.kt @@ -38,6 +38,10 @@ class QueueExecutionRunner( queue.push(RestartStage(execution, stageId, AuthenticatedRequest.getSpinnakerUser().orElse(null))) } + override fun ignoreFailure(execution: PipelineExecution, stageId: String, reason: String?) { + queue.push(IgnoreStageFailure(execution, stageId, AuthenticatedRequest.getSpinnakerUser().orElse(null), reason)) + } + override fun unpause(execution: PipelineExecution) { queue.push(ResumeExecution(execution)) } diff --git a/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/handler/IgnoreStageFailureHandler.kt b/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/handler/IgnoreStageFailureHandler.kt new file mode 100644 index 0000000000..a9823c183f --- /dev/null +++ b/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/handler/IgnoreStageFailureHandler.kt @@ -0,0 +1,83 @@ +/* + * Copyright 2017 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License") + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netflix.spinnaker.orca.q.handler + +import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionStatus.* +import com.netflix.spinnaker.orca.api.pipeline.models.StageExecution +import com.netflix.spinnaker.orca.pipeline.StageDefinitionBuilderFactory +import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository +import com.netflix.spinnaker.orca.q.IgnoreStageFailure +import com.netflix.spinnaker.orca.q.pending.PendingExecutionService +import com.netflix.spinnaker.q.Queue +import org.slf4j.Logger +import org.slf4j.LoggerFactory +import org.springframework.stereotype.Component +import java.time.Clock + +@Component +class IgnoreStageFailureHandler( + override val queue: Queue, + override val repository: ExecutionRepository, + override val stageDefinitionBuilderFactory: StageDefinitionBuilderFactory, + private val pendingExecutionService: PendingExecutionService, + private val clock: Clock +) : OrcaMessageHandler, StageBuilderAware { + + override val messageType = IgnoreStageFailure::class.java + + private val log: Logger get() = LoggerFactory.getLogger(javaClass) + + override fun handle(message: IgnoreStageFailure) { + message.withStage { stage -> + + if (!stage.status.isHalt) { + log.warn("Attempting to ignore the failure of stage $stage which is not halted. Will ignore") + } else if (stage.execution.shouldQueue()) { + // this pipeline is already running and has limitConcurrent = true + stage.execution.pipelineConfigId?.let { + log.info("Queueing IgnoreStageFailure of {} {} {}", stage.execution.application, stage.execution.name, stage.execution.id) + pendingExecutionService.enqueue(it, message) + } + } else { + stage.status = FAILED_CONTINUE + stage.addIgnoreFailureDetails(message.user, message.reason) + repository.storeStage(stage) + + val topLevelStage = stage.topLevelStage + if (topLevelStage != stage) { + topLevelStage.status = RUNNING + repository.storeStage(topLevelStage) + } + + val execution = topLevelStage.execution + stage.execution.updateStatus(RUNNING) + repository.updateStatus(execution) + + stage.startNext() + } + } + } + + private fun StageExecution.addIgnoreFailureDetails(user: String?, reason: String?) { + context["ignoreFailureDetails"] = mapOf( + "by" to (user ?: "anonymous"), + "reason" to (reason ?: "unspecified"), + "time" to clock.millis(), + "previousException" to context.remove("exception") + ) + } +} diff --git a/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/messages.kt b/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/messages.kt index 3100a1ac76..ae71303fe6 100644 --- a/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/messages.kt +++ b/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/messages.kt @@ -250,6 +250,22 @@ data class RestartStage( this(stage.execution.type, stage.execution.id, stage.execution.application, stage.id, user) } +@JsonTypeName("ignoreStageFailure") +data class IgnoreStageFailure( + override val executionType: ExecutionType, + override val executionId: String, + override val application: String, + override val stageId: String, + val user: String?, + val reason: String? +) : Message(), StageLevel { + constructor(source: PipelineExecution, stageId: String, user: String?, reason: String?) : + this(source.type, source.id, source.application, stageId, user, reason) + + constructor(stage: StageExecution, user: String?, reason: String?) : + this(stage.execution.type, stage.execution.id, stage.execution.application, stage.id, user, reason) +} + @JsonTypeName("resumeStage") data class ResumeStage( override val executionType: ExecutionType, diff --git a/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/migration/OrcaToKeikoSerializationMigrator.kt b/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/migration/OrcaToKeikoSerializationMigrator.kt index fd73fbb098..a54ba469f6 100644 --- a/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/migration/OrcaToKeikoSerializationMigrator.kt +++ b/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/migration/OrcaToKeikoSerializationMigrator.kt @@ -33,6 +33,7 @@ internal val orcaToKeikoTypes = mapOf( ".PauseStage" to "pauseStage", ".RestartStage" to "restartStage", ".ResumeStage" to "resumeStage", + ".IgnoreStageFailure" to "ignoreStageFailure", ".CancelStage" to "cancelStage", ".StartExecution" to "startExecution", ".RescheduleExecution" to "rescheduleExecution", diff --git a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/IgnoreStageFailureHandlerTest.kt b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/IgnoreStageFailureHandlerTest.kt new file mode 100644 index 0000000000..2aff4e6e42 --- /dev/null +++ b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/IgnoreStageFailureHandlerTest.kt @@ -0,0 +1,313 @@ +/* + * Copyright 2021 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netflix.spinnaker.orca.q.handler + +import com.netflix.spinnaker.orca.DefaultStageResolver +import com.netflix.spinnaker.orca.api.pipeline.graph.StageDefinitionBuilder +import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionStatus +import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionStatus.NOT_STARTED +import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionStatus.RUNNING +import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionStatus.TERMINAL +import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionStatus.FAILED_CONTINUE +import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionType.PIPELINE +import com.netflix.spinnaker.orca.api.pipeline.models.StageExecution +import com.netflix.spinnaker.orca.api.test.pipeline +import com.netflix.spinnaker.orca.api.test.stage +import com.netflix.spinnaker.orca.pipeline.DefaultStageDefinitionBuilderFactory +import com.netflix.spinnaker.orca.pipeline.model.StageExecutionImpl +import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository +import com.netflix.spinnaker.orca.q.* +import com.netflix.spinnaker.orca.q.pending.PendingExecutionService +import com.netflix.spinnaker.q.Queue +import com.netflix.spinnaker.time.fixedClock +import com.nhaarman.mockito_kotlin.any +import com.nhaarman.mockito_kotlin.argForWhich +import com.nhaarman.mockito_kotlin.argumentCaptor +import com.nhaarman.mockito_kotlin.atLeast +import com.nhaarman.mockito_kotlin.check +import com.nhaarman.mockito_kotlin.doReturn +import com.nhaarman.mockito_kotlin.eq +import com.nhaarman.mockito_kotlin.mock +import com.nhaarman.mockito_kotlin.never +import com.nhaarman.mockito_kotlin.reset +import com.nhaarman.mockito_kotlin.times +import com.nhaarman.mockito_kotlin.verify +import com.nhaarman.mockito_kotlin.whenever +import java.time.temporal.ChronoUnit.HOURS +import java.time.temporal.ChronoUnit.MINUTES +import kotlin.collections.contains +import kotlin.collections.filter +import kotlin.collections.first +import kotlin.collections.flatMap +import kotlin.collections.forEach +import kotlin.collections.listOf +import kotlin.collections.map +import kotlin.collections.mapOf +import kotlin.collections.set +import kotlin.collections.setOf +import org.assertj.core.api.Assertions.assertThat +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.it +import org.jetbrains.spek.api.lifecycle.CachingMode.GROUP +import org.jetbrains.spek.subject.SubjectSpek + +object IgnoreStageFailureHandlerTest : SubjectSpek({ + val queue: Queue = mock() + val repository: ExecutionRepository = mock() + val pendingExecutionService: PendingExecutionService = mock() + val clock = fixedClock() + + subject(GROUP) { + IgnoreStageFailureHandler( + queue, + repository, + DefaultStageDefinitionBuilderFactory( + DefaultStageResolver( + StageDefinitionBuildersProvider( + listOf( + singleTaskStage, + stageWithSyntheticBefore, + stageWithNestedSynthetics + ) + ) + ) + ), + pendingExecutionService, + clock + ) + } + + fun resetMocks() = reset(queue, repository) + + ExecutionStatus + .values() + .filter { !it.isHalt } + .forEach { unhaltedStatus -> + describe("trying to ignore the failure of a $unhaltedStatus stage") { + val pipeline = pipeline { + application = "foo" + status = RUNNING + startTime = clock.instant().minus(1, HOURS).toEpochMilli() + stage { + refId = "1" + singleTaskStage.plan(this) + status = unhaltedStatus + startTime = clock.instant().minus(1, HOURS).toEpochMilli() + } + stage { + refId = "2" + requisiteStageRefIds = listOf("1") + singleTaskStage.plan(this) + status = NOT_STARTED + } + } + val message = IgnoreStageFailure(pipeline.type, pipeline.id, "foo", pipeline.stageByRef("1").id, "aalhamali@coveo.com", null) + + beforeGroup { + whenever(repository.retrieve(message.executionType, message.executionId)) doReturn pipeline + } + + afterGroup(::resetMocks) + + action("the handler receives a message") { + subject.handle(message) + } + + it("does not modify the stage status") { + verify(repository, never()).store(any()) + } + + it("does not run any stages") { + verify(queue, never()).push(any()) + } + } + } + + describe("ignoring the failure of a failed stage with no downstream stages") { + val pipeline = pipeline { + application = "foo" + id = "1234" + status = TERMINAL + startTime = clock.instant().minus(1, HOURS).toEpochMilli() + endTime = clock.instant().minus(30, MINUTES).toEpochMilli() + stage { + refId = "1" + singleTaskStage.plan(this) + status = TERMINAL + startTime = clock.instant().minus(1, HOURS).toEpochMilli() + endTime = clock.instant().minus(59, MINUTES).toEpochMilli() + } + } + val message = IgnoreStageFailure(pipeline.type, pipeline.id, "foo", pipeline.stageByRef("1").id, "aalhamali@coveo.com", null) + + beforeGroup { + whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + } + + afterGroup(::resetMocks) + + action("the handler receives a message") { + subject.handle(message) + } + + it("changes the stage's status to FAILED_CONTINUE") { + verify(repository).storeStage( + check { + assertThat(it.id).isEqualTo(message.stageId) + assertThat(it.status).isEqualTo(FAILED_CONTINUE) + } + ) + } + + it("recommences the pipeline") { + verify(repository).updateStatus( + check { + assertThat(it.id).isEqualTo(pipeline.id) + assertThat(it.status).isEqualTo(RUNNING) + } + ) + } + + it("attempts to start the next downstream stage, but finds none, so sends a CompleteExecution message") { + verify(queue).push(any()) + } + } + + describe("ignoring the failure of a failed stage with downstream stages") { + val pipeline = pipeline { + application = "foo" + id = "1234" + status = TERMINAL + startTime = clock.instant().minus(1, HOURS).toEpochMilli() + endTime = clock.instant().minus(30, MINUTES).toEpochMilli() + stage { + refId = "1" + singleTaskStage.plan(this) + status = TERMINAL + startTime = clock.instant().minus(1, HOURS).toEpochMilli() + endTime = clock.instant().minus(59, MINUTES).toEpochMilli() + } + stage { + refId = "2" + requisiteStageRefIds = listOf("1") + singleTaskStage.plan(this) + status = NOT_STARTED + } + stage { + refId = "3" + requisiteStageRefIds = listOf("1") + singleTaskStage.plan(this) + status = NOT_STARTED + } + stage { + refId = "4" + requisiteStageRefIds = listOf("2") + singleTaskStage.plan(this) + status = NOT_STARTED + } + } + val message = IgnoreStageFailure(pipeline.type, pipeline.id, "foo", pipeline.stageByRef("1").id, "aalhamali@coveo.com", null) + + beforeGroup { + whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + } + + afterGroup(::resetMocks) + + action("the handler receives a message") { + subject.handle(message) + } + + it("changes the stage's status to FAILED_CONTINUE") { + verify(repository).storeStage( + check { + assertThat(it.id).isEqualTo(message.stageId) + assertThat(it.status).isEqualTo(FAILED_CONTINUE) + } + ) + } + + it("recommences the pipeline") { + verify(repository).updateStatus( + check { + assertThat(it.id).isEqualTo(pipeline.id) + assertThat(it.status).isEqualTo(RUNNING) + } + ) + } + + it("sends a StartStage message for all downstream stages") { + argumentCaptor().apply { + verify(queue, times(2)).push(capture()) + assertThat(allValues.map { it.stageId }.toSet()).isEqualTo(pipeline.stages.filter{"1" in it.requisiteStageRefIds}.map { it.id }.toSet()) + } + } + } + + describe("ignoring the failure of a failed in a running pipeline") { + val pipeline = pipeline { + application = "foo" + id = "1234" + status = RUNNING + startTime = clock.instant().minus(1, HOURS).toEpochMilli() + endTime = clock.instant().minus(30, MINUTES).toEpochMilli() + stage { + refId = "1" + singleTaskStage.plan(this) + status = TERMINAL + startTime = clock.instant().minus(1, HOURS).toEpochMilli() + endTime = clock.instant().minus(59, MINUTES).toEpochMilli() + } + stage { + refId = "2" + requisiteStageRefIds = listOf("1") + singleTaskStage.plan(this) + status = NOT_STARTED + } + stage { + refId = "3" + singleTaskStage.plan(this) + status = RUNNING + startTime = clock.instant().minus(1, HOURS).toEpochMilli() + } + } + val message = IgnoreStageFailure(pipeline.type, pipeline.id, "foo", pipeline.stageByRef("1").id, "aalhamali@coveo.com", null) + + beforeGroup { + whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + } + + afterGroup(::resetMocks) + + action("the handler receives a message") { + subject.handle(message) + } + + it("changes the stage's status to FAILED_CONTINUE") { + verify(repository).storeStage( + check { + assertThat(it.id).isEqualTo(message.stageId) + assertThat(it.status).isEqualTo(FAILED_CONTINUE) + } + ) + } + + it("sends a StartStage message for all downstream stages") { + verify(queue).push(any()) + } + } +}) diff --git a/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepository.kt b/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepository.kt index 07dc3e99a1..36ab1380b2 100644 --- a/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepository.kt +++ b/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepository.kt @@ -33,13 +33,7 @@ import com.netflix.spinnaker.orca.api.pipeline.models.PipelineExecution import com.netflix.spinnaker.orca.api.pipeline.models.StageExecution import com.netflix.spinnaker.orca.api.pipeline.persistence.ExecutionRepositoryListener import com.netflix.spinnaker.orca.interlink.Interlink -import com.netflix.spinnaker.orca.interlink.events.CancelInterlinkEvent -import com.netflix.spinnaker.orca.interlink.events.DeleteInterlinkEvent -import com.netflix.spinnaker.orca.interlink.events.InterlinkEvent -import com.netflix.spinnaker.orca.interlink.events.PatchStageInterlinkEvent -import com.netflix.spinnaker.orca.interlink.events.PauseInterlinkEvent -import com.netflix.spinnaker.orca.interlink.events.RestartStageInterlinkEvent -import com.netflix.spinnaker.orca.interlink.events.ResumeInterlinkEvent +import com.netflix.spinnaker.orca.interlink.events.* import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionNotFoundException import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository.ExecutionComparator @@ -260,6 +254,12 @@ class SqlExecutionRepository( } } + override fun ignoreStageFailure(executionId: String, stageId: String, reason: String?) { + doForeignAware(IgnoreStageFailureInterlinkEvent(PIPELINE, executionId, stageId, reason)) { + _, _ -> log.debug("ignoreStageFailure is a no-op for local executions") + } + } + override fun updateStatus(execution: PipelineExecution) { withPool(poolName) { jooq.transactional { diff --git a/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/TaskController.groovy b/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/TaskController.groovy index bf47e136df..0bd712e0fa 100644 --- a/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/TaskController.groovy +++ b/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/TaskController.groovy @@ -505,6 +505,18 @@ class TaskController { return executionOperator.restartStage(id, stageId) } + @PreAuthorize("hasPermission(this.getPipeline(#id)?.application, 'APPLICATION', 'EXECUTE')") + @RequestMapping(value = "/pipelines/{id}/stages/{stageId}/ignoreFailure", method = RequestMethod.PUT) + PipelineExecution ignoreFailureOfPipelineStage( + @PathVariable String id, @PathVariable String stageId, @RequestBody Map details) { + def pipeline = executionRepository.retrieve(PIPELINE, id) + def stage = pipeline.stageById(stageId) + if (!(boolean) stage.context.getCurrentOnly("allowIgnoreFailure", false)) { + throw new CannotUpdateExecutionStage("Stage does not allow ignoreFailure action") + } + return executionOperator.ignoreStageFailure(id, stageId, details["reason"]) + } + @PreAuthorize("hasPermission(this.getPipeline(#id)?.application, 'APPLICATION', 'READ')") @RequestMapping(value = "/pipelines/{id}/evaluateExpression", method = RequestMethod.GET) Map evaluateExpressionForExecution(@PathVariable("id") String id,