From 5ef917232afcd1dc1b8fa61d27f3924b32c8fa59 Mon Sep 17 00:00:00 2001 From: Lake Mossman Date: Thu, 22 Feb 2024 09:37:40 -0800 Subject: [PATCH] Show actionable error for large builder responses (#11328) --- .../server/errors/ClientExceptionHandler.java | 37 +++++++++++++++++++ .../VersionedAirbyteStreamFactory.java | 4 +- .../command_runner/ProcessOutputParser.java | 8 ++++ .../config/ApplicationBeanFactory.java | 2 +- .../UnprocessableEntityException.java | 29 +++++++++++++++ .../UnprocessableEntityExceptionHandler.java | 29 +++++++++++++++ 6 files changed, 107 insertions(+), 2 deletions(-) create mode 100644 airbyte-commons-server/src/main/java/io/airbyte/commons/server/errors/ClientExceptionHandler.java create mode 100644 airbyte-connector-builder-server/src/main/java/io/airbyte/connector_builder/exceptions/UnprocessableEntityException.java create mode 100644 airbyte-connector-builder-server/src/main/java/io/airbyte/connector_builder/exceptions/UnprocessableEntityExceptionHandler.java diff --git a/airbyte-commons-server/src/main/java/io/airbyte/commons/server/errors/ClientExceptionHandler.java b/airbyte-commons-server/src/main/java/io/airbyte/commons/server/errors/ClientExceptionHandler.java new file mode 100644 index 00000000000..5f21b21763c --- /dev/null +++ b/airbyte-commons-server/src/main/java/io/airbyte/commons/server/errors/ClientExceptionHandler.java @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2020-2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.commons.server.errors; + +import io.airbyte.commons.json.Jsons; +import io.micronaut.http.HttpRequest; +import io.micronaut.http.HttpResponse; +import io.micronaut.http.HttpStatus; +import io.micronaut.http.MediaType; +import io.micronaut.http.annotation.Produces; +import io.micronaut.http.server.exceptions.ExceptionHandler; +import jakarta.inject.Singleton; +import org.openapitools.client.infrastructure.ClientException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Handles unprocessable content exceptions. + */ +@Produces +@Singleton +public class ClientExceptionHandler implements ExceptionHandler { + + private static final Logger LOGGER = LoggerFactory.getLogger(ClientExceptionHandler.class); + + @Override + public HttpResponse handle(final HttpRequest request, final ClientException exception) { + return HttpResponse.status(HttpStatus.valueOf(exception.getStatusCode())) + .body(Jsons.serialize(new MessageObject(exception.getMessage()))) + .contentType(MediaType.APPLICATION_JSON); + } + + private record MessageObject(String message) {} + +} diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/VersionedAirbyteStreamFactory.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/VersionedAirbyteStreamFactory.java index c25e6a6605a..2a18a02a859 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/VersionedAirbyteStreamFactory.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/VersionedAirbyteStreamFactory.java @@ -61,6 +61,8 @@ public class VersionedAirbyteStreamFactory implements AirbyteStreamFactory { public record InvalidLineFailureConfiguration(boolean failTooLongRecords, boolean failMissingPks, boolean printLongRecordPks) {} + public static final String RECORD_TOO_LONG = "Record is too long, the size is: "; + private static final Logger LOGGER = LoggerFactory.getLogger(VersionedAirbyteStreamFactory.class); private static final double MAX_SIZE_RATIO = 0.8; private static final long DEFAULT_MEMORY_LIMIT = Runtime.getRuntime().maxMemory(); @@ -410,7 +412,7 @@ private void handleCannotDeserialize(final String line) { if (exceptionClass.isPresent()) { throwExceptionClass("One record is too big and can't be processed, the sync will be failed"); } else { - throw new IllegalStateException("Record is too long, the size is: " + line.length()); + throw new IllegalStateException(RECORD_TOO_LONG + line.length()); } } } diff --git a/airbyte-connector-builder-server/src/main/java/io/airbyte/connector_builder/command_runner/ProcessOutputParser.java b/airbyte-connector-builder-server/src/main/java/io/airbyte/connector_builder/command_runner/ProcessOutputParser.java index 070f6a2a7e1..e9c7c5028b7 100644 --- a/airbyte-connector-builder-server/src/main/java/io/airbyte/connector_builder/command_runner/ProcessOutputParser.java +++ b/airbyte-connector-builder-server/src/main/java/io/airbyte/connector_builder/command_runner/ProcessOutputParser.java @@ -4,12 +4,15 @@ package io.airbyte.connector_builder.command_runner; +import static io.airbyte.workers.internal.VersionedAirbyteStreamFactory.RECORD_TOO_LONG; + import datadog.trace.api.Trace; import io.airbyte.commons.io.IOs; import io.airbyte.connector_builder.TracingHelper; import io.airbyte.connector_builder.exceptions.AirbyteCdkInvalidInputException; import io.airbyte.connector_builder.exceptions.CdkProcessException; import io.airbyte.connector_builder.exceptions.CdkUnknownException; +import io.airbyte.connector_builder.exceptions.UnprocessableEntityException; import io.airbyte.connector_builder.requester.AirbyteCdkRequesterImpl; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteMessage.Type; @@ -48,6 +51,11 @@ AirbyteRecordMessage parse( messagesByType = WorkerUtils.getMessagesByType(process, streamFactory, timeOut); } catch (final NullPointerException exc) { throwCdkException(process, cdkCommand); + } catch (final IllegalStateException e) { + if (e.getMessage().contains(RECORD_TOO_LONG)) { + throw new UnprocessableEntityException("API response is too large. Reduce the size by requesting smaller pages or time intervals.", e); + } + throw e; } if (messagesByType == null || messagesByType.isEmpty()) { diff --git a/airbyte-connector-builder-server/src/main/java/io/airbyte/connector_builder/config/ApplicationBeanFactory.java b/airbyte-connector-builder-server/src/main/java/io/airbyte/connector_builder/config/ApplicationBeanFactory.java index f027076ece8..18c17a48906 100644 --- a/airbyte-connector-builder-server/src/main/java/io/airbyte/connector_builder/config/ApplicationBeanFactory.java +++ b/airbyte-connector-builder-server/src/main/java/io/airbyte/connector_builder/config/ApplicationBeanFactory.java @@ -43,7 +43,7 @@ public SynchronousCdkCommandRunner synchronousPythonCdkCommandRunner() { return new SynchronousPythonCdkCommandRunner( new AirbyteFileWriterImpl(), // This should eventually be constructed via DI. - VersionedAirbyteStreamFactory.noMigrationVersionedAirbyteStreamFactory(false), + VersionedAirbyteStreamFactory.noMigrationVersionedAirbyteStreamFactory(true), this.getPython(), this.getCdkEntrypoint()); } diff --git a/airbyte-connector-builder-server/src/main/java/io/airbyte/connector_builder/exceptions/UnprocessableEntityException.java b/airbyte-connector-builder-server/src/main/java/io/airbyte/connector_builder/exceptions/UnprocessableEntityException.java new file mode 100644 index 00000000000..eedec16463c --- /dev/null +++ b/airbyte-connector-builder-server/src/main/java/io/airbyte/connector_builder/exceptions/UnprocessableEntityException.java @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2020-2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.connector_builder.exceptions; + +import io.airbyte.commons.server.errors.KnownException; +import io.micronaut.http.HttpStatus; + +/** + * Thrown when the server understands the content type of the request entity, and the syntax of the + * request entity is correct, but it was unable to process the contained instructions. + */ +public class UnprocessableEntityException extends KnownException { + + public UnprocessableEntityException(final String msg) { + super(msg); + } + + public UnprocessableEntityException(final String msg, final Throwable t) { + super(msg, t); + } + + @Override + public int getHttpCode() { + return HttpStatus.UNPROCESSABLE_ENTITY.getCode(); + } + +} diff --git a/airbyte-connector-builder-server/src/main/java/io/airbyte/connector_builder/exceptions/UnprocessableEntityExceptionHandler.java b/airbyte-connector-builder-server/src/main/java/io/airbyte/connector_builder/exceptions/UnprocessableEntityExceptionHandler.java new file mode 100644 index 00000000000..7587dc7e565 --- /dev/null +++ b/airbyte-connector-builder-server/src/main/java/io/airbyte/connector_builder/exceptions/UnprocessableEntityExceptionHandler.java @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2020-2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.connector_builder.exceptions; + +import io.micronaut.http.HttpRequest; +import io.micronaut.http.HttpResponse; +import io.micronaut.http.HttpStatus; +import io.micronaut.http.MediaType; +import io.micronaut.http.annotation.Produces; +import io.micronaut.http.server.exceptions.ExceptionHandler; +import jakarta.inject.Singleton; + +/** + * Handles unprocessable content exceptions. + */ +@Produces +@Singleton +public class UnprocessableEntityExceptionHandler implements ExceptionHandler { + + @Override + public HttpResponse handle(final HttpRequest request, final UnprocessableEntityException exception) { + return HttpResponse.status(HttpStatus.valueOf(exception.getHttpCode())) + .body(exception.getMessage()) + .contentType(MediaType.TEXT_PLAIN); + } + +}