Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Implement acquiring/releasing of reserverations as bulk operation
Browse files Browse the repository at this point in the history
Implement deleting of artifacts as bulk operation

Signed-off-by: Nick Müller <[email protected]>
  • Loading branch information
Nick Müller committed Jan 19, 2023
1 parent b6aaa3c commit 2e80fba
Show file tree
Hide file tree
Showing 14 changed files with 1,538 additions and 47 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/flyteorg/datacatalog
go 1.18

replace (
github.com/flyteorg/flyteidl => github.com/blackshark-ai/flyteidl v0.24.22-0.20230104143947-9cc1f12a643f
github.com/flyteorg/flyteidl => github.com/blackshark-ai/flyteidl v0.24.22-0.20230119104851-e9fb728f4733
github.com/flyteorg/flytestdlib => github.com/blackshark-ai/flytestdlib v1.0.1-0.20230104151410-d6ec6dba8697
)

Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,8 @@ github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6r
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84=
github.com/bketelsen/crypt v0.0.4/go.mod h1:aI6NrJ0pMGgvZKL1iVgXLnfIFJtfV+bKCoqOes/6LfM=
github.com/blackshark-ai/flyteidl v0.24.22-0.20230104143947-9cc1f12a643f h1:BFpozetWgkWdrOz2wDQY/2pcvm8Wx60uI4mJup+0yrg=
github.com/blackshark-ai/flyteidl v0.24.22-0.20230104143947-9cc1f12a643f/go.mod h1:sgOlQA2lnugarwSN8M+9gWoCZmzYNFI8gpShZrm+wmo=
github.com/blackshark-ai/flyteidl v0.24.22-0.20230119104851-e9fb728f4733 h1:xvr9DovH3m3S10+U71+lJ6zmJDFoOHPLSZfXSRtZxEo=
github.com/blackshark-ai/flyteidl v0.24.22-0.20230119104851-e9fb728f4733/go.mod h1:sgOlQA2lnugarwSN8M+9gWoCZmzYNFI8gpShZrm+wmo=
github.com/blackshark-ai/flytestdlib v1.0.1-0.20230104151410-d6ec6dba8697 h1:N3ch1D89Hhe72LK9zVuSwZ9DrRl9G3M5fsntD6ydZEY=
github.com/blackshark-ai/flytestdlib v1.0.1-0.20230104151410-d6ec6dba8697/go.mod h1:ojJnMQ9sDf1VW6zrHv8TauKrMV0+Pf+6n+uProvhzac=
github.com/blang/semver v3.5.0+incompatible/go.mod h1:kRBLl5iJ+tD4TcOOxsy/0fnwebNt5EWlYSAyrTnjyyk=
Expand Down
2 changes: 1 addition & 1 deletion pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func NewCollectedErrors(code codes.Code, errors []error) error {
errorCollection[idx] = err.Error()
}

return NewDataCatalogError(code, strings.Join((errorCollection), ", "))
return NewDataCatalogError(code, strings.Join(errorCollection, ", "))
}

