Skip to content

Commit

Permalink
Improve Refresh Schema error messages (#12131)
Browse files Browse the repository at this point in the history
  • Loading branch information
gosusnp committed Apr 16, 2024
1 parent 704fb95 commit a1bef33
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 56 deletions.
1 change: 1 addition & 0 deletions airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9637,6 +9637,7 @@ components:
- refresh_schema
- heartbeat_timeout
- destination_timeout
- transient_error
AttemptStatus:
type: string
enum:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package io.airbyte.workers.temporal

import io.airbyte.commons.temporal.utils.ActivityFailureClassifier
import io.airbyte.config.ActorType
import io.airbyte.config.FailureReason
import org.apache.commons.lang3.exception.ExceptionUtils
import org.slf4j.LoggerFactory
import java.lang.String
import kotlin.time.Duration
import kotlin.time.toKotlinDuration

class FailureConverter {
@JvmOverloads
fun getFailureReason(
commandName: String,
actorType: ActorType,
e: Exception,
timeout: java.time.Duration? = null,
): FailureReason = getFailureReason(commandName, actorType, e, timeout?.toKotlinDuration())

fun getFailureReason(
commandName: String,
actorType: ActorType,
e: Exception,
timeout: Duration?,
): FailureReason {
val failureReason =
FailureReason()
.withFailureOrigin(if (actorType == ActorType.SOURCE) FailureReason.FailureOrigin.SOURCE else FailureReason.FailureOrigin.DESTINATION)
.withStacktrace(ExceptionUtils.getStackTrace(e))
val classifiedExc = ActivityFailureClassifier.classifyException(e)
LoggerFactory.getLogger("test").error("exception classified as $classifiedExc")
when (classifiedExc) {
ActivityFailureClassifier.TemporalFailureReason.HEARTBEAT ->
failureReason
.withFailureOrigin(FailureReason.FailureOrigin.AIRBYTE_PLATFORM)
.withFailureType(FailureReason.FailureType.SYSTEM_ERROR)
.withExternalMessage("$commandName connection failed because of an internal error.")
.withInternalMessage("$commandName pod failed to heartbeat, verify resource and heath of the worker/check pods.")

ActivityFailureClassifier.TemporalFailureReason.SCHEDULER_OVERLOADED ->
failureReason
.withFailureOrigin(FailureReason.FailureOrigin.AIRBYTE_PLATFORM)
.withFailureType(FailureReason.FailureType.TRANSIENT_ERROR)
.withExternalMessage("Airbyte Platform is experiencing a higher than usual load, please try again later.")
.withInternalMessage("$commandName wasn't able to start within the expected time, verify scheduler and worker load.")

ActivityFailureClassifier.TemporalFailureReason.OPERATION_TIMEOUT ->
failureReason
.withExternalMessage("$commandName took too long.")
.withInternalMessage("$commandName exceeded the timeout${timeout?.let { " of ${it.inWholeMinutes} minutes" }.orEmpty()}.")

ActivityFailureClassifier.TemporalFailureReason.UNKNOWN, ActivityFailureClassifier.TemporalFailureReason.NOT_A_TIMEOUT ->
failureReason
.withFailureOrigin(FailureReason.FailureOrigin.AIRBYTE_PLATFORM)
.withExternalMessage("$commandName failed because of an internal error")
.withInternalMessage("$commandName failed because of an internal error")

else ->
failureReason
.withFailureOrigin(FailureReason.FailureOrigin.AIRBYTE_PLATFORM)
.withExternalMessage("$commandName failed because of an internal error")
.withInternalMessage("$commandName failed because of an internal error")
}
return failureReason
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,18 @@
import datadog.trace.api.Trace;
import io.airbyte.commons.temporal.annotations.TemporalActivityStub;
import io.airbyte.commons.temporal.scheduling.CheckConnectionWorkflow;
import io.airbyte.commons.temporal.utils.ActivityFailureClassifier;
import io.airbyte.config.ActorType;
import io.airbyte.config.ConnectorJobOutput;
import io.airbyte.config.ConnectorJobOutput.OutputType;
import io.airbyte.config.FailureReason;
import io.airbyte.config.FailureReason.FailureOrigin;
import io.airbyte.config.StandardCheckConnectionInput;
import io.airbyte.config.StandardCheckConnectionOutput;
import io.airbyte.metrics.lib.ApmTraceUtils;
import io.airbyte.persistence.job.models.IntegrationLauncherConfig;
import io.airbyte.persistence.job.models.JobRunConfig;
import io.airbyte.workers.models.CheckConnectionInput;
import io.airbyte.workers.temporal.FailureConverter;
import io.temporal.workflow.Workflow;
import java.util.Map;
import java.util.UUID;
import org.apache.commons.lang3.exception.ExceptionUtils;

/**
* Check connection temporal workflow implementation.
Expand Down Expand Up @@ -64,7 +60,12 @@ public ConnectorJobOutput run(final JobRunConfig jobRunConfig,
new StandardCheckConnectionOutput()
.withStatus(StandardCheckConnectionOutput.Status.FAILED)
.withMessage("The check connection failed."))
.withFailureReason(getFailureReason(connectionConfiguration, e));
.withFailureReason(
new FailureConverter().getFailureReason(
"Check",
connectionConfiguration.getActorType(),
e,
activity.getCheckConnectionTimeout()));
}

return result;
Expand All @@ -81,42 +82,4 @@ private boolean checkUseWorkloadApiFlag(final UUID workspaceId) {
return activity.shouldUseWorkload(workspaceId);
}

private FailureReason getFailureReason(final StandardCheckConnectionInput connectionConfiguration, final Exception e) {
final FailureReason failureReason = new FailureReason()
.withFailureOrigin(
connectionConfiguration.getActorType() == ActorType.SOURCE ? FailureReason.FailureOrigin.SOURCE : FailureReason.FailureOrigin.DESTINATION)
.withStacktrace(ExceptionUtils.getStackTrace(e));
switch (ActivityFailureClassifier.classifyException(e)) {
case HEARTBEAT:
failureReason
.withFailureOrigin(FailureOrigin.AIRBYTE_PLATFORM)
.withExternalMessage("Check connection failed because of an internal error.")
.withInternalMessage("Check Pod failed to heartbeat, verify resource and heath of the worker/check pods.");
break;
case SCHEDULER_OVERLOADED:
failureReason
.withFailureOrigin(FailureOrigin.AIRBYTE_PLATFORM)
.withExternalMessage("Airbyte Platform is experiencing a higher than usual load, please try again later.")
.withInternalMessage("Check wasn't able to start within the expected time, check scheduler and worker load.");
break;
case OPERATION_TIMEOUT:
failureReason
.withExternalMessage("The check connection took too long.")
.withInternalMessage(
activity.getCheckConnectionTimeout() != null
? String.format("Check connection exceeded the timeout of %s minutes", activity.getCheckConnectionTimeout().toMinutes())
: "The check connection took too long.");
break;
case UNKNOWN:
case NOT_A_TIMEOUT:
default:
failureReason
.withFailureOrigin(FailureOrigin.AIRBYTE_PLATFORM)
.withExternalMessage("The check connection failed because of an internal error")
.withInternalMessage("The check connection failed because of an internal error");
break;
}
return failureReason;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@

package io.airbyte.workers.temporal.sync;

import io.airbyte.config.FailureReason;
import io.airbyte.config.ActorType;
import io.airbyte.config.FailureReason.FailureType;
import io.airbyte.config.StandardSyncOutput;
import io.airbyte.config.StandardSyncSummary;
import io.airbyte.config.SyncStats;
import io.airbyte.workers.temporal.FailureConverter;
import java.util.List;

/**
Expand Down Expand Up @@ -35,13 +37,12 @@ public class SyncOutputProvider {
* @return sync output
*/
public static StandardSyncOutput getRefreshSchemaFailure(final Exception e) {
final var failure = new FailureConverter().getFailureReason("Refresh Schema", ActorType.SOURCE, e);
if (failure.getFailureType() == null) {
failure.setFailureType(FailureType.REFRESH_SCHEMA);
}
return new StandardSyncOutput()
.withFailures(List.of(new FailureReason()
.withFailureType(FailureReason.FailureType.REFRESH_SCHEMA)
.withFailureOrigin(FailureReason.FailureOrigin.SOURCE)
.withExternalMessage("Failed to detect if there is a schema change. If the error persist please contact the support team.")
.withInternalMessage("Failed to launch the refresh schema activity because of: " + e.getMessage())
.withStacktrace(e.toString())))
.withFailures(List.of(failure))
.withStandardSyncSummary(EMPTY_FAILED_SYNC);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
import io.airbyte.commons.temporal.TemporalConstants;
import io.airbyte.commons.temporal.scheduling.SyncWorkflow;
import io.airbyte.config.ConnectionContext;
import io.airbyte.config.FailureReason;
import io.airbyte.config.FailureReason.FailureOrigin;
import io.airbyte.config.FailureReason.FailureType;
import io.airbyte.config.NormalizationInput;
import io.airbyte.config.NormalizationSummary;
import io.airbyte.config.OperatorWebhook;
Expand Down Expand Up @@ -422,10 +423,10 @@ void testGetProperFailureIfRefreshFails() throws Exception {
doThrow(new RuntimeException())
.when(refreshSchemaActivity).refreshSchemaV2(any());
final StandardSyncOutput output = execute();
assertEquals(output.getStandardSyncSummary().getStatus(), ReplicationStatus.FAILED);
assertEquals(output.getFailures().size(), 1);
assertEquals(output.getFailures().get(0).getFailureOrigin(), FailureReason.FailureOrigin.SOURCE);
assertEquals(output.getFailures().get(0).getFailureType(), FailureReason.FailureType.REFRESH_SCHEMA);
assertEquals(ReplicationStatus.FAILED, output.getStandardSyncSummary().getStatus());
assertEquals(1, output.getFailures().size());
assertEquals(FailureOrigin.AIRBYTE_PLATFORM, output.getFailures().get(0).getFailureOrigin());
assertEquals(FailureType.REFRESH_SCHEMA, output.getFailures().get(0).getFailureType());
}

@SuppressWarnings("ResultOfMethodCallIgnored")
Expand Down

0 comments on commit a1bef33

Please sign in to comment.