Executing solids only once per partition #4287
-
Asking here first, so that would see if i'm missing something. It's quite common for ETL frameworks to execute any unit of computations only once per partition. I've assumed Given this code, i run @solid
def fx_combined(context):
# let's assume this runs for 2 hours
context.log.info('IN COMBINED...')
return [{'a': 2}]
@solid
def fx(_, combined_res):
# let's assume this runs for 1 minute
from sources.fx import FxRate
context.log.info(f'Combined res: {combined_res}')
return [FxRate('2021-06-11', 'EUR', 'EUR', 1.0)]
@pipeline
def fx_pipeline():
fx(fx_combined())
def partition_fn() -> List[Partition]:
return [Partition("2021-06-11")]
@repository
def main():
trading_days = PartitionSetDefinition("trading_days", fx_pipeline.__name__, partition_fn)
schedule = trading_days.create_schedule_definition("fx_update", "0 17 * * Mon-Fri",
identity_partition_selector, execution_timezone="CET")
return [fx_pipeline, trading_days, schedule] ... where i go to Every time i'm launching in, i get the whole pipeline recomputed, which is not exactly what i would like to happen. Typical case would be to trigger only Is it already possible/thought about? or some custom code has to be written for it? I'm writing also a custom IOManager to persist data in systems readable outside of Dagster, so perhaps it could be done there?... |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 1 reply
-
How do you expect https://docs.dagster.io/guides/dagster/memoization#computing-versions may be relevant |
Beta Was this translation helpful? Give feedback.
How do you expect
fx
only to be triggered? Are you looking for a manual selection or do you expect the system to decide not to runfx_combined
automatically? If automatically, how?https://docs.dagster.io/guides/dagster/memoization#computing-versions may be relevant