Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add manifest onboard cli #9

Merged
merged 12 commits into from
Feb 12, 2024
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ def read(*names, **kwargs):
"dbt-artifacts-parser==0.5.1",
"configtree==0.6",
"tabulate==0.9.0",
"requests==2.31.0",
],
extras_require={
# eg:
Expand Down
Empty file.
Empty file.
66 changes: 66 additions & 0 deletions src/datapilot/clients/altimate/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import logging

import requests


class APIClient:
def __init__(self, api_token="", base_url="", tenant=""):
self.api_token = api_token
self.base_url = base_url
self.tenant = tenant
self.logger = logging.getLogger(self.__class__.__name__)

def _get_headers(self):
headers = {
"Content-Type": "application/json",
}

if self.api_token:
headers["Authorization"] = f"Bearer {self.api_token}"

if self.tenant:
headers["x-tenant"] = self.tenant

return headers

def log(self, message):
self.logger.debug(message)

def get(self, endpoint, params=None, timeout=None):
url = f"{self.base_url}{endpoint}"
headers = self._get_headers()

self.logger.debug(f"Sending GET request for tenant {self.tenant} at url: {url}")
response = requests.get(url, headers=headers, params=params, timeout=timeout)
self.logger.debug(f"Received GET response with status: {response.status_code }")
return response.json() if response.status_code == 200 else None

def post(self, endpoint, data=None, timeout=None):
url = f"{self.base_url}{endpoint}"
headers = self._get_headers()

self.logger.debug(f"Sending POST request for tenant {self.tenant} at url: {url}")
response = requests.post(url, headers=headers, json=data, timeout=timeout)
self.logger.debug(f"Received POST response with status: {response.status_code }")

return response
suryaiyer95 marked this conversation as resolved.
Show resolved Hide resolved

def put(self, endpoint, data, timeout=None):
url = f"{self.base_url}{endpoint}"

self.logger.debug(f"Sending PUT request for tenant {self.tenant} at url: {url}")
response = requests.put(url, data=data, timeout=timeout)
self.logger.debug(f"Received PUT response with status: {response.status_code}")
return response

def verify_upload(self, params=None):
endpoint = "/dbt/v1/verify_upload"
self.post(endpoint, data=params)

def get_signed_url(self, params=None):
endpoint = "/dbt/v1/signed_url"
return self.get(endpoint, params=params)

def validate_credentials(self):
endpoint = "/dbt/v3/validate-credentials"
return self.get(endpoint)
37 changes: 37 additions & 0 deletions src/datapilot/core/platforms/dbt/cli/cli.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import os

import click

Expand All @@ -8,7 +9,10 @@
from datapilot.core.platforms.dbt.executor import DBTInsightGenerator
from datapilot.core.platforms.dbt.formatting import generate_model_insights_table
from datapilot.core.platforms.dbt.formatting import generate_project_insights_table
from datapilot.core.platforms.dbt.utils import load_manifest
from datapilot.utils.formatting.utils import tabulate_data
from datapilot.utils.utils import onboard_manifest
from datapilot.utils.utils import validate_credentials

logging.basicConfig(level=logging.INFO)

Expand Down Expand Up @@ -65,3 +69,36 @@ def project_health(manifest_path, catalog_path, config_path=None):
click.echo("Project Insights")
click.echo("--" * 50)
click.echo(tabulate_data(project_report, headers="keys"))


@dbt.command("onboard")
@click.option("--token", prompt="API Token", help="Your API token for authentication.")
@click.option("--tenant", prompt="Tenant", help="Your tenant ID.")
@click.option("--dbt_core_integration_id", prompt="DBT Core Integration ID", help="DBT Core Integration ID")
@click.option("--manifest-path", required=True, prompt="Manifest Path", help="Path to the manifest file.")
@click.option("--backend-url", required=False, prompt="Altimate's Backend URL", help="Altimate's Backend URL")
def onboard(token, tenant, dbt_core_integration_id, manifest_path, backend_url="https://api.myaltimate.com", env=None):
"""Onboard a manifest file to DBT."""
if not token and env:
token = os.environ.get("ALTIMATE_API_KEY")

if not tenant and env:
tenant = os.environ.get("ALTIMATE_INSTANCE_NAME")

if not token or not tenant:
click.echo("Error: API Token is required.")
return

if not validate_credentials(token, backend_url, tenant):
click.echo("Error: Invalid credentials.")
return

# This will throw error if manifest file is incorrect
load_manifest(manifest_path)

response = onboard_manifest(token, tenant, dbt_core_integration_id, manifest_path, backend_url)

if response["ok"]:
click.echo("Manifest onboarded successfully!")
else:
click.echo(f"{response['message']}")
49 changes: 49 additions & 0 deletions src/datapilot/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
from pathlib import Path
from typing import Dict

from requests import Response

from datapilot.clients.altimate.client import APIClient


def load_json(file_path: str) -> Dict:
try:
Expand Down Expand Up @@ -40,3 +44,48 @@ def get_dir_path(path: str) -> str:
:return:
"""
return Path(path).parent


def upload_content_to_signed_url(file_path, signed_url) -> Response:
api_client = APIClient()

with Path(file_path).open("rb") as file:
file_content = file.read()

return api_client.put(signed_url, data=file_content)


def validate_credentials(
token,
backend_url,
tenant,
) -> Response:
api_client = APIClient(api_token=token, base_url=backend_url, tenant=tenant)
return api_client.validate_credentials()


def onboard_manifest(api_token, tenant, dbt_core_integration_id, manifest_path, backend_url) -> dict:
api_client = APIClient(api_token, base_url=backend_url, tenant=tenant)

params = {"dbt_core_integration_id": dbt_core_integration_id, "file_type": "manifest"}
signed_url_data = api_client.get_signed_url(params)

if signed_url_data:
signed_url = signed_url_data.get("url")
file_id = signed_url_data.get("dbt_core_integration_file_id")
api_client.log(f"Received signed URL: {signed_url}")
api_client.log(f"Received File ID: {file_id}")

upload_response = upload_content_to_signed_url(manifest_path, signed_url)

if upload_response:
verify_params = {"dbt_core_integration_file_id": file_id}
api_client.verify_upload(verify_params)
return {"ok": True}
else:
api_client.log(f"Error uploading file: {upload_response.status_code}, {upload_response.text}")
return {"ok": False, "message": f"Error uploading file: {upload_response.status_code}, {upload_response.text}"}

else:
api_client.log("Error getting signed URL.")
return {"ok": False, "message": "Error getting signed URL."}
suryaiyer95 marked this conversation as resolved.
Show resolved Hide resolved