Skip to content

Commit

Permalink
Add Parquet realted stats recording
Browse files Browse the repository at this point in the history
  • Loading branch information
Yaliang committed Jul 18, 2018
1 parent 1de9961 commit 947c3f3
Show file tree
Hide file tree
Showing 9 changed files with 197 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.facebook.presto.hive.metastore.SemiTransactionalHiveMetastore;
import com.facebook.presto.hive.orc.DwrfPageSourceFactory;
import com.facebook.presto.hive.orc.OrcPageSourceFactory;
import com.facebook.presto.hive.parquet.ParquetMetadataStats;
import com.facebook.presto.hive.parquet.ParquetPageSourceFactory;
import com.facebook.presto.hive.parquet.ParquetRecordCursorProvider;
import com.facebook.presto.hive.rcfile.RcFilePageSourceFactory;
Expand Down Expand Up @@ -102,6 +103,9 @@ public void configure(Binder binder)
binder.bind(FileFormatDataSourceStats.class).in(Scopes.SINGLETON);
newExporter(binder).export(FileFormatDataSourceStats.class).as(generatedNameOf(FileFormatDataSourceStats.class, connectorId));

binder.bind(ParquetMetadataStats.class).in(Scopes.SINGLETON);
newExporter(binder).export(ParquetMetadataStats.class).as(generatedNameOf(ParquetMetadataStats.class, connectorId));

Multibinder<HivePageSourceFactory> pageSourceFactoryBinder = newSetBinder(binder, HivePageSourceFactory.class);
pageSourceFactoryBinder.addBinding().to(OrcPageSourceFactory.class).in(Scopes.SINGLETON);
pageSourceFactoryBinder.addBinding().to(DwrfPageSourceFactory.class).in(Scopes.SINGLETON);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ public class ParquetHiveRecordCursor
private boolean closed;

private final FileFormatDataSourceStats stats;
private final ParquetMetadataStats metadataStats;

