Skip to content

Commit

Permalink
Show actionable error for large builder responses (#11328)
Browse files Browse the repository at this point in the history
  • Loading branch information
lmossman committed Feb 22, 2024
1 parent 5f620f7 commit 5ef9172
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright (c) 2020-2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.commons.server.errors;

import io.airbyte.commons.json.Jsons;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.HttpResponse;
import io.micronaut.http.HttpStatus;
import io.micronaut.http.MediaType;
import io.micronaut.http.annotation.Produces;
import io.micronaut.http.server.exceptions.ExceptionHandler;
import jakarta.inject.Singleton;
import org.openapitools.client.infrastructure.ClientException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Handles unprocessable content exceptions.
*/
@Produces
@Singleton
public class ClientExceptionHandler implements ExceptionHandler<ClientException, HttpResponse> {

private static final Logger LOGGER = LoggerFactory.getLogger(ClientExceptionHandler.class);

@Override
public HttpResponse handle(final HttpRequest request, final ClientException exception) {
return HttpResponse.status(HttpStatus.valueOf(exception.getStatusCode()))
.body(Jsons.serialize(new MessageObject(exception.getMessage())))
.contentType(MediaType.APPLICATION_JSON);
}

private record MessageObject(String message) {}

}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ public class VersionedAirbyteStreamFactory<T> implements AirbyteStreamFactory {

public record InvalidLineFailureConfiguration(boolean failTooLongRecords, boolean failMissingPks, boolean printLongRecordPks) {}

public static final String RECORD_TOO_LONG = "Record is too long, the size is: ";

private static final Logger LOGGER = LoggerFactory.getLogger(VersionedAirbyteStreamFactory.class);
private static final double MAX_SIZE_RATIO = 0.8;
private static final long DEFAULT_MEMORY_LIMIT = Runtime.getRuntime().maxMemory();
Expand Down Expand Up @@ -410,7 +412,7 @@ private void handleCannotDeserialize(final String line) {
if (exceptionClass.isPresent()) {
throwExceptionClass("One record is too big and can't be processed, the sync will be failed");
} else {
throw new IllegalStateException("Record is too long, the size is: " + line.length());
throw new IllegalStateException(RECORD_TOO_LONG + line.length());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@

package io.airbyte.connector_builder.command_runner;

import static io.airbyte.workers.internal.VersionedAirbyteStreamFactory.RECORD_TOO_LONG;

import datadog.trace.api.Trace;
import io.airbyte.commons.io.IOs;
import io.airbyte.connector_builder.TracingHelper;
import io.airbyte.connector_builder.exceptions.AirbyteCdkInvalidInputException;
import io.airbyte.connector_builder.exceptions.CdkProcessException;
import io.airbyte.connector_builder.exceptions.CdkUnknownException;
import io.airbyte.connector_builder.exceptions.UnprocessableEntityException;
import io.airbyte.connector_builder.requester.AirbyteCdkRequesterImpl;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
Expand Down Expand Up @@ -48,6 +51,11 @@ AirbyteRecordMessage parse(
messagesByType = WorkerUtils.getMessagesByType(process, streamFactory, timeOut);
} catch (final NullPointerException exc) {
throwCdkException(process, cdkCommand);
} catch (final IllegalStateException e) {
if (e.getMessage().contains(RECORD_TOO_LONG)) {
throw new UnprocessableEntityException("API response is too large. Reduce the size by requesting smaller pages or time intervals.", e);
}
throw e;
}

if (messagesByType == null || messagesByType.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public SynchronousCdkCommandRunner synchronousPythonCdkCommandRunner() {
return new SynchronousPythonCdkCommandRunner(
new AirbyteFileWriterImpl(),
// This should eventually be constructed via DI.
VersionedAirbyteStreamFactory.noMigrationVersionedAirbyteStreamFactory(false),
VersionedAirbyteStreamFactory.noMigrationVersionedAirbyteStreamFactory(true),
this.getPython(),
this.getCdkEntrypoint());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright (c) 2020-2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.connector_builder.exceptions;

import io.airbyte.commons.server.errors.KnownException;
import io.micronaut.http.HttpStatus;

/**
* Thrown when the server understands the content type of the request entity, and the syntax of the
* request entity is correct, but it was unable to process the contained instructions.
*/
public class UnprocessableEntityException extends KnownException {

public UnprocessableEntityException(final String msg) {
super(msg);
}

public UnprocessableEntityException(final String msg, final Throwable t) {
super(msg, t);
}

@Override
public int getHttpCode() {
return HttpStatus.UNPROCESSABLE_ENTITY.getCode();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright (c) 2020-2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.connector_builder.exceptions;

import io.micronaut.http.HttpRequest;
import io.micronaut.http.HttpResponse;
import io.micronaut.http.HttpStatus;
import io.micronaut.http.MediaType;
import io.micronaut.http.annotation.Produces;
import io.micronaut.http.server.exceptions.ExceptionHandler;
import jakarta.inject.Singleton;

/**
* Handles unprocessable content exceptions.
*/
@Produces
@Singleton
public class UnprocessableEntityExceptionHandler implements ExceptionHandler<UnprocessableEntityException, HttpResponse> {

@Override
public HttpResponse handle(final HttpRequest request, final UnprocessableEntityException exception) {
return HttpResponse.status(HttpStatus.valueOf(exception.getHttpCode()))
.body(exception.getMessage())
.contentType(MediaType.TEXT_PLAIN);
}

}

0 comments on commit 5ef9172

Please sign in to comment.