How to iterate over the output of a solid? #3211
-
Hi! I'm new to Dagster and stuck on what is a probably an anti-pattern. I'm trying to write a simple DAG: copy all the tables from one Database to another. My pipeline looks like this: from dagster import pipeline, solid
from my_dags import resources, modes
SOURCE_SCHEMA='pantheon'
DEST_SCHEMA='raw_mysql_drupal'
@solid(required_resource_keys={'mysql_drupal'})
def fetch_mysql_tables(context):
result = context.resources.mysql_drupal.query(
f"SELECT table_name FROM information_schema.tables WHERE schema_name='{SOURCE_SCHEMA}'"
)
for table in result.table_name:
yield Output(table)
@solid(required_resource_keys={'mysql_drupal'})
def fetch_table_data(context, table):
df = context.resources.mysql_drupal.query(
f'SELECT * FROM {SOURCE_SCHEMA}.{table}'
)
return Output({'name': table, 'df': df})
@solid(required_resource_keys={'warehouse'})
def copy_table_to_warehouse(context, table_data):
context.log.info(f'copying table "{table_data['table']}" to warehouse')
conn = context.resources.warehouse.connect()
table_data['df'].to_sql(table_data['name'], conn, schema=DEST_SCHEMA)
@pipeline(
mode_defs=[modes.DEFAULT]
)
def mysql_to_postgres_pipeline():
for table in fetch_mysql_tables():
copy_table_to_warehouse(fetch_table_data(table)) However, when I run this, I get the following error:
I'm confused here because it looks like I am yielding multiple Outputs, but maybe I am confused. What I'd like to do is have the first solid generate a list of tables and have that list create a new branch of the DAG for each table. Should I be looking into solid factories? |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 2 replies
-
We don't have support for mapping over results like this yet, but it is planned for an upcoming release. Here is the relevant issue: #462. @catherinewu can give more info here. You could refactor this to do all the mapping in each solid. Here's one way to re-organize this logic: from dagster import pipeline, solid
from my_dags import resources, modes
SOURCE_SCHEMA='pantheon'
DEST_SCHEMA='raw_mysql_drupal'
@solid(required_resource_keys={'mysql_drupal'})
def fetch_mysql_tables(context):
result = context.resources.mysql_drupal.query(
f"SELECT table_name FROM information_schema.tables WHERE schema_name='{SOURCE_SCHEMA}'"
)
yield Output([table in result.table_name])
@solid(required_resource_keys={'mysql_drupal'})
def fetch_table_datas(context, table_names):
table_datas = []
for table in table_names
df = context.resources.mysql_drupal.query(
f'SELECT * FROM {SOURCE_SCHEMA}.{table}'
)
table_datas.append({'name': table, 'df': df})
return Output(table_datas)
@solid(required_resource_keys={'warehouse'})
def copy_table_to_warehouse(context, table_datas):
for table_data in table_datas:
context.log.info(f'copying table "{table_data['table']}" to warehouse')
conn = context.resources.warehouse.connect()
table_data['df'].to_sql(table_data['name'], conn, schema=DEST_SCHEMA)
@pipeline(
mode_defs=[modes.DEFAULT]
)
def mysql_to_postgres_pipeline():
table_names = fetch_mysql_tables()
table_datas = fetch_table_datas(table_names)
copy_tables_to_warehouse(table_datas) |
Beta Was this translation helpful? Give feedback.
We don't have support for mapping over results like this yet, but it is planned for an upcoming release. Here is the relevant issue: #462. @catherinewu can give more info here.
You could refactor this to do all the mapping in each solid. Here's one way to re-organize this logic: