Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

complexe uc #2

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 1 addition & 17 deletions __main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,4 @@


if __name__ == "__main__":
result = defs.get_job_def(job_name).execute_in_process(
run_config=RunConfig(
ops={
"catfacts": ApiConfig(max_length=70, limit=20),
},
resources={
"io_manager": S3Config(
s3_bucket="konpyutaika-product-catfacts-staging",
s3_prefix="catfacts",
),
"dataframe_io_manager": S3Config(
s3_bucket="konpyutaika-product-catfacts-staging",
s3_prefix="catfacts/parquet",
)
}
)
)
result = defs.get_job_def("redshift").execute_in_process()
13 changes: 8 additions & 5 deletions catfacts/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import os

from dagster import Definitions
from dagster import Definitions, ScheduleDefinition

from .assets import cats_assets
from .jobs import catfacts_job
from .assets import cats_assets, gsheet_assets, redshift_assets
from .jobs import catfacts_job, gsheet_job, redshift_job
from .resources import RESOURCES_LOCAL, RESOURCES_STAGING, RESOURCES_PROD
from .sensors.redshift import redshift_sensor

all_assets = [*cats_assets]
all_assets = [*cats_assets, *gsheet_assets, *redshift_assets]

resources_by_deployment_name = {
"prod": RESOURCES_PROD,
Expand All @@ -17,5 +18,7 @@
defs = Definitions(
assets=all_assets,
resources=resources_by_deployment_name[os.getenv("ENV", "dev")],
jobs=[catfacts_job],
jobs=[catfacts_job, gsheet_job, redshift_job],
sensors=[redshift_sensor],
# schedules=[ScheduleDefinition(job=gsheet_job, cron_schedule="28 * * * *")]
)
6 changes: 6 additions & 0 deletions catfacts/assets/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
from dagster import load_assets_from_package_module

from . import cats
from . import gsheets
from . import redshift

CATS = "cats"
GSHEETS = "gsheets"
REDSHIFT = "redshift"

cats_assets = load_assets_from_package_module(package_module=cats, group_name=CATS)
gsheet_assets = load_assets_from_package_module(package_module=gsheets)
redshift_assets = load_assets_from_package_module(package_module=redshift, group_name=REDSHIFT)
Empty file.
29 changes: 29 additions & 0 deletions catfacts/assets/gsheets/raw_data_for_people_kpis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import time
from typing import Mapping

from ...libraries.dagster_gsheet.gsheet_resources import (
SheetConfig,
)

from ...libraries.dagster_gsheet.assets_factory import build_assets

configs: Mapping[str, SheetConfig] = {
"hiring_plan_metrics": SheetConfig(
spreadsheet_id="1KN70AzGPZYtlFVb_EbLDn1qH3dij2rK0Rt0Jcei2CiI",
sheet_name="hiring_plan_metrics",
columns_range="A:D"
),
"officevibe_data": SheetConfig(
spreadsheet_id="1KN70AzGPZYtlFVb_EbLDn1qH3dij2rK0Rt0Jcei2CiI",
sheet_name="officevibe_data",
columns_range="A:D"
)
}
assets = build_assets(
parent_name="gsheet",
configs=configs,
object_io_manager_key="s3_pd_csv_io_manager",
table_io_manager_key="s3_pd_csv_io_manager",
table_input_manager_key="s3_path_input_loader",
partitioned=True,
)
Empty file.
113 changes: 113 additions & 0 deletions catfacts/assets/redshift/data_for_people_kpis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
# from typing import Dict
#
# import pandas as pd
# from dagster import asset, Output, FreshnessPolicy, AssetIn, SpecificPartitionsPartitionMapping, OpExecutionContext
#
# from ...libraries.dagster_gsheet.configurable_query_resources import ConfigurableQueryResource
#
#
# @asset(
# ins={
# "upstream": AssetIn(
# asset_key="gsheet_table",
# partition_mapping=SpecificPartitionsPartitionMapping(
# partition_keys=["hiring_plan_metrics", "officevibe_data"]
# )
# ),
# },
# compute_kind="redshift",
# required_resource_keys={"redshift_query", "redshift"},
# freshness_policy=FreshnessPolicy(cron_schedule="2-59/5 * * * *", maximum_lag_minutes=5)
# )
# def data_for_people_kpis_5_min(
# context: OpExecutionContext,
# upstream: Dict[str, pd.DataFrame],
# ) -> Output[Dict[str, str]]:
# for partition_key in context.asset_partition_keys_for_input("upstream"):
# print(upstream[partition_key].to_dict())
#
# target_table = context.assets_def.asset_key.path[0]
# hiring_plan_metrics = upstream["hiring_plan_metrics"].to_dict(orient='index')[0]
# officevibe_data = upstream["officevibe_data"].to_dict(orient='index')[0]
#
# query = f"""
# SET query_group TO fast_track;
# SET statement_timeout to 86400000;
#
# CREATE TABLE IF NOT EXISTS "{{schema_name}}"."{target_table}" (
# total_filled_positions INT,
# average_enps FLOAT,
# updated_at TIMESTAMP
# );
# DELETE FROM "{{schema_name}}"."{target_table}" WHERE updated_at = '2022-01-01';
# INSERT INTO "{{schema_name}}"."{target_table}" (updated_at, total_filled_positions, average_enps)
# SELECT '2022-01-01' as updated_at, avg(enps) as average_enps, sum(filled_positions) as total_filled_positions
# FROM "{hiring_plan_metrics['schema_name']}"."{hiring_plan_metrics['table_name']}", "{officevibe_data['schema_name']}"."{officevibe_data['table_name']}"
#
# """
#
# redshift_resource = context.resources.redshift
# query_builder_resource: ConfigurableQueryResource = context.resources.redshift_query
# redshift_resource.execute_query(query=query_builder_resource.build_query(query))
#
# return Output(
# value=dict(table=target_table, schema=query_builder_resource.schema_name),
# metadata=dict(
# schema=query_builder_resource.schema_name,
# table=target_table
# )
# )
#
#
# @asset(
# ins={
# "upstream": AssetIn(
# asset_key="gsheet_table",
# partition_mapping=SpecificPartitionsPartitionMapping(
# partition_keys=["hiring_plan_metrics", "officevibe_data"]
# )
# ),
# },
# compute_kind="redshift",
# required_resource_keys={"redshift_query", "redshift"},
# freshness_policy=FreshnessPolicy(cron_schedule="30 * * * *", maximum_lag_minutes=60)
# )
# def data_for_people_kpis_hourly(
# context: OpExecutionContext,
# upstream: Dict[str, pd.DataFrame],
#
# ) -> Output[Dict[str, str]]:
# for partition_key in context.asset_partition_keys_for_input("upstream"):
# print(upstream[partition_key].to_dict())
#
# target_table = context.assets_def.asset_key.path[0]
# hiring_plan_metrics = upstream["hiring_plan_metrics"].to_dict(orient='index')[0]
# officevibe_data = upstream["officevibe_data"].to_dict(orient='index')[0]
#
# query = f"""
# SET query_group TO fast_track;
# SET statement_timeout to 86400000;
#
# CREATE TABLE IF NOT EXISTS "{{schema_name}}"."{target_table}" (
# total_filled_positions INT,
# average_enps FLOAT,
# updated_at TIMESTAMP
# );
# DELETE FROM "{{schema_name}}"."{target_table}" WHERE updated_at = '2022-01-01';
# INSERT INTO "{{schema_name}}"."{target_table}" (updated_at, total_filled_positions, average_enps)
# SELECT '2022-01-01' as updated_at, avg(enps) as average_enps, sum(filled_positions) as total_filled_positions
# FROM "{hiring_plan_metrics['schema_name']}"."{hiring_plan_metrics['table_name']}", "{officevibe_data['schema_name']}"."{officevibe_data['table_name']}"
#
# """
#
# redshift_resource = context.resources.redshift
# query_builder_resource: ConfigurableQueryResource = context.resources.redshift_query
# redshift_resource.execute_query(query=query_builder_resource.build_query(query))
#
# return Output(
# value=dict(table=target_table, schema=query_builder_resource.schema_name),
# metadata=dict(
# schema=query_builder_resource.schema_name,
# table=target_table
# )
# )
92 changes: 92 additions & 0 deletions catfacts/assets/redshift/data_for_people_kpis_no_partition.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
# from typing import Dict
#
# import pandas as pd
# from dagster import asset, Output, FreshnessPolicy, AssetIn, SpecificPartitionsPartitionMapping, OpExecutionContext
#
# from ...libraries.dagster_gsheet.configurable_query_resources import ConfigurableQueryResource
#
#
# @asset(
# compute_kind="redshift",
# required_resource_keys={"redshift_query", "redshift"},
# freshness_policy=FreshnessPolicy(cron_schedule="2-59/5 * * * *", maximum_lag_minutes=5)
# )
# def data_for_people_kpis_5_min(
# context: OpExecutionContext,
# hiring_plan_metrics: Dict[str, pd.DataFrame],
# officevibe_data: Dict[str, pd.DataFrame],
# ) -> Output[Dict[str, str]]:
# target_table = context.assets_def.asset_key.path[0]
# hiring_plan_metrics = hiring_plan_metrics[next(iter(hiring_plan_metrics))].to_dict(orient='index')[0]
# officevibe_data = officevibe_data[next(iter(officevibe_data))].to_dict(orient='index')[0]
#
# query = f"""
# SET query_group TO fast_track;
# SET statement_timeout to 86400000;
#
# CREATE TABLE IF NOT EXISTS "{{schema_name}}"."{target_table}" (
# total_filled_positions INT,
# average_enps FLOAT,
# updated_at TIMESTAMP
# );
# DELETE FROM "{{schema_name}}"."{target_table}" WHERE updated_at = '2022-01-01';
# INSERT INTO "{{schema_name}}"."{target_table}" (updated_at, total_filled_positions, average_enps)
# SELECT '2022-01-01' as updated_at, avg(enps) as average_enps, sum(filled_positions) as total_filled_positions
# FROM "{hiring_plan_metrics['schema_name']}"."{hiring_plan_metrics['table_name']}", "{officevibe_data['schema_name']}"."{officevibe_data['table_name']}"
#
# """
#
# redshift_resource = context.resources.redshift
# query_builder_resource: ConfigurableQueryResource = context.resources.redshift_query
# redshift_resource.execute_query(query=query_builder_resource.build_query(query))
#
# return Output(
# value=dict(table=target_table, schema=query_builder_resource.schema_name),
# metadata=dict(
# schema=query_builder_resource.schema_name,
# table=target_table
# )
# )
#
#
# @asset(
# compute_kind="redshift",
# required_resource_keys={"redshift_query", "redshift"},
# freshness_policy=FreshnessPolicy(cron_schedule="30 * * * *", maximum_lag_minutes=60)
# )
# def data_for_people_kpis_hourly(
# context: OpExecutionContext,
# hiring_plan_metrics: Dict[str, pd.DataFrame],
# officevibe_data: Dict[str, pd.DataFrame],
# ) -> Output[Dict[str, str]]:
# target_table = context.assets_def.asset_key.path[0]
# hiring_plan_metrics = hiring_plan_metrics[next(iter(hiring_plan_metrics))].to_dict(orient='index')[0]
# officevibe_data = officevibe_data[next(iter(officevibe_data))].to_dict(orient='index')[0]
#
# query = f"""
# SET query_group TO fast_track;
# SET statement_timeout to 86400000;
#
# CREATE TABLE IF NOT EXISTS "{{schema_name}}"."{target_table}" (
# total_filled_positions INT,
# average_enps FLOAT,
# updated_at TIMESTAMP
# );
# DELETE FROM "{{schema_name}}"."{target_table}" WHERE updated_at = '2022-01-01';
# INSERT INTO "{{schema_name}}"."{target_table}" (updated_at, total_filled_positions, average_enps)
# SELECT '2022-01-01' as updated_at, avg(enps) as average_enps, sum(filled_positions) as total_filled_positions
# FROM "{hiring_plan_metrics['schema_name']}"."{hiring_plan_metrics['table_name']}", "{officevibe_data['schema_name']}"."{officevibe_data['table_name']}"
#
# """
#
# redshift_resource = context.resources.redshift
# query_builder_resource: ConfigurableQueryResource = context.resources.redshift_query
# redshift_resource.execute_query(query=query_builder_resource.build_query(query))
#
# return Output(
# value=dict(table=target_table, schema=query_builder_resource.schema_name),
# metadata=dict(
# schema=query_builder_resource.schema_name,
# table=target_table
# )
# )
16 changes: 13 additions & 3 deletions catfacts/jobs.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from dagster import AssetSelection, define_asset_job
from dagster import AssetSelection, define_asset_job, RunConfig

from .assets import CATS
from .assets import CATS, GSHEETS, REDSHIFT


job_name = "catfacts_job"
Expand All @@ -10,4 +10,14 @@
tags={
"job": job_name
},
)
)

gsheet_job = define_asset_job(
name="gsheets",
selection=AssetSelection.groups(GSHEETS),
)

redshift_job = define_asset_job(
name="redshift",
selection=AssetSelection.groups(REDSHIFT),
)
Empty file added catfacts/libraries/__init__.py
Empty file.
Empty file.
Loading