From f13cc17b9432c249a22d94c7960139d49090b4fa Mon Sep 17 00:00:00 2001 From: "Wei-Chun, Chang" Date: Thu, 12 Oct 2023 10:23:46 +0800 Subject: [PATCH] Do dbt parse when given skip datasource option Signed-off-by: Wei-Chun, Chang --- piperider_cli/recipes/__init__.py | 68 +++++++++++++++++++++++++++---- 1 file changed, 59 insertions(+), 9 deletions(-) diff --git a/piperider_cli/recipes/__init__.py b/piperider_cli/recipes/__init__.py index 3587edf73..7886fce6e 100644 --- a/piperider_cli/recipes/__init__.py +++ b/piperider_cli/recipes/__init__.py @@ -14,6 +14,7 @@ import piperider_cli.dbtutil as dbtutil from piperider_cli import get_run_json_path, load_jinja_template, load_json from piperider_cli.configuration import Configuration, FileSystem +from piperider_cli.dbt import dbt_version from piperider_cli.error import RecipeConfigException from piperider_cli.event import CompareEventPayload from piperider_cli.recipes.utils import InteractiveStopException @@ -274,6 +275,13 @@ def replace_commands_dbt_state_path(commands: List[str], dbt_state_path: str): return [command.replace('', dbt_state_path) for command in commands] +def prepare_dbt_state(cfg: RecipeConfiguration, options: Dict, recipe_type: str = 'base'): + if options.get('skip_datasource_connection') and dbt_version >= '1.5': + execute_dbt_parse_archive(cfg, recipe_type=recipe_type) + else: + execute_dbt_compile_archive(cfg, recipe_type=recipe_type) + + def prepare_dbt_resources_candidate(cfg: RecipeConfiguration, options: Dict): config = Configuration.instance() state = None @@ -282,15 +290,18 @@ def prepare_dbt_resources_candidate(cfg: RecipeConfiguration, options: Dict): select = update_select_with_cli_option(options) if require_base_state(cfg): - execute_dbt_compile_archive(cfg, recipe_type='base') + prepare_dbt_state(cfg, options, recipe_type='base') state = cfg.base.state_path if cfg.target.ref is not None: - execute_dbt_compile_archive(cfg, recipe_type='target') + prepare_dbt_state(cfg, options, recipe_type='target') target_path = 'state' else: execute_dbt_deps(cfg.target) - execute_dbt_compile(cfg.target) + if options.get('skip_datasource_connection') and dbt_version >= '1.5': + execute_dbt_parse(cfg.target) + else: + execute_dbt_compile(cfg.target) dbt_project = dbtutil.load_dbt_project(config.dbt.get('projectDir')) target_path = dbt_project.get('target-path') if dbt_project.get('target-path') else 'target' @@ -351,12 +362,7 @@ def execute_recipe(cfg: RecipeConfiguration, recipe_type='base', debug=False, ev console.print() -def execute_dbt_compile_archive(cfg: RecipeConfiguration, recipe_type='base'): - if recipe_type == 'target': - model = cfg.target - else: - model = cfg.base - +def execute_dbt_archive(cfg: RecipeConfiguration, recipe_type='base'): if recipe_type == 'target': branch_or_commit = tool().git_rev_parse(cfg.target.ref) else: @@ -366,6 +372,33 @@ def execute_dbt_compile_archive(cfg: RecipeConfiguration, recipe_type='base'): if not branch_or_commit: raise RecipeConfigException("Branch is not specified") + return branch_or_commit + + +def execute_dbt_parse_archive(cfg: RecipeConfiguration, recipe_type='base'): + branch_or_commit = execute_dbt_archive(cfg, recipe_type) + + if recipe_type == 'target': + model = cfg.target + else: + model = cfg.base + + if model.tmp_dir_path is None: + model.tmp_dir_path = tool().git_archive(branch_or_commit) + model.state_path = Path(os.path.join(model.tmp_dir_path, 'state')).as_posix() + + execute_dbt_deps(model, model.tmp_dir_path, Path(FileSystem.DBT_PROFILES_DIR).as_posix()) + execute_dbt_parse(model, model.tmp_dir_path, Path(FileSystem.DBT_PROFILES_DIR).as_posix(), model.state_path) + + +def execute_dbt_compile_archive(cfg: RecipeConfiguration, recipe_type='base'): + branch_or_commit = execute_dbt_archive(cfg, recipe_type) + + if recipe_type == 'target': + model = cfg.target + else: + model = cfg.base + if model.tmp_dir_path is None: model.tmp_dir_path = tool().git_archive(branch_or_commit) model.state_path = Path(os.path.join(model.tmp_dir_path, 'state')).as_posix() @@ -407,6 +440,23 @@ def execute_dbt_compile(model: RecipeModel, project_dir: str = None, profiles_di console.print() +def execute_dbt_parse(model: RecipeModel, project_dir: str = None, profiles_dir: str = None, target_path: str = None): + console.print("Run: \[dbt parse]") + cmd = 'dbt parse' + if project_dir: + cmd += f' --project-dir {project_dir}' + if target_path: + cmd += f' --target-path {target_path}' + if profiles_dir: + cmd += f' --profiles-dir {profiles_dir}' + exit_code = tool().execute_command_with_showing_output(cmd.strip(), model.dbt.envs()) + if exit_code != 0: + console.print( + f"[bold yellow]Warning: [/bold yellow] Dbt command failed: '{cmd}' with exit code: {exit_code}") + sys.exit(exit_code) + console.print() + + def execute_recipe_archive(cfg: RecipeConfiguration, recipe_type='base', debug=False, event_payload=CompareEventPayload()): """ We execute a recipe in the following steps: