Skip to content

Commit

Permalink
Improve the telemetry events
Browse files Browse the repository at this point in the history
Signed-off-by: popcorny <[email protected]>
  • Loading branch information
popcornylu committed Sep 18, 2023
1 parent c514a8d commit b2d2633
Show file tree
Hide file tree
Showing 11 changed files with 332 additions and 190 deletions.
35 changes: 31 additions & 4 deletions piperider_cli/cli_utils/compare_with_recipe.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
import time

from piperider_cli.event import CompareEventPayload, log_event


def compare_with_recipe(**kwargs):
"""
Generate the comparison report for your branch.
Expand All @@ -22,6 +27,7 @@ def compare_with_recipe(**kwargs):

base_branch = kwargs.get('base_branch')
skip_datasource_connection = kwargs.get('skip_datasource')
event_payload = CompareEventPayload()

# reconfigure recipe global flags
configure_recipe_execution_flags(dry_run=kwargs.get('dry_run'), interactive=kwargs.get('interactive'))
Expand All @@ -47,7 +53,9 @@ def compare_with_recipe(**kwargs):
elif is_piperider_workspace_exist() is False:
raise DbtProjectNotFoundError()

ret = 0
status = False
reason = None
start_time = time.time()
try:
# note: dry-run and interactive are set by configure_recipe_execution_flags
from piperider_cli.recipe_executor import RecipeExecutor
Expand All @@ -57,7 +65,8 @@ def compare_with_recipe(**kwargs):
modified=modified,
base_branch=base_branch,
skip_datasource_connection=skip_datasource_connection,
debug=debug)
debug=debug,
event_payload=event_payload)
last = False
base = target = None
if not recipe_config.base.is_file_specified() and not recipe_config.target.is_file_specified():
Expand All @@ -67,6 +76,7 @@ def compare_with_recipe(**kwargs):
target = recipe_config.target.get_run_report()

if not is_recipe_dry_run():
event_payload.step = "compare reports"
from piperider_cli.compare_report import CompareReport
CompareReport.exec(a=base, b=target, last=last, datasource=None,
output=kwargs.get('output'), tables_from="all",
Expand All @@ -77,7 +87,24 @@ def compare_with_recipe(**kwargs):
project_name=project_name,
show_progress=True,
debug=debug)

status = True
reason = 'ok'
event_payload.step = "done"
return 0
except SystemExit as e:
reason = 'error'
raise e
except KeyboardInterrupt as e:
reason = 'aborted'
raise e
except Exception as e:
reason = 'fatal'
raise e

return ret
finally:
end_time = time.time()
duration = end_time - start_time
event_payload.status = status
event_payload.reason = reason
event_payload.duration = duration
log_event(event_payload.to_dict(), 'compare')
221 changes: 126 additions & 95 deletions piperider_cli/cli_utils/run_cmd.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import os
import sys
import time

from rich.console import Console

from piperider_cli.event import log_event


def run(**kwargs):
'Profile data source, run assertions, and generate report(s). By default, the raw results and reports are saved in ".piperider/outputs".'
Expand All @@ -11,101 +14,129 @@ def run(**kwargs):
from piperider_cli.cli_utils.cloud import CloudConnectorHelper
from piperider_cli.configuration import FileSystem, is_piperider_workspace_exist
from piperider_cli.error import DbtProjectNotFoundError, PipeRiderConflictOptionsError
from piperider_cli.exitcode import EC_ERR_TEST_FAILED, EC_WARN_NO_PROFILED_MODULES
from piperider_cli.exitcode import EC_WARN_NO_PROFILED_MODULES
from piperider_cli.generate_report import GenerateReport
from piperider_cli.guide import Guide
from piperider_cli.initializer import Initializer
from piperider_cli.runner import Runner

datasource = kwargs.get('datasource')
table = kwargs.get('table')
output = kwargs.get('output')
open_report = kwargs.get('open')
enable_share = kwargs.get('share')
skip_report = kwargs.get('skip_report')
dbt_target_path = kwargs.get('dbt_target_path')
dbt_list = kwargs.get('dbt_list')
force_upload = kwargs.get('upload')
project_name = kwargs.get('project')
select = kwargs.get('select')
state = kwargs.get('state')

if project_name is not None:
os.environ.get('PIPERIDER_API_PROJECT')

console = Console()
env_dbt_resources = os.environ.get('PIPERIDER_DBT_RESOURCES')

# True -> 1, False -> 0
if sum([True if table else False, dbt_list, env_dbt_resources is not None]) > 1:
console.print("[bold red]Error:[/bold red] "
"['--table', '--dbt-list'] are mutually exclusive")
sys.exit(1)

# Search dbt project config files
dbt_project_dir = kwargs.get('dbt_project_dir')
no_auto_search = kwargs.get('no_auto_search')
dbt_project_path = DbtUtil.get_dbt_project_path(dbt_project_dir, no_auto_search, recursive=False)
dbt_profiles_dir = kwargs.get('dbt_profiles_dir')
if dbt_project_path:
working_dir = os.path.dirname(dbt_project_path) if dbt_project_path.endswith('.yml') else dbt_project_path
FileSystem.set_working_directory(working_dir)
if dbt_profiles_dir:
FileSystem.set_dbt_profiles_dir(dbt_profiles_dir)
# Only run initializer when dbt project path is provided
Initializer.exec(dbt_project_path=dbt_project_path, dbt_profiles_dir=dbt_profiles_dir, interactive=False)
elif is_piperider_workspace_exist() is False:
raise DbtProjectNotFoundError()

dbt_resources = None
if select and dbt_list is True:
raise PipeRiderConflictOptionsError(
'Cannot use options "--select" with "--dbt-list"',
hint='Remove "--select" option and use "--dbt-list" instead.'
)

if dbt_list:
dbt_resources = DbtUtil.read_dbt_resources(sys.stdin)
if env_dbt_resources is not None:
dbt_resources = DbtUtil.read_dbt_resources(env_dbt_resources)

ret = Runner.exec(datasource=datasource,
table=table,
output=output,
skip_report=skip_report,
dbt_target_path=dbt_target_path,
dbt_resources=dbt_resources,
dbt_select=select,
dbt_state=state,
report_dir=kwargs.get('report_dir'),
skip_datasource_connection=kwargs.get('skip_datasource'))
if ret in (0, EC_ERR_TEST_FAILED, EC_WARN_NO_PROFILED_MODULES):
if enable_share:
force_upload = True

auto_upload = CloudConnectorHelper.is_auto_upload()
is_cloud_view = (force_upload or auto_upload)

if not skip_report:
GenerateReport.exec(None, kwargs.get('report_dir'), output, open_report, is_cloud_view)

if ret == EC_WARN_NO_PROFILED_MODULES:
# No module was profiled
if dbt_list or dbt_resources or select:
Guide().show('No resources was profiled. Please check given "--select", "--dbt-list" option or '
'environment variable "PIPERIDER_DBT_RESOURCES" to choose the resources to profile.')
ret = 0

if CloudConnectorHelper.is_login() and is_cloud_view:
ret = CloudConnectorHelper.upload_latest_report(report_dir=kwargs.get('report_dir'),
debug=kwargs.get('debug'),
open_report=open_report, enable_share=enable_share,
project_name=project_name)
elif not CloudConnectorHelper.is_login() and is_cloud_view:
console = Console()
console.print('[bold yellow]Warning: [/bold yellow]The report is not uploaded due to not logged in.')

if ret != 0:
if ret != EC_WARN_NO_PROFILED_MODULES:
sys.exit(ret)
return ret
from piperider_cli.runner import Runner, RunEventPayload

status = False
reason = None
start_time = time.time()
event_payload = RunEventPayload()
event_payload.step = 'start'
try:
datasource = kwargs.get('datasource')
table = kwargs.get('table')
output = kwargs.get('output')
open_report = kwargs.get('open')
enable_share = kwargs.get('share')
skip_report = kwargs.get('skip_report')
dbt_target_path = kwargs.get('dbt_target_path')
dbt_list = kwargs.get('dbt_list')
force_upload = kwargs.get('upload')
project_name = kwargs.get('project')
select = kwargs.get('select')
state = kwargs.get('state')

if project_name is not None:
os.environ.get('PIPERIDER_API_PROJECT')

console = Console()
env_dbt_resources = os.environ.get('PIPERIDER_DBT_RESOURCES')

# True -> 1, False -> 0
if sum([True if table else False, dbt_list, env_dbt_resources is not None]) > 1:
console.print("[bold red]Error:[/bold red] "
"['--table', '--dbt-list'] are mutually exclusive")
sys.exit(1)

# Search dbt project config files
dbt_project_dir = kwargs.get('dbt_project_dir')
no_auto_search = kwargs.get('no_auto_search')
dbt_project_path = DbtUtil.get_dbt_project_path(dbt_project_dir, no_auto_search, recursive=False)
dbt_profiles_dir = kwargs.get('dbt_profiles_dir')
if dbt_project_path:
working_dir = os.path.dirname(dbt_project_path) if dbt_project_path.endswith('.yml') else dbt_project_path
FileSystem.set_working_directory(working_dir)
if dbt_profiles_dir:
FileSystem.set_dbt_profiles_dir(dbt_profiles_dir)
# Only run initializer when dbt project path is provided
Initializer.exec(dbt_project_path=dbt_project_path, dbt_profiles_dir=dbt_profiles_dir, interactive=False)
elif is_piperider_workspace_exist() is False:
raise DbtProjectNotFoundError()

dbt_resources = None
if select and dbt_list is True:
raise PipeRiderConflictOptionsError(
'Cannot use options "--select" with "--dbt-list"',
hint='Remove "--select" option and use "--dbt-list" instead.'
)

if dbt_list:
dbt_resources = DbtUtil.read_dbt_resources(sys.stdin)
if env_dbt_resources is not None:
dbt_resources = DbtUtil.read_dbt_resources(env_dbt_resources)

ret = Runner.exec(datasource=datasource,
table=table,
output=output,
skip_report=skip_report,
dbt_target_path=dbt_target_path,
dbt_resources=dbt_resources,
dbt_select=select,
dbt_state=state,
report_dir=kwargs.get('report_dir'),
skip_datasource_connection=kwargs.get('skip_datasource'),
event_payload=event_payload)
if ret in (0, EC_WARN_NO_PROFILED_MODULES):
if enable_share:
force_upload = True

auto_upload = CloudConnectorHelper.is_auto_upload()
is_cloud_view = (force_upload or auto_upload)

if not skip_report:
GenerateReport.exec(None, kwargs.get('report_dir'), output, open_report, is_cloud_view)

if ret == EC_WARN_NO_PROFILED_MODULES:
# No module was profiled
if dbt_list or dbt_resources or select:
Guide().show('No resources was profiled. Please check given "--select", "--dbt-list" option or '
'environment variable "PIPERIDER_DBT_RESOURCES" to choose the resources to profile.')
ret = 0

event_payload.step = 'upload'
if CloudConnectorHelper.is_login() and is_cloud_view:
ret = CloudConnectorHelper.upload_latest_report(report_dir=kwargs.get('report_dir'),
debug=kwargs.get('debug'),
open_report=open_report, enable_share=enable_share,
project_name=project_name)
elif not CloudConnectorHelper.is_login() and is_cloud_view:
console = Console()
console.print('[bold yellow]Warning: [/bold yellow]The report is not uploaded due to not logged in.')

if ret != 0:
reason = 'error'
return ret

status = True
reason = 'ok'
event_payload.step = 'done'
return 0
except SystemExit as e:
reason = 'error'
raise e
except KeyboardInterrupt as e:
reason = 'aborted'
raise e
except Exception as e:
reason = 'fatal'
raise e
finally:
end_time = time.time()
duration = end_time - start_time
event_payload.status = status
event_payload.reason = reason
event_payload.duration = duration
log_event(event_payload.to_dict(), 'run')
25 changes: 25 additions & 0 deletions piperider_cli/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,31 @@ def _verify_input_config(self):
if not isinstance(self.excludes, List):
raise PipeRiderConfigTypeError("'excludes' should be a list of tables' name")

def get_datasource(self, datasource: str = None):
datasources = {}
datasource_names = []
for ds in self.dataSources:
datasource_names.append(ds.name)
datasources[ds.name] = ds

if len(datasource_names) == 0:
return None

if datasource:
ds_name = datasource
else:
# if global dbt config exists, use dbt profile target
# else use the first datasource
ds_name = self.dbt.get('target') if self.dbt else datasource_names[0]

if ds_name not in datasource_names:
console = Console()
console.print(f"[[bold red]Error[/bold red]] Datasource '{ds_name}' doesn't exist")
console.print(f"Available datasources: {', '.join(datasource_names)}")
return None

return datasources[ds_name]

def get_telemetry_id(self):
if self.telemetry_id is not None:
return self.telemetry_id
Expand Down
Loading

0 comments on commit b2d2633

Please sign in to comment.