Skip to content

Commit

Permalink
Merge pull request #31589: [Release-2.57.0] Cherry-pick #31580 into r…
Browse files Browse the repository at this point in the history
…elease branch
  • Loading branch information
kennknowles authored Jun 13, 2024
2 parents d64f8ca + a6f3efb commit 6df1214
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 8 deletions.
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@
* Running a 2.57.0+ remote SDK pipeline containing a pre-2.57.0 Java SchemaTransform
* All direct uses of Python's [SchemaAwareExternalTransform](https://github.com/apache/beam/blob/a998107a1f5c3050821eef6a5ad5843d8adb8aec/sdks/python/apache_beam/transforms/external.py#L381)
should be updated to use new snake_case parameter names.
* Upgraded Jackson Databind to 2.15.4 (Java) ([#26743](https://github.com/apache/beam/issues/26743)).
jackson-2.15 has known breaking changes. An important one is it imposed a buffer limit for parser.
If your custom PTransform/DoFn are affected, refer to [#31580](https://github.com/apache/beam/pull/31580) for mitigation.

## Deprecations

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,38 @@
@Internal
public class RowJsonUtils {

//
private static int defaultBufferLimit;

/**
* Increase the default jackson-databind stream read constraint.
*
* <p>StreamReadConstraints was introduced in jackson 2.15 causing string > 20MB (5MB in 2.15.0)
* parsing failure. This has caused regressions in its dependencies include Beam. Here we
* overwrite the default buffer size limit to 100 MB, and exposes this interface for higher limit.
* If needed, call this method during pipeline run time, e.g. in DoFn.setup.
*/
public static void increaseDefaultStreamReadConstraints(int newLimit) {
if (newLimit <= defaultBufferLimit) {
return;
}
try {
Class<?> unused = Class.forName("com.fasterxml.jackson.core.StreamReadConstraints");

com.fasterxml.jackson.core.StreamReadConstraints.overrideDefaultStreamReadConstraints(
com.fasterxml.jackson.core.StreamReadConstraints.builder()
.maxStringLength(newLimit)
.build());
} catch (ClassNotFoundException e) {
// <2.15, do nothing
}
defaultBufferLimit = newLimit;
}

static {
increaseDefaultStreamReadConstraints(100 * 1024 * 1024);
}

public static ObjectMapper newObjectMapperWith(RowJson.RowJsonDeserializer deserializer) {
SimpleModule module = new SimpleModule("rowDeserializationModule");
module.addDeserializer(Row.class, deserializer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.util.RowJsonUtils;
import org.apache.beam.sdk.values.TypeDescriptor;

/** A {@link Coder} that encodes BigQuery {@link TableRow} objects in their native JSON format. */
Expand Down Expand Up @@ -69,15 +70,22 @@ public long getEncodedElementByteSize(TableRow value) throws Exception {

// FAIL_ON_EMPTY_BEANS is disabled in order to handle null values in
// TableRow.
private static final ObjectMapper MAPPER =
new ObjectMapper()
.registerModule(new JavaTimeModule())
.registerModule(new JodaModule())
.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS)
.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);
private static final ObjectMapper MAPPER;;
private static final TableRowJsonCoder INSTANCE;
private static final TypeDescriptor<TableRow> TYPE_DESCRIPTOR;

private static final TableRowJsonCoder INSTANCE = new TableRowJsonCoder();
private static final TypeDescriptor<TableRow> TYPE_DESCRIPTOR = new TypeDescriptor<TableRow>() {};
static {
RowJsonUtils.increaseDefaultStreamReadConstraints(100 * 1024 * 1024);

MAPPER =
new ObjectMapper()
.registerModule(new JavaTimeModule())
.registerModule(new JodaModule())
.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS)
.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);
INSTANCE = new TableRowJsonCoder();
TYPE_DESCRIPTOR = new TypeDescriptor<TableRow>() {};
}

private TableRowJsonCoder() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.testing.CoderProperties;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.commons.lang3.StringUtils;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
Expand Down Expand Up @@ -67,6 +68,13 @@ public void testDecodeEncodeEqual() throws Exception {
}
}

@Test
public void testLargeRow() throws Exception {
String val = StringUtils.repeat("BEAM", 10 * 1024 * 1024); // 40 MB
TableRow testValue = new TableRowBuilder().set("a", val).set("b", "1").build();
CoderProperties.coderDecodeEncodeEqual(TEST_CODER, testValue);
}

/**
* Generated data to check that the wire format has not changed. To regenerate, see {@link
* org.apache.beam.sdk.coders.PrintBase64Encodings}.
Expand Down

0 comments on commit 6df1214

Please sign in to comment.