Skip to content

Commit

Permalink
switch to byte buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
ahmedabu98 committed Oct 16, 2024
1 parent 1e27978 commit dbfdd66
Showing 1 changed file with 8 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import com.google.auto.value.AutoValue;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.schemas.AutoValueSchema;
Expand Down Expand Up @@ -74,9 +73,9 @@ public static Builder builder() {

abstract @Nullable Map<Integer, Long> getNanValueCounts();

abstract @Nullable Map<Integer, byte[]> getLowerBounds();
abstract @Nullable Map<Integer, ByteBuffer> getLowerBounds();

abstract @Nullable Map<Integer, byte[]> getUpperBounds();
abstract @Nullable Map<Integer, ByteBuffer> getUpperBounds();

@AutoValue.Builder
abstract static class Builder {
Expand Down Expand Up @@ -104,9 +103,9 @@ abstract static class Builder {

abstract Builder setNanValueCounts(Map<Integer, Long> nanValueCounts);

abstract Builder setLowerBounds(@Nullable Map<Integer, byte[]> lowerBounds);
abstract Builder setLowerBounds(@Nullable Map<Integer, ByteBuffer> lowerBounds);

abstract Builder setUpperBounds(@Nullable Map<Integer, byte[]> upperBounds);
abstract Builder setUpperBounds(@Nullable Map<Integer, ByteBuffer> upperBounds);

abstract SerializableDataFile build();
}
Expand All @@ -129,8 +128,8 @@ static SerializableDataFile from(DataFile f, PartitionKey key) {
.setValueCounts(f.valueCounts())
.setNullValueCounts(f.nullValueCounts())
.setNanValueCounts(f.nanValueCounts())
.setLowerBounds(toByteArrayMap(f.lowerBounds()))
.setUpperBounds(toByteArrayMap(f.upperBounds()))
.setLowerBounds(f.lowerBounds())
.setUpperBounds(f.upperBounds())
.build();
}

Expand All @@ -155,8 +154,8 @@ DataFile createDataFile(PartitionSpec partitionSpec) {
getValueCounts(),
getNullValueCounts(),
getNanValueCounts(),
toByteBufferMap(getLowerBounds()),
toByteBufferMap(getUpperBounds()));
getLowerBounds(),
getUpperBounds());

return DataFiles.builder(partitionSpec)
.withFormat(FileFormat.fromString(getFileFormat()))
Expand All @@ -168,31 +167,4 @@ DataFile createDataFile(PartitionSpec partitionSpec) {
.withSplitOffsets(getSplitOffsets())
.build();
}

// ByteBuddyUtils has trouble converting Map value type ByteBuffer
// to byte[] and back to ByteBuffer, so we perform these conversions manually
// TODO(https://github.com/apache/beam/issues/32701)
private static @Nullable Map<Integer, byte[]> toByteArrayMap(
@Nullable Map<Integer, ByteBuffer> input) {
if (input == null) {
return null;
}
Map<Integer, byte[]> output = new HashMap<>(input.size());
for (Map.Entry<Integer, ByteBuffer> e : input.entrySet()) {
output.put(e.getKey(), e.getValue().array());
}
return output;
}

private static @Nullable Map<Integer, ByteBuffer> toByteBufferMap(
@Nullable Map<Integer, byte[]> input) {
if (input == null) {
return null;
}
Map<Integer, ByteBuffer> output = new HashMap<>(input.size());
for (Map.Entry<Integer, byte[]> e : input.entrySet()) {
output.put(e.getKey(), ByteBuffer.wrap(e.getValue()));
}
return output;
}
}

0 comments on commit dbfdd66

Please sign in to comment.