From a6f3efbb35174e5037072302e3b72dd04e775c83 Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Thu, 13 Jun 2024 10:46:45 -0400 Subject: [PATCH] Fix StreamConstraintsException introduced in jackson 2.15 (#31580) * Fix StreamConstraintsException introduced in jackson 2.15 * Fix spotless * Fix checkstyle * Add changes.md --- CHANGES.md | 3 ++ .../apache/beam/sdk/util/RowJsonUtils.java | 32 +++++++++++++++++++ .../io/gcp/bigquery/TableRowJsonCoder.java | 24 +++++++++----- .../gcp/bigquery/TableRowJsonCoderTest.java | 8 +++++ 4 files changed, 59 insertions(+), 8 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 1aee8283bcb7..bc522aea9115 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -97,6 +97,9 @@ * Running a 2.57.0+ remote SDK pipeline containing a pre-2.57.0 Java SchemaTransform * All direct uses of Python's [SchemaAwareExternalTransform](https://github.com/apache/beam/blob/a998107a1f5c3050821eef6a5ad5843d8adb8aec/sdks/python/apache_beam/transforms/external.py#L381) should be updated to use new snake_case parameter names. +* Upgraded Jackson Databind to 2.15.4 (Java) ([#26743](https://github.com/apache/beam/issues/26743)). + jackson-2.15 has known breaking changes. An important one is it imposed a buffer limit for parser. + If your custom PTransform/DoFn are affected, refer to [#31580](https://github.com/apache/beam/pull/31580) for mitigation. ## Deprecations diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJsonUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJsonUtils.java index 6538a1459290..408143fb1ebe 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJsonUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJsonUtils.java @@ -34,6 +34,38 @@ @Internal public class RowJsonUtils { + // + private static int defaultBufferLimit; + + /** + * Increase the default jackson-databind stream read constraint. + * + *

StreamReadConstraints was introduced in jackson 2.15 causing string > 20MB (5MB in 2.15.0) + * parsing failure. This has caused regressions in its dependencies include Beam. Here we + * overwrite the default buffer size limit to 100 MB, and exposes this interface for higher limit. + * If needed, call this method during pipeline run time, e.g. in DoFn.setup. + */ + public static void increaseDefaultStreamReadConstraints(int newLimit) { + if (newLimit <= defaultBufferLimit) { + return; + } + try { + Class unused = Class.forName("com.fasterxml.jackson.core.StreamReadConstraints"); + + com.fasterxml.jackson.core.StreamReadConstraints.overrideDefaultStreamReadConstraints( + com.fasterxml.jackson.core.StreamReadConstraints.builder() + .maxStringLength(newLimit) + .build()); + } catch (ClassNotFoundException e) { + // <2.15, do nothing + } + defaultBufferLimit = newLimit; + } + + static { + increaseDefaultStreamReadConstraints(100 * 1024 * 1024); + } + public static ObjectMapper newObjectMapperWith(RowJson.RowJsonDeserializer deserializer) { SimpleModule module = new SimpleModule("rowDeserializationModule"); module.addDeserializer(Row.class, deserializer); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java index 9b80c0b552b6..8cf3eeb479c0 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java @@ -28,6 +28,7 @@ import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.util.RowJsonUtils; import org.apache.beam.sdk.values.TypeDescriptor; /** A {@link Coder} that encodes BigQuery {@link TableRow} objects in their native JSON format. */ @@ -69,15 +70,22 @@ public long getEncodedElementByteSize(TableRow value) throws Exception { // FAIL_ON_EMPTY_BEANS is disabled in order to handle null values in // TableRow. - private static final ObjectMapper MAPPER = - new ObjectMapper() - .registerModule(new JavaTimeModule()) - .registerModule(new JodaModule()) - .disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS) - .disable(SerializationFeature.FAIL_ON_EMPTY_BEANS); + private static final ObjectMapper MAPPER;; + private static final TableRowJsonCoder INSTANCE; + private static final TypeDescriptor TYPE_DESCRIPTOR; - private static final TableRowJsonCoder INSTANCE = new TableRowJsonCoder(); - private static final TypeDescriptor TYPE_DESCRIPTOR = new TypeDescriptor() {}; + static { + RowJsonUtils.increaseDefaultStreamReadConstraints(100 * 1024 * 1024); + + MAPPER = + new ObjectMapper() + .registerModule(new JavaTimeModule()) + .registerModule(new JodaModule()) + .disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS) + .disable(SerializationFeature.FAIL_ON_EMPTY_BEANS); + INSTANCE = new TableRowJsonCoder(); + TYPE_DESCRIPTOR = new TypeDescriptor() {}; + } private TableRowJsonCoder() {} diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoderTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoderTest.java index b09832085a7d..9e767be48d81 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoderTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoderTest.java @@ -26,6 +26,7 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.testing.CoderProperties; import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.commons.lang3.StringUtils; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -67,6 +68,13 @@ public void testDecodeEncodeEqual() throws Exception { } } + @Test + public void testLargeRow() throws Exception { + String val = StringUtils.repeat("BEAM", 10 * 1024 * 1024); // 40 MB + TableRow testValue = new TableRowBuilder().set("a", val).set("b", "1").build(); + CoderProperties.coderDecodeEncodeEqual(TEST_CODER, testValue); + } + /** * Generated data to check that the wire format has not changed. To regenerate, see {@link * org.apache.beam.sdk.coders.PrintBase64Encodings}.