Skip to content

Commit

Permalink
Initial commit to support nested field pruning
Browse files Browse the repository at this point in the history
  • Loading branch information
Yaliang committed Aug 9, 2018
1 parent 947c3f3 commit 5a47548
Show file tree
Hide file tree
Showing 28 changed files with 927 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ public class HiveClientConfig
private boolean useParquetColumnNames;
private boolean parquetOptimizedReaderEnabled;
private boolean parquetPredicatePushdownEnabled;
private boolean parquetNestedFieldsProjectionPushdownEnabled;

private boolean assumeCanonicalPartitionKeys;

Expand Down Expand Up @@ -675,6 +676,18 @@ public HiveClientConfig setParquetPredicatePushdownEnabled(boolean parquetPredic
return this;
}

public boolean isParquetNestedFieldsProjectionPushdownEnabled()
{
return parquetNestedFieldsProjectionPushdownEnabled;
}

@Config("hive.parquet-nested-fields-projection-pushdown.enabled")
public HiveClientConfig setParquetNestedFieldsProjectionPushdownEnabled(boolean parquetNestedFieldsProjectionPushdownEnabled)
{
this.parquetNestedFieldsProjectionPushdownEnabled = parquetNestedFieldsProjectionPushdownEnabled;
return this;
}

@Deprecated
public boolean isParquetOptimizedReaderEnabled()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ColumnMetadata;
import com.facebook.presto.spi.predicate.FieldSet;
import com.facebook.presto.spi.type.TypeManager;
import com.facebook.presto.spi.type.TypeSignature;
import com.fasterxml.jackson.annotation.JsonCreator;
Expand Down Expand Up @@ -56,22 +57,30 @@ public enum ColumnType
}

private final String name;
private final Optional<FieldSet> fieldSet;
private final HiveType hiveType;
private final TypeSignature typeName;
private final int hiveColumnIndex;
private final ColumnType columnType;
private final Optional<String> comment;

public HiveColumnHandle(String name, HiveType hiveType, TypeSignature typeSignature, int hiveColumnIndex, ColumnType columnType, Optional<String> comment)
{
this(name, Optional.empty(), hiveType, typeSignature, hiveColumnIndex, columnType, comment);
}

@JsonCreator
public HiveColumnHandle(
@JsonProperty("name") String name,
@JsonProperty("fieldSet") Optional<FieldSet> fieldSet,
@JsonProperty("hiveType") HiveType hiveType,
@JsonProperty("typeSignature") TypeSignature typeSignature,
@JsonProperty("hiveColumnIndex") int hiveColumnIndex,
@JsonProperty("columnType") ColumnType columnType,
@JsonProperty("comment") Optional<String> comment)
{
this.name = requireNonNull(name, "name is null");
this.fieldSet = requireNonNull(fieldSet, "fieldSet is null");
checkArgument(hiveColumnIndex >= 0 || columnType == PARTITION_KEY || columnType == SYNTHESIZED, "hiveColumnIndex is negative");
this.hiveColumnIndex = hiveColumnIndex;
this.hiveType = requireNonNull(hiveType, "hiveType is null");
Expand All @@ -86,6 +95,12 @@ public String getName()
return name;
}

@JsonProperty
public Optional<FieldSet> getFieldSet()
{
return fieldSet;
}

