From d8f2e726c3ba5b9de3ddd6b0d4a7cd683f7805e0 Mon Sep 17 00:00:00 2001 From: Mindaugas Date: Thu, 13 Jun 2024 10:17:38 +0300 Subject: [PATCH] Fix reader resource leakage (#51) * Fix resource leakage withing streams --- .../deltafetch/search/parquet/ParquetLookupReader.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/exacaster/deltafetch/search/parquet/ParquetLookupReader.java b/src/main/java/com/exacaster/deltafetch/search/parquet/ParquetLookupReader.java index 48c354f..0f4ca6f 100644 --- a/src/main/java/com/exacaster/deltafetch/search/parquet/ParquetLookupReader.java +++ b/src/main/java/com/exacaster/deltafetch/search/parquet/ParquetLookupReader.java @@ -35,7 +35,13 @@ public ParquetLookupReader(Configuration conf, String path) { public Stream> find(List filters, int limit) { LOG.debug("Reading: {} with filters {}", path, filters); try (var reader = prepareReader(filters)) { - return Streams.stream(Iterators.limit(new ParquetIterator(reader), limit)); + return Streams.stream(Iterators.limit(new ParquetIterator<>(reader), limit)).onClose(() -> { + try { + reader.close(); + } catch (IOException e) { + LOG.error("Failed to close ParquetReader", e); + } + }); } catch (IOException e) { throw new IllegalStateException("Failed building reader", e); }