Skip to content

Commit

Permalink
[FuturesAwareThreadPoolExecutor] avoid compact on every submit (#20368)
Browse files Browse the repository at this point in the history
compacting on every submit is expensive for large throughput
threadpools, so use a scheme to avoid doing it as often

## How I Tested These Changes

existing tests
  • Loading branch information
alangenfeld authored Mar 8, 2024
1 parent e6537ff commit 076cd7b
Showing 1 changed file with 22 additions and 11 deletions.
33 changes: 22 additions & 11 deletions python_modules/dagster/dagster/_core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@
import uuid
import warnings
from collections import OrderedDict
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import Future, ThreadPoolExecutor
from contextvars import copy_context
from typing import (
AbstractSet,
Any,
Iterable,
List,
Mapping,
Optional,
Sequence,
Expand Down Expand Up @@ -143,6 +144,8 @@ class RequestUtilizationMetrics(TypedDict):


class FuturesAwareThreadPoolExecutor(ThreadPoolExecutor):
MAX_SUBMITS_WITHOUT_COMPACT = 10_000

def __init__(
self,
max_workers: Optional[int] = None,
Expand All @@ -153,26 +156,34 @@ def __init__(
super().__init__(max_workers, thread_name_prefix, initializer, initargs)
# The default threadpool class doesn't track the futures it creates,
# so if we want to be able to count the number of running futures, we need to do it ourselves.
self._all_futures = []
self._tracked_futures: List[Future] = []
self._submit_since_compact = 0

def submit(self, fn, *args, **kwargs):
def submit(self, fn, *args, **kwargs) -> Future:
new_future = super().submit(fn, *args, **kwargs)
self._all_futures = [
future for future in self._all_futures if not future.done()
] # clean up done futures
self._all_futures.append(new_future)
self._tracked_futures.append(new_future)
self._submit_since_compact += 1

# avoid checking all the futures on every submit, only compact
# after a number of calls. num_running_futures will also compact
# removing need to do it on submit
if self._submit_since_compact > self.MAX_SUBMITS_WITHOUT_COMPACT:
self._compact_and_count_not_done_futures()

return new_future

@property
def max_workers(self) -> int:
return self._max_workers

def _compact_and_count_not_done_futures(self) -> int:
self._tracked_futures = [f for f in self._tracked_futures if not f.done()]
self._submit_since_compact = 0
return len(self._tracked_futures)

@property
def num_running_futures(self) -> int:
return (
len([future for future in self._all_futures if not future.done()])
- self.num_queued_futures
)
return self._compact_and_count_not_done_futures() - self.num_queued_futures

@property
def num_queued_futures(self) -> int:
Expand Down

0 comments on commit 076cd7b

Please sign in to comment.