Skip to content

Commit

Permalink
feat: Minimize number of connections used by parallel reader (#126)
Browse files Browse the repository at this point in the history
  • Loading branch information
parthchandra authored Feb 28, 2024
1 parent 4d103b8 commit 0c2eb53
Show file tree
Hide file tree
Showing 2 changed files with 182 additions and 141 deletions.
290 changes: 151 additions & 139 deletions common/src/main/java/org/apache/comet/parquet/FileReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public class FileReader implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(FileReader.class);

private final ParquetMetadataConverter converter;
protected final SeekableInputStream f;
private final SeekableInputStream f;
private final InputFile file;
private final Map<String, SQLMetric> metrics;
private final Map<ColumnPath, ColumnDescriptor> paths = new HashMap<>();
Expand Down Expand Up @@ -364,10 +364,10 @@ ColumnIndexReader getColumnIndexReader(int blockIndex) {
private PageReadStore readChunks(
BlockMetaData block, List<ConsecutivePartList> allParts, ChunkListBuilder builder)
throws IOException {
for (ConsecutivePartList consecutiveChunks : allParts) {
if (shouldReadParallel()) {
consecutiveChunks.readAllParallel(builder);
} else {
if (shouldReadParallel()) {
readAllPartsParallel(allParts, builder);
} else {
for (ConsecutivePartList consecutiveChunks : allParts) {
consecutiveChunks.readAll(f, builder);
}
}
Expand Down Expand Up @@ -407,6 +407,145 @@ private static boolean shouldReadParallelForScheme(String scheme) {
}
}

static class ReadRange {

long offset = 0;
long length = 0;
List<ByteBuffer> buffers = new ArrayList<>();

@Override
public String toString() {
return "ReadRange{"
+ "offset="
+ offset
+ ", length="
+ length
+ ", numBuffers="
+ buffers.size()
+ '}';
}
}

List<ReadRange> getReadRanges(List<ConsecutivePartList> allParts, int nBuffers) {
int nThreads = cometOptions.parallelIOThreadPoolSize();
long buffersPerThread = nBuffers / nThreads + 1;
boolean adjustSkew = cometOptions.adjustReadRangesSkew();
List<ReadRange> allRanges = new ArrayList<>();
for (ConsecutivePartList consecutiveChunk : allParts) {
ReadRange readRange = null;
long offset = consecutiveChunk.offset;
for (int i = 0; i < consecutiveChunk.buffers.size(); i++) {
if ((adjustSkew && (i % buffersPerThread == 0)) || i == 0) {
readRange = new ReadRange();
allRanges.add(readRange);
readRange.offset = offset;
}
ByteBuffer b = consecutiveChunk.buffers.get(i);
readRange.length += b.capacity();
readRange.buffers.add(b);
offset += b.capacity();
}
}
if (LOG.isDebugEnabled()) {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < allRanges.size(); i++) {
sb.append(allRanges.get(i).toString());
if (i < allRanges.size() - 1) {
sb.append(",");
}
}
LOG.debug("Read Ranges: {}", sb);
}
return allRanges;
}

private void readAllRangesParallel(List<ReadRange> allRanges) {
int nThreads = cometOptions.parallelIOThreadPoolSize();
ExecutorService threadPool = CometFileReaderThreadPool.getOrCreateThreadPool(nThreads);
List<Future<Void>> futures = new ArrayList<>();

for (ReadRange readRange : allRanges) {
futures.add(
threadPool.submit(
() -> {
SeekableInputStream inputStream = null;
try {
if (file instanceof CometInputFile) {
// limit the max read ahead to length of the range
inputStream =
(((CometInputFile) file).newStream(readRange.offset, readRange.length));
LOG.debug(
"Opened new input file: {}, at offset: {}",
((CometInputFile) file).getPath().getName(),
readRange.offset);
} else {
inputStream = file.newStream();
}
long curPos = readRange.offset;
for (ByteBuffer buffer : readRange.buffers) {
inputStream.seek(curPos);
LOG.debug(
"Thread: {} Offset: {} Size: {}",
Thread.currentThread().getId(),
curPos,
buffer.capacity());
inputStream.readFully(buffer);
buffer.flip();
curPos += buffer.capacity();
} // for
} finally {
if (inputStream != null) {
inputStream.close();
}
}

return null;
}));
}
for (Future<Void> future : futures) {
try {
future.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
}

/**
* Read all the consecutive part list objects in parallel.
*
* @param allParts all consecutive parts
* @param builder chunk list builder
*/
public void readAllPartsParallel(List<ConsecutivePartList> allParts, ChunkListBuilder builder)
throws IOException {
int nBuffers = 0;
for (ConsecutivePartList consecutiveChunks : allParts) {
consecutiveChunks.allocateReadBuffers();
nBuffers += consecutiveChunks.buffers.size();
}
List<ReadRange> allRanges = getReadRanges(allParts, nBuffers);

long startNs = System.nanoTime();
readAllRangesParallel(allRanges);

for (ConsecutivePartList consecutiveChunks : allParts) {
consecutiveChunks.setReadMetrics(startNs);
ByteBufferInputStream stream;
stream = ByteBufferInputStream.wrap(consecutiveChunks.buffers);
// report in a counter the data we just scanned
BenchmarkCounter.incrementBytesRead(consecutiveChunks.length);
for (int i = 0; i < consecutiveChunks.chunks.size(); i++) {
ChunkDescriptor descriptor = consecutiveChunks.chunks.get(i);
if (descriptor.col != null) {
builder.add(descriptor, stream.sliceBuffers(descriptor.size));
} else {
stream.skipFully(descriptor.size);
}
}
}
}

private void readChunkPages(Chunk chunk, BlockMetaData block) throws IOException {
if (fileDecryptor == null || fileDecryptor.plaintextFile()) {
currentRowGroup.addColumn(chunk.descriptor.col, chunk.readAllPages());
Expand Down Expand Up @@ -880,6 +1019,7 @@ private class ConsecutivePartList {
private final SQLMetric fileReadTimeMetric;
private final SQLMetric fileReadSizeMetric;
private final SQLMetric readThroughput;
List<ByteBuffer> buffers;

/**
* Constructor
Expand Down Expand Up @@ -909,21 +1049,20 @@ public void addChunk(ChunkDescriptor descriptor) {
length += descriptor.size;
}

private List<ByteBuffer> allocateReadBuffers() {
private void allocateReadBuffers() {
int fullAllocations = Math.toIntExact(length / options.getMaxAllocationSize());
int lastAllocationSize = Math.toIntExact(length % options.getMaxAllocationSize());

int numAllocations = fullAllocations + (lastAllocationSize > 0 ? 1 : 0);
List<ByteBuffer> buffers = new ArrayList<>(numAllocations);
this.buffers = new ArrayList<>(numAllocations);

for (int i = 0; i < fullAllocations; i += 1) {
buffers.add(options.getAllocator().allocate(options.getMaxAllocationSize()));
this.buffers.add(options.getAllocator().allocate(options.getMaxAllocationSize()));
}

if (lastAllocationSize > 0) {
buffers.add(options.getAllocator().allocate(lastAllocationSize));
this.buffers.add(options.getAllocator().allocate(lastAllocationSize));
}
return buffers;
}

/**
Expand All @@ -934,7 +1073,7 @@ private List<ByteBuffer> allocateReadBuffers() {
public void readAll(SeekableInputStream f, ChunkListBuilder builder) throws IOException {
f.seek(offset);

List<ByteBuffer> buffers = allocateReadBuffers();
allocateReadBuffers();
long startNs = System.nanoTime();

for (ByteBuffer buffer : buffers) {
Expand All @@ -956,134 +1095,6 @@ public void readAll(SeekableInputStream f, ChunkListBuilder builder) throws IOEx
}
}

/**
* Api to read a consecutive range from the Parquet file in parallel. This is identical to
* {@link #readAll(SeekableInputStream, ChunkListBuilder) readAll}, except that the consecutive
* range is split into multiple smaller ranges and read in parallel. The parallelism can be set
* by specifying the threadpool size via {@link
* ReadOptions.Builder#withParallelIOThreadPoolSize(int)}.
*
* @param builder used to build chunk list to read the pages for the different columns
* @throws IOException if there is an error while reading from the stream
*/
public void readAllParallel(ChunkListBuilder builder) throws IOException {

List<ByteBuffer> buffers = allocateReadBuffers();
long startNs = System.nanoTime();

int nThreads = cometOptions.parallelIOThreadPoolSize();
ExecutorService threadPool = CometFileReaderThreadPool.getOrCreateThreadPool(nThreads);
List<Future<Void>> futures = new ArrayList<>();

long currentOffset = this.offset;
int buffersPerThread = buffers.size() / nThreads;
int remaining = buffers.size() % nThreads;
// offset in input file each thread seeks to before beginning read
long[] offsets = new long[nThreads];
// index of buffer where each thread will start writing data
int[] bufferIndexes = new int[nThreads];
// number of buffers for each thread to fill
int[] numBuffers = new int[nThreads];

int bufferNum = 0;
for (int i = 0; i < nThreads; i++) {
int nBuffers = 0;
offsets[i] = currentOffset;
bufferIndexes[i] = bufferNum;
nBuffers = buffersPerThread;
for (int j = 0; j < buffersPerThread; j++) {
currentOffset += buffers.get(bufferNum).capacity();
bufferNum++;
}
if (remaining > 0) {
remaining--;
currentOffset += buffers.get(bufferNum).capacity();
bufferNum++;
nBuffers++;
}
numBuffers[i] = nBuffers;
}
for (int n = 0; n < nThreads; n++) {
int threadIndex = n;
long pos = offsets[threadIndex];
int bufferIndex = bufferIndexes[threadIndex];
int nBuffers = numBuffers[threadIndex];
if (nBuffers == 0) {
continue;
}

// Find the total number of bytes to read for the current thread
long tmp = 0;
for (int i = 0; i < nBuffers; i++) {
int bufNo = bufferIndex + i;
if (bufNo >= buffers.size()) break;
tmp += buffers.get(bufNo).capacity();
}
final long length = tmp;

futures.add(
threadPool.submit(
() -> {
SeekableInputStream inputStream = null;
try {
if (file instanceof CometInputFile) {
inputStream = (((CometInputFile) file).newStream(pos, length));
} else {
inputStream = file.newStream();
}

inputStream.seek(pos);
long curPos = pos;
for (int i = 0; i < nBuffers; i++) {
int bufNo = bufferIndex + i;
if (bufNo >= buffers.size()) {
break;
}
ByteBuffer buffer = buffers.get(bufNo);
LOG.debug(
"Thread: {} Offset: {} Buffer: {} Size: {}",
threadIndex,
curPos,
bufNo,
buffer.capacity());
curPos += buffer.capacity();
inputStream.readFully(buffer);
buffer.flip();
} // for
} finally {
if (inputStream != null) {
inputStream.close();
}
}

return null;
}));
}

for (Future<Void> future : futures) {
try {
future.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}

setReadMetrics(startNs);

ByteBufferInputStream stream;
stream = ByteBufferInputStream.wrap(buffers);
// report in a counter the data we just scanned
BenchmarkCounter.incrementBytesRead(length);
for (int i = 0; i < chunks.size(); i++) {
ChunkDescriptor descriptor = chunks.get(i);
if (descriptor.col != null) {
builder.add(descriptor, stream.sliceBuffers(descriptor.size));
} else {
stream.skipFully(descriptor.size);
}
}
}

private void setReadMetrics(long startNs) {
long totalFileReadTimeNs = System.nanoTime() - startNs;
double sizeInMb = ((double) length) / (1024 * 1024);
Expand Down Expand Up @@ -1117,6 +1128,7 @@ public long endPos() {

/** Information needed to read a column chunk or a part of it. */
private static class ChunkDescriptor {

private final ColumnDescriptor col;
private final ColumnChunkMetaData metadata;
private final long fileOffset;
Expand Down
Loading

0 comments on commit 0c2eb53

Please sign in to comment.