func IsAlreadyExistsError(err error) bool {
Expand Down
97 changes: 71 additions & 26 deletions pkg/manager/impl/artifact_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ type artifactMetrics struct {
deleteResponseTime labeled.StopWatch
deleteSuccessCounter labeled.Counter
deleteFailureCounter labeled.Counter
bulkDeleteResponseTime labeled.StopWatch
bulkDeleteSuccessCounter labeled.Counter
bulkDeleteFailureCounter labeled.Counter
}

type artifactManager struct {
Expand All @@ -57,7 +60,8 @@ type artifactManager struct {
systemMetrics artifactMetrics
}

// Create an Artifact along with the associated ArtifactData. The ArtifactData will be stored in an offloaded location.
// CreateArtifact creates an Artifact along with the associated ArtifactData. The ArtifactData will be stored in an
// offloaded location.
func (m *artifactManager) CreateArtifact(ctx context.Context, request *datacatalog.CreateArtifactRequest) (*datacatalog.CreateArtifactResponse, error) {
timer := m.systemMetrics.createResponseTime.Start(ctx)
defer timer.Stop()
Expand Down Expand Up @@ -133,7 +137,7 @@ func (m *artifactManager) CreateArtifact(ctx context.Context, request *datacatal
return &datacatalog.CreateArtifactResponse{}, nil
}

// Get the Artifact and its associated ArtifactData. The request can query by ArtifactID or TagName.
// GetArtifact retrieves the Artifact and its associated ArtifactData. The request can query by ArtifactID or TagName.
func (m *artifactManager) GetArtifact(ctx context.Context, request *datacatalog.GetArtifactRequest) (*datacatalog.GetArtifactResponse, error) {
timer := m.systemMetrics.getResponseTime.Start(ctx)
defer timer.Stop()
Expand Down Expand Up @@ -244,6 +248,8 @@ func (m *artifactManager) getArtifactDataList(ctx context.Context, artifactDataM
return artifactDataList, nil
}

// ListArtifacts returns a paginated list of artifacts matching the provided filter expression, including their
// associated artifact data.
func (m *artifactManager) ListArtifacts(ctx context.Context, request *datacatalog.ListArtifactsRequest) (*datacatalog.ListArtifactsResponse, error) {
err := validators.ValidateListArtifactRequest(request)
if err != nil {
Expand Down Expand Up @@ -408,36 +414,22 @@ func (m *artifactManager) UpdateArtifact(ctx context.Context, request *datacatal
}, nil
}

// DeleteArtifact deletes the given artifact, removing all stored artifact data from the underlying blob storage.
func (m *artifactManager) DeleteArtifact(ctx context.Context, request *datacatalog.DeleteArtifactRequest) (*datacatalog.DeleteArtifactResponse, error) {
ctx = contextutils.WithProjectDomain(ctx, request.Dataset.Project, request.Dataset.Domain)

timer := m.systemMetrics.deleteResponseTime.Start(ctx)
defer timer.Stop()

err := validators.ValidateDeleteArtifactRequest(request)
if err != nil {
logger.Warningf(ctx, "Invalid delete artifact request %v, err: %v", request, err)
m.systemMetrics.validationErrorCounter.Inc(ctx)
m.systemMetrics.deleteFailureCounter.Inc(ctx)
return nil, err
}
func (m *artifactManager) deleteArtifact(ctx context.Context, datasetID *datacatalog.DatasetID, queryHandle artifactQueryHandle) error {
ctx = contextutils.WithProjectDomain(ctx, datasetID.Project, datasetID.Domain)

// artifact must already exist, verify first
artifactModel, err := m.findArtifact(ctx, request.GetDataset(), request)
artifactModel, err := m.findArtifact(ctx, datasetID, queryHandle)
if err != nil {
logger.Errorf(ctx, "Failed to get artifact for delete artifact request %v, err: %v", request, err)
m.systemMetrics.deleteFailureCounter.Inc(ctx)
return nil, err
logger.Errorf(ctx, "Failed to get artifact while trying to delete [%v], err: %v", queryHandle, err)
return err
}

// delete all artifact data from the blob storage
for _, artifactData := range artifactModel.ArtifactData {
if err := m.artifactStore.DeleteData(ctx, artifactData); err != nil {
logger.Errorf(ctx, "Failed to delete artifact data [%v] during delete, err: %v", artifactData.Name, err)
logger.Errorf(ctx, "Failed to delete artifact data [%v] while deleting artifact [%v], err: %v", artifactData.Name, artifactModel.ArtifactID, err)
m.systemMetrics.deleteDataFailureCounter.Inc(ctx)
m.systemMetrics.deleteFailureCounter.Inc(ctx)
return nil, err
return err
}

m.systemMetrics.deleteDataSuccessCounter.Inc(ctx)
Expand All @@ -447,21 +439,71 @@ func (m *artifactManager) DeleteArtifact(ctx context.Context, request *datacatal
err = m.repo.ArtifactRepo().Delete(ctx, artifactModel)
if err != nil {
if errors.IsDoesNotExistError(err) {
logger.Warnf(ctx, "Artifact does not exist key: %+v, err %v", artifactModel.ArtifactID, err)
logger.Warnf(ctx, "Artifact [%v] does not exist, err %v", artifactModel.ArtifactID, err)
m.systemMetrics.doesNotExistCounter.Inc(ctx)
} else {
logger.Errorf(ctx, "Failed to delete artifact %v, err: %v", artifactModel, err)
logger.Errorf(ctx, "Failed to delete artifact [%v], err: %v", artifactModel, err)
}
return err
}

logger.Debugf(ctx, "Successfully deleted artifact [%v]", artifactModel.ArtifactID)
return nil
}

// DeleteArtifact deletes the given artifact, removing all stored artifact data from the underlying blob storage.
func (m *artifactManager) DeleteArtifact(ctx context.Context, request *datacatalog.DeleteArtifactRequest) (*datacatalog.DeleteArtifactResponse, error) {
ctx = contextutils.WithProjectDomain(ctx, request.Dataset.Project, request.Dataset.Domain)

timer := m.systemMetrics.deleteResponseTime.Start(ctx)
defer timer.Stop()

err := validators.ValidateDeleteArtifactRequest(request)
if err != nil {
logger.Warningf(ctx, "Invalid delete artifacts request %v, err: %v", request, err)
m.systemMetrics.validationErrorCounter.Inc(ctx)
m.systemMetrics.deleteFailureCounter.Inc(ctx)
return nil, err
}

logger.Debugf(ctx, "Successfully deleted artifact id: %v", artifactModel.ArtifactID)
if err := m.deleteArtifact(ctx, request.GetDataset(), request); err != nil {
m.systemMetrics.deleteFailureCounter.Inc(ctx)
return nil, err
}

m.systemMetrics.deleteSuccessCounter.Inc(ctx)
return &datacatalog.DeleteArtifactResponse{}, nil
}

// DeleteArtifacts deletes the given artifacts, removing all stored artifact data from the underlying blob storage.
func (m *artifactManager) DeleteArtifacts(ctx context.Context, request *datacatalog.DeleteArtifactsRequest) (*datacatalog.DeleteArtifactResponse, error) {
timer := m.systemMetrics.bulkDeleteResponseTime.Start(ctx)
defer timer.Stop()

err := validators.ValidateDeleteArtifactsRequest(request)
if err != nil {
logger.Warningf(ctx, "Invalid delete artifacts request %v, err: %v", request, err)
m.systemMetrics.validationErrorCounter.Inc(ctx)
m.systemMetrics.bulkDeleteFailureCounter.Inc(ctx)
return nil, err
}

for _, deleteArtifactReq := range request.Artifacts {
if err := m.deleteArtifact(ctx, deleteArtifactReq.GetDataset(), deleteArtifactReq); err != nil {
// bulk delete endpoint is idempotent, ignore errors regarding missing artifacts as they might've already
// been deleted by a previous call.
if errors.IsDoesNotExistError(err) {
continue
}
m.systemMetrics.bulkDeleteFailureCounter.Inc(ctx)
return nil, err
}
}

m.systemMetrics.bulkDeleteSuccessCounter.Inc(ctx)
return &datacatalog.DeleteArtifactResponse{}, nil
}

func NewArtifactManager(repo repositories.RepositoryInterface, store *storage.DataStore, storagePrefix storage.DataReference, artifactScope promutils.Scope) interfaces.ArtifactManager {
artifactMetrics := artifactMetrics{
scope: artifactScope,
Expand Down Expand Up @@ -489,6 +531,9 @@ func NewArtifactManager(repo repositories.RepositoryInterface, store *storage.Da
deleteResponseTime: labeled.NewStopWatch("delete_duration", "The duration of the delete artifact calls.", time.Millisecond, artifactScope, labeled.EmitUnlabeledMetric),
deleteSuccessCounter: labeled.NewCounter("delete_success_count", "The number of times delete artifact succeeded", artifactScope, labeled.EmitUnlabeledMetric),
deleteFailureCounter: labeled.NewCounter("delete_failure_count", "The number of times delete artifact failed", artifactScope, labeled.EmitUnlabeledMetric),
bulkDeleteResponseTime: labeled.NewStopWatch("bulk_delete_duration", "The duration of the bulk delete artifacts calls.", time.Millisecond, artifactScope, labeled.EmitUnlabeledMetric),
bulkDeleteSuccessCounter: labeled.NewCounter("bulk_delete_success_count", "The number of times bulk delete artifacts succeeded", artifactScope, labeled.EmitUnlabeledMetric),
bulkDeleteFailureCounter: labeled.NewCounter("bulk_delete_failure_count", "The number of times bulk delete artifacts failed", artifactScope, labeled.EmitUnlabeledMetric),
}

return &artifactManager{
Expand Down
Loading

0 comments on commit 2e80fba

Please sign in to comment.