Skip to content

Commit

Permalink
mypy fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
astro-friedel committed Dec 3, 2024
1 parent 9ed699d commit ce609cc
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 13 deletions.
2 changes: 1 addition & 1 deletion parsl/curvezmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ def _start_auth_thread(self) -> ThreadAuthenticator:
auth_thread.start()
# Only allow certs that are in the cert dir
assert self.cert_dir # For mypy
auth_thread.configure_curve(domain="*", location=self.cert_dir)
auth_thread.configure_curve(domain="*", location=str(self.cert_dir))
return auth_thread

def socket(self, socket_type: int, *args, **kwargs) -> zmq.Socket:
Expand Down
4 changes: 2 additions & 2 deletions parsl/data_provider/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ def __init__(self, url: Union[os.PathLike, str], uu_id: Optional[uuid.UUID] = No
self.path = parsed_url.path
self.filename = os.path.basename(self.path)
# let the DFK set these values, if needed
self.size = None
self.md5sum = None
self.size: Optional[int] = None
self.md5sum: Optional[str] = None
self.timestamp = timestamp

self.local_path: Optional[str] = None
Expand Down
10 changes: 5 additions & 5 deletions parsl/dataflow/dflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ def std_spec_to_name(name, spec):
def _send_file_log_info(self, file: Union[File, DataFuture],
task_record: TaskRecord, is_output: bool) -> None:
""" Generate a message for the monitoring db about a file. """
if self.file_provenance:
if self.monitoring and self.file_provenance:
file_log_info = self._create_file_log_info(file, task_record)
# make sure the task_id is None for inputs
if not is_output:
Expand Down Expand Up @@ -343,15 +343,15 @@ def _create_file_log_info(self, file: Union[File, DataFuture],
def register_as_input(self, f: Union[File, DataFuture],
task_record: TaskRecord):
""" Register a file as an input to a task. """
if self.file_provenance:
if self.monitoring and self.file_provenance:
self._send_file_log_info(f, task_record, False)
file_input_info = self._create_file_io_info(f, task_record)
self.monitoring.send((MessageType.INPUT_FILE, file_input_info))

def register_as_output(self, f: Union[File, DataFuture],
task_record: TaskRecord):
""" Register a file as an output of a task. """
if self.file_provenance:
if self.monitoring and self.file_provenance:
self._send_file_log_info(f, task_record, True)
file_output_info = self._create_file_io_info(f, task_record)
self.monitoring.send((MessageType.OUTPUT_FILE, file_output_info))
Expand All @@ -370,7 +370,7 @@ def _create_file_io_info(self, file: Union[File, DataFuture],

def _register_env(self, environ: ParslExecutor) -> None:
""" Capture the environment information for the monitoring db. """
if self.file_provenance:
if self.monitoring and self.file_provenance:
environ_info = self._create_env_log_info(environ)
self.monitoring.send((MessageType.ENVIRONMENT_INFO, environ_info))

Expand All @@ -387,7 +387,7 @@ def _create_env_log_info(self, environ: ParslExecutor) -> Dict[str, Any]:
provider = getattr(environ, 'provider', None)
if provider is not None:
env_log_info['provider'] = provider.label
env_log_info['launcher'] = type(getattr(provider, 'launcher', None))
env_log_info['launcher'] = str(type(getattr(provider, 'launcher', None)))
env_log_info['worker_init'] = getattr(provider, 'worker_init', None)
return env_log_info

Expand Down
3 changes: 3 additions & 0 deletions parsl/dataflow/taskrecord.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,6 @@ class TaskRecord(TypedDict, total=False):
"""Restricts access to end-of-join behavior to ensure that joins
only complete once, even if several joining Futures complete close
together in time."""

environment: str
"""The environment in which the task is being executed."""
10 changes: 5 additions & 5 deletions parsl/monitoring/db_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import queue
import threading
import time
from typing import Any, Dict, List, Optional, Set, Tuple, TypeVar, cast
from typing import Any, Dict, List, Optional, Set, Tuple, TypeVar, Union, cast

import typeguard

Expand Down Expand Up @@ -395,10 +395,10 @@ def start(self,
"""
like inserted_tasks but for Files
"""
inserted_files = dict() # type: Dict[str, Dict[str, Union[None, datetime.datetime, str, int]]]
input_inserted_files = dict() # type: Dict[str, List[str]]
output_inserted_files = dict() # type: Dict[str, List[str]]
inserted_envs = set() # type: Set[object]
inserted_files: Dict[str, Dict[str, Union[None, datetime.datetime, str, int]]] = dict()
input_inserted_files: Dict[str, List[str]] = dict()
output_inserted_files: Dict[str, List[str]] = dict()
inserted_envs: Set[object] = set()

# for any task ID, we can defer exactly one message, which is the
# assumed-to-be-unique first message (with first message flag set).
Expand Down

0 comments on commit ce609cc

Please sign in to comment.