From e15220652c96f8311d575e2810b4564fad502e78 Mon Sep 17 00:00:00 2001 From: himanshudube97 Date: Fri, 13 Dec 2024 11:30:38 +0530 Subject: [PATCH] code to create a new dbt cloud task --- ddpui/api/orgtask_api.py | 19 ++++++++++++++----- ddpui/api/pipeline_api.py | 8 ++++++-- ddpui/core/orgtaskfunctions.py | 19 ++++++++++++------- ddpui/core/pipelinefunctions.py | 23 +++++++++++++++++++++-- ddpui/ddpdbt/dbt_service.py | 2 ++ ddpui/ddpdbt/schema.py | 19 ++++++++++++++++--- ddpui/ddpprefect/__init__.py | 1 + ddpui/ddpprefect/schema.py | 23 +++++++++++++++++++++++ ddpui/utils/constants.py | 6 ++++-- seed/tasks.json | 11 +++++++++++ 10 files changed, 110 insertions(+), 21 deletions(-) diff --git a/ddpui/api/orgtask_api.py b/ddpui/api/orgtask_api.py index 8e5c06a3..64c6718d 100644 --- a/ddpui/api/orgtask_api.py +++ b/ddpui/api/orgtask_api.py @@ -31,7 +31,7 @@ from ddpui.ddpprefect.schema import ( PrefectSecretBlockCreate, ) -from ddpui.ddpdbt.schema import DbtProjectParams +from ddpui.ddpdbt.schema import DbtCliParams, DbtCloudParams from ddpui.schemas.org_task_schema import CreateOrgTaskPayload, TaskParameters from ddpui.core.orgdbt_manager import DbtProjectManager @@ -90,14 +90,15 @@ def post_orgtask(request, payload: CreateOrgTaskPayload): orgtask = OrgTask.objects.create( org=orguser.org, task=task, - parameters=parameters, + parameters=parameters, # here the accountId, jobId and apiKey will be recieved for dbt-cloud generated_by="client", uuid=uuid.uuid4(), ) dataflow = None - if task.slug in LONG_RUNNING_TASKS: - dbt_project_params: DbtProjectParams = DbtProjectManager.gather_dbt_project_params( + # For dbt-cli + if task.slug in LONG_RUNNING_TASKS and task.type is "dbt": + dbt_project_params: DbtCliParams = DbtProjectManager.gather_dbt_project_params( orguser.org, orgdbt ) @@ -113,6 +114,14 @@ def post_orgtask(request, payload: CreateOrgTaskPayload): orgtask, cli_profile_block, dbt_project_params ) + # For dbt-cloud + if task.slug in LONG_RUNNING_TASKS and task.type is "dbtcloud": + dbt_cloud_params: DbtCloudParams = parameters["options"] + dataflow = create_prefect_deployment_for_dbtcore_task( + orgtask, dbt_cloud_params # this will contain accountId, apikey, and jobId + ) + # if the task id dbtcloud run then create another dataflow createprfectdeployfor dbtcloud task (112, 114 but for cloud) + return { **model_to_dict(orgtask, fields=["parameters"]), "task_slug": orgtask.task.slug, @@ -326,7 +335,7 @@ def post_run_prefect_org_task( if orgdbt is None: raise HttpError(400, "dbt is not configured for this client") - dbt_project_params: DbtProjectParams = DbtProjectManager.gather_dbt_project_params( + dbt_project_params: DbtCliParams = DbtProjectManager.gather_dbt_project_params( orguser.org, orgdbt ) diff --git a/ddpui/api/pipeline_api.py b/ddpui/api/pipeline_api.py index 45b8e346..53b7836e 100644 --- a/ddpui/api/pipeline_api.py +++ b/ddpui/api/pipeline_api.py @@ -102,10 +102,14 @@ def post_prefect_dataflow_v1(request, payload: PrefectDataFlowCreateSchema4): dbt_project_params: DbtProjectParams = None dbt_git_orgtasks = [] orgdbt = orguser.org.dbt - if payload.transformTasks and len(payload.transformTasks) > 0: + if ( + payload.transformTasks and len(payload.transformTasks) > 0 + ): # dont modify this block as its of rlocal logger.info("Dbt tasks being pushed to the pipeline") + # for dbt cloud we dont check the dbcliblcok (add if condition to check it) # dbt params + dbt_project_params = DbtProjectManager.gather_dbt_project_params(orguser.org, orgdbt) # dbt cli profile block @@ -137,7 +141,7 @@ def post_prefect_dataflow_v1(request, payload: PrefectDataFlowCreateSchema4): if error: raise HttpError(400, error) tasks += task_configs - + # new if payload.cloud transorm task, get taskconfig, dbt-run map_org_tasks += dbt_git_orgtasks # create deployment diff --git a/ddpui/core/orgtaskfunctions.py b/ddpui/core/orgtaskfunctions.py index 3da9288e..19e9124e 100644 --- a/ddpui/core/orgtaskfunctions.py +++ b/ddpui/core/orgtaskfunctions.py @@ -20,9 +20,9 @@ PrefectDataFlowCreateSchema3, ) from ddpui.ddpprefect import MANUL_DBT_WORK_QUEUE -from ddpui.ddpdbt.schema import DbtProjectParams +from ddpui.ddpdbt.schema import DbtCliParams, DbtProjectParams from ddpui.ddpprefect import prefect_service -from ddpui.core.pipelinefunctions import setup_dbt_core_task_config +from ddpui.core.pipelinefunctions import setup_dbt_core_task_config, setup_dbt_cloud_task_config from ddpui.utils.constants import TASK_DBTRUN, TASK_GENERATE_EDR from ddpui.utils.helpers import generate_hash_id @@ -117,6 +117,15 @@ def create_prefect_deployment_for_dbtcore_task( """ hash_code = generate_hash_id(8) deployment_name = f"manual-{org_task.org.slug}-{org_task.task.slug}-{hash_code}" + + tasks = [] + if org_task.task.type == "dbt": + tasks = [ + setup_dbt_core_task_config(org_task, cli_profile_block, dbt_project_params).to_json() + ] + elif org_task.task.type == "dbtcloud": + tasks = [setup_dbt_cloud_task_config(org_task, dbt_project_params).to_json()] + dataflow = prefect_service.create_dataflow_v1( PrefectDataFlowCreateSchema3( deployment_name=deployment_name, @@ -124,11 +133,7 @@ def create_prefect_deployment_for_dbtcore_task( orgslug=org_task.org.slug, deployment_params={ "config": { - "tasks": [ - setup_dbt_core_task_config( - org_task, cli_profile_block, dbt_project_params - ).to_json() - ], + "tasks": tasks, "org_slug": org_task.org.slug, } }, diff --git a/ddpui/core/pipelinefunctions.py b/ddpui/core/pipelinefunctions.py index 0bd95495..99ac6bd6 100644 --- a/ddpui/core/pipelinefunctions.py +++ b/ddpui/core/pipelinefunctions.py @@ -15,6 +15,7 @@ from ddpui.utils.custom_logger import CustomLogger from ddpui.ddpprefect.schema import ( PrefectDbtTaskSetup, + PrefectDbtCloudTaskSetup, PrefectShellTaskSetup, PrefectAirbyteSyncTaskSetup, PrefectAirbyteRefreshSchemaTaskSetup, @@ -22,6 +23,7 @@ ) from ddpui.ddpprefect import ( AIRBYTECONNECTION, + DBTCLOUD, DBTCORE, SECRET, SHELLOPERATION, @@ -39,7 +41,7 @@ UPDATE_SCHEMA, TRANSFORM_TASKS_SEQ, ) -from ddpui.ddpdbt.schema import DbtProjectParams +from ddpui.ddpdbt.schema import DbtCliParams, DbtCloudParams, DbtProjectParams logger = CustomLogger("ddpui") @@ -83,7 +85,7 @@ def setup_airbyte_update_schema_task_config( def setup_dbt_core_task_config( org_task: OrgTask, cli_profile_block: OrgPrefectBlockv1, - dbt_project_params: DbtProjectParams, + dbt_project_params: DbtCliParams, seq: int = 1, ): """constructs the prefect payload for a dbt job""" @@ -102,6 +104,23 @@ def setup_dbt_core_task_config( ) +def setup_dbt_cloud_task_config( + org_task: OrgTask, + dbt_project_params: DbtCloudParams, + seq: int = 1, +): + """constructs the prefect payload for a dbt-cloud job""" + return PrefectDbtCloudTaskSetup( + seq=seq, + slug=org_task.task.slug, + type=DBTCLOUD, + api_key=dbt_project_params.api_key, + account_id=dbt_project_params.account_id, + job_id=dbt_project_params.job_id, + orgtask_uuid=str(org_task.uuid), + ) + + def setup_git_pull_shell_task_config( org_task: OrgTask, project_dir: str, diff --git a/ddpui/ddpdbt/dbt_service.py b/ddpui/ddpdbt/dbt_service.py index b14d5b32..ad372b11 100644 --- a/ddpui/ddpdbt/dbt_service.py +++ b/ddpui/ddpdbt/dbt_service.py @@ -30,6 +30,7 @@ TASK_DBTRUN, TASK_DBTSEED, TASK_DBTDEPS, + TASK_DBTCLOUDRUN, ) from ddpui.core.orgdbt_manager import DbtProjectManager from ddpui.utils.timezone import as_ist @@ -107,6 +108,7 @@ def task_config_params(task: Task): TASK_DBTTEST: {"flags": [], "options": ["select", "exclude"]}, TASK_DBTSEED: {"flags": [], "options": ["select"]}, TASK_DOCSGENERATE: {"flags": [], "options": []}, + TASK_DBTCLOUDRUN: {"flags": [], "options": ["account_id", "api-key", "job-id"]}, } return TASK_CONIF_PARAM[task.slug] if task.slug in TASK_CONIF_PARAM else None diff --git a/ddpui/ddpdbt/schema.py b/ddpui/ddpdbt/schema.py index aea1086d..efa8e901 100644 --- a/ddpui/ddpdbt/schema.py +++ b/ddpui/ddpdbt/schema.py @@ -1,11 +1,11 @@ -from typing import Optional, Union +from typing import Literal, Optional, Union from ninja import Schema from pathlib import Path -class DbtProjectParams(Schema): +class DbtCliParams(Schema): """ - schema to define all parameters required to run a dbt project + Schema to define all parameters required to run a dbt project using CLI. """ dbt_env_dir: Union[str, Path] @@ -14,3 +14,16 @@ class DbtProjectParams(Schema): target: str venv_binary: Union[str, Path] dbt_binary: Union[str, Path] + + +class DbtCloudParams(Schema): + """ + Schema to define all parameters required to run a dbt project using dbt Cloud. + """ + + api_key: str + account_id: int + job_id: int + + +DbtProjectParams = Union[DbtCliParams, DbtCloudParams] diff --git a/ddpui/ddpprefect/__init__.py b/ddpui/ddpprefect/__init__.py index 86e0834e..ad14ba4b 100644 --- a/ddpui/ddpprefect/__init__.py +++ b/ddpui/ddpprefect/__init__.py @@ -3,6 +3,7 @@ AIRBYTECONNECTION = "Airbyte Connection" SHELLOPERATION = "Shell Operation" DBTCORE = "dbt Core Operation" +DBTCLOUD = "dbt Cloud Operation" DBTCLIPROFILE = "dbt CLI Profile" SECRET = "Secret" diff --git a/ddpui/ddpprefect/schema.py b/ddpui/ddpprefect/schema.py index ecacd5a5..d18ee8af 100644 --- a/ddpui/ddpprefect/schema.py +++ b/ddpui/ddpprefect/schema.py @@ -160,6 +160,29 @@ def to_json(self): } +class PrefectDbtCloudTaskSetup(Schema): + "request payload to trigger a dbt cloud run task in prefect" + seq = int = (0,) + slug = (str,) + type = (str,) + api_key = (str,) + account_id = (str,) + job_id = (str,) + orgtask_uuid = (str,) + + def to_json(self): + """JSON serialization""" + return { + "seq": self.seq, + "slug": self.slug, + "type": self.type, + "api_key": self.api_key, + "account_id": self.account_id, + "job_id": self.job_id, + "orgtask_uuid": self.orgtask_uuid, + } + + class DbtProfile(Schema): """Docstring""" diff --git a/ddpui/utils/constants.py b/ddpui/utils/constants.py index 9c2fbfef..3538beea 100644 --- a/ddpui/utils/constants.py +++ b/ddpui/utils/constants.py @@ -3,7 +3,8 @@ TASK_DBTTEST = "dbt-test" TASK_DBTCLEAN = "dbt-clean" TASK_DBTDEPS = "dbt-deps" -TASK_GITPULL = "git-pull" +TASK_GITPULL = ("git-pull",) +TASK_DBTCLOUDRUN = "dbtcloud-run" # this is task slug so it should match the seed data. TASK_DOCSGENERATE = "dbt-docs-generate" TASK_AIRBYTESYNC = "airbyte-sync" TASK_AIRBYTERESET = "airbyte-reset" @@ -30,12 +31,13 @@ TASK_DBTDEPS, TASK_DBTRUN, TASK_DBTTEST, + TASK_DBTCLOUDRUN, ] # These are tasks to be run via deployment # Adding a new task here will work for any new orgtask created # But for the current ones a script would need to be run to set them with a deployment -LONG_RUNNING_TASKS = [TASK_DBTRUN, TASK_DBTSEED, TASK_DBTTEST] +LONG_RUNNING_TASKS = [TASK_DBTRUN, TASK_DBTSEED, TASK_DBTTEST, TASK_DBTCLOUDRUN] # airbyte sync timeout in deployment params AIRBYTE_SYNC_TIMEOUT = 15 diff --git a/seed/tasks.json b/seed/tasks.json index 134c45c0..ba3826a9 100644 --- a/seed/tasks.json +++ b/seed/tasks.json @@ -119,5 +119,16 @@ "command": null, "is_system": false } + }, + { + "model": "ddpui.Task", + "pk": 11, + "fields": { + "type": "dbtcloud", + "slug": "dbtcloud-run", + "label": "DBT Cloud run", + "command": "cloud-run", + "is_system": true + } } ] \ No newline at end of file