Skip to content

Commit

Permalink
code to create a new dbt cloud task
Browse files Browse the repository at this point in the history
  • Loading branch information
himanshudube97 committed Dec 13, 2024
1 parent 3ed6cd1 commit e152206
Show file tree
Hide file tree
Showing 10 changed files with 110 additions and 21 deletions.
19 changes: 14 additions & 5 deletions ddpui/api/orgtask_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from ddpui.ddpprefect.schema import (
PrefectSecretBlockCreate,
)
from ddpui.ddpdbt.schema import DbtProjectParams
from ddpui.ddpdbt.schema import DbtCliParams, DbtCloudParams
from ddpui.schemas.org_task_schema import CreateOrgTaskPayload, TaskParameters

from ddpui.core.orgdbt_manager import DbtProjectManager
Expand Down Expand Up @@ -90,14 +90,15 @@ def post_orgtask(request, payload: CreateOrgTaskPayload):
orgtask = OrgTask.objects.create(
org=orguser.org,
task=task,
parameters=parameters,
parameters=parameters, # here the accountId, jobId and apiKey will be recieved for dbt-cloud
generated_by="client",
uuid=uuid.uuid4(),
)

dataflow = None
if task.slug in LONG_RUNNING_TASKS:
dbt_project_params: DbtProjectParams = DbtProjectManager.gather_dbt_project_params(
# For dbt-cli
if task.slug in LONG_RUNNING_TASKS and task.type is "dbt":
dbt_project_params: DbtCliParams = DbtProjectManager.gather_dbt_project_params(
orguser.org, orgdbt
)

Expand All @@ -113,6 +114,14 @@ def post_orgtask(request, payload: CreateOrgTaskPayload):
orgtask, cli_profile_block, dbt_project_params
)

# For dbt-cloud
if task.slug in LONG_RUNNING_TASKS and task.type is "dbtcloud":
dbt_cloud_params: DbtCloudParams = parameters["options"]
dataflow = create_prefect_deployment_for_dbtcore_task(
orgtask, dbt_cloud_params # this will contain accountId, apikey, and jobId
)
# if the task id dbtcloud run then create another dataflow createprfectdeployfor dbtcloud task (112, 114 but for cloud)

return {
**model_to_dict(orgtask, fields=["parameters"]),
"task_slug": orgtask.task.slug,
Expand Down Expand Up @@ -326,7 +335,7 @@ def post_run_prefect_org_task(
if orgdbt is None:
raise HttpError(400, "dbt is not configured for this client")

dbt_project_params: DbtProjectParams = DbtProjectManager.gather_dbt_project_params(
dbt_project_params: DbtCliParams = DbtProjectManager.gather_dbt_project_params(
orguser.org, orgdbt
)

Expand Down
8 changes: 6 additions & 2 deletions ddpui/api/pipeline_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,14 @@ def post_prefect_dataflow_v1(request, payload: PrefectDataFlowCreateSchema4):
dbt_project_params: DbtProjectParams = None
dbt_git_orgtasks = []
orgdbt = orguser.org.dbt
if payload.transformTasks and len(payload.transformTasks) > 0:
if (
payload.transformTasks and len(payload.transformTasks) > 0
): # dont modify this block as its of rlocal
logger.info("Dbt tasks being pushed to the pipeline")

# for dbt cloud we dont check the dbcliblcok (add if condition to check it)
# dbt params

dbt_project_params = DbtProjectManager.gather_dbt_project_params(orguser.org, orgdbt)

# dbt cli profile block
Expand Down Expand Up @@ -137,7 +141,7 @@ def post_prefect_dataflow_v1(request, payload: PrefectDataFlowCreateSchema4):
if error:
raise HttpError(400, error)
tasks += task_configs

# new if payload.cloud transorm task, get taskconfig, dbt-run
map_org_tasks += dbt_git_orgtasks

# create deployment
Expand Down
19 changes: 12 additions & 7 deletions ddpui/core/orgtaskfunctions.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
PrefectDataFlowCreateSchema3,
)
from ddpui.ddpprefect import MANUL_DBT_WORK_QUEUE
from ddpui.ddpdbt.schema import DbtProjectParams
from ddpui.ddpdbt.schema import DbtCliParams, DbtProjectParams
from ddpui.ddpprefect import prefect_service
from ddpui.core.pipelinefunctions import setup_dbt_core_task_config
from ddpui.core.pipelinefunctions import setup_dbt_core_task_config, setup_dbt_cloud_task_config
from ddpui.utils.constants import TASK_DBTRUN, TASK_GENERATE_EDR
from ddpui.utils.helpers import generate_hash_id

Expand Down Expand Up @@ -117,18 +117,23 @@ def create_prefect_deployment_for_dbtcore_task(
"""
hash_code = generate_hash_id(8)
deployment_name = f"manual-{org_task.org.slug}-{org_task.task.slug}-{hash_code}"

tasks = []
if org_task.task.type == "dbt":
tasks = [
setup_dbt_core_task_config(org_task, cli_profile_block, dbt_project_params).to_json()
]
elif org_task.task.type == "dbtcloud":
tasks = [setup_dbt_cloud_task_config(org_task, dbt_project_params).to_json()]

dataflow = prefect_service.create_dataflow_v1(
PrefectDataFlowCreateSchema3(
deployment_name=deployment_name,
flow_name=deployment_name,
orgslug=org_task.org.slug,
deployment_params={
"config": {
"tasks": [
setup_dbt_core_task_config(
org_task, cli_profile_block, dbt_project_params
).to_json()
],
"tasks": tasks,
"org_slug": org_task.org.slug,
}
},
Expand Down
23 changes: 21 additions & 2 deletions ddpui/core/pipelinefunctions.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
from ddpui.utils.custom_logger import CustomLogger
from ddpui.ddpprefect.schema import (
PrefectDbtTaskSetup,
PrefectDbtCloudTaskSetup,
PrefectShellTaskSetup,
PrefectAirbyteSyncTaskSetup,
PrefectAirbyteRefreshSchemaTaskSetup,
PrefectDataFlowUpdateSchema3,
)
from ddpui.ddpprefect import (
AIRBYTECONNECTION,
DBTCLOUD,
DBTCORE,
SECRET,
SHELLOPERATION,
Expand All @@ -39,7 +41,7 @@
UPDATE_SCHEMA,
TRANSFORM_TASKS_SEQ,
)
from ddpui.ddpdbt.schema import DbtProjectParams
from ddpui.ddpdbt.schema import DbtCliParams, DbtCloudParams, DbtProjectParams

logger = CustomLogger("ddpui")

Expand Down Expand Up @@ -83,7 +85,7 @@ def setup_airbyte_update_schema_task_config(
def setup_dbt_core_task_config(
org_task: OrgTask,
cli_profile_block: OrgPrefectBlockv1,
dbt_project_params: DbtProjectParams,
dbt_project_params: DbtCliParams,
seq: int = 1,
):
"""constructs the prefect payload for a dbt job"""
Expand All @@ -102,6 +104,23 @@ def setup_dbt_core_task_config(
)


def setup_dbt_cloud_task_config(
org_task: OrgTask,
dbt_project_params: DbtCloudParams,
seq: int = 1,
):
"""constructs the prefect payload for a dbt-cloud job"""
return PrefectDbtCloudTaskSetup(
seq=seq,
slug=org_task.task.slug,
type=DBTCLOUD,
api_key=dbt_project_params.api_key,
account_id=dbt_project_params.account_id,
job_id=dbt_project_params.job_id,
orgtask_uuid=str(org_task.uuid),
)


def setup_git_pull_shell_task_config(
org_task: OrgTask,
project_dir: str,
Expand Down
2 changes: 2 additions & 0 deletions ddpui/ddpdbt/dbt_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
TASK_DBTRUN,
TASK_DBTSEED,
TASK_DBTDEPS,
TASK_DBTCLOUDRUN,
)
from ddpui.core.orgdbt_manager import DbtProjectManager
from ddpui.utils.timezone import as_ist
Expand Down Expand Up @@ -107,6 +108,7 @@ def task_config_params(task: Task):
TASK_DBTTEST: {"flags": [], "options": ["select", "exclude"]},
TASK_DBTSEED: {"flags": [], "options": ["select"]},
TASK_DOCSGENERATE: {"flags": [], "options": []},
TASK_DBTCLOUDRUN: {"flags": [], "options": ["account_id", "api-key", "job-id"]},
}

return TASK_CONIF_PARAM[task.slug] if task.slug in TASK_CONIF_PARAM else None
Expand Down
19 changes: 16 additions & 3 deletions ddpui/ddpdbt/schema.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from typing import Optional, Union
from typing import Literal, Optional, Union
from ninja import Schema
from pathlib import Path


class DbtProjectParams(Schema):
class DbtCliParams(Schema):
"""
schema to define all parameters required to run a dbt project
Schema to define all parameters required to run a dbt project using CLI.
"""

dbt_env_dir: Union[str, Path]
Expand All @@ -14,3 +14,16 @@ class DbtProjectParams(Schema):
target: str
venv_binary: Union[str, Path]
dbt_binary: Union[str, Path]


class DbtCloudParams(Schema):
"""
Schema to define all parameters required to run a dbt project using dbt Cloud.
"""

api_key: str
account_id: int
job_id: int


DbtProjectParams = Union[DbtCliParams, DbtCloudParams]
1 change: 1 addition & 0 deletions ddpui/ddpprefect/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
AIRBYTECONNECTION = "Airbyte Connection"
SHELLOPERATION = "Shell Operation"
DBTCORE = "dbt Core Operation"
DBTCLOUD = "dbt Cloud Operation"
DBTCLIPROFILE = "dbt CLI Profile"
SECRET = "Secret"

Expand Down
23 changes: 23 additions & 0 deletions ddpui/ddpprefect/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,29 @@ def to_json(self):
}


class PrefectDbtCloudTaskSetup(Schema):
"request payload to trigger a dbt cloud run task in prefect"
seq = int = (0,)
slug = (str,)
type = (str,)
api_key = (str,)
account_id = (str,)
job_id = (str,)
orgtask_uuid = (str,)

def to_json(self):
"""JSON serialization"""
return {
"seq": self.seq,
"slug": self.slug,
"type": self.type,
"api_key": self.api_key,
"account_id": self.account_id,
"job_id": self.job_id,
"orgtask_uuid": self.orgtask_uuid,
}


class DbtProfile(Schema):
"""Docstring"""

Expand Down
6 changes: 4 additions & 2 deletions ddpui/utils/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
TASK_DBTTEST = "dbt-test"
TASK_DBTCLEAN = "dbt-clean"
TASK_DBTDEPS = "dbt-deps"
TASK_GITPULL = "git-pull"
TASK_GITPULL = ("git-pull",)
TASK_DBTCLOUDRUN = "dbtcloud-run" # this is task slug so it should match the seed data.
TASK_DOCSGENERATE = "dbt-docs-generate"
TASK_AIRBYTESYNC = "airbyte-sync"
TASK_AIRBYTERESET = "airbyte-reset"
Expand All @@ -30,12 +31,13 @@
TASK_DBTDEPS,
TASK_DBTRUN,
TASK_DBTTEST,
TASK_DBTCLOUDRUN,
]

# These are tasks to be run via deployment
# Adding a new task here will work for any new orgtask created
# But for the current ones a script would need to be run to set them with a deployment
LONG_RUNNING_TASKS = [TASK_DBTRUN, TASK_DBTSEED, TASK_DBTTEST]
LONG_RUNNING_TASKS = [TASK_DBTRUN, TASK_DBTSEED, TASK_DBTTEST, TASK_DBTCLOUDRUN]

# airbyte sync timeout in deployment params
AIRBYTE_SYNC_TIMEOUT = 15
Expand Down
11 changes: 11 additions & 0 deletions seed/tasks.json
Original file line number Diff line number Diff line change
Expand Up @@ -119,5 +119,16 @@
"command": null,
"is_system": false
}
},
{
"model": "ddpui.Task",
"pk": 11,
"fields": {
"type": "dbtcloud",
"slug": "dbtcloud-run",
"label": "DBT Cloud run",
"command": "cloud-run",
"is_system": true
}
}
]

0 comments on commit e152206

Please sign in to comment.