Skip to content

Commit

Permalink
feat: Add manifest onboard cli (#9)
Browse files Browse the repository at this point in the history
* Add manifest onboard cli

* api client changes

* changes

* more changes

* add creds validation

* remove error handling

* Add requests library dependency

* Update src/datapilot/utils/utils.py

* fix:ruff

* fix: ruff

* fix: ruff

---------

Co-authored-by: Michiel De Smet <[email protected]>
Co-authored-by: surya <[email protected]>
Co-authored-by: suryaiyer95 <[email protected]>
  • Loading branch information
4 people authored Feb 12, 2024
1 parent fa28327 commit 8885392
Show file tree
Hide file tree
Showing 6 changed files with 189 additions and 0 deletions.
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ def read(*names, **kwargs):
"dbt-artifacts-parser==0.5.1",
"configtree==0.6",
"tabulate==0.9.0",
"requests==2.31.0",
],
extras_require={
# eg:
Expand Down
Empty file.
Empty file.
85 changes: 85 additions & 0 deletions src/datapilot/clients/altimate/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import logging

import requests
from requests.exceptions import ConnectionError
from requests.exceptions import HTTPError
from requests.exceptions import RequestException
from requests.exceptions import Timeout


class APIClient:
def __init__(self, api_token="", base_url="", tenant=""):
self.api_token = api_token
self.base_url = base_url
self.tenant = tenant
self.logger = logging.getLogger(self.__class__.__name__)

def _get_headers(self):
headers = {
"Content-Type": "application/json",
}

if self.api_token:
headers["Authorization"] = f"Bearer {self.api_token}"

if self.tenant:
headers["x-tenant"] = self.tenant

return headers

def log(self, message):
self.logger.debug(message)

def get(self, endpoint, params=None, timeout=None):
url = f"{self.base_url}{endpoint}"
headers = self._get_headers()

try:
self.logger.debug(f"Sending GET request for tenant {self.tenant} at url: {url}")
response = requests.get(url, headers=headers, params=params, timeout=timeout)

# Check if the response was successful
response.raise_for_status()
self.logger.debug(f"Received GET response with status: {response.status_code}")
return response.json()

except HTTPError as http_err:
self.logger.error(f"{http_err.response.json()} - Status code: {response.status_code}")
except ConnectionError as conn_err:
self.logger.error(f"Connection error occurred: {conn_err}")
except Timeout as timeout_err:
self.logger.error(f"Timeout error occurred: {timeout_err}")
except RequestException as req_err:
self.logger.error(f"Unexpected error occurred: {req_err}")
except Exception as err:
self.logger.error(f"An error occurred: {err}")

def post(self, endpoint, data=None, timeout=None):
url = f"{self.base_url}{endpoint}"
headers = self._get_headers()

self.logger.debug(f"Sending POST request for tenant {self.tenant} at url: {url}")
response = requests.post(url, headers=headers, json=data, timeout=timeout)
self.logger.debug(f"Received POST response with status: {response.status_code }")

return response

def put(self, endpoint, data, timeout=None):
url = f"{self.base_url}{endpoint}"

self.logger.debug(f"Sending PUT request for tenant {self.tenant} at url: {url}")
response = requests.put(url, data=data, timeout=timeout)
self.logger.debug(f"Received PUT response with status: {response.status_code}")
return response

def verify_upload(self, params=None):
endpoint = "/dbt/v1/verify_upload"
self.post(endpoint, data=params)

def get_signed_url(self, params=None):
endpoint = "/dbt/v1/signed_url"
return self.get(endpoint, params=params)

def validate_credentials(self):
endpoint = "/dbt/v3/validate-credentials"
return self.get(endpoint)
74 changes: 74 additions & 0 deletions src/datapilot/clients/altimate/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import os
from pathlib import Path
from typing import Optional

import click
from requests import Response

from datapilot.clients.altimate.client import APIClient


def check_token_and_instance(
token: Optional[str],
instance_name: Optional[str],
):
if not token:
token = os.environ.get("ALTIMATE_API_KEY")

if not instance_name:
instance_name = os.environ.get("ALTIMATE_INSTANCE_NAME")

if not token or not instance_name:
click.echo(
"Error: API TOKEN and instance name is required. Please provide a valid API token."
" You can pass it as command line arguments or set it using environment variables like "
"ALTIMATE_API_KEY and ALTIMATE_INSTANCE_NAME."
)
return


def upload_content_to_signed_url(file_path, signed_url) -> Response:
api_client = APIClient()

with Path(file_path).open("rb") as file:
file_content = file.read()

return api_client.put(signed_url, data=file_content)


def validate_credentials(
token,
backend_url,
tenant,
) -> Response:
api_client = APIClient(api_token=token, base_url=backend_url, tenant=tenant)
return api_client.validate_credentials()


def onboard_manifest(api_token, tenant, dbt_core_integration_id, manifest_path, backend_url) -> dict:
api_client = APIClient(api_token, base_url=backend_url, tenant=tenant)

params = {"dbt_core_integration_id": dbt_core_integration_id, "file_type": "manifest"}
signed_url_data = api_client.get_signed_url(params)
if signed_url_data:
signed_url = signed_url_data.get("url")
file_id = signed_url_data.get("dbt_core_integration_file_id")
api_client.log(f"Received signed URL: {signed_url}")
api_client.log(f"Received File ID: {file_id}")

upload_response = upload_content_to_signed_url(manifest_path, signed_url)

if upload_response:
verify_params = {"dbt_core_integration_file_id": file_id}
api_client.verify_upload(verify_params)
return {"ok": True}
else:
api_client.log(f"Error uploading file: {upload_response.status_code}, {upload_response.text}")
return {"ok": False, "message": f"Error uploading file: {upload_response.status_code}, {upload_response.text}"}

else:
api_client.log("Error getting signed URL.")
return {
"ok": False,
"message": "Error in uploading the manifest. ",
}
29 changes: 29 additions & 0 deletions src/datapilot/core/platforms/dbt/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@

import click

from datapilot.clients.altimate.utils import check_token_and_instance
from datapilot.clients.altimate.utils import onboard_manifest
from datapilot.clients.altimate.utils import validate_credentials
from datapilot.config.config import load_config
from datapilot.core.platforms.dbt.constants import MODEL
from datapilot.core.platforms.dbt.constants import PROJECT
from datapilot.core.platforms.dbt.executor import DBTInsightGenerator
from datapilot.core.platforms.dbt.formatting import generate_model_insights_table
from datapilot.core.platforms.dbt.formatting import generate_project_insights_table
from datapilot.core.platforms.dbt.utils import load_manifest
from datapilot.utils.formatting.utils import tabulate_data

logging.basicConfig(level=logging.INFO)
Expand Down Expand Up @@ -65,3 +69,28 @@ def project_health(manifest_path, catalog_path, config_path=None):
click.echo("Project Insights")
click.echo("--" * 50)
click.echo(tabulate_data(project_report, headers="keys"))


@dbt.command("onboard")
@click.option("--token", prompt="API Token", help="Your API token for authentication.")
@click.option("--instance-name", prompt="Instance Name", help="Your tenant ID.")
@click.option("--dbt_core_integration_id", prompt="DBT Core Integration ID", help="DBT Core Integration ID")
@click.option("--manifest-path", required=True, prompt="Manifest Path", help="Path to the manifest file.")
@click.option("--backend-url", required=False, prompt="Altimate's Backend URL", help="Altimate's Backend URL")
def onboard(token, instance_name, dbt_core_integration_id, manifest_path, backend_url="https://api.myaltimate.com", env=None):
"""Onboard a manifest file to DBT."""
check_token_and_instance(token, instance_name)

if not validate_credentials(token, backend_url, instance_name):
click.echo("Error: Invalid credentials.")
return

# This will throw error if manifest file is incorrect
load_manifest(manifest_path)

response = onboard_manifest(token, instance_name, dbt_core_integration_id, manifest_path, backend_url)

if response["ok"]:
click.echo("Manifest onboarded successfully!")
else:
click.echo(f"{response['message']}")

0 comments on commit 8885392

Please sign in to comment.