public ParquetHiveRecordCursor(
HdfsEnvironment hdfsEnvironment,
Expand All @@ -134,13 +135,15 @@ public ParquetHiveRecordCursor(
TypeManager typeManager,
boolean predicatePushdownEnabled,
TupleDomain<HiveColumnHandle> effectivePredicate,
FileFormatDataSourceStats stats)
FileFormatDataSourceStats stats,
ParquetMetadataStats metadataStats)
{
requireNonNull(path, "path is null");
checkArgument(length >= 0, "length is negative");
requireNonNull(splitSchema, "splitSchema is null");
requireNonNull(columns, "columns is null");
this.stats = requireNonNull(stats, "stats is null");
this.metadataStats = requireNonNull(metadataStats);

this.totalBytes = length;

Expand Down Expand Up @@ -353,7 +356,7 @@ private ParquetRecordReader<FakeParquetRecord> createParquetRecordReader(
Map<List<String>, RichColumnDescriptor> descriptorsByPath = getDescriptors(fileSchema, requestedSchema);
TupleDomain<ColumnDescriptor> parquetTupleDomain = getParquetTupleDomain(descriptorsByPath, effectivePredicate);
ParquetPredicate parquetPredicate = buildParquetPredicate(requestedSchema, parquetTupleDomain, descriptorsByPath);
if (predicateMatches(parquetPredicate, block, dataSource, descriptorsByPath, parquetTupleDomain)) {
if (predicateMatches(parquetPredicate, block, dataSource, descriptorsByPath, parquetTupleDomain, metadataStats)) {
offsets.add(block.getStartingPos());
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.hive.parquet;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.airlift.stats.DistributionStat;
import org.weakref.jmx.Managed;
import org.weakref.jmx.Nested;

import javax.inject.Inject;

import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;

public class ParquetMetadataStats
{
private final DistributionStat metadataLength;
private final DistributionStat readSize;
private final Map<String, DistributionStat> dataReadSize;
private final Map<String, DistributionStat> dictionaryReadSize;
private final Map<String, DistributionStat> pageSize;

private static ObjectMapper objectMapper = new ObjectMapper();

@Inject
public ParquetMetadataStats()
{
this.metadataLength = new DistributionStat();
this.readSize = new DistributionStat();
this.dataReadSize = new HashMap<>();
this.dictionaryReadSize = new HashMap<>();
this.pageSize = new HashMap<>();
}

@Managed
@Nested
public DistributionStat getMetadataLength()
{
return metadataLength;
}

@Managed
@Nested
public DistributionStat getReadSize()
{
return readSize;
}

@Managed
public String getDataReadSize()
{
return getSnapshot(dataReadSize);
}

@Managed
public String getDictionaryReadSize()
{
return getSnapshot(dictionaryReadSize);
}

@Managed
public String getPageSize()
{
return getSnapshot(pageSize);
}

private static String getSnapshot(Map<String, DistributionStat> stats)
{
Map<String, DistributionStat.DistributionStatSnapshot> result = new LinkedHashMap<>(stats.size());
for (Map.Entry<String, DistributionStat> entry : stats.entrySet()) {
result.put(entry.getKey(), entry.getValue().snapshot());
}
return toJson(result);
}

private static String toJson(Map<String, DistributionStat.DistributionStatSnapshot> snapshot)
{
try {
return objectMapper.writeValueAsString(snapshot);
}
catch (Exception ignore) {
return snapshot.toString();
}
}

public void addDataReadSize(String name, long size)
{
synchronized (this) {
addSize(dataReadSize, name, size);
readSize.add(size);
}
}

public void addDictionaryReadSize(String name, long size)
{
synchronized (this) {
addSize(dictionaryReadSize, name, size);
readSize.add(size);
}
}

public void addPageSize(String name, long size)
{
synchronized (this) {
addSize(pageSize, name, size);
}
}

private static void addSize(Map<String, DistributionStat> stats, String name, long size)
{
if (!stats.containsKey(name)) {
stats.put(name, new DistributionStat());
}

stats.get(name).add(size);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,19 +83,31 @@ public class ParquetPageSourceFactory
private final boolean useParquetColumnNames;
private final HdfsEnvironment hdfsEnvironment;
private final FileFormatDataSourceStats stats;
private final ParquetMetadataStats metadataStats;

@Inject
public ParquetPageSourceFactory(TypeManager typeManager, HiveClientConfig config, HdfsEnvironment hdfsEnvironment, FileFormatDataSourceStats stats, ParquetMetadataStats metadataStats)
{
this(typeManager, requireNonNull(config, "hiveClientConfig is null").isUseParquetColumnNames(), hdfsEnvironment, stats, metadataStats);
}

public ParquetPageSourceFactory(TypeManager typeManager, HiveClientConfig config, HdfsEnvironment hdfsEnvironment, FileFormatDataSourceStats stats)
{
this(typeManager, requireNonNull(config, "hiveClientConfig is null").isUseParquetColumnNames(), hdfsEnvironment, stats);
this(typeManager, config, hdfsEnvironment, stats, new ParquetMetadataStats());
}

public ParquetPageSourceFactory(TypeManager typeManager, boolean useParquetColumnNames, HdfsEnvironment hdfsEnvironment, FileFormatDataSourceStats stats)
{
this(typeManager, useParquetColumnNames, hdfsEnvironment, stats, new ParquetMetadataStats());
}

public ParquetPageSourceFactory(TypeManager typeManager, boolean useParquetColumnNames, HdfsEnvironment hdfsEnvironment, FileFormatDataSourceStats stats, ParquetMetadataStats metadataStats)
{
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.useParquetColumnNames = useParquetColumnNames;
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.stats = requireNonNull(stats, "stats is null");
this.metadataStats = requireNonNull(metadataStats);
}

@Override
Expand Down Expand Up @@ -133,7 +145,9 @@ public Optional<? extends ConnectorPageSource> createPageSource(
typeManager,
isParquetPredicatePushdownEnabled(session),
effectivePredicate,
stats));
stats,
metadataStats,
session));
}

public static ParquetPageSource createParquetPageSource(
Expand All @@ -150,15 +164,17 @@ public static ParquetPageSource createParquetPageSource(
TypeManager typeManager,
boolean predicatePushdownEnabled,
TupleDomain<HiveColumnHandle> effectivePredicate,
FileFormatDataSourceStats stats)
FileFormatDataSourceStats stats,
ParquetMetadataStats metadataStats,
ConnectorSession session)
{
AggregatedMemoryContext systemMemoryContext = newSimpleAggregatedMemoryContext();

ParquetDataSource dataSource = null;
try {
FileSystem fileSystem = hdfsEnvironment.getFileSystem(user, path, configuration);
FSDataInputStream inputStream = fileSystem.open(path);
ParquetMetadata parquetMetadata = ParquetMetadataReader.readFooter(inputStream, path, fileSize);
ParquetMetadata parquetMetadata = ParquetMetadataReader.readFooter(inputStream, path, fileSize, metadataStats);
FileMetaData fileMetaData = parquetMetadata.getFileMetaData();
MessageType fileSchema = fileMetaData.getSchema();
dataSource = buildHdfsParquetDataSource(inputStream, path, fileSize, stats);
Expand All @@ -185,15 +201,16 @@ public static ParquetPageSource createParquetPageSource(
ParquetPredicate parquetPredicate = buildParquetPredicate(requestedSchema, parquetTupleDomain, descriptorsByPath);
final ParquetDataSource finalDataSource = dataSource;
blocks = blocks.stream()
.filter(block -> predicateMatches(parquetPredicate, block, finalDataSource, descriptorsByPath, parquetTupleDomain))
.filter(block -> predicateMatches(parquetPredicate, block, finalDataSource, descriptorsByPath, parquetTupleDomain, metadataStats))
.collect(toList());
}
MessageColumnIO messageColumnIO = getColumnIO(fileSchema, requestedSchema);
ParquetReader parquetReader = new ParquetReader(
messageColumnIO,
blocks,
dataSource,
systemMemoryContext);
systemMemoryContext,
metadataStats);

return new ParquetPageSource(
parquetReader,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,18 +49,30 @@ public class ParquetRecordCursorProvider
private final boolean useParquetColumnNames;
private final HdfsEnvironment hdfsEnvironment;
private final FileFormatDataSourceStats stats;
private final ParquetMetadataStats metadataStats;

@Inject
public ParquetRecordCursorProvider(HiveClientConfig hiveClientConfig, HdfsEnvironment hdfsEnvironment, FileFormatDataSourceStats stats)
{
this(requireNonNull(hiveClientConfig, "hiveClientConfig is null").isUseParquetColumnNames(), hdfsEnvironment, stats);
this(hiveClientConfig, hdfsEnvironment, stats, new ParquetMetadataStats());
}

public ParquetRecordCursorProvider(boolean useParquetColumnNames, HdfsEnvironment hdfsEnvironment, FileFormatDataSourceStats stats)
{
this(useParquetColumnNames, hdfsEnvironment, stats, new ParquetMetadataStats());
}

@Inject
public ParquetRecordCursorProvider(HiveClientConfig hiveClientConfig, HdfsEnvironment hdfsEnvironment, FileFormatDataSourceStats stats, ParquetMetadataStats metadataStats)
{
this(requireNonNull(hiveClientConfig, "hiveClientConfig is null").isUseParquetColumnNames(), hdfsEnvironment, stats, metadataStats);
}

public ParquetRecordCursorProvider(boolean useParquetColumnNames, HdfsEnvironment hdfsEnvironment, FileFormatDataSourceStats stats, ParquetMetadataStats metadataStats)
{
this.useParquetColumnNames = useParquetColumnNames;
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.stats = requireNonNull(stats, "stats is null");
this.metadataStats = requireNonNull(metadataStats);
}

@Override
Expand Down Expand Up @@ -95,6 +107,7 @@ public Optional<RecordCursor> createRecordCursor(
typeManager,
isParquetPredicatePushdownEnabled(session),
effectivePredicate,
stats));
stats,
metadataStats));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.facebook.presto.hive.parquet.ParquetDataSource;
import com.facebook.presto.hive.parquet.ParquetDictionaryPage;
import com.facebook.presto.hive.parquet.ParquetEncoding;
import com.facebook.presto.hive.parquet.ParquetMetadataStats;
import com.facebook.presto.hive.parquet.RichColumnDescriptor;
import com.facebook.presto.spi.predicate.Domain;
import com.facebook.presto.spi.predicate.TupleDomain;
Expand Down Expand Up @@ -110,14 +111,14 @@ public static ParquetPredicate buildParquetPredicate(MessageType requestedSchema
return new TupleDomainParquetPredicate(parquetTupleDomain, columnReferences.build());
}

public static boolean predicateMatches(ParquetPredicate parquetPredicate, BlockMetaData block, ParquetDataSource dataSource, Map<List<String>, RichColumnDescriptor> descriptorsByPath, TupleDomain<ColumnDescriptor> parquetTupleDomain)
public static boolean predicateMatches(ParquetPredicate parquetPredicate, BlockMetaData block, ParquetDataSource dataSource, Map<List<String>, RichColumnDescriptor> descriptorsByPath, TupleDomain<ColumnDescriptor> parquetTupleDomain, ParquetMetadataStats metadataStats)
{
Map<ColumnDescriptor, Statistics<?>> columnStatistics = getStatistics(block, descriptorsByPath);
if (!parquetPredicate.matches(block.getRowCount(), columnStatistics)) {
return false;
}

Map<ColumnDescriptor, ParquetDictionaryDescriptor> dictionaries = getDictionaries(block, dataSource, descriptorsByPath, parquetTupleDomain);
Map<ColumnDescriptor, ParquetDictionaryDescriptor> dictionaries = getDictionaries(block, dataSource, descriptorsByPath, parquetTupleDomain, metadataStats);
return parquetPredicate.matches(dictionaries);
}

Expand All @@ -136,7 +137,7 @@ private static Map<ColumnDescriptor, Statistics<?>> getStatistics(BlockMetaData
return statistics.build();
}

private static Map<ColumnDescriptor, ParquetDictionaryDescriptor> getDictionaries(BlockMetaData blockMetadata, ParquetDataSource dataSource, Map<List<String>, RichColumnDescriptor> descriptorsByPath, TupleDomain<ColumnDescriptor> parquetTupleDomain)
private static Map<ColumnDescriptor, ParquetDictionaryDescriptor> getDictionaries(BlockMetaData blockMetadata, ParquetDataSource dataSource, Map<List<String>, RichColumnDescriptor> descriptorsByPath, TupleDomain<ColumnDescriptor> parquetTupleDomain, ParquetMetadataStats metadataStats)
{
ImmutableMap.Builder<ColumnDescriptor, ParquetDictionaryDescriptor> dictionaries = ImmutableMap.builder();
for (ColumnChunkMetaData columnMetaData : blockMetadata.getColumns()) {
Expand All @@ -146,6 +147,7 @@ private static Map<ColumnDescriptor, ParquetDictionaryDescriptor> getDictionarie
int totalSize = toIntExact(columnMetaData.getTotalSize());
byte[] buffer = new byte[totalSize];
dataSource.readFully(columnMetaData.getStartingPos(), buffer);
metadataStats.addDictionaryReadSize(columnMetaData.getPath().toDotString(), totalSize);
Optional<ParquetDictionaryPage> dictionaryPage = readDictionaryPage(buffer, columnMetaData.getCodec());
dictionaries.put(descriptor, new ParquetDictionaryDescriptor(descriptor, dictionaryPage));
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.facebook.presto.hive.parquet.ParquetDataPageV1;
import com.facebook.presto.hive.parquet.ParquetDataPageV2;
import com.facebook.presto.hive.parquet.ParquetDictionaryPage;
import com.facebook.presto.hive.parquet.ParquetMetadataStats;
import io.airlift.slice.Slice;
import parquet.column.Encoding;
import parquet.format.DataPageHeader;
Expand Down Expand Up @@ -60,7 +61,7 @@ protected PageHeader readPageHeader()
return Util.readPageHeader(this);
}

public ParquetPageReader readAllPages()
public ParquetPageReader readAllPages(ParquetMetadataStats metadataStats)
throws IOException
{
List<ParquetDataPage> pages = new ArrayList<>();
Expand All @@ -70,6 +71,7 @@ public ParquetPageReader readAllPages()
PageHeader pageHeader = readPageHeader();
int uncompressedPageSize = pageHeader.getUncompressed_page_size();
int compressedPageSize = pageHeader.getCompressed_page_size();
metadataStats.addPageSize(descriptor.getColumnChunkMetaData().getPath().toDotString(), compressedPageSize);
switch (pageHeader.type) {
case DICTIONARY_PAGE:
if (dictionaryPage != null) {
Expand Down
Loading

0 comments on commit 947c3f3

Please sign in to comment.