Skip to content

Commit

Permalink
Stop deleting chained-to jobs which fail as orphaned jobs (#4557)
Browse files Browse the repository at this point in the history
* Use a different name for unreachable jobs

* Get rid of unused jobsToDelete in the file store

* Calrify semantics of filesToDelete as a write-ahead deletion journal

* Treat chaining as merging jobs

* Add a test for restarting a chained-to job

* Turn off the fix to prove that we can reproduce the problem

* Revert "Turn off the fix to prove that we can reproduce the problem"

This reverts commit dce4610.

* Resolve TODO

* Move responsibility for snapshotting job descriptions during commits over to the file store

* Fix whitespace and change non caching layout back

* Add a bunch of async commit debugging

* Catch the deepcopy misbehavior

* Allow deepcopy on job descriptions to chain correctly

* Remove extra debugging
  • Loading branch information
adamnovak authored Aug 10, 2023
1 parent 26cc98b commit 55c0410
Show file tree
Hide file tree
Showing 9 changed files with 163 additions and 75 deletions.
12 changes: 7 additions & 5 deletions src/toil/fileStores/abstractFileStore.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,6 @@ def __init__(
# job is re-run it will need to be able to re-delete these files.
# This is a set of str objects, not FileIDs.
self.filesToDelete: Set[str] = set()
# Records IDs of jobs that need to be deleted when the currently
# running job is cleaned up.
# May be modified by the worker to actually delete jobs!
self.jobsToDelete: Set[str] = set()
# Holds records of file ID, or file ID and local path, for reporting
# the accessed files of failed jobs.
self._accessLog: List[Tuple[str, ...]] = []
Expand Down Expand Up @@ -606,7 +602,13 @@ def startCommit(self, jobState: bool = False) -> None:
"""
Update the status of the job on the disk.
May start an asynchronous process. Call waitForCommit() to wait on that process.
May bump the version number of the job.
May start an asynchronous process. Call waitForCommit() to wait on that
process. You must waitForCommit() before committing any further updates
to the job. During the asynchronous process, it is safe to modify the
job; modifications after this call will not be committed until the next
call.
:param jobState: If True, commit the state of the FileStore's job,
and file deletes. Otherwise, commit only file creates/updates.
Expand Down
58 changes: 38 additions & 20 deletions src/toil/fileStores/cachingFileStore.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import errno
import hashlib
import logging
Expand Down Expand Up @@ -1796,11 +1797,32 @@ def startCommit(self, jobState=False):
# value?) wait on it, so we can't forget to join it later.
self.waitForCommit()

if len(self.jobDesc.filesToDelete) > 0:
raise RuntimeError("Job is already in the process of being committed!")

state_to_commit: Optional[JobDescription] = None

if jobState:
# Clone the current job description, so that further updates to it
# (such as new successors being added when it runs) occur after the
# commit process, and aren't committed early or partially.
state_to_commit = copy.deepcopy(self.jobDesc)
# Also snapshot the files that should be seen as deleted once the
# update of the job description is visible.
state_to_commit.filesToDelete = list(self.filesToDelete)
# TODO: We never clear this out on the file store itself. This
# might be necessary for later jobs to see earlier jobs' deleted
# before they are committed?

# Bump the original's version since saving will do that too and we
# don't want duplicate versions.
self.jobDesc.reserve_versions(1 if len(state_to_commit.filesToDelete) == 0 else 2)

# Start the commit thread
self.commitThread = threading.Thread(target=self.startCommitThread, args=(jobState,))
self.commitThread = threading.Thread(target=self.startCommitThread, args=(state_to_commit,))
self.commitThread.start()

def startCommitThread(self, jobState):
def startCommitThread(self, state_to_commit: Optional[JobDescription]):
"""
Run in a thread to actually commit the current job.
"""
Expand All @@ -1823,25 +1845,21 @@ def startCommitThread(self, jobState):
# Finish all deletions out of the cache (not from the job store)
self._executePendingDeletions(self.coordination_dir, con, cur)

if jobState:
if state_to_commit is not None:
# Do all the things that make this job not redoable

logger.debug('Committing file deletes and job state changes asynchronously')

# Indicate any files that should be deleted once the update of
# the job wrapper is completed.
self.jobDesc.filesToDelete = list(self.filesToDelete)
# Complete the job
self.jobStore.update_job(self.jobDesc)
# Delete any remnant jobs
list(map(self.jobStore.delete_job, self.jobsToDelete))
# Delete any remnant files
list(map(self.jobStore.delete_file, self.filesToDelete))
self.jobStore.update_job(state_to_commit)
# Delete the files
list(map(self.jobStore.delete_file, state_to_commit.filesToDelete))
# Remove the files to delete list, having successfully removed the files
if len(self.filesToDelete) > 0:
self.jobDesc.filesToDelete = []
if len(state_to_commit.filesToDelete) > 0:
state_to_commit.filesToDelete = []
# Update, removing emptying files to delete
self.jobStore.update_job(self.jobDesc)
self.jobStore.update_job(state_to_commit)

except:
self._terminateEvent.set()
raise
Expand All @@ -1852,14 +1870,14 @@ def startCommitThread(self, jobState):
def shutdown(cls, shutdown_info: Tuple[str, str]) -> None:
"""
:param shutdown_info: Tuple of the coordination directory (where the
cache database is) and the cache directory (where the cached data is).
cache database is) and the cache directory (where the cached data is).
Job local temp directories will be removed due to their appearance in
the database.
"""

coordination_dir, cache_dir = shutdown_info

if os.path.isdir(cache_dir):
# There is a directory to clean up

Expand All @@ -1877,7 +1895,7 @@ def shutdown(cls, shutdown_info: Tuple[str, str]) -> None:
# and use that.
dbFilename = None
dbAttempt = float('-inf')

# We also need to remember all the plausible database files and
# journals
all_db_files = []
Expand Down Expand Up @@ -1929,7 +1947,7 @@ def shutdown(cls, shutdown_info: Tuple[str, str]) -> None:
for filename in all_db_files:
# And delete everything related to the caching database
robust_rmtree(filename)

def __del__(self):
"""
Cleanup function that is run when destroying the class instance that ensures that all the
Expand Down
12 changes: 8 additions & 4 deletions src/toil/fileStores/nonCachingFileStore.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,18 +194,21 @@ def startCommit(self, jobState: bool = False) -> None:
if self.waitForPreviousCommit is not None:
self.waitForPreviousCommit()

# We are going to commit synchronously, so no need to clone a snapshot
# of the job description or mess with its version numbering.

if not jobState:
# All our operations that need committing are job state related
return

try:
# Indicate any files that should be deleted once the update of
# the job wrapper is completed.
# Indicate any files that should be seen as deleted once the
# update of the job description is visible.
if len(self.jobDesc.filesToDelete) > 0:
raise RuntimeError("Job is already in the process of being committed!")
self.jobDesc.filesToDelete = list(self.filesToDelete)
# Complete the job
self.jobStore.update_job(self.jobDesc)
# Delete any remnant jobs
list(map(self.jobStore.delete_job, self.jobsToDelete))
# Delete any remnant files
list(map(self.jobStore.delete_file, self.filesToDelete))
# Remove the files to delete list, having successfully removed the files
Expand All @@ -217,6 +220,7 @@ def startCommit(self, jobState: bool = False) -> None:
self._terminateEvent.set()
raise


def __del__(self) -> None:
"""
Cleanup function that is run when destroying the class instance. Nothing to do since there
Expand Down
63 changes: 46 additions & 17 deletions src/toil/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import pickle
import sys
import time
import types
import uuid
from abc import ABCMeta, abstractmethod
from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser, Namespace
Expand Down Expand Up @@ -449,8 +450,15 @@ def __copy__(self) -> "Requirer":

def __deepcopy__(self, memo: Any) -> "Requirer":
"""Return a semantically-deep copy of the object, for :meth:`copy.deepcopy`."""
# See https://stackoverflow.com/a/40484215 for how to do an override
# that uses the base implementation
# We used to use <https://stackoverflow.com/a/40484215> but that was
# discovered to not actually work right, because you would get the
# copy, if later copied again, stamping out copies of the *original*
# object, due to putting back a method as a member that was already
# bound to a self parameter.

# So we have to also tinker with the method binding as noted in
# <https://stackoverflow.com/a/71125311>.
# TODO: What's the default implementation actually?

# Hide this override
implementation = self.__deepcopy__
Expand All @@ -459,9 +467,11 @@ def __deepcopy__(self, memo: Any) -> "Requirer":
# Do the deepcopy which omits the config via __getstate__ override
clone = copy.deepcopy(self, memo)

# Put back the override on us and the copy
# Put back the override on us
self.__deepcopy__ = implementation # type: ignore[assignment]
clone.__deepcopy__ = implementation # type: ignore[assignment]

# Bind the override to the copy and put it on the copy
clone.__deepcopy__ = types.MethodType(implementation.__func__, clone) # type: ignore[assignment]

if self._config is not None:
# Share a config reference
Expand Down Expand Up @@ -795,15 +805,27 @@ def makeString(x: Union[str, bytes, None]) -> str:
# default value for this workflow execution.
self._remainingTryCount = None

# Holds FileStore FileIDs of the files that this job has deleted. Used
# to journal deletions of files and recover from a worker crash between
# committing a JobDescription update and actually executing the
# requested deletions.
# Holds FileStore FileIDs of the files that should be seen as deleted,
# as part of a transaction with the writing of this version of the job
# to the job store. Used to journal deletions of files and recover from
# a worker crash between committing a JobDescription update (for
# example, severing the body of a completed job from the
# JobDescription) and actually executing the requested deletions (i.e.
# the deletions made by executing the body).
#
# Since the files being deleted might be required to execute the job
# body, we can't delete them first, but we also don't want to leave
# them behind if we die right after saving the JobDescription.
#
# This will be empty at all times except when a new version of a job is
# in the process of being committed.
self.filesToDelete = []

# Holds JobStore Job IDs of the jobs that have been chained into this
# job, and which should be deleted when this job finally is deleted.
self.jobsToDelete = []
# job, and which should be deleted when this job finally is deleted
# (but not before). The successor relationships with them will have
# been cut, so we need to hold onto them somehow.
self.merged_jobs = []

# The number of direct predecessors of the job. Needs to be stored at
# the JobDescription to support dynamically-created jobs with multiple
Expand Down Expand Up @@ -1027,15 +1049,15 @@ def replace(self, other: "JobDescription") -> None:
logger.debug('%s is adopting successor phases from %s of: %s', self, other, old_phases)
self.successor_phases = old_phases + self.successor_phases

# TODO: also be able to take on the successors of the other job, under
# ours on the stack, somehow.

# When deleting, we need to delete the files for our old ID, and also
# anything that needed to be deleted for the job we are replacing.
self.merged_jobs += [self.jobStoreID] + other.merged_jobs
self.jobStoreID = other.jobStoreID

# Save files and jobs to delete from the job we replaced, so we can
# roll up a whole chain of jobs and delete them when they're all done.
self.filesToDelete += other.filesToDelete
self.jobsToDelete += other.jobsToDelete
if len(other.filesToDelete) > 0:
raise RuntimeError("Trying to take on the ID of a job that is in the process of being committed!")
if len(self.filesToDelete) > 0:
raise RuntimeError("Trying to take on the ID of anothe job while in the process of being committed!")

self._job_version = other._job_version

Expand Down Expand Up @@ -1217,6 +1239,13 @@ def __str__(self) -> str:
def __repr__(self):
return f'{self.__class__.__name__}( **{self.__dict__!r} )'

def reserve_versions(self, count: int) -> None:
"""
Reserve a job version number for later, for journaling asynchronously.
"""
self._job_version += count
logger.debug("Skip ahead to job version: %s", self)

def pre_update_hook(self) -> None:
"""
Run before pickling and saving a created or updated version of this job.
Expand Down
12 changes: 10 additions & 2 deletions src/toil/jobStores/abstractJobStore.py
Original file line number Diff line number Diff line change
Expand Up @@ -794,6 +794,10 @@ def get_jobs_reachable_from_root() -> Set[str]:
for service_jobstore_id in root_job_description.services:
if haveJob(service_jobstore_id):
reachable_from_root.add(service_jobstore_id)
for merged_jobstore_id in root_job_description.merged_jobs:
# Keep merged-in jobs around themselves, but don't bother
# exploring them, since we took their successors.
reachable_from_root.add(merged_jobstore_id)

# Unprocessed means it might have successor jobs we need to add.
unprocessed_job_descriptions = [root_job_description]
Expand All @@ -815,6 +819,10 @@ def get_jobs_reachable_from_root() -> Set[str]:
reachable_from_root.add(service_jobstore_id)

new_job_descriptions_to_process.append(successor_job_description)
for merged_jobstore_id in job_description.merged_jobs:
# Keep merged-in jobs around themselves, but don't bother
# exploring them, since we took their successors.
reachable_from_root.add(merged_jobstore_id)
unprocessed_job_descriptions = new_job_descriptions_to_process

logger.debug(f"{len(reachable_from_root)} jobs reachable from root.")
Expand All @@ -824,8 +832,8 @@ def get_jobs_reachable_from_root() -> Set[str]:

# Cleanup jobs that are not reachable from the root, and therefore orphaned
# TODO: Avoid reiterating reachable_from_root (which may be very large)
jobsToDelete = [x for x in getJobDescriptions() if x.jobStoreID not in reachable_from_root]
for jobDescription in jobsToDelete:
unreachable = [x for x in getJobDescriptions() if x.jobStoreID not in reachable_from_root]
for jobDescription in unreachable:
# clean up any associated files before deletion
for fileID in jobDescription.filesToDelete:
# Delete any files that should already be deleted
Expand Down
13 changes: 8 additions & 5 deletions src/toil/test/jobStores/jobStoreTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,11 +224,14 @@ def testChildLoadingEquality(self):

def testPersistantFilesToDelete(self):
"""
Make sure that updating a job carries over filesToDelete.
The following demonstrates the job update pattern, where files to be deleted are referenced in
"filesToDelete" array, which is persisted to disk first. If things go wrong during the update, this list of
files to delete is used to remove the unneeded files.
Make sure that updating a job persists filesToDelete.
The following demonstrates the job update pattern, where files to
be deleted atomically with a job update are referenced in
"filesToDelete" array, which is persisted to disk first. If things
go wrong during the update, this list of files to delete is used to
ensure that the updated job and the files are never both visible at
the same time.
"""

# Create a job.
Expand Down
2 changes: 2 additions & 0 deletions src/toil/test/src/fileStoreTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,8 @@ def testAsyncWriteWithCaching(self):
Attempting to get the file from the jobstore should not fail.
"""
print("Testing")
logger.debug("Testing testing 123")
self.options.retryCount = 0
self.options.logLevel = 'DEBUG'
A = Job.wrapJobFn(self._adjustCacheLimit, newTotalMB=1024, disk='1G')
Expand Down
Loading

0 comments on commit 55c0410

Please sign in to comment.