diff --git a/core/src/main/java/io/kestra/core/models/flows/Flow.java b/core/src/main/java/io/kestra/core/models/flows/Flow.java index 04c01d45a8..4fb798347f 100644 --- a/core/src/main/java/io/kestra/core/models/flows/Flow.java +++ b/core/src/main/java/io/kestra/core/models/flows/Flow.java @@ -13,6 +13,7 @@ import io.kestra.core.models.Label; import io.kestra.core.models.annotations.PluginProperty; import io.kestra.core.models.executions.Execution; +import io.kestra.core.models.flows.sla.SLA; import io.kestra.core.models.listeners.Listener; import io.kestra.core.models.tasks.FlowableTask; import io.kestra.core.models.tasks.Task; @@ -116,7 +117,10 @@ public List getTaskDefaults() { List outputs; @Valid - protected AbstractRetry retry; + AbstractRetry retry; + + List sla; + public Logger logger() { return LoggerFactory.getLogger("flow." + this.id); diff --git a/core/src/main/java/io/kestra/core/models/flows/FlowWithSource.java b/core/src/main/java/io/kestra/core/models/flows/FlowWithSource.java index 8b4d796348..efa583b135 100644 --- a/core/src/main/java/io/kestra/core/models/flows/FlowWithSource.java +++ b/core/src/main/java/io/kestra/core/models/flows/FlowWithSource.java @@ -36,6 +36,7 @@ public Flow toFlow() { .deleted(this.deleted) .concurrency(this.concurrency) .retry(this.retry) + .sla(this.sla) .build(); } @@ -89,6 +90,7 @@ public static FlowWithSource of(Flow flow, String source) { .source(source) .concurrency(flow.concurrency) .retry(flow.retry) + .sla(flow.sla) .build(); } } diff --git a/core/src/main/java/io/kestra/core/models/flows/sla/ExecutionChangedSLA.java b/core/src/main/java/io/kestra/core/models/flows/sla/ExecutionChangedSLA.java new file mode 100644 index 0000000000..520847be12 --- /dev/null +++ b/core/src/main/java/io/kestra/core/models/flows/sla/ExecutionChangedSLA.java @@ -0,0 +1,8 @@ +package io.kestra.core.models.flows.sla; + +/** + * Marker interface to denote an SLA as evaluating on execution change. + * ExecutionChangedSLA will be evaluated on each execution change, a.k.a. at the beginning of the processing of the execution queue. + */ +public interface ExecutionChangedSLA { +} diff --git a/core/src/main/java/io/kestra/core/models/flows/sla/ExecutionMonitoringSLA.java b/core/src/main/java/io/kestra/core/models/flows/sla/ExecutionMonitoringSLA.java new file mode 100644 index 0000000000..da98fe0257 --- /dev/null +++ b/core/src/main/java/io/kestra/core/models/flows/sla/ExecutionMonitoringSLA.java @@ -0,0 +1,12 @@ +package io.kestra.core.models.flows.sla; + +import java.time.Duration; + +/** + * Marker interface to denote an SLA as evaluating using an {@link SLAMonitor}. + * ExecutionMonitoringSLA will be evaluated on a deadline defined by the monitor; + * the monitor is created when the execution is created. + */ +public interface ExecutionMonitoringSLA { + Duration getDuration(); +} diff --git a/core/src/main/java/io/kestra/core/models/flows/sla/SLA.java b/core/src/main/java/io/kestra/core/models/flows/sla/SLA.java new file mode 100644 index 0000000000..c61003fbd8 --- /dev/null +++ b/core/src/main/java/io/kestra/core/models/flows/sla/SLA.java @@ -0,0 +1,57 @@ +package io.kestra.core.models.flows.sla; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import io.kestra.core.exceptions.InternalException; +import io.kestra.core.models.executions.Execution; +import io.kestra.core.models.flows.sla.types.ExecutionConditionSLA; +import io.kestra.core.models.flows.sla.types.MaxDurationSLA; +import io.kestra.core.runners.RunContext; +import jakarta.validation.constraints.NotEmpty; +import jakarta.validation.constraints.NotNull; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; + +import java.util.Map; +import java.util.Optional; + +@SuperBuilder +@Getter +@NoArgsConstructor +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", visible = true, include = JsonTypeInfo.As.EXISTING_PROPERTY) +@JsonSubTypes({ + @JsonSubTypes.Type(value = MaxDurationSLA.class, name = "MAX_DURATION"), + @JsonSubTypes.Type(value = ExecutionConditionSLA.class, name = "EXECUTION_CONDITION"), +}) +public abstract class SLA { + @NotNull + @NotEmpty + private String id; + + @NotNull + private SLA.Type type; + + @NotNull + private Behavior behavior; + + // TODO prevent system labels + private Map labels; + + /** + * Evaluate a flow SLA on an execution. + * In case the SLA is exceeded, a violation will be returned. + */ + public abstract Optional evaluate(RunContext runContext, Execution execution) throws InternalException; + + public enum Type { + MAX_DURATION, + EXECUTION_CONDITION, + } + + public enum Behavior { + FAIL, + CANCEL, + NONE + } +} diff --git a/core/src/main/java/io/kestra/core/models/flows/sla/SLAMonitor.java b/core/src/main/java/io/kestra/core/models/flows/sla/SLAMonitor.java new file mode 100644 index 0000000000..884f563252 --- /dev/null +++ b/core/src/main/java/io/kestra/core/models/flows/sla/SLAMonitor.java @@ -0,0 +1,21 @@ +package io.kestra.core.models.flows.sla; + +import io.kestra.core.models.HasUID; +import io.kestra.core.utils.IdUtils; +import lombok.Builder; +import lombok.Getter; + +import java.time.Instant; + +@Builder +@Getter +public class SLAMonitor implements HasUID { + String executionId; + String slaId; + Instant deadline; + + @Override + public String uid() { + return IdUtils.fromParts(executionId, slaId); + } +} diff --git a/core/src/main/java/io/kestra/core/models/flows/sla/SLAMonitorStorage.java b/core/src/main/java/io/kestra/core/models/flows/sla/SLAMonitorStorage.java new file mode 100644 index 0000000000..12adeb18ec --- /dev/null +++ b/core/src/main/java/io/kestra/core/models/flows/sla/SLAMonitorStorage.java @@ -0,0 +1,12 @@ +package io.kestra.core.models.flows.sla; + +import java.time.Instant; +import java.util.function.Consumer; + +public interface SLAMonitorStorage { + void save(SLAMonitor slaMonitor); + + void purge(String executionId); + + void processExpired(Instant now, Consumer consumer); +} diff --git a/core/src/main/java/io/kestra/core/models/flows/sla/Violation.java b/core/src/main/java/io/kestra/core/models/flows/sla/Violation.java new file mode 100644 index 0000000000..5330baebbc --- /dev/null +++ b/core/src/main/java/io/kestra/core/models/flows/sla/Violation.java @@ -0,0 +1,6 @@ +package io.kestra.core.models.flows.sla; + +import java.util.Map; + +public record Violation(String slaId, SLA.Behavior behavior, Map labels, String reason) { +} diff --git a/core/src/main/java/io/kestra/core/models/flows/sla/types/ExecutionConditionSLA.java b/core/src/main/java/io/kestra/core/models/flows/sla/types/ExecutionConditionSLA.java new file mode 100644 index 0000000000..19b2ca7817 --- /dev/null +++ b/core/src/main/java/io/kestra/core/models/flows/sla/types/ExecutionConditionSLA.java @@ -0,0 +1,36 @@ +package io.kestra.core.models.flows.sla.types; + +import io.kestra.core.exceptions.InternalException; +import io.kestra.core.models.executions.Execution; +import io.kestra.core.models.flows.sla.ExecutionChangedSLA; +import io.kestra.core.models.flows.sla.SLA; +import io.kestra.core.models.flows.sla.Violation; +import io.kestra.core.runners.RunContext; +import io.kestra.core.utils.TruthUtils; +import jakarta.validation.constraints.NotEmpty; +import jakarta.validation.constraints.NotNull; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; + +import java.util.Optional; + +@SuperBuilder +@Getter +@NoArgsConstructor +public class ExecutionConditionSLA extends SLA implements ExecutionChangedSLA { + @NotNull + @NotEmpty + private String condition; + + @Override + public Optional evaluate(RunContext runContext, Execution execution) throws InternalException { + String result = runContext.render(this.getCondition()); + if (TruthUtils.isTruthy(result)) { + String reason = "execution condition violation: " + this.getCondition() + "."; + return Optional.of(new Violation(this.getId(), this.getBehavior(), this.getLabels(), reason)); + } + + return Optional.empty(); + } +} diff --git a/core/src/main/java/io/kestra/core/models/flows/sla/types/MaxDurationSLA.java b/core/src/main/java/io/kestra/core/models/flows/sla/types/MaxDurationSLA.java new file mode 100644 index 0000000000..ae1f00a6a5 --- /dev/null +++ b/core/src/main/java/io/kestra/core/models/flows/sla/types/MaxDurationSLA.java @@ -0,0 +1,36 @@ +package io.kestra.core.models.flows.sla.types; + +import io.kestra.core.exceptions.InternalException; +import io.kestra.core.models.executions.Execution; +import io.kestra.core.models.flows.sla.ExecutionMonitoringSLA; +import io.kestra.core.models.flows.sla.SLA; +import io.kestra.core.models.flows.sla.Violation; +import io.kestra.core.runners.RunContext; +import jakarta.validation.constraints.NotNull; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; + +import java.time.Duration; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.Optional; + +@SuperBuilder +@Getter +@NoArgsConstructor +public class MaxDurationSLA extends SLA implements ExecutionMonitoringSLA { + @NotNull + private Duration duration; + + @Override + public Optional evaluate(RunContext runContext, Execution execution) throws InternalException { + Duration executionDuration = Duration.between(execution.getState().getStartDate(), Instant.now()); + if (executionDuration.compareTo(this.getDuration()) > 0) { + String reason = "execution duration of " + executionDuration.truncatedTo(ChronoUnit.MILLIS) + " exceed the maximum duration of " + this.getDuration() + "."; + return Optional.of(new Violation(this.getId(), this.getBehavior(), this.getLabels(), reason)); + } + + return Optional.empty(); + } +} diff --git a/core/src/main/java/io/kestra/core/runners/ExecutorService.java b/core/src/main/java/io/kestra/core/runners/ExecutorService.java index b096f98999..c1c7e3b5e4 100644 --- a/core/src/main/java/io/kestra/core/runners/ExecutorService.java +++ b/core/src/main/java/io/kestra/core/runners/ExecutorService.java @@ -3,9 +3,11 @@ import com.google.common.collect.ImmutableMap; import io.kestra.core.exceptions.InternalException; import io.kestra.core.metrics.MetricRegistry; +import io.kestra.core.models.Label; import io.kestra.core.models.executions.*; import io.kestra.core.models.flows.Flow; import io.kestra.core.models.flows.State; +import io.kestra.core.models.flows.sla.Violation; import io.kestra.core.models.tasks.ExecutableTask; import io.kestra.core.models.tasks.ExecutionUpdatableTask; import io.kestra.core.models.tasks.FlowableTask; @@ -15,17 +17,19 @@ import io.kestra.core.models.tasks.Task; import io.kestra.core.models.tasks.WorkerGroup; import io.kestra.core.models.tasks.retrys.AbstractRetry; -import io.kestra.core.services.ConditionService; -import io.kestra.core.services.ExecutionService; -import io.kestra.core.services.LogService; -import io.kestra.core.services.WorkerGroupService; +import io.kestra.core.queues.QueueException; +import io.kestra.core.queues.QueueFactoryInterface; +import io.kestra.core.queues.QueueInterface; +import io.kestra.core.services.*; import io.kestra.core.utils.ListUtils; +import io.kestra.core.utils.MapUtils; import io.kestra.plugin.core.flow.Pause; import io.kestra.plugin.core.flow.Subflow; import io.kestra.plugin.core.flow.WaitFor; import io.kestra.plugin.core.flow.WorkingDirectory; import io.micronaut.context.ApplicationContext; import jakarta.inject.Inject; +import jakarta.inject.Named; import jakarta.inject.Singleton; import lombok.extern.slf4j.Slf4j; import org.slf4j.Logger; @@ -75,6 +79,13 @@ public class ExecutorService { @Inject private WorkerGroupService workerGroupService; + @Inject + private SLAService slaService; + + @Inject + @Named(QueueFactoryInterface.KILL_NAMED) + protected QueueInterface killQueue; + protected FlowExecutorInterface flowExecutorInterface() { // bean is injected late, so we need to wait if (this.flowExecutorInterface == null) { @@ -1036,4 +1047,79 @@ public void log(Logger log, Boolean in, ExecutionKilledExecution value) { value ); } + + /** + * Handle flow ExecutionChangedSLA on an executor. + * If there are SLA violations, it will take care of updating the execution based on the SLA behavior. + * @see #processViolation(RunContext, Executor, Violation) + *

+ * WARNING: ATM, only the first violation will update the execution. + */ + public Executor handleExecutionChangedSLA(Executor executor) throws QueueException { + if (ListUtils.isEmpty(executor.getFlow().getSla()) || executor.getExecution().getState().isTerminated()) { + return executor; + } + + RunContext runContext = runContextFactory.of(executor.getFlow(), executor.getExecution()); + List violations = slaService.evaluateExecutionChangedSLA(runContext, executor.getFlow(), executor.getExecution()); + if (!violations.isEmpty()) { + // For now, we only consider the first violation to be capable of updating the execution. + // Other violations would only be logged. + Violation violation = violations.getFirst(); + return processViolation(runContext, executor, violation); + } + + return executor; + } + + /** + * Process an SLA violation on an executor: + * - If behavior is FAIL or CANCEL: kill the execution, then return it with the new state. + * - If behavior is NONE: do nothing and return an unmodified executor. + *

+ * Then, if there are labels, they are added to the SLA (modifying the executor) + */ + public Executor processViolation(RunContext runContext, Executor executor, Violation violation) throws QueueException { + Execution newExecution = switch (violation.behavior()) { + case FAIL -> { + runContext.logger().error("Flow fail due to SLA '{}' violated: {}", violation.slaId(), violation.reason()); + yield markAs(executor.getExecution(), State.Type.FAILED); + } + case CANCEL -> markAs(executor.getExecution(), State.Type.CANCELLED); + case NONE -> executor.getExecution(); + }; + + if (!MapUtils.isEmpty(violation.labels())) { + List