Skip to content

Commit

Permalink
feat(core): flow SLA
Browse files Browse the repository at this point in the history
Fixes #5857
  • Loading branch information
loicmathieu committed Nov 22, 2024
1 parent 4431d1f commit 14ceeb6
Show file tree
Hide file tree
Showing 23 changed files with 590 additions and 9 deletions.
6 changes: 5 additions & 1 deletion core/src/main/java/io/kestra/core/models/flows/Flow.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.kestra.core.models.Label;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.sla.SLA;
import io.kestra.core.models.listeners.Listener;
import io.kestra.core.models.tasks.FlowableTask;
import io.kestra.core.models.tasks.Task;
Expand Down Expand Up @@ -116,7 +117,10 @@ public List<PluginDefault> getTaskDefaults() {
List<Output> outputs;

@Valid
protected AbstractRetry retry;
AbstractRetry retry;

List<SLA> sla;


public Logger logger() {
return LoggerFactory.getLogger("flow." + this.id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public Flow toFlow() {
.deleted(this.deleted)
.concurrency(this.concurrency)
.retry(this.retry)
.sla(this.sla)
.build();
}

Expand Down Expand Up @@ -89,6 +90,7 @@ public static FlowWithSource of(Flow flow, String source) {
.source(source)
.concurrency(flow.concurrency)
.retry(flow.retry)
.sla(flow.sla)
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package io.kestra.core.models.flows.sla;

/**
* Marker interface to denote an SLA as evaluating on execution change.
* ExecutionChangedSLA will be evaluated on each execution change, a.k.a. at the beginning of the processing of the execution queue.
*/
public interface ExecutionChangedSLA {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package io.kestra.core.models.flows.sla;

import java.time.Duration;

/**
* Marker interface to denote an SLA as evaluating using an {@link SLAMonitor}.
* ExecutionMonitoringSLA will be evaluated on a deadline defined by the monitor;
* the monitor is created when the execution is created.
*/
public interface ExecutionMonitoringSLA {
Duration getDuration();
}
57 changes: 57 additions & 0 deletions core/src/main/java/io/kestra/core/models/flows/sla/SLA.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package io.kestra.core.models.flows.sla;

import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.kestra.core.exceptions.InternalException;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.sla.types.ExecutionConditionSLA;
import io.kestra.core.models.flows.sla.types.MaxDurationSLA;
import io.kestra.core.runners.RunContext;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;

import java.util.Map;
import java.util.Optional;

@SuperBuilder
@Getter
@NoArgsConstructor
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", visible = true, include = JsonTypeInfo.As.EXISTING_PROPERTY)
@JsonSubTypes({
@JsonSubTypes.Type(value = MaxDurationSLA.class, name = "MAX_DURATION"),
@JsonSubTypes.Type(value = ExecutionConditionSLA.class, name = "EXECUTION_CONDITION"),
})
public abstract class SLA {
@NotNull
@NotEmpty
private String id;

@NotNull
private SLA.Type type;

@NotNull
private Behavior behavior;

// TODO prevent system labels
private Map<String, Object> labels;

/**
* Evaluate a flow SLA on an execution.
* In case the SLA is exceeded, a violation will be returned.
*/
public abstract Optional<Violation> evaluate(RunContext runContext, Execution execution) throws InternalException;

public enum Type {
MAX_DURATION,
EXECUTION_CONDITION,
}

public enum Behavior {
FAIL,
CANCEL,
NONE
}
}
21 changes: 21 additions & 0 deletions core/src/main/java/io/kestra/core/models/flows/sla/SLAMonitor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package io.kestra.core.models.flows.sla;

import io.kestra.core.models.HasUID;
import io.kestra.core.utils.IdUtils;
import lombok.Builder;
import lombok.Getter;

import java.time.Instant;

@Builder
@Getter
public class SLAMonitor implements HasUID {
String executionId;
String slaId;
Instant deadline;

@Override
public String uid() {
return IdUtils.fromParts(executionId, slaId);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package io.kestra.core.models.flows.sla;

import java.time.Instant;
import java.util.function.Consumer;

public interface SLAMonitorStorage {
void save(SLAMonitor slaMonitor);

void purge(String executionId);

void processExpired(Instant now, Consumer<SLAMonitor> consumer);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package io.kestra.core.models.flows.sla;

import java.util.Map;

public record Violation(String slaId, SLA.Behavior behavior, Map<String, Object> labels, String reason) {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package io.kestra.core.models.flows.sla.types;

import io.kestra.core.exceptions.InternalException;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.sla.ExecutionChangedSLA;
import io.kestra.core.models.flows.sla.SLA;
import io.kestra.core.models.flows.sla.Violation;
import io.kestra.core.runners.RunContext;
import io.kestra.core.utils.TruthUtils;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;

import java.util.Optional;

@SuperBuilder
@Getter
@NoArgsConstructor
public class ExecutionConditionSLA extends SLA implements ExecutionChangedSLA {
@NotNull
@NotEmpty
private String condition;

@Override
public Optional<Violation> evaluate(RunContext runContext, Execution execution) throws InternalException {
String result = runContext.render(this.getCondition());
if (TruthUtils.isTruthy(result)) {
String reason = "execution condition violation: " + this.getCondition() + ".";
return Optional.of(new Violation(this.getId(), this.getBehavior(), this.getLabels(), reason));
}

return Optional.empty();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package io.kestra.core.models.flows.sla.types;

import io.kestra.core.exceptions.InternalException;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.sla.ExecutionMonitoringSLA;
import io.kestra.core.models.flows.sla.SLA;
import io.kestra.core.models.flows.sla.Violation;
import io.kestra.core.runners.RunContext;
import jakarta.validation.constraints.NotNull;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;

import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Optional;

@SuperBuilder
@Getter
@NoArgsConstructor
public class MaxDurationSLA extends SLA implements ExecutionMonitoringSLA {
@NotNull
private Duration duration;

@Override
public Optional<Violation> evaluate(RunContext runContext, Execution execution) throws InternalException {
Duration executionDuration = Duration.between(execution.getState().getStartDate(), Instant.now());
if (executionDuration.compareTo(this.getDuration()) > 0) {
String reason = "execution duration of " + executionDuration.truncatedTo(ChronoUnit.MILLIS) + " exceed the maximum duration of " + this.getDuration() + ".";
return Optional.of(new Violation(this.getId(), this.getBehavior(), this.getLabels(), reason));
}

return Optional.empty();
}
}
94 changes: 90 additions & 4 deletions core/src/main/java/io/kestra/core/runners/ExecutorService.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
import com.google.common.collect.ImmutableMap;
import io.kestra.core.exceptions.InternalException;
import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.models.Label;
import io.kestra.core.models.executions.*;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.flows.sla.Violation;
import io.kestra.core.models.tasks.ExecutableTask;
import io.kestra.core.models.tasks.ExecutionUpdatableTask;
import io.kestra.core.models.tasks.FlowableTask;
Expand All @@ -15,17 +17,19 @@
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.tasks.WorkerGroup;
import io.kestra.core.models.tasks.retrys.AbstractRetry;
import io.kestra.core.services.ConditionService;
import io.kestra.core.services.ExecutionService;
import io.kestra.core.services.LogService;
import io.kestra.core.services.WorkerGroupService;
import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.services.*;
import io.kestra.core.utils.ListUtils;
import io.kestra.core.utils.MapUtils;
import io.kestra.plugin.core.flow.Pause;
import io.kestra.plugin.core.flow.Subflow;
import io.kestra.plugin.core.flow.WaitFor;
import io.kestra.plugin.core.flow.WorkingDirectory;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
Expand Down Expand Up @@ -75,6 +79,13 @@ public class ExecutorService {
@Inject
private WorkerGroupService workerGroupService;

@Inject
private SLAService slaService;

@Inject
@Named(QueueFactoryInterface.KILL_NAMED)
protected QueueInterface<ExecutionKilled> killQueue;

protected FlowExecutorInterface flowExecutorInterface() {
// bean is injected late, so we need to wait
if (this.flowExecutorInterface == null) {
Expand Down Expand Up @@ -1036,4 +1047,79 @@ public void log(Logger log, Boolean in, ExecutionKilledExecution value) {
value
);
}

/**
* Handle flow ExecutionChangedSLA on an executor.
* If there are SLA violations, it will take care of updating the execution based on the SLA behavior.
* @see #processViolation(RunContext, Executor, Violation)
* <p>
* WARNING: ATM, only the first violation will update the execution.
*/
public Executor handleExecutionChangedSLA(Executor executor) throws QueueException {
if (ListUtils.isEmpty(executor.getFlow().getSla()) || executor.getExecution().getState().isTerminated()) {
return executor;
}

RunContext runContext = runContextFactory.of(executor.getFlow(), executor.getExecution());
List<Violation> violations = slaService.evaluateExecutionChangedSLA(runContext, executor.getFlow(), executor.getExecution());
if (!violations.isEmpty()) {
// For now, we only consider the first violation to be capable of updating the execution.
// Other violations would only be logged.
Violation violation = violations.getFirst();
return processViolation(runContext, executor, violation);
}

return executor;
}

/**
* Process an SLA violation on an executor:
* - If behavior is FAIL or CANCEL: kill the execution, then return it with the new state.
* - If behavior is NONE: do nothing and return an unmodified executor.
* <p>
* Then, if there are labels, they are added to the SLA (modifying the executor)
*/
public Executor processViolation(RunContext runContext, Executor executor, Violation violation) throws QueueException {
Execution newExecution = switch (violation.behavior()) {
case FAIL -> {
runContext.logger().error("Flow fail due to SLA '{}' violated: {}", violation.slaId(), violation.reason());
yield markAs(executor.getExecution(), State.Type.FAILED);
}
case CANCEL -> markAs(executor.getExecution(), State.Type.CANCELLED);
case NONE -> executor.getExecution();
};

if (!MapUtils.isEmpty(violation.labels())) {
List<Label> labels = new ArrayList<>(newExecution.getLabels());
violation.labels().forEach((key, value) -> labels.add(new Label(key, String.valueOf(value))));
newExecution = newExecution.withLabels(labels);
}

return executor.withExecution(newExecution, "SLAViolation");
}

private Execution markAs(Execution execution, State.Type state) throws QueueException {
Execution newExecution = execution.findLastNotTerminated()
.map(taskRun -> {
try {
return execution.withTaskRun(taskRun.withState(state));
} catch (InternalException e) {
// in case we cannot update the last not terminated task run, we ignore it
return execution;
}
})
.orElse(execution)
.withState(state);

killQueue.emit(ExecutionKilledExecution
.builder()
.state(ExecutionKilled.State.REQUESTED)
.executionId(execution.getId())
.isOnKillCascade(false) // TODO we may offer the choice to the user here
.tenantId(execution.getTenantId())
.build()
);

return newExecution;
}
}
Loading

0 comments on commit 14ceeb6

Please sign in to comment.