From 9f864504f9307b14fd6e5c4490579526c78e3f14 Mon Sep 17 00:00:00 2001 From: Michel Davit Date: Tue, 5 Nov 2024 14:31:09 +0100 Subject: [PATCH] [java] Add Optional coder --- .../apache/beam/sdk/coders/OptionalCoder.java | 169 ++++++++++++++++++ .../beam/sdk/coders/OptionalCoderTest.java | 153 ++++++++++++++++ 2 files changed, 322 insertions(+) create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/coders/OptionalCoder.java create mode 100644 sdks/java/core/src/test/java/org/apache/beam/sdk/coders/OptionalCoderTest.java diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/OptionalCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/OptionalCoder.java new file mode 100644 index 000000000000..4c0dfce6b652 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/OptionalCoder.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.coders; + +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.List; +import java.util.Optional; +import org.apache.beam.sdk.util.common.ElementByteSizeObserver; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.sdk.values.TypeParameter; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; + +/** + * A {@link OptionalCoder} encodes optional values of type {@code T} using a nested {@code + * Coder}. {@link OptionalCoder} uses exactly 1 byte per entry to indicate whether the value is + * empty, then adds the encoding of the inner coder for non-empty values. + * + * @param the type of the values being transcoded + */ +public class OptionalCoder extends StructuredCoder> { + public static OptionalCoder of(Coder valueCoder) { + return new OptionalCoder<>(valueCoder); + } + + ///////////////////////////////////////////////////////////////////////////// + + private final Coder valueCoder; + private static final int ENCODE_EMPTY = 0; + private static final int ENCODE_PRESENT = 1; + + private OptionalCoder(Coder valueCoder) { + this.valueCoder = valueCoder; + } + + /** Returns the inner {@link Coder} wrapped by this {@link OptionalCoder} instance. */ + public Coder getValueCoder() { + return valueCoder; + } + + @Override + public void encode(Optional value, OutputStream outStream) throws IOException, CoderException { + if (!value.isPresent()) { + outStream.write(ENCODE_EMPTY); + } else { + outStream.write(ENCODE_PRESENT); + valueCoder.encode(value.get(), outStream); + } + } + + @Override + public Optional decode(InputStream inStream) throws IOException, CoderException { + int b = inStream.read(); + if (b == ENCODE_EMPTY) { + return Optional.empty(); + } else if (b != ENCODE_PRESENT) { + throw new CoderException( + String.format( + "OptionalCoder expects either a byte valued %s (empty) or %s (present), got %s", + ENCODE_EMPTY, ENCODE_PRESENT, b)); + } + T value = checkNotNull(valueCoder.decode(inStream), "cannot decode a null"); + return Optional.of(value); + } + + @Override + public List> getCoderArguments() { + return ImmutableList.of(valueCoder); + } + + /** + * {@code OptionalCoder} is deterministic if the nested {@code Coder} is. + * + *

{@inheritDoc} + */ + @Override + public void verifyDeterministic() throws NonDeterministicException { + verifyDeterministic(this, "Value coder must be deterministic", valueCoder); + } + + /** + * {@code OptionalCoder} is consistent with equals if the nested {@code Coder} is. + * + *

{@inheritDoc} + */ + @Override + public boolean consistentWithEquals() { + return valueCoder.consistentWithEquals(); + } + + @Override + public Object structuralValue(Optional value) { + return value.map(valueCoder::structuralValue); + } + + /** + * Overridden to short-circuit the default {@code StructuredCoder} behavior of encoding and + * counting the bytes. The size is known (1 byte) when {@code value} is {@code null}, otherwise + * the size is 1 byte plus the size of nested {@code Coder}'s encoding of {@code value}. + * + *

{@inheritDoc} + */ + @Override + public void registerByteSizeObserver(Optional value, ElementByteSizeObserver observer) + throws Exception { + observer.update(1); + if (value.isPresent()) { + valueCoder.registerByteSizeObserver(value.get(), observer); + } + } + + /** + * Overridden to short-circuit the default {@code StructuredCoder} behavior of encoding and + * counting the bytes. The size is known (1 byte) when {@code value} is {@code null}, otherwise + * the size is 1 byte plus the size of nested {@code Coder}'s encoding of {@code value}. + * + *

{@inheritDoc} + */ + @Override + protected long getEncodedElementByteSize(Optional value) throws Exception { + if (!value.isPresent()) { + return 1; + } + + if (valueCoder instanceof StructuredCoder) { + // If valueCoder is a StructuredCoder then we can ask it directly for the encoded size of + // the value, adding 1 byte to count the null indicator. + return 1 + valueCoder.getEncodedElementByteSize(value.get()); + } + + // If value is not a StructuredCoder then fall back to the default StructuredCoder behavior + // of encoding and counting the bytes. The encoding will include the null indicator byte. + return super.getEncodedElementByteSize(value); + } + + /** + * {@code OptionalCoder} is cheap if {@code valueCoder} is cheap. + * + *

{@inheritDoc} + */ + @Override + public boolean isRegisterByteSizeObserverCheap(Optional value) { + return value.map(valueCoder::isRegisterByteSizeObserverCheap).orElse(true); + } + + @Override + public TypeDescriptor> getEncodedTypeDescriptor() { + return new TypeDescriptor>() {}.where( + new TypeParameter() {}, valueCoder.getEncodedTypeDescriptor()); + } +} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/OptionalCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/OptionalCoderTest.java new file mode 100644 index 000000000000..ca41cc0cc286 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/OptionalCoderTest.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.coders; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import org.apache.beam.sdk.testing.CoderProperties; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Unit tests for {@link OptionalCoder}. */ +@RunWith(JUnit4.class) +public class OptionalCoderTest { + + private static final Coder> TEST_CODER = OptionalCoder.of(StringUtf8Coder.of()); + + private static final List> TEST_VALUES = + Arrays.asList( + Optional.of(""), + Optional.of("a"), + Optional.of("13"), + Optional.of("hello"), + Optional.empty(), + Optional.of("a longer string with spaces and all that"), + Optional.of("a string with a \n newline"), + Optional.of("スタリング")); + + @Test + public void testDecodeEncodeContentsInSameOrder() throws Exception { + for (Optional value : TEST_VALUES) { + CoderProperties.coderDecodeEncodeEqual(TEST_CODER, value); + } + } + + @Test + public void testCoderSerializable() throws Exception { + CoderProperties.coderSerializable(TEST_CODER); + } + + @Test + public void testCoderIsSerializableWithWellKnownCoderType() throws Exception { + CoderProperties.coderSerializable(OptionalCoder.of(GlobalWindow.Coder.INSTANCE)); + } + + /** + * Generated data to check that the wire format has not changed. To regenerate, see {@code + * PrintBase64Encodings}. + * + * @see PrintBase64Encodings + */ + private static final List TEST_ENCODINGS = + Arrays.asList( + "AQA", + "AQFh", + "AQIxMw", + "AQVoZWxsbw", + "AA", + "AShhIGxvbmdlciBzdHJpbmcgd2l0aCBzcGFjZXMgYW5kIGFsbCB0aGF0", + "ARlhIHN0cmluZyB3aXRoIGEgCiBuZXdsaW5l", + "AQ_jgrnjgr_jg6rjg7PjgrA"); + + @Test + public void testWireFormatEncode() throws Exception { + CoderProperties.coderEncodesBase64(TEST_CODER, TEST_VALUES, TEST_ENCODINGS); + } + + @Test + public void testEncodedSize() throws Exception { + OptionalCoder coder = OptionalCoder.of(DoubleCoder.of()); + assertEquals(1, coder.getEncodedElementByteSize(Optional.empty())); + assertEquals(9, coder.getEncodedElementByteSize(Optional.of(5.0))); + } + + @Test + public void testEncodedSizeNested() throws Exception { + OptionalCoder varLenCoder = OptionalCoder.of(StringUtf8Coder.of()); + assertEquals(1, varLenCoder.getEncodedElementByteSize(Optional.empty())); + assertEquals(6, varLenCoder.getEncodedElementByteSize(Optional.of("spam"))); + } + + @Test + public void testObserverIsCheap() throws Exception { + OptionalCoder coder = OptionalCoder.of(DoubleCoder.of()); + assertTrue(coder.isRegisterByteSizeObserverCheap(Optional.of(5.0))); + } + + @Test + public void testObserverIsNotCheap() throws Exception { + OptionalCoder> coder = OptionalCoder.of(ListCoder.of(StringUtf8Coder.of())); + assertFalse(coder.isRegisterByteSizeObserverCheap(Optional.of(ImmutableList.of("hi", "test")))); + } + + @Test + public void testObserverIsAlwaysCheapForEmptyValues() throws Exception { + OptionalCoder> coder = OptionalCoder.of(ListCoder.of(StringUtf8Coder.of())); + assertTrue(coder.isRegisterByteSizeObserverCheap(Optional.empty())); + } + + @Test + public void testStructuralValueConsistentWithEquals() throws Exception { + CoderProperties.structuralValueConsistentWithEquals( + TEST_CODER, Optional.empty(), Optional.empty()); + } + + @Rule public ExpectedException thrown = ExpectedException.none(); + + @Test + public void testDecodingError() throws Exception { + thrown.expect(CoderException.class); + thrown.expectMessage( + equalTo("OptionalCoder expects either a byte valued 0 (empty) " + "or 1 (present), got 5")); + + InputStream input = new ByteArrayInputStream(new byte[] {5}); + TEST_CODER.decode(input); + } + + @Test + public void testEncodedTypeDescriptor() throws Exception { + TypeDescriptor> expectedTypeDescriptor = + new TypeDescriptor>() {}; + assertThat(TEST_CODER.getEncodedTypeDescriptor(), equalTo(expectedTypeDescriptor)); + } +}