-
Notifications
You must be signed in to change notification settings - Fork 4.3k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
0f77934
commit 9f86450
Showing
2 changed files
with
322 additions
and
0 deletions.
There are no files selected for viewing
169 changes: 169 additions & 0 deletions
169
sdks/java/core/src/main/java/org/apache/beam/sdk/coders/OptionalCoder.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,169 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.beam.sdk.coders; | ||
|
||
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; | ||
|
||
import java.io.IOException; | ||
import java.io.InputStream; | ||
import java.io.OutputStream; | ||
import java.util.List; | ||
import java.util.Optional; | ||
import org.apache.beam.sdk.util.common.ElementByteSizeObserver; | ||
import org.apache.beam.sdk.values.TypeDescriptor; | ||
import org.apache.beam.sdk.values.TypeParameter; | ||
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; | ||
|
||
/** | ||
* A {@link OptionalCoder} encodes optional values of type {@code T} using a nested {@code | ||
* Coder<T>}. {@link OptionalCoder} uses exactly 1 byte per entry to indicate whether the value is | ||
* empty, then adds the encoding of the inner coder for non-empty values. | ||
* | ||
* @param <T> the type of the values being transcoded | ||
*/ | ||
public class OptionalCoder<T> extends StructuredCoder<Optional<T>> { | ||
public static <T> OptionalCoder<T> of(Coder<T> valueCoder) { | ||
return new OptionalCoder<>(valueCoder); | ||
} | ||
|
||
///////////////////////////////////////////////////////////////////////////// | ||
|
||
private final Coder<T> valueCoder; | ||
private static final int ENCODE_EMPTY = 0; | ||
private static final int ENCODE_PRESENT = 1; | ||
|
||
private OptionalCoder(Coder<T> valueCoder) { | ||
this.valueCoder = valueCoder; | ||
} | ||
|
||
/** Returns the inner {@link Coder} wrapped by this {@link OptionalCoder} instance. */ | ||
public Coder<T> getValueCoder() { | ||
return valueCoder; | ||
} | ||
|
||
@Override | ||
public void encode(Optional<T> value, OutputStream outStream) throws IOException, CoderException { | ||
if (!value.isPresent()) { | ||
outStream.write(ENCODE_EMPTY); | ||
} else { | ||
outStream.write(ENCODE_PRESENT); | ||
valueCoder.encode(value.get(), outStream); | ||
} | ||
} | ||
|
||
@Override | ||
public Optional<T> decode(InputStream inStream) throws IOException, CoderException { | ||
int b = inStream.read(); | ||
if (b == ENCODE_EMPTY) { | ||
return Optional.empty(); | ||
} else if (b != ENCODE_PRESENT) { | ||
throw new CoderException( | ||
String.format( | ||
"OptionalCoder expects either a byte valued %s (empty) or %s (present), got %s", | ||
ENCODE_EMPTY, ENCODE_PRESENT, b)); | ||
} | ||
T value = checkNotNull(valueCoder.decode(inStream), "cannot decode a null"); | ||
return Optional.of(value); | ||
} | ||
|
||
@Override | ||
public List<Coder<T>> getCoderArguments() { | ||
return ImmutableList.of(valueCoder); | ||
} | ||
|
||
/** | ||
* {@code OptionalCoder} is deterministic if the nested {@code Coder} is. | ||
* | ||
* <p>{@inheritDoc} | ||
*/ | ||
@Override | ||
public void verifyDeterministic() throws NonDeterministicException { | ||
verifyDeterministic(this, "Value coder must be deterministic", valueCoder); | ||
} | ||
|
||
/** | ||
* {@code OptionalCoder} is consistent with equals if the nested {@code Coder} is. | ||
* | ||
* <p>{@inheritDoc} | ||
*/ | ||
@Override | ||
public boolean consistentWithEquals() { | ||
return valueCoder.consistentWithEquals(); | ||
} | ||
|
||
@Override | ||
public Object structuralValue(Optional<T> value) { | ||
return value.map(valueCoder::structuralValue); | ||
} | ||
|
||
/** | ||
* Overridden to short-circuit the default {@code StructuredCoder} behavior of encoding and | ||
* counting the bytes. The size is known (1 byte) when {@code value} is {@code null}, otherwise | ||
* the size is 1 byte plus the size of nested {@code Coder}'s encoding of {@code value}. | ||
* | ||
* <p>{@inheritDoc} | ||
*/ | ||
@Override | ||
public void registerByteSizeObserver(Optional<T> value, ElementByteSizeObserver observer) | ||
throws Exception { | ||
observer.update(1); | ||
if (value.isPresent()) { | ||
valueCoder.registerByteSizeObserver(value.get(), observer); | ||
} | ||
} | ||
|
||
/** | ||
* Overridden to short-circuit the default {@code StructuredCoder} behavior of encoding and | ||
* counting the bytes. The size is known (1 byte) when {@code value} is {@code null}, otherwise | ||
* the size is 1 byte plus the size of nested {@code Coder}'s encoding of {@code value}. | ||
* | ||
* <p>{@inheritDoc} | ||
*/ | ||
@Override | ||
protected long getEncodedElementByteSize(Optional<T> value) throws Exception { | ||
if (!value.isPresent()) { | ||
return 1; | ||
} | ||
|
||
if (valueCoder instanceof StructuredCoder) { | ||
// If valueCoder is a StructuredCoder then we can ask it directly for the encoded size of | ||
// the value, adding 1 byte to count the null indicator. | ||
return 1 + valueCoder.getEncodedElementByteSize(value.get()); | ||
} | ||
|
||
// If value is not a StructuredCoder then fall back to the default StructuredCoder behavior | ||
// of encoding and counting the bytes. The encoding will include the null indicator byte. | ||
return super.getEncodedElementByteSize(value); | ||
} | ||
|
||
/** | ||
* {@code OptionalCoder} is cheap if {@code valueCoder} is cheap. | ||
* | ||
* <p>{@inheritDoc} | ||
*/ | ||
@Override | ||
public boolean isRegisterByteSizeObserverCheap(Optional<T> value) { | ||
return value.map(valueCoder::isRegisterByteSizeObserverCheap).orElse(true); | ||
} | ||
|
||
@Override | ||
public TypeDescriptor<Optional<T>> getEncodedTypeDescriptor() { | ||
return new TypeDescriptor<Optional<T>>() {}.where( | ||
new TypeParameter<T>() {}, valueCoder.getEncodedTypeDescriptor()); | ||
} | ||
} |
153 changes: 153 additions & 0 deletions
153
sdks/java/core/src/test/java/org/apache/beam/sdk/coders/OptionalCoderTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,153 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.beam.sdk.coders; | ||
|
||
import static org.hamcrest.MatcherAssert.assertThat; | ||
import static org.hamcrest.Matchers.equalTo; | ||
import static org.junit.Assert.assertEquals; | ||
import static org.junit.Assert.assertFalse; | ||
import static org.junit.Assert.assertTrue; | ||
|
||
import java.io.ByteArrayInputStream; | ||
import java.io.InputStream; | ||
import java.util.Arrays; | ||
import java.util.List; | ||
import java.util.Optional; | ||
import org.apache.beam.sdk.testing.CoderProperties; | ||
import org.apache.beam.sdk.transforms.windowing.GlobalWindow; | ||
import org.apache.beam.sdk.values.TypeDescriptor; | ||
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; | ||
import org.junit.Rule; | ||
import org.junit.Test; | ||
import org.junit.rules.ExpectedException; | ||
import org.junit.runner.RunWith; | ||
import org.junit.runners.JUnit4; | ||
|
||
/** Unit tests for {@link OptionalCoder}. */ | ||
@RunWith(JUnit4.class) | ||
public class OptionalCoderTest { | ||
|
||
private static final Coder<Optional<String>> TEST_CODER = OptionalCoder.of(StringUtf8Coder.of()); | ||
|
||
private static final List<Optional<String>> TEST_VALUES = | ||
Arrays.asList( | ||
Optional.of(""), | ||
Optional.of("a"), | ||
Optional.of("13"), | ||
Optional.of("hello"), | ||
Optional.empty(), | ||
Optional.of("a longer string with spaces and all that"), | ||
Optional.of("a string with a \n newline"), | ||
Optional.of("スタリング")); | ||
|
||
@Test | ||
public void testDecodeEncodeContentsInSameOrder() throws Exception { | ||
for (Optional<String> value : TEST_VALUES) { | ||
CoderProperties.coderDecodeEncodeEqual(TEST_CODER, value); | ||
} | ||
} | ||
|
||
@Test | ||
public void testCoderSerializable() throws Exception { | ||
CoderProperties.coderSerializable(TEST_CODER); | ||
} | ||
|
||
@Test | ||
public void testCoderIsSerializableWithWellKnownCoderType() throws Exception { | ||
CoderProperties.coderSerializable(OptionalCoder.of(GlobalWindow.Coder.INSTANCE)); | ||
} | ||
|
||
/** | ||
* Generated data to check that the wire format has not changed. To regenerate, see {@code | ||
* PrintBase64Encodings}. | ||
* | ||
* @see PrintBase64Encodings | ||
*/ | ||
private static final List<String> TEST_ENCODINGS = | ||
Arrays.asList( | ||
"AQA", | ||
"AQFh", | ||
"AQIxMw", | ||
"AQVoZWxsbw", | ||
"AA", | ||
"AShhIGxvbmdlciBzdHJpbmcgd2l0aCBzcGFjZXMgYW5kIGFsbCB0aGF0", | ||
"ARlhIHN0cmluZyB3aXRoIGEgCiBuZXdsaW5l", | ||
"AQ_jgrnjgr_jg6rjg7PjgrA"); | ||
|
||
@Test | ||
public void testWireFormatEncode() throws Exception { | ||
CoderProperties.coderEncodesBase64(TEST_CODER, TEST_VALUES, TEST_ENCODINGS); | ||
} | ||
|
||
@Test | ||
public void testEncodedSize() throws Exception { | ||
OptionalCoder<Double> coder = OptionalCoder.of(DoubleCoder.of()); | ||
assertEquals(1, coder.getEncodedElementByteSize(Optional.empty())); | ||
assertEquals(9, coder.getEncodedElementByteSize(Optional.of(5.0))); | ||
} | ||
|
||
@Test | ||
public void testEncodedSizeNested() throws Exception { | ||
OptionalCoder<String> varLenCoder = OptionalCoder.of(StringUtf8Coder.of()); | ||
assertEquals(1, varLenCoder.getEncodedElementByteSize(Optional.empty())); | ||
assertEquals(6, varLenCoder.getEncodedElementByteSize(Optional.of("spam"))); | ||
} | ||
|
||
@Test | ||
public void testObserverIsCheap() throws Exception { | ||
OptionalCoder<Double> coder = OptionalCoder.of(DoubleCoder.of()); | ||
assertTrue(coder.isRegisterByteSizeObserverCheap(Optional.of(5.0))); | ||
} | ||
|
||
@Test | ||
public void testObserverIsNotCheap() throws Exception { | ||
OptionalCoder<List<String>> coder = OptionalCoder.of(ListCoder.of(StringUtf8Coder.of())); | ||
assertFalse(coder.isRegisterByteSizeObserverCheap(Optional.of(ImmutableList.of("hi", "test")))); | ||
} | ||
|
||
@Test | ||
public void testObserverIsAlwaysCheapForEmptyValues() throws Exception { | ||
OptionalCoder<List<String>> coder = OptionalCoder.of(ListCoder.of(StringUtf8Coder.of())); | ||
assertTrue(coder.isRegisterByteSizeObserverCheap(Optional.empty())); | ||
} | ||
|
||
@Test | ||
public void testStructuralValueConsistentWithEquals() throws Exception { | ||
CoderProperties.structuralValueConsistentWithEquals( | ||
TEST_CODER, Optional.empty(), Optional.empty()); | ||
} | ||
|
||
@Rule public ExpectedException thrown = ExpectedException.none(); | ||
|
||
@Test | ||
public void testDecodingError() throws Exception { | ||
thrown.expect(CoderException.class); | ||
thrown.expectMessage( | ||
equalTo("OptionalCoder expects either a byte valued 0 (empty) " + "or 1 (present), got 5")); | ||
|
||
InputStream input = new ByteArrayInputStream(new byte[] {5}); | ||
TEST_CODER.decode(input); | ||
} | ||
|
||
@Test | ||
public void testEncodedTypeDescriptor() throws Exception { | ||
TypeDescriptor<Optional<String>> expectedTypeDescriptor = | ||
new TypeDescriptor<Optional<String>>() {}; | ||
assertThat(TEST_CODER.getEncodedTypeDescriptor(), equalTo(expectedTypeDescriptor)); | ||
} | ||
} |