@JsonProperty
public HiveType getHiveType()
{
Expand Down Expand Up @@ -134,7 +149,7 @@ public ColumnType getColumnType()
@Override
public int hashCode()
{
return Objects.hash(name, hiveColumnIndex, hiveType, columnType, comment);
return Objects.hash(name, fieldSet, hiveColumnIndex, hiveType, columnType, comment);
}

@Override
Expand All @@ -148,6 +163,7 @@ public boolean equals(Object obj)
}
HiveColumnHandle other = (HiveColumnHandle) obj;
return Objects.equals(this.name, other.name) &&
Objects.equals(this.fieldSet, other.fieldSet) &&
Objects.equals(this.hiveColumnIndex, other.hiveColumnIndex) &&
Objects.equals(this.hiveType, other.hiveType) &&
Objects.equals(this.columnType, other.columnType) &&
Expand All @@ -159,6 +175,7 @@ public String toString()
{
return toStringHelper(this)
.add("name", name)
.add("fieldSet", fieldSet.orElse(null))
.add("hiveType", hiveType)
.add("hiveColumnIndex", hiveColumnIndex)
.add("columnType", columnType)
Expand Down Expand Up @@ -202,4 +219,9 @@ public static boolean isBucketColumnHandle(HiveColumnHandle column)
{
return column.getHiveColumnIndex() == BUCKET_COLUMN_INDEX;
}

public static HiveColumnHandle withFieldSet(HiveColumnHandle column, Optional<FieldSet> fieldSet)
{
return new HiveColumnHandle(column.name, fieldSet, column.getHiveType(), column.getTypeSignature(), column.getHiveColumnIndex(), column.getColumnType(), column.getComment());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@
import static com.facebook.presto.hive.HiveColumnHandle.ColumnType.SYNTHESIZED;
import static com.facebook.presto.hive.HiveColumnHandle.PATH_COLUMN_NAME;
import static com.facebook.presto.hive.HiveColumnHandle.updateRowIdHandle;
import static com.facebook.presto.hive.HiveColumnHandle.withFieldSet;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_COLUMN_ORDER_MISMATCH;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_CONCURRENT_MODIFICATION_DETECTED;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_EXCEEDED_PARTITION_LIMIT;
Expand Down Expand Up @@ -1302,22 +1303,48 @@ public boolean supportsMetadataDelete(ConnectorSession session, ConnectorTableHa
public List<ConnectorTableLayoutResult> getTableLayouts(ConnectorSession session, ConnectorTableHandle tableHandle, Constraint<ColumnHandle> constraint, Optional<Set<ColumnHandle>> desiredColumns)
{
HiveTableHandle handle = (HiveTableHandle) tableHandle;
System.err.println("+++++++HiveMetadata::getTableLayouts+++++++");
System.err.println(constraint.getFieldSets());
HivePartitionResult hivePartitionResult = partitionManager.getPartitions(metastore, tableHandle, constraint);

HiveTableLayoutHandle layoutHandle = new HiveTableLayoutHandle(
handle.getSchemaTableName(),
ImmutableList.copyOf(hivePartitionResult.getPartitionColumns()),
getPartitionsAsList(hivePartitionResult),
hivePartitionResult.getCompactEffectivePredicate(),
hivePartitionResult.getEnforcedConstraint(),
hivePartitionResult.getBucketHandle(),
hivePartitionResult.getBucketFilter());

if (constraint.getFieldSets().isPresent()) {
return ImmutableList.of(new ConnectorTableLayoutResult(
pruneColumnFields(layoutHandle, constraint),
constraint.getSummary()));
}

return ImmutableList.of(new ConnectorTableLayoutResult(
getTableLayout(
session,
new HiveTableLayoutHandle(
handle.getSchemaTableName(),
ImmutableList.copyOf(hivePartitionResult.getPartitionColumns()),
getPartitionsAsList(hivePartitionResult),
hivePartitionResult.getCompactEffectivePredicate(),
hivePartitionResult.getEnforcedConstraint(),
hivePartitionResult.getBucketHandle(),
hivePartitionResult.getBucketFilter())),
getTableLayout(session, layoutHandle),
hivePartitionResult.getUnenforcedConstraint()));
}

private ConnectorTableLayout pruneColumnFields(HiveTableLayoutHandle layoutHandle, Constraint<ColumnHandle> constraint)
{
Optional<List<ColumnHandle>> columns = constraint.getFieldSets()
.map(fieldsSets -> fieldsSets.stream()
.filter(entry -> !((HiveColumnHandle) entry.getKey()).getFieldSet().isPresent())
.map(entry -> withFieldSet((HiveColumnHandle) entry.getKey(), Optional.of(entry.getValue())))
.collect(toImmutableList()));

return new ConnectorTableLayout(
layoutHandle,
columns,
TupleDomain.all(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
emptyList());
}

@Override
public ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorTableLayoutHandle layoutHandle)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,7 @@ public static List<HiveColumnHandle> toColumnHandles(List<ColumnMapping> regular
}
return new HiveColumnHandle(
columnHandle.getName(),
columnHandle.getFieldSet(),
columnMapping.getCoercionFrom().get(),
columnMapping.getCoercionFrom().get().getTypeSignature(),
columnHandle.getHiveColumnIndex(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public final class HiveSessionProperties
private static final String RESPECT_TABLE_FORMAT = "respect_table_format";
private static final String PARQUET_PREDICATE_PUSHDOWN_ENABLED = "parquet_predicate_pushdown_enabled";
private static final String PARQUET_OPTIMIZED_READER_ENABLED = "parquet_optimized_reader_enabled";
private static final String PARQUET_NESTED_FIELDS_PROJECTION_PUSHDOWN_READER_ENABLED = "parquet_nested_fields_projection_pushdown_enabled";
private static final String MAX_SPLIT_SIZE = "max_split_size";
private static final String MAX_INITIAL_SPLIT_SIZE = "max_initial_split_size";
public static final String RCFILE_OPTIMIZED_WRITER_ENABLED = "rcfile_optimized_writer_enabled";
Expand Down Expand Up @@ -153,6 +154,11 @@ public HiveSessionProperties(HiveClientConfig hiveClientConfig, OrcFileWriterCon
"Experimental: Parquet: Enable predicate pushdown for Parquet",
hiveClientConfig.isParquetPredicatePushdownEnabled(),
false),
booleanSessionProperty(
PARQUET_NESTED_FIELDS_PROJECTION_PUSHDOWN_READER_ENABLED,
"Experimental: Parquet: Enable nested fields projection pushdown for Parquet",
hiveClientConfig.isParquetNestedFieldsProjectionPushdownEnabled(),
false),
dataSizeSessionProperty(
MAX_SPLIT_SIZE,
"Max split size",
Expand Down Expand Up @@ -299,6 +305,11 @@ public static boolean isParquetPredicatePushdownEnabled(ConnectorSession session
return session.getProperty(PARQUET_PREDICATE_PUSHDOWN_ENABLED, Boolean.class);
}

public static boolean isParquetNestedFieldsProjectionPushdownEnabled(ConnectorSession session)
{
return session.getProperty(PARQUET_NESTED_FIELDS_PROJECTION_PUSHDOWN_READER_ENABLED, Boolean.class);
}

public static DataSize getMaxSplitSize(ConnectorSession session)
{
return session.getProperty(MAX_SPLIT_SIZE, DataSize.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ private static List<HiveColumnHandle> getPhysicalHiveColumnHandles(List<HiveColu
physicalOrdinal = nextMissingColumnIndex;
nextMissingColumnIndex++;
}
physicalColumns.add(new HiveColumnHandle(column.getName(), column.getHiveType(), column.getTypeSignature(), physicalOrdinal, column.getColumnType(), column.getComment()));
physicalColumns.add(new HiveColumnHandle(column.getName(), column.getFieldSet(), column.getHiveType(), column.getTypeSignature(), physicalOrdinal, column.getColumnType(), column.getComment()));
}
return physicalColumns.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,14 @@
import static com.facebook.presto.hive.HiveColumnHandle.ColumnType.REGULAR;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_CANNOT_OPEN_SPLIT;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_MISSING_DATA;
import static com.facebook.presto.hive.HiveSessionProperties.isParquetNestedFieldsProjectionPushdownEnabled;
import static com.facebook.presto.hive.HiveSessionProperties.isParquetOptimizedReaderEnabled;
import static com.facebook.presto.hive.HiveSessionProperties.isParquetPredicatePushdownEnabled;
import static com.facebook.presto.hive.HiveUtil.getDeserializerClassName;
import static com.facebook.presto.hive.parquet.HdfsParquetDataSource.buildHdfsParquetDataSource;
import static com.facebook.presto.hive.parquet.ParquetTypeUtils.getColumnIO;
import static com.facebook.presto.hive.parquet.ParquetTypeUtils.getDescriptors;
import static com.facebook.presto.hive.parquet.ParquetTypeUtils.getParquetType;
import static com.facebook.presto.hive.parquet.ParquetTypeUtils.getPrunedParquetType;
import static com.facebook.presto.hive.parquet.predicate.ParquetPredicateUtils.buildParquetPredicate;
import static com.facebook.presto.hive.parquet.predicate.ParquetPredicateUtils.getParquetTupleDomain;
import static com.facebook.presto.hive.parquet.predicate.ParquetPredicateUtils.predicateMatches;
Expand Down Expand Up @@ -181,7 +182,7 @@ public static ParquetPageSource createParquetPageSource(

List<parquet.schema.Type> fields = columns.stream()
.filter(column -> column.getColumnType() == REGULAR)
.map(column -> getParquetType(column, fileSchema, useParquetColumnNames))
.map(column -> getPrunedParquetType(column, fileSchema, useParquetColumnNames, isParquetNestedFieldsProjectionPushdownEnabled(session)))
.filter(Objects::nonNull)
.collect(toList());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import com.facebook.presto.spi.type.TimestampType;
import com.facebook.presto.spi.type.Type;
import com.facebook.presto.spi.type.VarcharType;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import parquet.column.Encoding;
import parquet.io.ColumnIO;
import parquet.io.ColumnIOFactory;
Expand All @@ -37,12 +39,15 @@

import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static java.util.Optional.empty;
import static parquet.schema.OriginalType.DECIMAL;
import static parquet.schema.Type.Repetition.REPEATED;
Expand Down Expand Up @@ -194,6 +199,53 @@ public static int getFieldIndex(MessageType fileSchema, String name)
}
}

public static parquet.schema.Type getPrunedParquetType(HiveColumnHandle column, MessageType messageType, boolean useParquetColumnNames, boolean pruneNestedFields)
{
parquet.schema.Type originalType = getParquetType(column, messageType, useParquetColumnNames);
if (pruneNestedFields && column.getFieldSet().isPresent()) {
return pruneParquetType(originalType, column.getFieldSet().get().getFields());
}

return originalType;
}

private static parquet.schema.Type pruneParquetType(parquet.schema.Type type, Set<String> requiredFields)
{
if (requiredFields.isEmpty()) {
return type;
}

if (type.isPrimitive()) {
return type;
}

Map<String, Set<String>> fields = groupFields(requiredFields);

List<parquet.schema.Type> newFields = fields.entrySet().stream()
.map(entry -> pruneParquetType(type.asGroupType().getType(entry.getKey()), entry.getValue()))
.collect(toImmutableList());

return type.asGroupType().withNewFields(newFields);
}

private static Map<String, Set<String>> groupFields(Set<String> requiredFields)
{
Map<String, Set<String>> fields = new HashMap<>();
for (String field : requiredFields) {
String[] path = field.split("\\.", 2);
String fieldName = path[0];
Set<String> nestedField = path.length == 1 ? ImmutableSet.of() : ImmutableSet.of(path[1]);
if (fields.containsKey(fieldName)) {
fields.get(fieldName).addAll(nestedField);
}
else {
fields.put(fieldName, new HashSet<>(nestedField));
}
}

return ImmutableMap.copyOf(fields);
}

public static parquet.schema.Type getParquetType(HiveColumnHandle column, MessageType messageType, boolean useParquetColumnNames)
{
if (useParquetColumnNames) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@
import com.facebook.presto.sql.planner.LocalExecutionPlanner;
import com.facebook.presto.sql.planner.NodePartitioningManager;
import com.facebook.presto.sql.planner.PlanOptimizers;
import com.facebook.presto.sql.planner.Symbol;
import com.facebook.presto.sql.tree.Expression;
import com.facebook.presto.sql.tree.FunctionCall;
import com.facebook.presto.transaction.ForTransactionManager;
Expand Down Expand Up @@ -434,6 +435,8 @@ protected void setup(Binder binder)
jsonBinder(binder).addSerializerBinding(Expression.class).to(ExpressionSerializer.class);
jsonBinder(binder).addDeserializerBinding(Expression.class).to(ExpressionDeserializer.class);
jsonBinder(binder).addDeserializerBinding(FunctionCall.class).to(FunctionCallDeserializer.class);
jsonBinder(binder).addKeySerializerBinding(Symbol.class).to(Symbol.SymbolKeySerializer.class);
jsonBinder(binder).addKeyDeserializerBinding(Symbol.class).to(Symbol.SymbolKeyDeserializer.class);

// query monitor
configBinder(binder).bindConfig(QueryMonitorConfig.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@
import java.util.Map;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static java.util.Objects.requireNonNull;

public class LookupSymbolResolver
implements SymbolResolver
{
private final Map<String, Symbol> assignmentSymbolLookup;
private final Map<Symbol, ColumnHandle> assignments;
private final Map<ColumnHandle, NullableValue> bindings;

Expand All @@ -33,18 +35,22 @@ public LookupSymbolResolver(Map<Symbol, ColumnHandle> assignments, Map<ColumnHan
requireNonNull(assignments, "assignments is null");
requireNonNull(bindings, "bindings is null");

this.assignmentSymbolLookup = assignments.keySet().stream()
.collect(toImmutableMap(Symbol::getName, symbol -> symbol));
this.assignments = ImmutableMap.copyOf(assignments);
this.bindings = ImmutableMap.copyOf(bindings);
}

@Override
public Object getValue(Symbol symbol)
{
ColumnHandle column = assignments.get(symbol);
Symbol assignmentSymbol = assignmentSymbolLookup.get(symbol.getName());
checkArgument(assignmentSymbol != null, "Missing column assignment for %s", symbol.getName());
ColumnHandle column = assignments.get(assignmentSymbol);
checkArgument(column != null, "Missing column assignment for %s", symbol);

if (!bindings.containsKey(column)) {
return symbol.toSymbolReference();
return assignmentSymbol.toSymbolReference();
}

return bindings.get(column).getValue();
Expand Down
Loading

0 comments on commit 5a47548

Please sign in to comment.