diff --git a/common/src/main/java/org/apache/comet/parquet/FileReader.java b/common/src/main/java/org/apache/comet/parquet/FileReader.java index 61cf89bf1..72b337849 100644 --- a/common/src/main/java/org/apache/comet/parquet/FileReader.java +++ b/common/src/main/java/org/apache/comet/parquet/FileReader.java @@ -98,7 +98,7 @@ public class FileReader implements Closeable { private static final Logger LOG = LoggerFactory.getLogger(FileReader.class); private final ParquetMetadataConverter converter; - protected final SeekableInputStream f; + private final SeekableInputStream f; private final InputFile file; private final Map metrics; private final Map paths = new HashMap<>(); @@ -364,10 +364,10 @@ ColumnIndexReader getColumnIndexReader(int blockIndex) { private PageReadStore readChunks( BlockMetaData block, List allParts, ChunkListBuilder builder) throws IOException { - for (ConsecutivePartList consecutiveChunks : allParts) { - if (shouldReadParallel()) { - consecutiveChunks.readAllParallel(builder); - } else { + if (shouldReadParallel()) { + readAllPartsParallel(allParts, builder); + } else { + for (ConsecutivePartList consecutiveChunks : allParts) { consecutiveChunks.readAll(f, builder); } } @@ -407,6 +407,145 @@ private static boolean shouldReadParallelForScheme(String scheme) { } } + static class ReadRange { + + long offset = 0; + long length = 0; + List buffers = new ArrayList<>(); + + @Override + public String toString() { + return "ReadRange{" + + "offset=" + + offset + + ", length=" + + length + + ", numBuffers=" + + buffers.size() + + '}'; + } + } + + List getReadRanges(List allParts, int nBuffers) { + int nThreads = cometOptions.parallelIOThreadPoolSize(); + long buffersPerThread = nBuffers / nThreads + 1; + boolean adjustSkew = cometOptions.adjustReadRangesSkew(); + List allRanges = new ArrayList<>(); + for (ConsecutivePartList consecutiveChunk : allParts) { + ReadRange readRange = null; + long offset = consecutiveChunk.offset; + for (int i = 0; i < consecutiveChunk.buffers.size(); i++) { + if ((adjustSkew && (i % buffersPerThread == 0)) || i == 0) { + readRange = new ReadRange(); + allRanges.add(readRange); + readRange.offset = offset; + } + ByteBuffer b = consecutiveChunk.buffers.get(i); + readRange.length += b.capacity(); + readRange.buffers.add(b); + offset += b.capacity(); + } + } + if (LOG.isDebugEnabled()) { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < allRanges.size(); i++) { + sb.append(allRanges.get(i).toString()); + if (i < allRanges.size() - 1) { + sb.append(","); + } + } + LOG.debug("Read Ranges: {}", sb); + } + return allRanges; + } + + private void readAllRangesParallel(List allRanges) { + int nThreads = cometOptions.parallelIOThreadPoolSize(); + ExecutorService threadPool = CometFileReaderThreadPool.getOrCreateThreadPool(nThreads); + List> futures = new ArrayList<>(); + + for (ReadRange readRange : allRanges) { + futures.add( + threadPool.submit( + () -> { + SeekableInputStream inputStream = null; + try { + if (file instanceof CometInputFile) { + // limit the max read ahead to length of the range + inputStream = + (((CometInputFile) file).newStream(readRange.offset, readRange.length)); + LOG.debug( + "Opened new input file: {}, at offset: {}", + ((CometInputFile) file).getPath().getName(), + readRange.offset); + } else { + inputStream = file.newStream(); + } + long curPos = readRange.offset; + for (ByteBuffer buffer : readRange.buffers) { + inputStream.seek(curPos); + LOG.debug( + "Thread: {} Offset: {} Size: {}", + Thread.currentThread().getId(), + curPos, + buffer.capacity()); + inputStream.readFully(buffer); + buffer.flip(); + curPos += buffer.capacity(); + } // for + } finally { + if (inputStream != null) { + inputStream.close(); + } + } + + return null; + })); + } + for (Future future : futures) { + try { + future.get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + } + + /** + * Read all the consecutive part list objects in parallel. + * + * @param allParts all consecutive parts + * @param builder chunk list builder + */ + public void readAllPartsParallel(List allParts, ChunkListBuilder builder) + throws IOException { + int nBuffers = 0; + for (ConsecutivePartList consecutiveChunks : allParts) { + consecutiveChunks.allocateReadBuffers(); + nBuffers += consecutiveChunks.buffers.size(); + } + List allRanges = getReadRanges(allParts, nBuffers); + + long startNs = System.nanoTime(); + readAllRangesParallel(allRanges); + + for (ConsecutivePartList consecutiveChunks : allParts) { + consecutiveChunks.setReadMetrics(startNs); + ByteBufferInputStream stream; + stream = ByteBufferInputStream.wrap(consecutiveChunks.buffers); + // report in a counter the data we just scanned + BenchmarkCounter.incrementBytesRead(consecutiveChunks.length); + for (int i = 0; i < consecutiveChunks.chunks.size(); i++) { + ChunkDescriptor descriptor = consecutiveChunks.chunks.get(i); + if (descriptor.col != null) { + builder.add(descriptor, stream.sliceBuffers(descriptor.size)); + } else { + stream.skipFully(descriptor.size); + } + } + } + } + private void readChunkPages(Chunk chunk, BlockMetaData block) throws IOException { if (fileDecryptor == null || fileDecryptor.plaintextFile()) { currentRowGroup.addColumn(chunk.descriptor.col, chunk.readAllPages()); @@ -880,6 +1019,7 @@ private class ConsecutivePartList { private final SQLMetric fileReadTimeMetric; private final SQLMetric fileReadSizeMetric; private final SQLMetric readThroughput; + List buffers; /** * Constructor @@ -909,21 +1049,20 @@ public void addChunk(ChunkDescriptor descriptor) { length += descriptor.size; } - private List allocateReadBuffers() { + private void allocateReadBuffers() { int fullAllocations = Math.toIntExact(length / options.getMaxAllocationSize()); int lastAllocationSize = Math.toIntExact(length % options.getMaxAllocationSize()); int numAllocations = fullAllocations + (lastAllocationSize > 0 ? 1 : 0); - List buffers = new ArrayList<>(numAllocations); + this.buffers = new ArrayList<>(numAllocations); for (int i = 0; i < fullAllocations; i += 1) { - buffers.add(options.getAllocator().allocate(options.getMaxAllocationSize())); + this.buffers.add(options.getAllocator().allocate(options.getMaxAllocationSize())); } if (lastAllocationSize > 0) { - buffers.add(options.getAllocator().allocate(lastAllocationSize)); + this.buffers.add(options.getAllocator().allocate(lastAllocationSize)); } - return buffers; } /** @@ -934,7 +1073,7 @@ private List allocateReadBuffers() { public void readAll(SeekableInputStream f, ChunkListBuilder builder) throws IOException { f.seek(offset); - List buffers = allocateReadBuffers(); + allocateReadBuffers(); long startNs = System.nanoTime(); for (ByteBuffer buffer : buffers) { @@ -956,134 +1095,6 @@ public void readAll(SeekableInputStream f, ChunkListBuilder builder) throws IOEx } } - /** - * Api to read a consecutive range from the Parquet file in parallel. This is identical to - * {@link #readAll(SeekableInputStream, ChunkListBuilder) readAll}, except that the consecutive - * range is split into multiple smaller ranges and read in parallel. The parallelism can be set - * by specifying the threadpool size via {@link - * ReadOptions.Builder#withParallelIOThreadPoolSize(int)}. - * - * @param builder used to build chunk list to read the pages for the different columns - * @throws IOException if there is an error while reading from the stream - */ - public void readAllParallel(ChunkListBuilder builder) throws IOException { - - List buffers = allocateReadBuffers(); - long startNs = System.nanoTime(); - - int nThreads = cometOptions.parallelIOThreadPoolSize(); - ExecutorService threadPool = CometFileReaderThreadPool.getOrCreateThreadPool(nThreads); - List> futures = new ArrayList<>(); - - long currentOffset = this.offset; - int buffersPerThread = buffers.size() / nThreads; - int remaining = buffers.size() % nThreads; - // offset in input file each thread seeks to before beginning read - long[] offsets = new long[nThreads]; - // index of buffer where each thread will start writing data - int[] bufferIndexes = new int[nThreads]; - // number of buffers for each thread to fill - int[] numBuffers = new int[nThreads]; - - int bufferNum = 0; - for (int i = 0; i < nThreads; i++) { - int nBuffers = 0; - offsets[i] = currentOffset; - bufferIndexes[i] = bufferNum; - nBuffers = buffersPerThread; - for (int j = 0; j < buffersPerThread; j++) { - currentOffset += buffers.get(bufferNum).capacity(); - bufferNum++; - } - if (remaining > 0) { - remaining--; - currentOffset += buffers.get(bufferNum).capacity(); - bufferNum++; - nBuffers++; - } - numBuffers[i] = nBuffers; - } - for (int n = 0; n < nThreads; n++) { - int threadIndex = n; - long pos = offsets[threadIndex]; - int bufferIndex = bufferIndexes[threadIndex]; - int nBuffers = numBuffers[threadIndex]; - if (nBuffers == 0) { - continue; - } - - // Find the total number of bytes to read for the current thread - long tmp = 0; - for (int i = 0; i < nBuffers; i++) { - int bufNo = bufferIndex + i; - if (bufNo >= buffers.size()) break; - tmp += buffers.get(bufNo).capacity(); - } - final long length = tmp; - - futures.add( - threadPool.submit( - () -> { - SeekableInputStream inputStream = null; - try { - if (file instanceof CometInputFile) { - inputStream = (((CometInputFile) file).newStream(pos, length)); - } else { - inputStream = file.newStream(); - } - - inputStream.seek(pos); - long curPos = pos; - for (int i = 0; i < nBuffers; i++) { - int bufNo = bufferIndex + i; - if (bufNo >= buffers.size()) { - break; - } - ByteBuffer buffer = buffers.get(bufNo); - LOG.debug( - "Thread: {} Offset: {} Buffer: {} Size: {}", - threadIndex, - curPos, - bufNo, - buffer.capacity()); - curPos += buffer.capacity(); - inputStream.readFully(buffer); - buffer.flip(); - } // for - } finally { - if (inputStream != null) { - inputStream.close(); - } - } - - return null; - })); - } - - for (Future future : futures) { - try { - future.get(); - } catch (InterruptedException | ExecutionException e) { - throw new RuntimeException(e); - } - } - - setReadMetrics(startNs); - - ByteBufferInputStream stream; - stream = ByteBufferInputStream.wrap(buffers); - // report in a counter the data we just scanned - BenchmarkCounter.incrementBytesRead(length); - for (int i = 0; i < chunks.size(); i++) { - ChunkDescriptor descriptor = chunks.get(i); - if (descriptor.col != null) { - builder.add(descriptor, stream.sliceBuffers(descriptor.size)); - } else { - stream.skipFully(descriptor.size); - } - } - } - private void setReadMetrics(long startNs) { long totalFileReadTimeNs = System.nanoTime() - startNs; double sizeInMb = ((double) length) / (1024 * 1024); @@ -1117,6 +1128,7 @@ public long endPos() { /** Information needed to read a column chunk or a part of it. */ private static class ChunkDescriptor { + private final ColumnDescriptor col; private final ColumnChunkMetaData metadata; private final long fileOffset; diff --git a/common/src/main/java/org/apache/comet/parquet/ReadOptions.java b/common/src/main/java/org/apache/comet/parquet/ReadOptions.java index 6754443e6..a25fc61a7 100644 --- a/common/src/main/java/org/apache/comet/parquet/ReadOptions.java +++ b/common/src/main/java/org/apache/comet/parquet/ReadOptions.java @@ -48,6 +48,16 @@ public class ReadOptions { "comet.parquet.read.io.mergeRanges.delta"; private static final int COMET_IO_MERGE_RANGES_DELTA_DEFAULT = 1 << 23; // 8 MB + // In the parallel reader, if the read ranges submitted are skewed in sizes, this + // option will cause the reader to break up larger read ranges into smaller ranges + // to reduce the skew. This will result in a slightly larger number of connections + // opened to the file system but may give improved performance. + // The option is off by default. + public static final String BOSON_IO_ADJUST_READRANGE_SKEW = + "boson.parquet.read.io.adjust.readRange.skew"; + + private static final boolean BOSON_IO_ADJUST_READRANGE_SKEW_DEFAULT = false; + // Max number of concurrent tasks we expect. Used to autoconfigure S3 client connections public static final int S3A_MAX_EXPECTED_PARALLELISM = 32; // defined in hadoop-aws - org.apache.hadoop.fs.s3a.Constants.MAXIMUM_CONNECTIONS @@ -63,16 +73,19 @@ public class ReadOptions { private final int parallelIOThreadPoolSize; private final boolean ioMergeRanges; private final int ioMergeRangesDelta; + private final boolean adjustReadRangeSkew; ReadOptions( boolean parallelIOEnabled, int parallelIOThreadPoolSize, boolean ioMergeRanges, - int ioMergeRangesDelta) { + int ioMergeRangesDelta, + boolean adjustReadRangeSkew) { this.parallelIOEnabled = parallelIOEnabled; this.parallelIOThreadPoolSize = parallelIOThreadPoolSize; this.ioMergeRanges = ioMergeRanges; this.ioMergeRangesDelta = ioMergeRangesDelta; + this.adjustReadRangeSkew = adjustReadRangeSkew; } public boolean isParallelIOEnabled() { @@ -91,6 +104,10 @@ public int getIOMergeRangesDelta() { return ioMergeRangesDelta; } + public boolean adjustReadRangesSkew() { + return adjustReadRangeSkew; + } + public static Builder builder(Configuration conf) { return new Builder(conf); } @@ -106,6 +123,7 @@ public static class Builder { private int parallelIOThreadPoolSize; private boolean ioMergeRanges; private int ioMergeRangesDelta; + private boolean adjustReadRangeSkew; /** * Whether to enable Parquet parallel IO when reading row groups. If true, Parquet reader will @@ -137,9 +155,18 @@ public Builder withIOMergeRangesDelta(int ioMergeRangesDelta) { return this; } + public Builder adjustReadRangeSkew(boolean adjustReadRangeSkew) { + this.adjustReadRangeSkew = adjustReadRangeSkew; + return this; + } + public ReadOptions build() { return new ReadOptions( - parallelIOEnabled, parallelIOThreadPoolSize, ioMergeRanges, ioMergeRangesDelta); + parallelIOEnabled, + parallelIOThreadPoolSize, + ioMergeRanges, + ioMergeRangesDelta, + adjustReadRangeSkew); } public Builder(Configuration conf) { @@ -152,6 +179,8 @@ public Builder(Configuration conf) { this.ioMergeRanges = conf.getBoolean(COMET_IO_MERGE_RANGES, COMET_IO_MERGE_RANGES_DEFAULT); this.ioMergeRangesDelta = conf.getInt(COMET_IO_MERGE_RANGES_DELTA, COMET_IO_MERGE_RANGES_DELTA_DEFAULT); + this.adjustReadRangeSkew = + conf.getBoolean(BOSON_IO_ADJUST_READRANGE_SKEW, BOSON_IO_ADJUST_READRANGE_SKEW_DEFAULT); // override some S3 defaults setS3Config(); }