Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add manifest onboard cli #9

Merged
merged 12 commits into from
Feb 12, 2024
Empty file.
Empty file.
51 changes: 51 additions & 0 deletions src/datapilot/clients/altimate/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import logging

import requests


class APIClient:
def __init__(self, api_token="", base_url="", tenant=""):
self.api_token = api_token
self.base_url = base_url
self.tenant = tenant
self.logger = logging.getLogger(self.__class__.__name__)

def _get_headers(self):
headers = {
"Content-Type": "application/json",
}

if self.api_token:
headers["Authorization"] = f"Bearer {self.api_token}"

if self.tenant:
headers["x-tenant"] = self.tenant

return headers

def get(self, endpoint, params=None, timeout=None):
url = f"{self.base_url}{endpoint}"
headers = self._get_headers()

self.logger.debug(f"Sending GET request for tenant {self.tenant} at url: {url}")
response = requests.get(url, headers=headers, params=params, timeout=timeout)
self.logger.debug(f"Received GET response with status: {response.status_code }")
return response.json() if response.status_code == 200 else None

def post(self, endpoint, data=None, timeout=None):
url = f"{self.base_url}{endpoint}"
headers = self._get_headers()

self.logger.debug(f"Sending POST request for tenant {self.tenant} at url: {url}")
response = requests.post(url, headers=headers, json=data, timeout=timeout)
self.logger.debug(f"Received POST response with status: {response.status_code }")

return response
suryaiyer95 marked this conversation as resolved.
Show resolved Hide resolved

def put(self, endpoint, data, timeout=None):
url = f"{self.base_url}{endpoint}"

self.logger.debug(f"Sending PUT request for tenant {self.tenant} at url: {url}")
response = requests.put(url, data=data, timeout=timeout)
self.logger.debug(f"Received PUT response with status: {response.status_code}")
return response
29 changes: 29 additions & 0 deletions src/datapilot/core/platforms/dbt/cli/cli.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import os

import click

Expand All @@ -9,6 +10,7 @@
from datapilot.core.platforms.dbt.formatting import generate_model_insights_table
from datapilot.core.platforms.dbt.formatting import generate_project_insights_table
from datapilot.utils.formatting.utils import tabulate_data
from datapilot.utils.utils import onboard_manifest

logging.basicConfig(level=logging.INFO)

Expand Down Expand Up @@ -65,3 +67,30 @@ def project_health(manifest_path, catalog_path, config_path=None):
click.echo("Project Insights")
click.echo("--" * 50)
click.echo(tabulate_data(project_report, headers="keys"))


@dbt.command("onboard")
@click.option("--token", prompt="API Token", help="Your API token for authentication.")
@click.option("--tenant", prompt="Tenant", help="Your tenant ID.")
@click.option("--dbt_core_integration_id", prompt="DBT Core Integration ID", help="DBT Core Integration ID")
@click.option("--manifest-path", required=True, prompt="Manifest Path", help="Path to the manifest file.")
def onboard(token, tenant, dbt_core_integration_id, manifest_path, env=None):
"""Onboard a manifest file to DBT."""
if not token and env:
token = os.environ.get("ALTIMATE_API_KEY")

if not tenant and env:
tenant = os.environ.get("ALTIMATE_INSTANCE_NAME")

if not token or not tenant:
click.echo("Error: API Token is required.")
return

# if not validate_credentials(token, tenant):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's do this

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

# print("Error: Invalid credentials.")
# return

# This will throw error if manifest file is incorrect
# load_manifest(manifest_path)
suryaiyer95 marked this conversation as resolved.
Show resolved Hide resolved

onboard_manifest(token, tenant, dbt_core_integration_id, manifest_path)
suryaiyer95 marked this conversation as resolved.
Show resolved Hide resolved
48 changes: 48 additions & 0 deletions src/datapilot/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
from pathlib import Path
from typing import Dict

from datapilot.clients.altimate.client import APIClient


def load_json(file_path: str) -> Dict:
try:
Expand Down Expand Up @@ -40,3 +42,49 @@ def get_dir_path(path: str) -> str:
:return:
"""
return Path(path).parent


# Will need to change this
base_url = "http://localhost:5001"
suryaiyer95 marked this conversation as resolved.
Show resolved Hide resolved


def upload_content_to_signed_url(file_path, signed_url):
suryaiyer95 marked this conversation as resolved.
Show resolved Hide resolved
api_client = APIClient()

with Path(file_path).open("rb") as file:
file_content = file.read()

return api_client.put(signed_url, data=file_content)


def onboard_manifest(api_token, tenant, dbt_core_integration_id, manifest_path):
suryaiyer95 marked this conversation as resolved.
Show resolved Hide resolved
api_client = APIClient(api_token, base_url, tenant)

endpoint = "/dbt/v1/signed_url"
params = {"dbt_core_integration_id": dbt_core_integration_id, "file_type": "manifest"}
signed_url_data = api_client.get(endpoint, params=params)
suryaiyer95 marked this conversation as resolved.
Show resolved Hide resolved

if signed_url_data:
signed_url = signed_url_data.get("url")
file_id = signed_url_data.get("dbt_core_integration_file_id")
print(f"Received signed URL: {signed_url}")
print(f"Received File ID: {file_id}")
suryaiyer95 marked this conversation as resolved.
Show resolved Hide resolved

upload_response = upload_content_to_signed_url(manifest_path, signed_url)

if upload_response:
endpoint = "/dbt/v1/verify_upload"
suryaiyer95 marked this conversation as resolved.
Show resolved Hide resolved
verify_params = {"dbt_core_integration_file_id": file_id}
verify_response = api_client.post(endpoint, data=verify_params)

if verify_response:
print("File successfully uploaded and verified.")
return
else:
print(f"Error verifying upload: {verify_response.status_code}, {verify_response.text}")

else:
print(f"Error uploading file: {upload_response.status_code}, {upload_response.text}")

else:
print("Error getting signed URL.")
Loading