Skip to content

Commit

Permalink
Fix #3867 and try to explain but not crash when bad things happen to …
Browse files Browse the repository at this point in the history
…our mutex file
  • Loading branch information
adamnovak committed Oct 27, 2023
1 parent ecebc07 commit f756b62
Showing 1 changed file with 63 additions and 20 deletions.
83 changes: 63 additions & 20 deletions src/toil/lib/threading.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# Note: renamed from "threading.py" to "threading.py" to avoid conflicting imports
# from the built-in "threading" from psutil in python3.9
import atexit
import errno
import fcntl
import logging
import math
Expand Down Expand Up @@ -358,6 +359,9 @@ def global_mutex(base_dir: str, mutex: str) -> Iterator[None]:
:param str mutex: Mutex to lock. Must be a permissible path component.
"""

if not os.path.isdir(base_dir):
raise RuntimeError(f"Directory {base_dir} for mutex does not exist")

# Define a filename
lock_filename = os.path.join(base_dir, 'toil-mutex-' + mutex)

Expand All @@ -368,18 +372,32 @@ def global_mutex(base_dir: str, mutex: str) -> Iterator[None]:
# get a lock on the deleted file.

while True:
fd = -1

try:
# Try to create the file, ignoring if it exists or not.
fd = os.open(lock_filename, os.O_CREAT | os.O_WRONLY)
# Try to create the file, ignoring if it exists or not.
fd = os.open(lock_filename, os.O_CREAT | os.O_WRONLY)

# Wait until we can exclusively lock it.
fcntl.lockf(fd, fcntl.LOCK_EX)
# Wait until we can exclusively lock it.
fcntl.lockf(fd, fcntl.LOCK_EX)

# Holding the lock, make sure we are looking at the same file on disk still.
# Holding the lock, make sure we are looking at the same file on disk still.
try:
# So get the stats from the open file
fd_stats = os.fstat(fd)
except OSError as e:
if e.errno == errno.ESTALE:
# The file handle has gone stale, because somebody removed the file.
# Try again.
try:
fcntl.lockf(fd, fcntl.LOCK_UN)
except OSError:
pass
os.close(fd)
continue
else:
# Something else broke
raise

try:
# And get the stats for the name in the directory
path_stats: Optional[os.stat_result] = os.stat(lock_filename)
except FileNotFoundError:
path_stats = None
Expand All @@ -389,10 +407,9 @@ def global_mutex(base_dir: str, mutex: str) -> Iterator[None]:
# any). This usually happens, because before someone releases a
# lock, they delete the file. Go back and contend again. TODO: This
# allows a lot of queue jumping on our mutex.
if fd != -1:
fcntl.lockf(fd, fcntl.LOCK_UN)
os.close(fd)
continue
fcntl.lockf(fd, fcntl.LOCK_UN)
os.close(fd)
continue
else:
# We have a lock on the file that the name points to. Since we
# hold the lock, nobody will be deleting it or can be in the
Expand All @@ -407,14 +424,40 @@ def global_mutex(base_dir: str, mutex: str) -> Iterator[None]:
# Delete it while we still own it, so we can't delete it from out from
# under someone else who thinks they are holding it.
logger.debug('PID %d releasing mutex %s', os.getpid(), lock_filename)
os.unlink(lock_filename)
if fd != -1:
fcntl.lockf(fd, fcntl.LOCK_UN)
# Note that we are unlinking it and then unlocking it; a lot of people
# might have opened it before we unlinked it and will wake up when they
# get the worthless lock on the now-unlinked file. We have to do some
# stat gymnastics above to work around this.
os.close(fd)

# We have had observations in the wild of the lock file not exisiting
# when we go to unlink it, causing a crash on mutex release. See
# <https://github.com/DataBiosphere/toil/issues/4654>.
#
# We want to tolerate this; maybe unlink() interacts with fcntl() locks
# on NFS in a way that is actually fine, somehow? But we also want to
# complain loudly if something is tampering with our locks or not
# really enforcing locks on the filesystem, so we will notice if it is
# the cause of further problems.
try:
path_stats = os.stat(lock_filename)
except FileNotFoundError:
path_stats = None

# Check to make sure it still looks locked before we unlink.
if path_stats is None:
logger.error('PID %d had mutex %s disappear while locked! Mutex system is not working!', os.getpid(), lock_filename)
elif fd_stats.st_dev != path_stats.st_dev or fd_stats.st_ino != path_stats.st_ino:
logger.error('PID %d had mutex %s get replaced while locked! Mutex system is not working!', os.getpid(), lock_filename)

if path_stats is not None:
try:
# Unlink the file
os.unlink(lock_filename)
except FileNotFoundError:
logger.error('PID %d had mutex %s disappear between stat and unlink while unlocking! Mutex system is not working!', os.getpid(), lock_filename)

# Note that we are unlinking it and then unlocking it; a lot of people
# might have opened it before we unlinked it and will wake up when they
# get the worthless lock on the now-unlinked file. We have to do some
# stat gymnastics above to work around this.
fcntl.lockf(fd, fcntl.LOCK_UN)
os.close(fd)


class LastProcessStandingArena:
Expand Down

0 comments on commit f756b62

Please sign in to comment.