Replies: 3 comments 4 replies
-
What is the goal of encapsulating I/O params into an object? Why not pass it as an actual parameters to a task? |
Beta Was this translation helpful? Give feedback.
-
I think it would be very useful to somehow abstract away some of the specifics of the filesystem requirements in Hedwig. It is very difficult to write Pytests, for example, because there are so many Path dependencies involved. There are lots of checks to see where a Path is in the working_dir. So the whole Path often has to be created to run a simple test - or at least that's what I remember. I'm not sure if the TaskIO concept would relieve this, but it might help. |
Beta Was this translation helpful? Give feedback.
-
|
Beta Was this translation helpful? Give feedback.
-
Currently, tasks in the workflows have an implicit dependency on
FilePath
objects. For example,Here,
gen_zarr
function could have directly received theinput_tiff
file as an argument. However, input_tiff is implicitly referred to be stored at a specific location in the working dir with the exact same name. This becomes a problem, when an upstream task decides to modify the name of the tiff file to{working_dir}/{base}__as_tiff.tiff
, sogen_zarr
task breaks with aFileNotFound
exception.A generic principle to fix these issues is:
Each task should receive only the inputs it needs to perform its specific function. This promotes modularity and reusability by keeping tasks self-contained and focused on their individual responsibilities.
Tasks should explicitly declare their dependencies by accepting input parameters or dependencies from upstream tasks. This makes the task dependencies clear and helps in understanding the data flow within the workflow.
Avoid relying on global state or shared variables to pass data between tasks, as this can lead to dependencies and make the workflow harder to understand and debug. Instead, pass inputs explicitly between tasks using parameters or context objects.
Tasks should produce outputs that are consumed by downstream tasks, either by returning values from the execute method or by storing outputs in a location accessible to other tasks (e.g., a file in a shared filesystem or a database table,).
Implement error handling within each task to handle failures gracefully. This may involve catching exceptions, retrying failed operations, logging errors, and possibly triggering alerts or notifications.
One approach we are considering is creating an object for Task I/O. Each task takes in TaskIO object, which has task-specific inputs. It executes and returns a new
TaskIO
object with outputs that would be consumed by downstream tasks.In order to enforce
Task-specific inputs and Error handling
, we came up with a decorator called stream or taskio handler. This makes sure that if upstream tasks has observed/created an unhandled exception, it will be passed right through the current task.Beta Was this translation helpful? Give feedback.
All reactions