Skip to content

Commit

Permalink
feat(Dataset): generate upload url independetly (#717)
Browse files Browse the repository at this point in the history
* feat(Dataset): generate upload url independetly

* test(Dataset): test generation of upload url

* feat(Dataset): create custom validation error when bucket object exists

* feat(Dataset): move deprecated field to a resovler

* fix(Datasets): rename uploadUrl mutation to refer to datasets
  • Loading branch information
nazarfil authored Jul 12, 2024
1 parent 0f7ec3f commit 66ba6e0
Show file tree
Hide file tree
Showing 10 changed files with 159 additions and 28 deletions.
19 changes: 10 additions & 9 deletions hexa/datasets/api.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
from django.conf import settings
from google.api_core.exceptions import NotFound
from google.api_core import exceptions

from hexa.files import basefs
from hexa.files.api import get_storage


def generate_upload_url(file):
def generate_upload_url(uri, content_type):
return get_storage().generate_upload_url(
settings.WORKSPACE_DATASETS_BUCKET,
file.uri,
file.content_type,
uri,
content_type,
raise_if_exists=True,
)

Expand All @@ -19,10 +20,10 @@ def generate_download_url(file):
)


def get_blob(file):
def get_blob(uri):
try:
return get_storage().get_bucket_object(
settings.WORKSPACE_DATASETS_BUCKET, file.uri
)
except NotFound:
return get_storage().get_bucket_object(settings.WORKSPACE_DATASETS_BUCKET, uri)
except exceptions.NotFound:
return None
except basefs.NotFound:
return None
23 changes: 22 additions & 1 deletion hexa/datasets/graphql/schema.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,15 @@ input CreateDatasetVersionFileInput {
uri: String!
}

"""
Input for creating un upload link for the file
"""
input GenerateDatasetUploadUrlInput {
versionId: ID!
contentType: String!
uri: String!
}

"""
Errors that can occur when creating a dataset version file.
"""
Expand All @@ -288,7 +297,15 @@ enum CreateDatasetVersionFileError {
ALREADY_EXISTS
INVALID_URI
PERMISSION_DENIED
}

"""
Result of creating an upload url
"""
type GenerateDatasetUploadUrlResult {
uploadUrl: String
success: Boolean!
errors: [CreateDatasetVersionFileError!]!
}

"""
Expand All @@ -298,7 +315,7 @@ type CreateDatasetVersionFileResult {
"The created file object"
file: DatasetVersionFile
"The URL to upload the file to"
uploadUrl: String
uploadUrl: String @deprecated(reason: "moved to dedicated generateDatasetUploadUrl mutation")
success: Boolean!
errors: [CreateDatasetVersionFileError!]!
}
Expand All @@ -312,6 +329,8 @@ input LinkDatasetInput {
}

"""
Errors that can occur when linking a dataset with a workspace.
"""
enum LinkDatasetError {
Expand Down Expand Up @@ -429,6 +448,8 @@ extend type Mutation {
createDatasetVersion(input: CreateDatasetVersionInput!): CreateDatasetVersionResult! @loginRequired
"Delete a dataset version."
deleteDatasetVersion(input: DeleteDatasetVersionInput!): DeleteDatasetVersionResult! @loginRequired
"Create dataset version file upload url."
generateDatasetUploadUrl(input: GenerateDatasetUploadUrlInput!): GenerateDatasetUploadUrlResult! @loginRequired
"Create a new file in a dataset version."
createDatasetVersionFile(input: CreateDatasetVersionFileInput!): CreateDatasetVersionFileResult! @loginRequired
"Prepare to download a file in a dataset version."
Expand Down
41 changes: 33 additions & 8 deletions hexa/datasets/schema/mutations.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,33 @@ def resolve_delete_dataset_share(_, info, **kwargs):
return {"success": False, "errors": ["PERMISSION_DENIED"]}


@mutations.field("generateDatasetUploadUrl")
def resolve_generate_upload_url(_, info, **kwargs):
request = info.context["request"]
mutation_input = kwargs["input"]

try:
version = DatasetVersion.objects.filter_for_user(request.user).get(
id=mutation_input["versionId"]
)
if version.id != version.dataset.latest_version.id:
return {"success": False, "errors": ["LOCKED_VERSION"]}

full_uri = version.get_full_uri(mutation_input["uri"])
if get_blob(full_uri) is not None:
return {"success": False, "errors": ["ALREADY_EXISTS"]}

upload_url = generate_upload_url(full_uri, mutation_input["contentType"])

return {"success": True, "errors": [], "upload_url": upload_url}
except ValidationError:
return {"success": False, "errors": ["INVALID_URI"]}
except DatasetVersion.DoesNotExist:
return {"success": False, "errors": ["VERSION_NOT_FOUND"]}
except PermissionDenied:
return {"success": False, "errors": ["PERMISSION_DENIED"]}


@mutations.field("createDatasetVersionFile")
def resolve_create_version_file(_, info, **kwargs):
request = info.context["request"]
Expand All @@ -216,7 +243,7 @@ def resolve_create_version_file(_, info, **kwargs):
file = None
try:
file = version.get_file_by_name(mutation_input["uri"])
if get_blob(file) is not None:
if get_blob(file.uri) is not None:
return {"success": False, "errors": ["ALREADY_EXISTS"]}
except DatasetVersionFile.DoesNotExist:
file = DatasetVersionFile.objects.create_if_has_perm(
Expand All @@ -225,13 +252,11 @@ def resolve_create_version_file(_, info, **kwargs):
uri=version.get_full_uri(mutation_input["uri"]),
content_type=mutation_input["contentType"],
)
upload_url = generate_upload_url(file)
return {
"success": True,
"errors": [],
"file": file,
"upload_url": upload_url,
}
return {
"success": True,
"errors": [],
"file": file,
}
except ValidationError:
return {"success": False, "errors": ["INVALID_URI"]}
except DatasetVersion.DoesNotExist:
Expand Down
23 changes: 21 additions & 2 deletions hexa/datasets/schema/types.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,27 @@
import logging

from ariadne import ObjectType
from django.db.models import Q
from django.http import HttpRequest

from hexa.core.graphql import result_page
from hexa.datasets.api import generate_upload_url
from hexa.datasets.models import (
Dataset,
DatasetLink,
DatasetVersion,
DatasetVersionFile,
)
from hexa.files.basefs import BucketObjectAlreadyExists
from hexa.workspaces.models import Workspace
from hexa.workspaces.schema.types import workspace_object, workspace_permissions

from ..models import Dataset, DatasetLink, DatasetVersion, DatasetVersionFile

dataset_object = ObjectType("Dataset")
dataset_permissions = ObjectType("DatasetPermissions")
dataset_version_object = ObjectType("DatasetVersion")
dataset_version_permissions = ObjectType("DatasetVersionPermissions")
dataset_version_file_object = ObjectType("DatasetVersionFile")
dataset_version_file_result_object = ObjectType("CreateDatasetVersionFileResult")
dataset_link_object = ObjectType("DatasetLink")
dataset_link_permissions = ObjectType("DatasetLinkPermissions")

Expand Down Expand Up @@ -198,6 +207,16 @@ def resolve_version_permissions_delete(obj: DatasetVersion, info, **kwargs):
)


@dataset_version_file_result_object.field("uploadUrl")
def resolve_upload_url(obj, info, **kwargs):
try:
upload_url = generate_upload_url(obj.uri, obj.content_type)
return upload_url
except BucketObjectAlreadyExists as exc:
logging.error(f"Upload URL generation failed: {exc.message}")
return None


bindables = [
dataset_object,
dataset_permissions,
Expand Down
40 changes: 40 additions & 0 deletions hexa/datasets/tests/test_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,46 @@ def test_create_duplicate(self):
with self.assertRaises(IntegrityError):
dataset.create_version(principal=superuser, name="Version 1")

@mock_gcp_storage
def test_generate_upload_url(self):
superuser = self.create_user("[email protected]", is_superuser=True)
workspace = self.create_workspace(
superuser,
name="My Workspace",
description="Test workspace",
)
dataset = self.create_dataset(
superuser, workspace, "Dataset", "Dataset description"
)
dataset_version = self.create_dataset_version(superuser, dataset=dataset)
self.client.force_login(superuser)
r = self.run_query(
"""
mutation generateDatasetUploadUrl ($input: GenerateDatasetUploadUrlInput!) {
generateDatasetUploadUrl(input: $input) {
uploadUrl
success
errors
}
}
""",
{
"input": {
"versionId": str(dataset_version.id),
"uri": "uri_file.csv",
"contentType": "text/csv",
}
},
)
self.assertEqual(
r["data"]["generateDatasetUploadUrl"],
{
"uploadUrl": f"http://signed-url/{str(dataset.id)}/{str(dataset_version.id)}/uri_file.csv",
"success": True,
"errors": [],
},
)

def test_get_file_by_name(self):
self.test_create_dataset_version()
superuser = User.objects.get(email="[email protected]")
Expand Down
8 changes: 8 additions & 0 deletions hexa/files/basefs.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,14 @@ class NotFound(Exception):
pass


class BucketObjectAlreadyExists(Exception):
def __init__(self, target_key):
self.message = (
f"File already exists. Choose a different object key {target_key}."
)
super().__init__(self.message)


@dataclass
class ObjectsPage:
items: typing.List[any]
Expand Down
10 changes: 8 additions & 2 deletions hexa/files/gcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,13 @@
from google.oauth2 import service_account
from google.protobuf import duration_pb2

from .basefs import BaseClient, NotFound, ObjectsPage, load_bucket_sample_data_with
from .basefs import (
BaseClient,
BucketObjectAlreadyExists,
NotFound,
ObjectsPage,
load_bucket_sample_data_with,
)


def get_credentials():
Expand Down Expand Up @@ -198,7 +204,7 @@ def generate_upload_url(
client = get_storage_client()
gcs_bucket = client.get_bucket(bucket_name)
if raise_if_exists and gcs_bucket.get_blob(target_key) is not None:
raise ValidationError(f"GCS: Object {target_key} already exists!")
raise BucketObjectAlreadyExists(target_key)
blob = gcs_bucket.blob(target_key)
return blob.generate_signed_url(
expiration=3600, version="v4", method="PUT", content_type=content_type
Expand Down
12 changes: 8 additions & 4 deletions hexa/files/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,13 @@
from django.conf import settings
from django.core.exceptions import ValidationError

from .basefs import BaseClient, NotFound, ObjectsPage, load_bucket_sample_data_with
from .basefs import (
BaseClient,
BucketObjectAlreadyExists,
NotFound,
ObjectsPage,
load_bucket_sample_data_with,
)

default_region = "eu-central-1"

Expand Down Expand Up @@ -232,9 +238,7 @@ def generate_upload_url(
if raise_if_exists:
try:
s3_client.head_object(Bucket=bucket_name, Key=target_key)
raise ValidationError(
f"File already exists. Choose a different object key Object {target_key}."
)
raise BucketObjectAlreadyExists(target_key)
except s3_client.exceptions.ClientError as e:
if e.response["Error"]["Code"] != "404":
# don't hide non "not found errors"
Expand Down
8 changes: 7 additions & 1 deletion hexa/files/tests/mocks/bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,13 @@ def list_blobs(self, *args, **kwargs):
return self.client.list_blobs(self, *args, **kwargs)

def get_blob(self, blob_name, *args, **kwargs):
return MockBlob(blob_name, self)
if any(
filename in blob_name
for filename in ["test", "demo", "mock", "data", "some-uri"]
):
return MockBlob(blob_name, self)
else:
return None

def blob(self, *args, **kwargs):
b = MockBlob(*args, bucket=self, **kwargs)
Expand Down
3 changes: 2 additions & 1 deletion hexa/files/tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from hexa.core.test import TestCase

from ..api import NotFound, get_storage
from ..basefs import BucketObjectAlreadyExists
from .mocks.mockgcp import backend


Expand Down Expand Up @@ -362,7 +363,7 @@ def test_generate_upload_url_raise_existing(self):
size=123,
content_type="text/plain",
)
with self.assertRaises(ValidationError):
with self.assertRaises(BucketObjectAlreadyExists):
self.get_client().generate_upload_url(
bucket_name="bucket", target_key="demo.txt", raise_if_exists=True
)
Expand Down

0 comments on commit 66ba6e0

Please sign in to comment.