From 1d0e09a6d57bf9a77c05d0a9437c9e28438bf004 Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Mon, 9 Sep 2024 12:53:11 -0400 Subject: [PATCH] Support Zstd codec in SerializableAvroCodecFactory (#32352) * Support Zstd codec in SerializableAvroCodecFactory * Test AvroIO.write * Make tests compilable on Avro 1.8 * format * Update CHANGES.md * Support negative levels for zstd --- CHANGES.md | 1 + .../avro/io/SerializableAvroCodecFactory.java | 12 ++++- .../io/SerializableAvroCodecFactoryTest.java | 48 ++++++++++++++++++- 3 files changed, 58 insertions(+), 3 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index e9f6113a181a..47f1831b446e 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -67,6 +67,7 @@ ## New Features / Improvements * Dataflow worker can install packages from Google Artifact Registry Python repositories (Python) ([#32123](https://github.com/apache/beam/issues/32123)). +* Added support for Zstd codec in SerializableAvroCodecFactory (Java) ([#32349](https://github.com/apache/beam/issues/32349)) * X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). ## Breaking Changes diff --git a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/SerializableAvroCodecFactory.java b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/SerializableAvroCodecFactory.java index 2dc247969da6..215d3b4dd5fc 100644 --- a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/SerializableAvroCodecFactory.java +++ b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/SerializableAvroCodecFactory.java @@ -50,6 +50,9 @@ class SerializableAvroCodecFactory implements Externalizable { private static final Pattern deflatePattern = Pattern.compile(DEFLATE_CODEC + "-(?-?\\d)"); private static final Pattern xzPattern = Pattern.compile(XZ_CODEC + "-(?\\d)"); + // Don't reference `DataFileConstants.ZSTANDARD_CODEC` directly for Avro 1.8 compat + private static final Pattern zstdPattern = Pattern.compile("zstandard\\[(?-?\\d+)\\]"); + private @Nullable CodecFactory codecFactory; // For java.io.Externalizable @@ -65,7 +68,8 @@ private boolean checkIsSupportedCodec(CodecFactory codecFactory) { final String codecStr = codecFactory.toString(); return noOptAvroCodecs.contains(codecStr) || deflatePattern.matcher(codecStr).matches() - || xzPattern.matcher(codecStr).matches(); + || xzPattern.matcher(codecStr).matches() + || zstdPattern.matcher(codecStr).matches(); } @Override @@ -97,6 +101,12 @@ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundExcept return; } + Matcher zstdMatcher = zstdPattern.matcher(codecStr); + if (zstdMatcher.find()) { + codecFactory = CodecFactory.zstandardCodec(Integer.parseInt(zstdMatcher.group("level"))); + return; + } + throw new IllegalStateException(codecStr + " is not supported"); } diff --git a/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/io/SerializableAvroCodecFactoryTest.java b/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/io/SerializableAvroCodecFactoryTest.java index 241ad11635a8..52f7a6700275 100644 --- a/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/io/SerializableAvroCodecFactoryTest.java +++ b/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/io/SerializableAvroCodecFactoryTest.java @@ -24,6 +24,11 @@ import static org.apache.avro.file.DataFileConstants.XZ_CODEC; import static org.junit.Assert.assertEquals; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import org.apache.avro.file.CodecFactory; @@ -35,8 +40,20 @@ /** Tests of SerializableAvroCodecFactory. */ @RunWith(JUnit4.class) public class SerializableAvroCodecFactoryTest { - private final List avroCodecs = - Arrays.asList(NULL_CODEC, SNAPPY_CODEC, DEFLATE_CODEC, XZ_CODEC, BZIP2_CODEC); + private static final String VERSION_AVRO = + org.apache.avro.Schema.class.getPackage().getImplementationVersion(); + + private static final List avroCodecs = new ArrayList<>(); + + static { + avroCodecs.addAll( + Arrays.asList(NULL_CODEC, SNAPPY_CODEC, DEFLATE_CODEC, XZ_CODEC, BZIP2_CODEC)); + + // Zstd codec not available until Avro 1.9 + if (!VERSION_AVRO.startsWith("1.8.")) { + avroCodecs.add("zstandard"); + } + } @Test public void testDefaultCodecsIn() throws Exception { @@ -84,6 +101,33 @@ public void testXZCodecSerDeWithLevels() throws Exception { } } + @Test + public void testZstdCodecSerDeWithLevels() throws Exception { + if (VERSION_AVRO.startsWith("1.8.")) { + // Skip, zstd only supported for Avro 1.9+ + return; + } + + for (int i = -7; i <= 22; i++) { + SerializableAvroCodecFactory codecFactory = new SerializableAvroCodecFactory(); + + // Deserialize a ZStandardCodec instance from bytes; we can't reference the class directly + // since it won't compile for Avro 1.8 + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final ObjectOutputStream os = new ObjectOutputStream(baos); + os.writeUTF("zstandard[" + i + "]"); + os.flush(); + codecFactory.readExternal( + new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray()))); + + assertEquals("zstandard[" + i + "]", codecFactory.getCodec().toString()); + + // Test cloning behavior + SerializableAvroCodecFactory clone = SerializableUtils.clone(codecFactory); + assertEquals(codecFactory.getCodec().toString(), clone.getCodec().toString()); + } + } + @Test(expected = NullPointerException.class) public void testNullCodecToString() throws Exception { // use default CTR (available cause Serializable)