Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix RowCoderGenerator to use the encodingPositions when encoding and decoding the bit set representing null fields. #32389

Merged
merged 1 commit into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.beam.sdk.transforms.SerializableFunctions;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.checkerframework.checker.nullness.qual.Nullable;

/** A sub-class of SchemaCoder that can only encode {@link Row} instances. */
Expand All @@ -35,7 +36,12 @@ public static RowCoder of(Schema schema) {

/** Override encoding positions for the given schema. */
public static void overrideEncodingPositions(UUID uuid, Map<String, Integer> encodingPositions) {
SchemaCoder.overrideEncodingPositions(uuid, encodingPositions);
RowCoderGenerator.overrideEncodingPositions(uuid, encodingPositions);
}

@VisibleForTesting
static void clearGeneratedRowCoders() {
RowCoderGenerator.clearRowCoderCache();
}

private RowCoder(Schema schema) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.Map;
import java.util.UUID;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import net.bytebuddy.ByteBuddy;
import net.bytebuddy.description.modifier.FieldManifestation;
import net.bytebuddy.description.modifier.Ownership;
Expand All @@ -53,10 +54,14 @@
import org.apache.beam.sdk.schemas.Schema.Field;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.SchemaCoder;
import org.apache.beam.sdk.util.StringUtils;
import org.apache.beam.sdk.util.common.ReflectHelpers;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A utility for automatically generating a {@link Coder} for {@link Row} objects corresponding to a
Expand Down Expand Up @@ -109,30 +114,113 @@ public abstract class RowCoderGenerator {
private static final String CODERS_FIELD_NAME = "FIELD_CODERS";
private static final String POSITIONS_FIELD_NAME = "FIELD_ENCODING_POSITIONS";

static class WithStackTrace<T> {
private final T value;
private final String stackTrace;

public WithStackTrace(T value, String stackTrace) {
this.value = value;
this.stackTrace = stackTrace;
}

public T getValue() {
return value;
}

public String getStackTrace() {
return stackTrace;
}
}

// Cache for Coder class that are already generated.
private static final Map<UUID, Coder<Row>> GENERATED_CODERS = Maps.newConcurrentMap();
private static final Map<UUID, Map<String, Integer>> ENCODING_POSITION_OVERRIDES =
Maps.newConcurrentMap();
@GuardedBy("cacheLock")
private static final Map<UUID, WithStackTrace<Coder<Row>>> GENERATED_CODERS = Maps.newHashMap();

@GuardedBy("cacheLock")
private static final Map<UUID, WithStackTrace<Map<String, Integer>>> ENCODING_POSITION_OVERRIDES =
Maps.newHashMap();

private static final Object cacheLock = new Object();

private static final Logger LOG = LoggerFactory.getLogger(RowCoderGenerator.class);

private static String getStackTrace() {
return StringUtils.arrayToNewlines(Thread.currentThread().getStackTrace(), 10);
}

public static void overrideEncodingPositions(UUID uuid, Map<String, Integer> encodingPositions) {
ENCODING_POSITION_OVERRIDES.put(uuid, encodingPositions);
final String stackTrace = getStackTrace();
synchronized (cacheLock) {
@Nullable
WithStackTrace<Map<String, Integer>> previousEncodingPositions =
ENCODING_POSITION_OVERRIDES.put(
uuid, new WithStackTrace<>(encodingPositions, stackTrace));
@Nullable WithStackTrace<Coder<Row>> existingCoder = GENERATED_CODERS.get(uuid);
if (previousEncodingPositions == null) {
if (existingCoder != null) {
LOG.error(
"Received encoding positions for uuid {} too late after creating RowCoder. Created: {}\n Override: {}",
Abacn marked this conversation as resolved.
Show resolved Hide resolved
uuid,
existingCoder.getStackTrace(),
stackTrace);
} else {
LOG.info("Received encoding positions {} for uuid {}.", encodingPositions, uuid);
}
} else if (!previousEncodingPositions.getValue().equals(encodingPositions)) {
if (existingCoder == null) {
LOG.error(
"Received differing encoding positions for uuid {} before coder creation. Was {} at {}\n Now {} at {}",
uuid,
previousEncodingPositions.getValue(),
encodingPositions,
previousEncodingPositions.getStackTrace(),
stackTrace);
} else {
LOG.error(
"Received differing encoding positions for uuid {} after coder creation at {}\n. "
+ "Was {} at {}\n Now {} at {}\n",
uuid,
existingCoder.getStackTrace(),
previousEncodingPositions.getValue(),
encodingPositions,
previousEncodingPositions.getStackTrace(),
stackTrace);
}
}
}
}

@VisibleForTesting
static void clearRowCoderCache() {
synchronized (cacheLock) {
GENERATED_CODERS.clear();
}
}

@SuppressWarnings("unchecked")
public static Coder<Row> generate(Schema schema) {
// Using ConcurrentHashMap::computeIfAbsent here would deadlock in case of nested
// coders. Using HashMap::computeIfAbsent generates ConcurrentModificationExceptions in Java 11.
Coder<Row> rowCoder = GENERATED_CODERS.get(schema.getUUID());
if (rowCoder == null) {
String stackTrace = getStackTrace();
UUID uuid = Preconditions.checkNotNull(schema.getUUID());
// Avoid using computeIfAbsent which may cause issues with nested schemas.
synchronized (cacheLock) {
@Nullable WithStackTrace<Coder<Row>> existingRowCoder = GENERATED_CODERS.get(uuid);
if (existingRowCoder != null) {
return existingRowCoder.getValue();
}
TypeDescription.Generic coderType =
TypeDescription.Generic.Builder.parameterizedType(Coder.class, Row.class).build();
DynamicType.Builder<Coder> builder =
(DynamicType.Builder<Coder>) BYTE_BUDDY.subclass(coderType);
builder = implementMethods(schema, builder);

int[] encodingPosToRowIndex = new int[schema.getFieldCount()];
@Nullable
WithStackTrace<Map<String, Integer>> existingEncodingPositions =
ENCODING_POSITION_OVERRIDES.get(uuid);
Map<String, Integer> encodingPositions =
ENCODING_POSITION_OVERRIDES.getOrDefault(schema.getUUID(), schema.getEncodingPositions());
existingEncodingPositions == null
? schema.getEncodingPositions()
: existingEncodingPositions.getValue();
for (int recordIndex = 0; recordIndex < schema.getFieldCount(); ++recordIndex) {
String name = schema.getField(recordIndex).getName();
int encodingPosition = encodingPositions.get(name);
Expand Down Expand Up @@ -163,6 +251,7 @@ public static Coder<Row> generate(Schema schema) {
.withParameters(Coder[].class, int[].class)
.intercept(new GeneratedCoderConstructor());

Coder<Row> rowCoder;
try {
rowCoder =
builder
Expand All @@ -179,9 +268,14 @@ public static Coder<Row> generate(Schema schema) {
| InvocationTargetException e) {
throw new RuntimeException("Unable to generate coder for schema " + schema, e);
}
GENERATED_CODERS.put(schema.getUUID(), rowCoder);
GENERATED_CODERS.put(uuid, new WithStackTrace<>(rowCoder, stackTrace));
LOG.debug(
"Created row coder for uuid {} with encoding positions {} at {}",
uuid,
encodingPositions,
stackTrace);
return rowCoder;
}
return rowCoder;
}

private static class GeneratedCoderConstructor implements Implementation {
Expand Down Expand Up @@ -326,7 +420,7 @@ static void encodeDelegate(
}

// Encode a bitmap for the null fields to save having to encode a bunch of nulls.
NULL_LIST_CODER.encode(scanNullFields(fieldValues), outputStream);
NULL_LIST_CODER.encode(scanNullFields(fieldValues, encodingPosToIndex), outputStream);
for (int encodingPos = 0; encodingPos < fieldValues.length; ++encodingPos) {
@Nullable Object fieldValue = fieldValues[encodingPosToIndex[encodingPos]];
if (fieldValue != null) {
Expand All @@ -348,14 +442,15 @@ static void encodeDelegate(

// Figure out which fields of the Row are null, and returns a BitSet. This allows us to save
// on encoding each null field separately.
private static BitSet scanNullFields(Object[] fieldValues) {
private static BitSet scanNullFields(Object[] fieldValues, int[] encodingPosToIndex) {
Preconditions.checkState(fieldValues.length == encodingPosToIndex.length);
BitSet nullFields = new BitSet(fieldValues.length);
for (int idx = 0; idx < fieldValues.length; ++idx) {
if (fieldValues[idx] == null) {
nullFields.set(idx);
for (int encodingPos = 0; encodingPos < encodingPosToIndex.length; ++encodingPos) {
int fieldIndex = encodingPosToIndex[encodingPos];
if (fieldValues[fieldIndex] == null) {
nullFields.set(encodingPos);
}
}

return nullFields;
}
}
Expand Down Expand Up @@ -425,7 +520,7 @@ static Row decodeDelegate(
// in which case we drop the extra fields.
if (encodingPos < coders.length) {
int rowIndex = encodingPosToIndex[encodingPos];
if (nullFields.get(rowIndex)) {
if (nullFields.get(encodingPos)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was this a bug? rowIndex and encodingPos looks different

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this along with other nullfields fix above is the purpose of this PR to fix #32388 .
The stack trace and synchronization changes were added as the initial belief was that encoded corruption was due to late overrides arriving. Since that could still be an issue, I think we should keep those changes but I can separate them to a separate PR if you'd prefer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that unless there are encoding overrides rowIndex and encodingPos are equal. But the improved unit tests catch the issue, previous tests with encoding overrides didn't have null fields and thus missed it.

fieldValues[rowIndex] = null;
} else {
Object fieldValue = coders[encodingPos].decode(inputStream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,10 @@ public String toString() {
}

// Sets the schema id, and then recursively ensures that all schemas have ids set.
private static void setSchemaIds(Schema schema) {
private static void setSchemaIds(@Nullable Schema schema) {
if (schema == null) {
return;
}
if (schema.getUUID() == null) {
schema.setUUID(UUID.randomUUID());
}
Expand All @@ -187,7 +190,7 @@ private static void setSchemaIds(FieldType fieldType) {
return;

case ARRAY:
case ITERABLE:;
case ITERABLE:
setSchemaIds(fieldType.getCollectionElementType());
return;

Expand Down
Loading
Loading