diff --git a/setup.py b/setup.py index 7a489430..e13b349f 100755 --- a/setup.py +++ b/setup.py @@ -67,6 +67,7 @@ def read(*names, **kwargs): "dbt-artifacts-parser==0.5.1", "configtree==0.6", "tabulate==0.9.0", + "requests==2.31.0", ], extras_require={ # eg: diff --git a/src/datapilot/clients/__init__.py b/src/datapilot/clients/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/datapilot/clients/altimate/__init__.py b/src/datapilot/clients/altimate/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/datapilot/clients/altimate/client.py b/src/datapilot/clients/altimate/client.py new file mode 100644 index 00000000..4ad9e5b1 --- /dev/null +++ b/src/datapilot/clients/altimate/client.py @@ -0,0 +1,85 @@ +import logging + +import requests +from requests.exceptions import ConnectionError +from requests.exceptions import HTTPError +from requests.exceptions import RequestException +from requests.exceptions import Timeout + + +class APIClient: + def __init__(self, api_token="", base_url="", tenant=""): + self.api_token = api_token + self.base_url = base_url + self.tenant = tenant + self.logger = logging.getLogger(self.__class__.__name__) + + def _get_headers(self): + headers = { + "Content-Type": "application/json", + } + + if self.api_token: + headers["Authorization"] = f"Bearer {self.api_token}" + + if self.tenant: + headers["x-tenant"] = self.tenant + + return headers + + def log(self, message): + self.logger.debug(message) + + def get(self, endpoint, params=None, timeout=None): + url = f"{self.base_url}{endpoint}" + headers = self._get_headers() + + try: + self.logger.debug(f"Sending GET request for tenant {self.tenant} at url: {url}") + response = requests.get(url, headers=headers, params=params, timeout=timeout) + + # Check if the response was successful + response.raise_for_status() + self.logger.debug(f"Received GET response with status: {response.status_code}") + return response.json() + + except HTTPError as http_err: + self.logger.error(f"{http_err.response.json()} - Status code: {response.status_code}") + except ConnectionError as conn_err: + self.logger.error(f"Connection error occurred: {conn_err}") + except Timeout as timeout_err: + self.logger.error(f"Timeout error occurred: {timeout_err}") + except RequestException as req_err: + self.logger.error(f"Unexpected error occurred: {req_err}") + except Exception as err: + self.logger.error(f"An error occurred: {err}") + + def post(self, endpoint, data=None, timeout=None): + url = f"{self.base_url}{endpoint}" + headers = self._get_headers() + + self.logger.debug(f"Sending POST request for tenant {self.tenant} at url: {url}") + response = requests.post(url, headers=headers, json=data, timeout=timeout) + self.logger.debug(f"Received POST response with status: {response.status_code }") + + return response + + def put(self, endpoint, data, timeout=None): + url = f"{self.base_url}{endpoint}" + + self.logger.debug(f"Sending PUT request for tenant {self.tenant} at url: {url}") + response = requests.put(url, data=data, timeout=timeout) + self.logger.debug(f"Received PUT response with status: {response.status_code}") + return response + + def verify_upload(self, params=None): + endpoint = "/dbt/v1/verify_upload" + self.post(endpoint, data=params) + + def get_signed_url(self, params=None): + endpoint = "/dbt/v1/signed_url" + return self.get(endpoint, params=params) + + def validate_credentials(self): + endpoint = "/dbt/v3/validate-credentials" + return self.get(endpoint) diff --git a/src/datapilot/clients/altimate/utils.py b/src/datapilot/clients/altimate/utils.py new file mode 100644 index 00000000..9b0e8550 --- /dev/null +++ b/src/datapilot/clients/altimate/utils.py @@ -0,0 +1,74 @@ +import os +from pathlib import Path +from typing import Optional + +import click +from requests import Response + +from datapilot.clients.altimate.client import APIClient + + +def check_token_and_instance( + token: Optional[str], + instance_name: Optional[str], +): + if not token: + token = os.environ.get("ALTIMATE_API_KEY") + + if not instance_name: + instance_name = os.environ.get("ALTIMATE_INSTANCE_NAME") + + if not token or not instance_name: + click.echo( + "Error: API TOKEN and instance name is required. Please provide a valid API token." + " You can pass it as command line arguments or set it using environment variables like " + "ALTIMATE_API_KEY and ALTIMATE_INSTANCE_NAME." + ) + return + + +def upload_content_to_signed_url(file_path, signed_url) -> Response: + api_client = APIClient() + + with Path(file_path).open("rb") as file: + file_content = file.read() + + return api_client.put(signed_url, data=file_content) + + +def validate_credentials( + token, + backend_url, + tenant, +) -> Response: + api_client = APIClient(api_token=token, base_url=backend_url, tenant=tenant) + return api_client.validate_credentials() + + +def onboard_manifest(api_token, tenant, dbt_core_integration_id, manifest_path, backend_url) -> dict: + api_client = APIClient(api_token, base_url=backend_url, tenant=tenant) + + params = {"dbt_core_integration_id": dbt_core_integration_id, "file_type": "manifest"} + signed_url_data = api_client.get_signed_url(params) + if signed_url_data: + signed_url = signed_url_data.get("url") + file_id = signed_url_data.get("dbt_core_integration_file_id") + api_client.log(f"Received signed URL: {signed_url}") + api_client.log(f"Received File ID: {file_id}") + + upload_response = upload_content_to_signed_url(manifest_path, signed_url) + + if upload_response: + verify_params = {"dbt_core_integration_file_id": file_id} + api_client.verify_upload(verify_params) + return {"ok": True} + else: + api_client.log(f"Error uploading file: {upload_response.status_code}, {upload_response.text}") + return {"ok": False, "message": f"Error uploading file: {upload_response.status_code}, {upload_response.text}"} + + else: + api_client.log("Error getting signed URL.") + return { + "ok": False, + "message": "Error in uploading the manifest. ", + } diff --git a/src/datapilot/core/platforms/dbt/cli/cli.py b/src/datapilot/core/platforms/dbt/cli/cli.py index e5a18ff7..e3cc0a22 100644 --- a/src/datapilot/core/platforms/dbt/cli/cli.py +++ b/src/datapilot/core/platforms/dbt/cli/cli.py @@ -2,12 +2,16 @@ import click +from datapilot.clients.altimate.utils import check_token_and_instance +from datapilot.clients.altimate.utils import onboard_manifest +from datapilot.clients.altimate.utils import validate_credentials from datapilot.config.config import load_config from datapilot.core.platforms.dbt.constants import MODEL from datapilot.core.platforms.dbt.constants import PROJECT from datapilot.core.platforms.dbt.executor import DBTInsightGenerator from datapilot.core.platforms.dbt.formatting import generate_model_insights_table from datapilot.core.platforms.dbt.formatting import generate_project_insights_table +from datapilot.core.platforms.dbt.utils import load_manifest from datapilot.utils.formatting.utils import tabulate_data logging.basicConfig(level=logging.INFO) @@ -65,3 +69,28 @@ def project_health(manifest_path, catalog_path, config_path=None): click.echo("Project Insights") click.echo("--" * 50) click.echo(tabulate_data(project_report, headers="keys")) + + +@dbt.command("onboard") +@click.option("--token", prompt="API Token", help="Your API token for authentication.") +@click.option("--instance-name", prompt="Instance Name", help="Your tenant ID.") +@click.option("--dbt_core_integration_id", prompt="DBT Core Integration ID", help="DBT Core Integration ID") +@click.option("--manifest-path", required=True, prompt="Manifest Path", help="Path to the manifest file.") +@click.option("--backend-url", required=False, prompt="Altimate's Backend URL", help="Altimate's Backend URL") +def onboard(token, instance_name, dbt_core_integration_id, manifest_path, backend_url="https://api.myaltimate.com", env=None): + """Onboard a manifest file to DBT.""" + check_token_and_instance(token, instance_name) + + if not validate_credentials(token, backend_url, instance_name): + click.echo("Error: Invalid credentials.") + return + + # This will throw error if manifest file is incorrect + load_manifest(manifest_path) + + response = onboard_manifest(token, instance_name, dbt_core_integration_id, manifest_path, backend_url) + + if response["ok"]: + click.echo("Manifest onboarded successfully!") + else: + click.echo(f"{response['message']}")