Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add manifest onboard cli #9

Merged
merged 12 commits into from
Feb 12, 2024
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ def read(*names, **kwargs):
"dbt-artifacts-parser==0.5.1",
"configtree==0.6",
"tabulate==0.9.0",
"requests==2.31.0",
],
extras_require={
# eg:
Expand Down
Empty file.
Empty file.
85 changes: 85 additions & 0 deletions src/datapilot/clients/altimate/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import logging

import requests
from requests.exceptions import ConnectionError
from requests.exceptions import HTTPError
from requests.exceptions import RequestException
from requests.exceptions import Timeout


class APIClient:
def __init__(self, api_token="", base_url="", tenant=""):
self.api_token = api_token
self.base_url = base_url
self.tenant = tenant
self.logger = logging.getLogger(self.__class__.__name__)

def _get_headers(self):
headers = {
"Content-Type": "application/json",
}

if self.api_token:
headers["Authorization"] = f"Bearer {self.api_token}"

if self.tenant:
headers["x-tenant"] = self.tenant

return headers

def log(self, message):
self.logger.debug(message)

def get(self, endpoint, params=None, timeout=None):
url = f"{self.base_url}{endpoint}"
headers = self._get_headers()

try:
self.logger.debug(f"Sending GET request for tenant {self.tenant} at url: {url}")
response = requests.get(url, headers=headers, params=params, timeout=timeout)

# Check if the response was successful
response.raise_for_status()
self.logger.debug(f"Received GET response with status: {response.status_code}")
return response.json()

except HTTPError as http_err:
self.logger.error(f"{http_err.response.json()} - Status code: {response.status_code}")
except ConnectionError as conn_err:
self.logger.error(f"Connection error occurred: {conn_err}")
except Timeout as timeout_err:
self.logger.error(f"Timeout error occurred: {timeout_err}")
except RequestException as req_err:
self.logger.error(f"Unexpected error occurred: {req_err}")
except Exception as err:
self.logger.error(f"An error occurred: {err}")

def post(self, endpoint, data=None, timeout=None):
url = f"{self.base_url}{endpoint}"
headers = self._get_headers()

self.logger.debug(f"Sending POST request for tenant {self.tenant} at url: {url}")
response = requests.post(url, headers=headers, json=data, timeout=timeout)
self.logger.debug(f"Received POST response with status: {response.status_code }")

return response
suryaiyer95 marked this conversation as resolved.
Show resolved Hide resolved

def put(self, endpoint, data, timeout=None):
url = f"{self.base_url}{endpoint}"

self.logger.debug(f"Sending PUT request for tenant {self.tenant} at url: {url}")
response = requests.put(url, data=data, timeout=timeout)
self.logger.debug(f"Received PUT response with status: {response.status_code}")
return response

def verify_upload(self, params=None):
endpoint = "/dbt/v1/verify_upload"
self.post(endpoint, data=params)

def get_signed_url(self, params=None):
endpoint = "/dbt/v1/signed_url"
return self.get(endpoint, params=params)

def validate_credentials(self):
endpoint = "/dbt/v3/validate-credentials"
return self.get(endpoint)
74 changes: 74 additions & 0 deletions src/datapilot/clients/altimate/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import os
from pathlib import Path
from typing import Optional

import click
from requests import Response

from datapilot.clients.altimate.client import APIClient


def check_token_and_instance(
token: Optional[str],
instance_name: Optional[str],
):
if not token:
token = os.environ.get("ALTIMATE_API_KEY")

if not instance_name:
instance_name = os.environ.get("ALTIMATE_INSTANCE_NAME")

if not token or not instance_name:
click.echo(
"Error: API TOKEN and instance name is required. Please provide a valid API token."
" You can pass it as command line arguments or set it using environment variables like "
"ALTIMATE_API_KEY and ALTIMATE_INSTANCE_NAME."
)
return


def upload_content_to_signed_url(file_path, signed_url) -> Response:
api_client = APIClient()

with Path(file_path).open("rb") as file:
file_content = file.read()

return api_client.put(signed_url, data=file_content)


def validate_credentials(
token,
backend_url,
tenant,
) -> Response:
api_client = APIClient(api_token=token, base_url=backend_url, tenant=tenant)
return api_client.validate_credentials()


def onboard_manifest(api_token, tenant, dbt_core_integration_id, manifest_path, backend_url) -> dict:
api_client = APIClient(api_token, base_url=backend_url, tenant=tenant)

params = {"dbt_core_integration_id": dbt_core_integration_id, "file_type": "manifest"}
signed_url_data = api_client.get_signed_url(params)
if signed_url_data:
signed_url = signed_url_data.get("url")
file_id = signed_url_data.get("dbt_core_integration_file_id")
api_client.log(f"Received signed URL: {signed_url}")
api_client.log(f"Received File ID: {file_id}")

upload_response = upload_content_to_signed_url(manifest_path, signed_url)

if upload_response:
verify_params = {"dbt_core_integration_file_id": file_id}
api_client.verify_upload(verify_params)
return {"ok": True}
else:
api_client.log(f"Error uploading file: {upload_response.status_code}, {upload_response.text}")
return {"ok": False, "message": f"Error uploading file: {upload_response.status_code}, {upload_response.text}"}

else:
api_client.log("Error getting signed URL.")
return {
"ok": False,
"message": "Error in uploading the manifest. ",
}
29 changes: 29 additions & 0 deletions src/datapilot/core/platforms/dbt/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@

import click

from datapilot.clients.altimate.utils import check_token_and_instance
from datapilot.clients.altimate.utils import onboard_manifest
from datapilot.clients.altimate.utils import validate_credentials
from datapilot.config.config import load_config
from datapilot.core.platforms.dbt.constants import MODEL
from datapilot.core.platforms.dbt.constants import PROJECT
from datapilot.core.platforms.dbt.executor import DBTInsightGenerator
from datapilot.core.platforms.dbt.formatting import generate_model_insights_table
from datapilot.core.platforms.dbt.formatting import generate_project_insights_table
from datapilot.core.platforms.dbt.utils import load_manifest
from datapilot.utils.formatting.utils import tabulate_data

logging.basicConfig(level=logging.INFO)
Expand Down Expand Up @@ -65,3 +69,28 @@ def project_health(manifest_path, catalog_path, config_path=None):
click.echo("Project Insights")
click.echo("--" * 50)
click.echo(tabulate_data(project_report, headers="keys"))


@dbt.command("onboard")
@click.option("--token", prompt="API Token", help="Your API token for authentication.")
@click.option("--instance-name", prompt="Instance Name", help="Your tenant ID.")
@click.option("--dbt_core_integration_id", prompt="DBT Core Integration ID", help="DBT Core Integration ID")
@click.option("--manifest-path", required=True, prompt="Manifest Path", help="Path to the manifest file.")
@click.option("--backend-url", required=False, prompt="Altimate's Backend URL", help="Altimate's Backend URL")
def onboard(token, instance_name, dbt_core_integration_id, manifest_path, backend_url="https://api.myaltimate.com", env=None):
"""Onboard a manifest file to DBT."""
check_token_and_instance(token, instance_name)

if not validate_credentials(token, backend_url, instance_name):
click.echo("Error: Invalid credentials.")
return

# This will throw error if manifest file is incorrect
load_manifest(manifest_path)

response = onboard_manifest(token, instance_name, dbt_core_integration_id, manifest_path, backend_url)

if response["ok"]:
click.echo("Manifest onboarded successfully!")
else:
click.echo(f"{response['message']}")