Skip to content

Commit

Permalink
checkpoint exceptions
Browse files Browse the repository at this point in the history
  • Loading branch information
benclifford committed Jul 30, 2024
1 parent fd311d0 commit 1846145
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 5 deletions.
24 changes: 19 additions & 5 deletions parsl/dataflow/memoization.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import typeguard

from parsl.dataflow.errors import BadCheckpoint
from parsl.dataflow.futures import AppFuture
from parsl.dataflow.taskrecord import TaskRecord

if TYPE_CHECKING:
Expand Down Expand Up @@ -336,8 +337,12 @@ def _load_checkpoints(self, checkpointDirs: Sequence[str]) -> Dict[str, Future[A
data = pickle.load(f)
# Copy and hash only the input attributes
memo_fu: Future = Future()
assert data['exception'] is None
memo_fu.set_result(data['result'])

if data['exception'] is None:
memo_fu.set_result(data['result'])
else:
assert data['result'] is None
memo_fu.set_exception(data['exception'])
memo_lookup_table[data['hash']] = memo_fu

except EOFError:
Expand Down Expand Up @@ -418,17 +423,22 @@ def checkpoint(self, tasks: Sequence[TaskRecord]) -> str:

app_fu = task_record['app_fu']

if app_fu.done() and app_fu.exception() is None:
if app_fu.done() and self.filter_for_checkpoint(app_fu):

hashsum = task_record['hashsum']
if not hashsum:
continue
t = {'hash': hashsum, 'exception': None, 'result': app_fu.result()}

if app_fu.exception() is None:
t = {'hash': hashsum, 'exception': None, 'result': app_fu.result()}
else:
t = {'hash': hashsum, 'exception': app_fu.exception(), 'result': None}

# We are using pickle here since pickle dumps to a file in 'ab'
# mode behave like a incremental log.
pickle.dump(t, f)
count += 1
logger.debug("Task {} checkpointed".format(task_id))
logger.debug("Task {} checkpointed as result".format(task_id))

self.checkpointed_tasks += count

Expand All @@ -441,3 +451,7 @@ def checkpoint(self, tasks: Sequence[TaskRecord]) -> str:
logger.info("Done checkpointing {} tasks".format(count))

return checkpoint_dir

def filter_for_checkpoint(self, app_fu: AppFuture) -> bool:
"""Overridable method to decide if an entry should be checkpointed"""
return app_fu.exception() is None
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import contextlib
import os

import pytest

import parsl
from parsl import python_app
from parsl.config import Config
from parsl.dataflow.memoization import BasicMemoizer
from parsl.executors.threads import ThreadPoolExecutor


class CheckpointExceptionsMemoizer(BasicMemoizer):
def filter_for_checkpoint(self, app_fu):
# checkpoint everything, rather than selecting only futures with
# results, not exceptions.

# task record is available from app_fu.task_record
assert app_fu.task_record is not None

return True


def fresh_config():
return Config(
memoizer=CheckpointExceptionsMemoizer(),
executors=[
ThreadPoolExecutor(
label='local_threads_checkpoint',
)
]
)


@contextlib.contextmanager
def parsl_configured(run_dir, **kw):
c = fresh_config()
c.run_dir = run_dir
for config_attr, config_val in kw.items():
setattr(c, config_attr, config_val)
dfk = parsl.load(c)
for ex in dfk.executors.values():
ex.working_dir = run_dir
yield dfk

parsl.dfk().cleanup()


@python_app(cache=True)
def uuid_app():
import uuid
raise RuntimeError(str(uuid.uuid4()))


@pytest.mark.local
def test_loading_checkpoint(tmpd_cwd):
"""Load memoization table from previous checkpoint
"""
with parsl_configured(tmpd_cwd, checkpoint_mode="task_exit"):
checkpoint_files = [os.path.join(parsl.dfk().run_dir, "checkpoint")]
result = uuid_app().exception()

with parsl_configured(tmpd_cwd, checkpoint_files=checkpoint_files):
relaunched = uuid_app().exception()

assert result.args == relaunched.args, "Expected following call to uuid_app to return cached uuid in exception"

0 comments on commit 1846145

Please sign in to comment.