Skip to content

Commit

Permalink
Fix slow handling of cached files with large cache_file_limit (#1352)
Browse files Browse the repository at this point in the history
  • Loading branch information
clearml committed Dec 7, 2024
1 parent ba492dd commit fd01be6
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 28 deletions.
52 changes: 31 additions & 21 deletions clearml/storage/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ def get_local_copy(self, remote_url, force_download, skip_zero_size_check=False)
if cached_size is not None and not force_download:
CacheManager._add_remote_url(remote_url, cached_file)
return cached_file

self.clean_cache()
# we need to download the file:
downloaded_file = helper.download_to_file(
remote_url,
Expand Down Expand Up @@ -163,7 +165,34 @@ def get_cache_file(self, remote_url=None, local_filename=None):
:param local_filename: if local_file is given, search for the local file/directory in the cache folder
:return: full path to file name, current file size or None
"""
folder = Path(get_cache_dir() / CacheManager._storage_manager_folder / self._context)
folder.mkdir(parents=True, exist_ok=True)
local_filename = local_filename or self.get_hashed_url_file(remote_url)
local_filename = self._conform_filename(local_filename)
new_file = folder / local_filename
new_file_exists = new_file.exists()
if new_file_exists:
# noinspection PyBroadException
try:
new_file.touch(exist_ok=True)
except Exception:
pass
# if file doesn't exist, return file size None
# noinspection PyBroadException
try:
new_file_size = new_file.stat().st_size if new_file_exists else None
except Exception:
new_file_size = None

return new_file.as_posix(), new_file_size

def clean_cache(self):
# type: () -> bool
"""
If cache is full, clean it by deleting old/lock files
:return: True if the cache has been cleaned and False otherwise
"""
def safe_time(x):
# noinspection PyBroadException
try:
Expand All @@ -183,27 +212,9 @@ def sort_max_access_time(x):
return atime

folder = Path(get_cache_dir() / CacheManager._storage_manager_folder / self._context)
folder.mkdir(parents=True, exist_ok=True)
local_filename = local_filename or self.get_hashed_url_file(remote_url)
local_filename = self._conform_filename(local_filename)
new_file = folder / local_filename
new_file_exists = new_file.exists()
if new_file_exists:
# noinspection PyBroadException
try:
new_file.touch(exist_ok=True)
except Exception:
pass
# if file doesn't exist, return file size None
# noinspection PyBroadException
try:
new_file_size = new_file.stat().st_size if new_file_exists else None
except Exception:
new_file_size = None

folder_files = list(folder.iterdir())
if len(folder_files) <= self._file_limit:
return new_file.as_posix(), new_file_size
return False

# first exclude lock files
lock_files = dict()
Expand Down Expand Up @@ -269,8 +280,7 @@ def sort_max_access_time(x):
os.unlink(f)
except BaseException:
pass

return new_file.as_posix(), new_file_size
return True

def lock_cache_folder(self, local_path):
# type: (Union[str, Path]) -> ()
Expand Down
16 changes: 10 additions & 6 deletions clearml/utilities/process/mp.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,12 +281,16 @@ def __init__(self, *args, **kwargs):
# Fix the python Queue and Use SimpleQueue write so it uses a single OS write,
# making it atomic message passing
self._q = SimpleQueue(*args, **kwargs)
# noinspection PyBroadException
try:
# noinspection PyUnresolvedReferences,PyProtectedMember
self._q._writer._send_bytes = partial(SafeQueue._pipe_override_send_bytes, self._q._writer)
except Exception:
pass

# on Windows, queue communication is done via pipes, no need to override the _send_bytes method
if sys.platform != 'win32':
# noinspection PyBroadException
try:
# noinspection PyUnresolvedReferences,PyProtectedMember
self._q._writer._send_bytes = partial(SafeQueue._pipe_override_send_bytes, self._q._writer)
except Exception:
pass

self._internal_q = None
# Note we should Never! assign a new object to `self._q_size`, just work with the initial object
self._q_size = [] # list of PIDs we pushed, so this is atomic.
Expand Down
2 changes: 1 addition & 1 deletion examples/router/http_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
A local route is then created, which will proxy all traffic from
`http://<PRIVATE_IP>:9000/example_source` to `http://localhost:8000/serve`.
Trafic can be intercepted both on request and response via callbacks. See
Traffic can be intercepted both on request and response via callbacks. See
`request_callback` and `response_callback`.
By default, the route traffic is monitored and telemetry is sent to the ClearML
Expand Down

0 comments on commit fd01be6

Please sign in to comment.