Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Minimize number of connections used by parallel reader #126

Merged
merged 1 commit into from
Feb 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
290 changes: 151 additions & 139 deletions common/src/main/java/org/apache/comet/parquet/FileReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public class FileReader implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(FileReader.class);

private final ParquetMetadataConverter converter;
protected final SeekableInputStream f;
private final SeekableInputStream f;
private final InputFile file;
private final Map<String, SQLMetric> metrics;
private final Map<ColumnPath, ColumnDescriptor> paths = new HashMap<>();
Expand Down Expand Up @@ -364,10 +364,10 @@ ColumnIndexReader getColumnIndexReader(int blockIndex) {
private PageReadStore readChunks(
BlockMetaData block, List<ConsecutivePartList> allParts, ChunkListBuilder builder)
throws IOException {
for (ConsecutivePartList consecutiveChunks : allParts) {
if (shouldReadParallel()) {
consecutiveChunks.readAllParallel(builder);
} else {
if (shouldReadParallel()) {
readAllPartsParallel(allParts, builder);
} else {
for (ConsecutivePartList consecutiveChunks : allParts) {
consecutiveChunks.readAll(f, builder);
}
}
Expand Down Expand Up @@ -407,6 +407,145 @@ private static boolean shouldReadParallelForScheme(String scheme) {
}
}

static class ReadRange {

long offset = 0;
long length = 0;
List<ByteBuffer> buffers = new ArrayList<>();

@Override
public String toString() {
return "ReadRange{"
+ "offset="
+ offset
+ ", length="
+ length
+ ", numBuffers="
+ buffers.size()
+ '}';
}
}

List<ReadRange> getReadRanges(List<ConsecutivePartList> allParts, int nBuffers) {
int nThreads = cometOptions.parallelIOThreadPoolSize();
long buffersPerThread = nBuffers / nThreads + 1;
boolean adjustSkew = cometOptions.adjustReadRangesSkew();
List<ReadRange> allRanges = new ArrayList<>();
for (ConsecutivePartList consecutiveChunk : allParts) {
ReadRange readRange = null;
long offset = consecutiveChunk.offset;
for (int i = 0; i < consecutiveChunk.buffers.size(); i++) {
if ((adjustSkew && (i % buffersPerThread == 0)) || i == 0) {
readRange = new ReadRange();
allRanges.add(readRange);
readRange.offset = offset;
}
ByteBuffer b = consecutiveChunk.buffers.get(i);
readRange.length += b.capacity();
readRange.buffers.add(b);
offset += b.capacity();
}
}
if (LOG.isDebugEnabled()) {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < allRanges.size(); i++) {
sb.append(allRanges.get(i).toString());
if (i < allRanges.size() - 1) {
sb.append(",");
}
}
LOG.debug("Read Ranges: {}", sb);
}
return allRanges;
}

private void readAllRangesParallel(List<ReadRange> allRanges) {
int nThreads = cometOptions.parallelIOThreadPoolSize();
ExecutorService threadPool = CometFileReaderThreadPool.getOrCreateThreadPool(nThreads);
List<Future<Void>> futures = new ArrayList<>();

for (ReadRange readRange : allRanges) {
futures.add(
threadPool.submit(
() -> {
SeekableInputStream inputStream = null;
try {
if (file instanceof CometInputFile) {
// limit the max read ahead to length of the range
inputStream =
(((CometInputFile) file).newStream(readRange.offset, readRange.length));
LOG.debug(
"Opened new input file: {}, at offset: {}",
((CometInputFile) file).getPath().getName(),
readRange.offset);
} else {
inputStream = file.newStream();
}
long curPos = readRange.offset;
for (ByteBuffer buffer : readRange.buffers) {
inputStream.seek(curPos);
LOG.debug(
"Thread: {} Offset: {} Size: {}",
Thread.currentThread().getId(),
curPos,
buffer.capacity());
inputStream.readFully(buffer);
buffer.flip();
curPos += buffer.capacity();
} // for
} finally {
if (inputStream != null) {
inputStream.close();
}
}

return null;
}));
}
for (Future<Void> future : futures) {
try {
future.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
}

/**
* Read all the consecutive part list objects in parallel.
*
* @param allParts all consecutive parts
* @param builder chunk list builder
*/
public void readAllPartsParallel(List<ConsecutivePartList> allParts, ChunkListBuilder builder)
throws IOException {
int nBuffers = 0;
for (ConsecutivePartList consecutiveChunks : allParts) {
consecutiveChunks.allocateReadBuffers();
nBuffers += consecutiveChunks.buffers.size();
}
List<ReadRange> allRanges = getReadRanges(allParts, nBuffers);

long startNs = System.nanoTime();
readAllRangesParallel(allRanges);

for (ConsecutivePartList consecutiveChunks : allParts) {
consecutiveChunks.setReadMetrics(startNs);
ByteBufferInputStream stream;
stream = ByteBufferInputStream.wrap(consecutiveChunks.buffers);
// report in a counter the data we just scanned
BenchmarkCounter.incrementBytesRead(consecutiveChunks.length);
for (int i = 0; i < consecutiveChunks.chunks.size(); i++) {
ChunkDescriptor descriptor = consecutiveChunks.chunks.get(i);
if (descriptor.col != null) {
builder.add(descriptor, stream.sliceBuffers(descriptor.size));
} else {
stream.skipFully(descriptor.size);
}
}
}
}

private void readChunkPages(Chunk chunk, BlockMetaData block) throws IOException {
if (fileDecryptor == null || fileDecryptor.plaintextFile()) {
currentRowGroup.addColumn(chunk.descriptor.col, chunk.readAllPages());
Expand Down Expand Up @@ -880,6 +1019,7 @@ private class ConsecutivePartList {
private final SQLMetric fileReadTimeMetric;
private final SQLMetric fileReadSizeMetric;
private final SQLMetric readThroughput;
List<ByteBuffer> buffers;

/**
* Constructor
Expand Down Expand Up @@ -909,21 +1049,20 @@ public void addChunk(ChunkDescriptor descriptor) {
length += descriptor.size;
}

private List<ByteBuffer> allocateReadBuffers() {
private void allocateReadBuffers() {
int fullAllocations = Math.toIntExact(length / options.getMaxAllocationSize());
int lastAllocationSize = Math.toIntExact(length % options.getMaxAllocationSize());

int numAllocations = fullAllocations + (lastAllocationSize > 0 ? 1 : 0);
List<ByteBuffer> buffers = new ArrayList<>(numAllocations);
this.buffers = new ArrayList<>(numAllocations);

for (int i = 0; i < fullAllocations; i += 1) {
buffers.add(options.getAllocator().allocate(options.getMaxAllocationSize()));
this.buffers.add(options.getAllocator().allocate(options.getMaxAllocationSize()));
}

if (lastAllocationSize > 0) {
buffers.add(options.getAllocator().allocate(lastAllocationSize));
this.buffers.add(options.getAllocator().allocate(lastAllocationSize));
}
return buffers;
}

/**
Expand All @@ -934,7 +1073,7 @@ private List<ByteBuffer> allocateReadBuffers() {
public void readAll(SeekableInputStream f, ChunkListBuilder builder) throws IOException {
f.seek(offset);

List<ByteBuffer> buffers = allocateReadBuffers();
allocateReadBuffers();
long startNs = System.nanoTime();

for (ByteBuffer buffer : buffers) {
Expand All @@ -956,134 +1095,6 @@ public void readAll(SeekableInputStream f, ChunkListBuilder builder) throws IOEx
}
}

/**
* Api to read a consecutive range from the Parquet file in parallel. This is identical to
* {@link #readAll(SeekableInputStream, ChunkListBuilder) readAll}, except that the consecutive
* range is split into multiple smaller ranges and read in parallel. The parallelism can be set
* by specifying the threadpool size via {@link
* ReadOptions.Builder#withParallelIOThreadPoolSize(int)}.
*
* @param builder used to build chunk list to read the pages for the different columns
* @throws IOException if there is an error while reading from the stream
*/
public void readAllParallel(ChunkListBuilder builder) throws IOException {

List<ByteBuffer> buffers = allocateReadBuffers();
long startNs = System.nanoTime();

int nThreads = cometOptions.parallelIOThreadPoolSize();
ExecutorService threadPool = CometFileReaderThreadPool.getOrCreateThreadPool(nThreads);
List<Future<Void>> futures = new ArrayList<>();

long currentOffset = this.offset;
int buffersPerThread = buffers.size() / nThreads;
int remaining = buffers.size() % nThreads;
// offset in input file each thread seeks to before beginning read
long[] offsets = new long[nThreads];
// index of buffer where each thread will start writing data
int[] bufferIndexes = new int[nThreads];
// number of buffers for each thread to fill
int[] numBuffers = new int[nThreads];

int bufferNum = 0;
for (int i = 0; i < nThreads; i++) {
int nBuffers = 0;
offsets[i] = currentOffset;
bufferIndexes[i] = bufferNum;
nBuffers = buffersPerThread;
for (int j = 0; j < buffersPerThread; j++) {
currentOffset += buffers.get(bufferNum).capacity();
bufferNum++;
}
if (remaining > 0) {
remaining--;
currentOffset += buffers.get(bufferNum).capacity();
bufferNum++;
nBuffers++;
}
numBuffers[i] = nBuffers;
}
for (int n = 0; n < nThreads; n++) {
int threadIndex = n;
long pos = offsets[threadIndex];
int bufferIndex = bufferIndexes[threadIndex];
int nBuffers = numBuffers[threadIndex];
if (nBuffers == 0) {
continue;
}

// Find the total number of bytes to read for the current thread
long tmp = 0;
for (int i = 0; i < nBuffers; i++) {
int bufNo = bufferIndex + i;
if (bufNo >= buffers.size()) break;
tmp += buffers.get(bufNo).capacity();
}
final long length = tmp;

futures.add(
threadPool.submit(
() -> {
SeekableInputStream inputStream = null;
try {
if (file instanceof CometInputFile) {
inputStream = (((CometInputFile) file).newStream(pos, length));
} else {
inputStream = file.newStream();
}

inputStream.seek(pos);
long curPos = pos;
for (int i = 0; i < nBuffers; i++) {
int bufNo = bufferIndex + i;
if (bufNo >= buffers.size()) {
break;
}
ByteBuffer buffer = buffers.get(bufNo);
LOG.debug(
"Thread: {} Offset: {} Buffer: {} Size: {}",
threadIndex,
curPos,
bufNo,
buffer.capacity());
curPos += buffer.capacity();
inputStream.readFully(buffer);
buffer.flip();
} // for
} finally {
if (inputStream != null) {
inputStream.close();
}
}

return null;
}));
}

for (Future<Void> future : futures) {
try {
future.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}

setReadMetrics(startNs);

ByteBufferInputStream stream;
stream = ByteBufferInputStream.wrap(buffers);
// report in a counter the data we just scanned
BenchmarkCounter.incrementBytesRead(length);
for (int i = 0; i < chunks.size(); i++) {
ChunkDescriptor descriptor = chunks.get(i);
if (descriptor.col != null) {
builder.add(descriptor, stream.sliceBuffers(descriptor.size));
} else {
stream.skipFully(descriptor.size);
}
}
}

private void setReadMetrics(long startNs) {
long totalFileReadTimeNs = System.nanoTime() - startNs;
double sizeInMb = ((double) length) / (1024 * 1024);
Expand Down Expand Up @@ -1117,6 +1128,7 @@ public long endPos() {

/** Information needed to read a column chunk or a part of it. */
private static class ChunkDescriptor {

private final ColumnDescriptor col;
private final ColumnChunkMetaData metadata;
private final long fileOffset;
Expand Down
Loading