From 076cd7b19d0cc0e7c57bc3512e76c27a7173499e Mon Sep 17 00:00:00 2001 From: Alex Langenfeld Date: Fri, 8 Mar 2024 16:46:55 -0600 Subject: [PATCH] [FuturesAwareThreadPoolExecutor] avoid compact on every submit (#20368) compacting on every submit is expensive for large throughput threadpools, so use a scheme to avoid doing it as often ## How I Tested These Changes existing tests --- python_modules/dagster/dagster/_core/utils.py | 33 ++++++++++++------- 1 file changed, 22 insertions(+), 11 deletions(-) diff --git a/python_modules/dagster/dagster/_core/utils.py b/python_modules/dagster/dagster/_core/utils.py index cc1779956a0a7..30ea8bb428e91 100644 --- a/python_modules/dagster/dagster/_core/utils.py +++ b/python_modules/dagster/dagster/_core/utils.py @@ -5,12 +5,13 @@ import uuid import warnings from collections import OrderedDict -from concurrent.futures import ThreadPoolExecutor +from concurrent.futures import Future, ThreadPoolExecutor from contextvars import copy_context from typing import ( AbstractSet, Any, Iterable, + List, Mapping, Optional, Sequence, @@ -143,6 +144,8 @@ class RequestUtilizationMetrics(TypedDict): class FuturesAwareThreadPoolExecutor(ThreadPoolExecutor): + MAX_SUBMITS_WITHOUT_COMPACT = 10_000 + def __init__( self, max_workers: Optional[int] = None, @@ -153,26 +156,34 @@ def __init__( super().__init__(max_workers, thread_name_prefix, initializer, initargs) # The default threadpool class doesn't track the futures it creates, # so if we want to be able to count the number of running futures, we need to do it ourselves. - self._all_futures = [] + self._tracked_futures: List[Future] = [] + self._submit_since_compact = 0 - def submit(self, fn, *args, **kwargs): + def submit(self, fn, *args, **kwargs) -> Future: new_future = super().submit(fn, *args, **kwargs) - self._all_futures = [ - future for future in self._all_futures if not future.done() - ] # clean up done futures - self._all_futures.append(new_future) + self._tracked_futures.append(new_future) + self._submit_since_compact += 1 + + # avoid checking all the futures on every submit, only compact + # after a number of calls. num_running_futures will also compact + # removing need to do it on submit + if self._submit_since_compact > self.MAX_SUBMITS_WITHOUT_COMPACT: + self._compact_and_count_not_done_futures() + return new_future @property def max_workers(self) -> int: return self._max_workers + def _compact_and_count_not_done_futures(self) -> int: + self._tracked_futures = [f for f in self._tracked_futures if not f.done()] + self._submit_since_compact = 0 + return len(self._tracked_futures) + @property def num_running_futures(self) -> int: - return ( - len([future for future in self._all_futures if not future.done()]) - - self.num_queued_futures - ) + return self._compact_and_count_not_done_futures() - self.num_queued_futures @property def num_queued_futures(self) -> int: