Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat background task nf #765

Merged
merged 37 commits into from
Aug 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
a86f0ac
feat(Dataset): add initial background queue, task, work and trigger t…
nazarfil Jun 18, 2024
be8abab
feat(Dataset): job task to generate snapshot
nazarfil Jun 23, 2024
e974508
fix(Datasets): fixes migrations
nazarfil Jul 4, 2024
1c0b5f0
feat(Dataset): adds worker to docker compose
nazarfil Jul 5, 2024
a607afc
chore: renamed snapshot to file_metadata
nazarfil Jul 12, 2024
7eff6e7
remove not needed methods
nazarfil Jul 12, 2024
ac411c5
chore: rename worker to dataset-worker
nazarfil Jul 17, 2024
c81a551
feat(Dataset): add initial background queue, task, work and trigger t…
nazarfil Jun 18, 2024
1a86b91
feat(Dataset): job task to generate snapshot
nazarfil Jun 23, 2024
af08699
chore: renamed snapshot to file_metadata
nazarfil Jul 12, 2024
3a18c0f
feat(Dataset): add dataset file snapshot model
nazarfil Jul 9, 2024
835df3b
chore: adds todos
nazarfil Jul 10, 2024
fd103df
fix: removes upload to bucket and adds status
nazarfil Jul 10, 2024
3713283
fix: read parquet file in dataframe
nazarfil Jul 11, 2024
4ba35b0
fix: fixes api call parameters
nazarfil Jul 11, 2024
31ed9af
test(Dataset): attempt to test the version file
nazarfil Jul 12, 2024
a5951dc
chore: rename snapshot to file_metadata
nazarfil Jul 12, 2024
af85aad
refactor: removed not needed file metadata query
nazarfil Jul 12, 2024
9526eaa
test(Dataset): add unittest for the task flow
nazarfil Jul 16, 2024
06b427c
fix(Dataset): fixed parquet file reading
nazarfil Jul 16, 2024
0901809
fix(Dataset): added corner cases for failures
nazarfil Jul 16, 2024
5817d6f
fix(Dataset): move from head to sample
nazarfil Jul 16, 2024
d03d879
fix(Dataset): tests handled exceptions
nazarfil Jul 16, 2024
e2f1ad7
fix(Dataset): tests file with no columns and no rows
nazarfil Jul 16, 2024
ab38d4d
fix(Dataset): fixes grpahql exposed api
nazarfil Jul 16, 2024
a9be661
test(Dataset): add unittest for xlsx file
nazarfil Jul 16, 2024
aa1b240
chore: rename worker to dataset-worker
nazarfil Jul 16, 2024
9aaef85
chore: ignore local files
nazarfil Jul 17, 2024
fe817e8
chore: document how to run dataset worker locally
nazarfil Jul 17, 2024
95f6b49
fix(Dataset): fixes grpahql exposed metadata and exceptions
nazarfil Jul 18, 2024
57e05c3
fix(Dataset): fix typo, add internationalisation
nazarfil Jul 30, 2024
a7abfdc
fix(Dataset): adds property to fethc latest metadata for a file
nazarfil Jul 30, 2024
6f7b71c
fix(Dataset): fixes typo and dict argument
nazarfil Aug 6, 2024
be80552
fix: renames sample in metadata
nazarfil Aug 7, 2024
a347e39
chore: move error to warning
nazarfil Aug 7, 2024
0f7fd7c
fix(Dataset): not creating metadata if format not supported
nazarfil Aug 7, 2024
2cf1a6d
fix: fixes issue with uplioad url
nazarfil Aug 8, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ venv
k8s/*
!k8s/sample_app.yaml

#for mac and idea
*.DS_Store
.idea/
# TODO: remove
credentials
.terraform*
Expand Down
3 changes: 3 additions & 0 deletions config/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,9 @@

# Datasets config
WORKSPACE_DATASETS_BUCKET = os.environ.get("WORKSPACE_DATASETS_BUCKET")
WORKSPACE_DATASETS_FILE_SNAPSHOT_SIZE = os.environ.get(
"WORKSPACE_DATASETS_FILE_SNAPSHOT_SIZE", 50
)

# Base64 encoded service account key
# To generate a service account key, follow the instructions here:
Expand Down
7 changes: 6 additions & 1 deletion hexa/datasets/admin.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
from django.contrib import admin

from .models import Dataset, DatasetLink, DatasetVersion, DatasetVersionFile
from .models import (
Dataset,
DatasetLink,
DatasetVersion,
DatasetVersionFile,
)


@admin.register(Dataset)
Expand Down
26 changes: 24 additions & 2 deletions hexa/datasets/graphql/schema.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,23 @@ type DatasetVersionPermissions {
download: Boolean!
}

"""
Statuses that can occur when generating file metadata
"""
enum FileMetadataStatus{
PROCESSING,
FAILED,
FINISHED
}

"""
Metadata for dataset file
"""
type DatasetFileMetadata {
sample: JSON!
status: FileMetadataStatus!
}

"""
A file in a dataset version.
"""
Expand All @@ -101,6 +118,7 @@ type DatasetVersionFile {
createdAt: DateTime!
createdBy: User
contentType: String!
fileMetadata: DatasetFileMetadata
}

"""
Expand Down Expand Up @@ -315,7 +333,7 @@ type CreateDatasetVersionFileResult {
"The created file object"
file: DatasetVersionFile
"The URL to upload the file to"
uploadUrl: String @deprecated(reason: "moved to dedicated generateDatasetUploadUrl mutation")
uploadUrl: String! @deprecated(reason: "moved to dedicated generateDatasetUploadUrl mutation")
success: Boolean!
errors: [CreateDatasetVersionFileError!]!
}
Expand Down Expand Up @@ -429,6 +447,8 @@ extend type Query {
dataset(id: ID!): Dataset
"Get a dataset by its slug."
datasetVersion(id: ID!): DatasetVersion
"Get a dataset file by its id "
datasetVersionFile(id: ID!): DatasetVersionFile
"Get a dataset link by its id."
datasetLink(id: ID!): DatasetLink
"Get a dataset link by its slug."
Expand All @@ -437,6 +457,7 @@ extend type Query {
datasets(query: String, page: Int = 1, perPage: Int = 15): DatasetPage!
}


extend type Mutation {
"Create a new dataset."
createDataset(input: CreateDatasetInput!): CreateDatasetResult! @loginRequired
Expand All @@ -460,4 +481,5 @@ extend type Mutation {
deleteDatasetLink(input: DeleteDatasetLinkInput!): DeleteDatasetLinkResult! @loginRequired
"Pin or unpin a dataset for a workspace."
pinDataset(input: PinDatasetInput!): PinDatasetResult! @loginRequired
}
}

56 changes: 56 additions & 0 deletions hexa/datasets/migrations/0006_datasetfilemetadata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Generated by Django 5.0.7 on 2024-08-07 09:02

import uuid

import django.db.models.deletion
from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
("datasets", "0005_datasetfilemetadatajob"),
]

operations = [
migrations.CreateModel(
name="DatasetFileMetadata",
fields=[
(
"id",
models.UUIDField(
default=uuid.uuid4,
editable=False,
primary_key=True,
serialize=False,
),
),
("created_at", models.DateTimeField(auto_now_add=True)),
("updated_at", models.DateTimeField(auto_now=True)),
("sample", models.JSONField(blank=True, default=list, null=True)),
(
"status",
models.CharField(
choices=[
("PROCESSING", "Processing"),
("FAILED", "Failed"),
("FINISHED", "Finished"),
],
default="PROCESSING",
max_length=10,
),
),
("status_reason", models.TextField(blank=True, null=True)),
(
"dataset_version_file",
models.ForeignKey(
on_delete=django.db.models.deletion.CASCADE,
related_name="metadata_entries",
to="datasets.datasetversionfile",
),
),
],
options={
"abstract": False,
},
),
]
33 changes: 33 additions & 0 deletions hexa/datasets/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
from django.contrib.auth.models import AnonymousUser
from django.core.exceptions import PermissionDenied
from django.db import models
from django.db.models import JSONField
from django.utils.translation import gettext_lazy as _
from dpq.models import BaseJob
from slugify import slugify

Expand Down Expand Up @@ -251,10 +253,41 @@ class DatasetVersionFile(Base):
def filename(self):
return self.uri.split("/")[-1]

@property
def latest_metadata(self):
return self.metadata_entries.order_by("-created_at").first()

class Meta:
ordering = ["uri"]


class DatasetFileMetadata(Base):
STATUS_PROCESSING = "PROCESSING"
STATUS_FAILED = "FAILED"
STATUS_FINISHED = "FINISHED"

STATUS_CHOICES = [
(STATUS_PROCESSING, _("Processing")),
(STATUS_FAILED, _("Failed")),
(STATUS_FINISHED, _("Finished")),
]

sample = JSONField(blank=True, default=list, null=True)
status = models.CharField(
max_length=10,
choices=STATUS_CHOICES,
default=STATUS_PROCESSING,
)
status_reason = models.TextField(blank=True, null=True)
dataset_version_file = models.ForeignKey(
DatasetVersionFile,
null=False,
blank=False,
on_delete=models.CASCADE,
related_name="metadata_entries",
)


class DatasetLinkQuerySet(BaseQuerySet):
def filter_for_user(self, user: AnonymousUser | User):
# FIXME: Use a generic permission system instead of differencing between User and PipelineRunUser
Expand Down
6 changes: 5 additions & 1 deletion hexa/datasets/permissions.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
from hexa.datasets.models import Dataset, DatasetLink, DatasetVersion
from hexa.datasets.models import (
Dataset,
DatasetLink,
DatasetVersion,
)
from hexa.user_management.models import User
from hexa.workspaces.models import (
Workspace,
Expand Down
108 changes: 98 additions & 10 deletions hexa/datasets/queue.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,108 @@
import json
from logging import getLogger

import pandas as pd
from django.conf import settings
from django.core.exceptions import ObjectDoesNotExist, ValidationError
from django.db import DatabaseError, IntegrityError
from dpq.queue import AtLeastOnceQueue

from hexa.datasets.models import DatasetFileMetadataJob
from hexa.core import mimetypes
from hexa.datasets.api import generate_download_url
from hexa.datasets.models import (
DatasetFileMetadata,
DatasetFileMetadataJob,
DatasetVersionFile,
)

logger = getLogger(__name__)


def is_supported_mimetype(filename: str) -> bool:
supported_mimetypes = [
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
"application/vnd.ms-excel",
"application/vnd.apache.parquet",
"text/csv",
]
supported_extensions = ["parquet"]
suffix = filename.split(".")[-1]
mime_type, encoding = mimetypes.guess_type(filename, strict=False)
return mime_type in supported_mimetypes or suffix in supported_extensions


def download_file_as_dataframe(
dataset_version_file: DatasetVersionFile,
) -> pd.DataFrame | None:
mime_type, encoding = mimetypes.guess_type(
dataset_version_file.filename, strict=False
)
download_url = generate_download_url(dataset_version_file)
if mime_type == "text/csv":
return pd.read_csv(download_url)
elif (
mime_type == "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
or mime_type == "application/vnd.ms-excel"
):
return pd.read_excel(download_url)
elif (
mime_type == "application/vnd.apache.parquet"
or dataset_version_file.filename.split(".")[-1] == "parquet"
):
return pd.read_parquet(download_url)


def generate_dataset_file_sample_task(
queue: AtLeastOnceQueue, job: DatasetFileMetadataJob
):
# TODO: imlpement ticket PATHWAYS-98 - extract data in background task
dataset_version_file_id = job.args["fileId"]
logger.info(f"Creating dataset version file {dataset_version_file_id}")
dataset_version_file_id = job.args["file_id"]
try:
dataset_version_file = DatasetVersionFile.objects.get(
id=dataset_version_file_id
)
except ObjectDoesNotExist as e:
logger.error(
f"DatasetVersionFile with id {dataset_version_file_id} does not exist: {e}"
)
return

if not is_supported_mimetype(dataset_version_file.filename):
logger.info(f"Unsupported file format: {dataset_version_file.filename}")
return

logger.info(f"Creating dataset sample for version file {dataset_version_file.id}")
try:
dataset_file_metadata = DatasetFileMetadata.objects.create(
dataset_version_file=dataset_version_file,
status=DatasetFileMetadata.STATUS_PROCESSING,
)
except (IntegrityError, DatabaseError, ValidationError) as e:
logger.error(f"Error creating DatasetFileMetadata: {e}")
return

try:
file_content = download_file_as_dataframe(dataset_version_file)
if not file_content.empty:
random_seed = 22
file_sample = file_content.sample(
settings.WORKSPACE_DATASETS_FILE_SNAPSHOT_SIZE,
random_state=random_seed,
replace=True,
)
dataset_file_metadata.sample = file_sample.to_json(orient="records")
else:
dataset_file_metadata.sample = json.dumps([])
logger.info(f"Dataset sample saved for file {dataset_version_file_id}")
dataset_file_metadata.status = DatasetFileMetadata.STATUS_FINISHED
dataset_file_metadata.save()
logger.info(f"Dataset sample created for file {dataset_version_file_id}")
except Exception as e:
nazarfil marked this conversation as resolved.
Show resolved Hide resolved
dataset_file_metadata.status = DatasetFileMetadata.STATUS_FAILED
dataset_file_metadata.status_reason = str(e)
dataset_file_metadata.save()
logger.exception(
f"Dataset file sample creation failed for file {dataset_version_file_id}: {e}"
)


class DatasetsFileMetadataQueue(AtLeastOnceQueue):
Expand All @@ -21,18 +111,16 @@ class DatasetsFileMetadataQueue(AtLeastOnceQueue):

dataset_file_metadata_queue = DatasetsFileMetadataQueue(
tasks={
"generate_file_sample": generate_dataset_file_sample_task,
"generate_file_metadata": generate_dataset_file_sample_task,
},
notify_channel="dataset_file_metadata_queue",
)


def load_file_metadata(file_id: str):
dataset_file_metadata_queue.enqueue(
"generate_file_metadata",
{
"generate_file_metadata",
{
"file_id": str(file_id),
},
}
"file_id": str(file_id),
},
)
18 changes: 17 additions & 1 deletion hexa/datasets/schema/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@

from hexa.core.graphql import result_page

from ..models import Dataset, DatasetLink, DatasetVersion
from ..models import (
Dataset,
DatasetLink,
DatasetVersion,
DatasetVersionFile,
)

datasets_queries = QueryType()

Expand Down Expand Up @@ -37,6 +42,17 @@ def resolve_dataset_version(_, info, **kwargs):
return None


@datasets_queries.field("datasetVersionFile")
def resolve_dataset_version_file(_, info, **kwargs):
request = info.context["request"]
try:
return DatasetVersionFile.objects.filter_for_user(request.user).get(
id=kwargs["id"]
)
except DatasetVersionFile.DoesNotExist:
return None


@datasets_queries.field("datasetLink")
def resolve_dataset_link(_, info, **kwargs):
request = info.context["request"]
Expand Down
Loading
Loading