From 2e80fbaa49fa7de21a3a12da04f6dd7cb04d765f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nick=20M=C3=BCller?= Date: Thu, 19 Jan 2023 13:12:19 +0100 Subject: [PATCH] Implement acquiring/releasing of reserverations as bulk operation Implement deleting of artifacts as bulk operation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Nick Müller --- go.mod | 2 +- go.sum | 4 +- pkg/errors/errors.go | 2 +- pkg/manager/impl/artifact_manager.go | 97 ++- pkg/manager/impl/artifact_manager_test.go | 400 ++++++++++ pkg/manager/impl/reservation_manager.go | 111 ++- pkg/manager/impl/reservation_manager_test.go | 723 +++++++++++++++++- .../impl/validators/artifact_validator.go | 22 + .../impl/validators/reservation_validator.go | 86 +++ pkg/manager/interfaces/artifact.go | 1 + pkg/manager/interfaces/reservation.go | 2 + pkg/manager/mocks/artifact_manager.go | 41 + pkg/manager/mocks/reservation_manager.go | 82 ++ pkg/rpc/datacatalogservice/service.go | 12 + 14 files changed, 1538 insertions(+), 47 deletions(-) create mode 100644 pkg/manager/impl/validators/reservation_validator.go diff --git a/go.mod b/go.mod index 8ec063f0..aa855797 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/flyteorg/datacatalog go 1.18 replace ( - github.com/flyteorg/flyteidl => github.com/blackshark-ai/flyteidl v0.24.22-0.20230104143947-9cc1f12a643f + github.com/flyteorg/flyteidl => github.com/blackshark-ai/flyteidl v0.24.22-0.20230119104851-e9fb728f4733 github.com/flyteorg/flytestdlib => github.com/blackshark-ai/flytestdlib v1.0.1-0.20230104151410-d6ec6dba8697 ) diff --git a/go.sum b/go.sum index 15a235ca..bf24e52b 100644 --- a/go.sum +++ b/go.sum @@ -185,8 +185,8 @@ github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6r github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84= github.com/bketelsen/crypt v0.0.4/go.mod h1:aI6NrJ0pMGgvZKL1iVgXLnfIFJtfV+bKCoqOes/6LfM= -github.com/blackshark-ai/flyteidl v0.24.22-0.20230104143947-9cc1f12a643f h1:BFpozetWgkWdrOz2wDQY/2pcvm8Wx60uI4mJup+0yrg= -github.com/blackshark-ai/flyteidl v0.24.22-0.20230104143947-9cc1f12a643f/go.mod h1:sgOlQA2lnugarwSN8M+9gWoCZmzYNFI8gpShZrm+wmo= +github.com/blackshark-ai/flyteidl v0.24.22-0.20230119104851-e9fb728f4733 h1:xvr9DovH3m3S10+U71+lJ6zmJDFoOHPLSZfXSRtZxEo= +github.com/blackshark-ai/flyteidl v0.24.22-0.20230119104851-e9fb728f4733/go.mod h1:sgOlQA2lnugarwSN8M+9gWoCZmzYNFI8gpShZrm+wmo= github.com/blackshark-ai/flytestdlib v1.0.1-0.20230104151410-d6ec6dba8697 h1:N3ch1D89Hhe72LK9zVuSwZ9DrRl9G3M5fsntD6ydZEY= github.com/blackshark-ai/flytestdlib v1.0.1-0.20230104151410-d6ec6dba8697/go.mod h1:ojJnMQ9sDf1VW6zrHv8TauKrMV0+Pf+6n+uProvhzac= github.com/blang/semver v3.5.0+incompatible/go.mod h1:kRBLl5iJ+tD4TcOOxsy/0fnwebNt5EWlYSAyrTnjyyk= diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go index ac778f00..37643a23 100644 --- a/pkg/errors/errors.go +++ b/pkg/errors/errors.go @@ -51,7 +51,7 @@ func NewCollectedErrors(code codes.Code, errors []error) error { errorCollection[idx] = err.Error() } - return NewDataCatalogError(code, strings.Join((errorCollection), ", ")) + return NewDataCatalogError(code, strings.Join(errorCollection, ", ")) } func IsAlreadyExistsError(err error) bool { diff --git a/pkg/manager/impl/artifact_manager.go b/pkg/manager/impl/artifact_manager.go index f7389195..0d3aeec1 100644 --- a/pkg/manager/impl/artifact_manager.go +++ b/pkg/manager/impl/artifact_manager.go @@ -49,6 +49,9 @@ type artifactMetrics struct { deleteResponseTime labeled.StopWatch deleteSuccessCounter labeled.Counter deleteFailureCounter labeled.Counter + bulkDeleteResponseTime labeled.StopWatch + bulkDeleteSuccessCounter labeled.Counter + bulkDeleteFailureCounter labeled.Counter } type artifactManager struct { @@ -57,7 +60,8 @@ type artifactManager struct { systemMetrics artifactMetrics } -// Create an Artifact along with the associated ArtifactData. The ArtifactData will be stored in an offloaded location. +// CreateArtifact creates an Artifact along with the associated ArtifactData. The ArtifactData will be stored in an +// offloaded location. func (m *artifactManager) CreateArtifact(ctx context.Context, request *datacatalog.CreateArtifactRequest) (*datacatalog.CreateArtifactResponse, error) { timer := m.systemMetrics.createResponseTime.Start(ctx) defer timer.Stop() @@ -133,7 +137,7 @@ func (m *artifactManager) CreateArtifact(ctx context.Context, request *datacatal return &datacatalog.CreateArtifactResponse{}, nil } -// Get the Artifact and its associated ArtifactData. The request can query by ArtifactID or TagName. +// GetArtifact retrieves the Artifact and its associated ArtifactData. The request can query by ArtifactID or TagName. func (m *artifactManager) GetArtifact(ctx context.Context, request *datacatalog.GetArtifactRequest) (*datacatalog.GetArtifactResponse, error) { timer := m.systemMetrics.getResponseTime.Start(ctx) defer timer.Stop() @@ -244,6 +248,8 @@ func (m *artifactManager) getArtifactDataList(ctx context.Context, artifactDataM return artifactDataList, nil } +// ListArtifacts returns a paginated list of artifacts matching the provided filter expression, including their +// associated artifact data. func (m *artifactManager) ListArtifacts(ctx context.Context, request *datacatalog.ListArtifactsRequest) (*datacatalog.ListArtifactsResponse, error) { err := validators.ValidateListArtifactRequest(request) if err != nil { @@ -408,36 +414,22 @@ func (m *artifactManager) UpdateArtifact(ctx context.Context, request *datacatal }, nil } -// DeleteArtifact deletes the given artifact, removing all stored artifact data from the underlying blob storage. -func (m *artifactManager) DeleteArtifact(ctx context.Context, request *datacatalog.DeleteArtifactRequest) (*datacatalog.DeleteArtifactResponse, error) { - ctx = contextutils.WithProjectDomain(ctx, request.Dataset.Project, request.Dataset.Domain) - - timer := m.systemMetrics.deleteResponseTime.Start(ctx) - defer timer.Stop() - - err := validators.ValidateDeleteArtifactRequest(request) - if err != nil { - logger.Warningf(ctx, "Invalid delete artifact request %v, err: %v", request, err) - m.systemMetrics.validationErrorCounter.Inc(ctx) - m.systemMetrics.deleteFailureCounter.Inc(ctx) - return nil, err - } +func (m *artifactManager) deleteArtifact(ctx context.Context, datasetID *datacatalog.DatasetID, queryHandle artifactQueryHandle) error { + ctx = contextutils.WithProjectDomain(ctx, datasetID.Project, datasetID.Domain) // artifact must already exist, verify first - artifactModel, err := m.findArtifact(ctx, request.GetDataset(), request) + artifactModel, err := m.findArtifact(ctx, datasetID, queryHandle) if err != nil { - logger.Errorf(ctx, "Failed to get artifact for delete artifact request %v, err: %v", request, err) - m.systemMetrics.deleteFailureCounter.Inc(ctx) - return nil, err + logger.Errorf(ctx, "Failed to get artifact while trying to delete [%v], err: %v", queryHandle, err) + return err } // delete all artifact data from the blob storage for _, artifactData := range artifactModel.ArtifactData { if err := m.artifactStore.DeleteData(ctx, artifactData); err != nil { - logger.Errorf(ctx, "Failed to delete artifact data [%v] during delete, err: %v", artifactData.Name, err) + logger.Errorf(ctx, "Failed to delete artifact data [%v] while deleting artifact [%v], err: %v", artifactData.Name, artifactModel.ArtifactID, err) m.systemMetrics.deleteDataFailureCounter.Inc(ctx) - m.systemMetrics.deleteFailureCounter.Inc(ctx) - return nil, err + return err } m.systemMetrics.deleteDataSuccessCounter.Inc(ctx) @@ -447,21 +439,71 @@ func (m *artifactManager) DeleteArtifact(ctx context.Context, request *datacatal err = m.repo.ArtifactRepo().Delete(ctx, artifactModel) if err != nil { if errors.IsDoesNotExistError(err) { - logger.Warnf(ctx, "Artifact does not exist key: %+v, err %v", artifactModel.ArtifactID, err) + logger.Warnf(ctx, "Artifact [%v] does not exist, err %v", artifactModel.ArtifactID, err) m.systemMetrics.doesNotExistCounter.Inc(ctx) } else { - logger.Errorf(ctx, "Failed to delete artifact %v, err: %v", artifactModel, err) + logger.Errorf(ctx, "Failed to delete artifact [%v], err: %v", artifactModel, err) } + return err + } + + logger.Debugf(ctx, "Successfully deleted artifact [%v]", artifactModel.ArtifactID) + return nil +} + +// DeleteArtifact deletes the given artifact, removing all stored artifact data from the underlying blob storage. +func (m *artifactManager) DeleteArtifact(ctx context.Context, request *datacatalog.DeleteArtifactRequest) (*datacatalog.DeleteArtifactResponse, error) { + ctx = contextutils.WithProjectDomain(ctx, request.Dataset.Project, request.Dataset.Domain) + + timer := m.systemMetrics.deleteResponseTime.Start(ctx) + defer timer.Stop() + + err := validators.ValidateDeleteArtifactRequest(request) + if err != nil { + logger.Warningf(ctx, "Invalid delete artifacts request %v, err: %v", request, err) + m.systemMetrics.validationErrorCounter.Inc(ctx) m.systemMetrics.deleteFailureCounter.Inc(ctx) return nil, err } - logger.Debugf(ctx, "Successfully deleted artifact id: %v", artifactModel.ArtifactID) + if err := m.deleteArtifact(ctx, request.GetDataset(), request); err != nil { + m.systemMetrics.deleteFailureCounter.Inc(ctx) + return nil, err + } m.systemMetrics.deleteSuccessCounter.Inc(ctx) return &datacatalog.DeleteArtifactResponse{}, nil } +// DeleteArtifacts deletes the given artifacts, removing all stored artifact data from the underlying blob storage. +func (m *artifactManager) DeleteArtifacts(ctx context.Context, request *datacatalog.DeleteArtifactsRequest) (*datacatalog.DeleteArtifactResponse, error) { + timer := m.systemMetrics.bulkDeleteResponseTime.Start(ctx) + defer timer.Stop() + + err := validators.ValidateDeleteArtifactsRequest(request) + if err != nil { + logger.Warningf(ctx, "Invalid delete artifacts request %v, err: %v", request, err) + m.systemMetrics.validationErrorCounter.Inc(ctx) + m.systemMetrics.bulkDeleteFailureCounter.Inc(ctx) + return nil, err + } + + for _, deleteArtifactReq := range request.Artifacts { + if err := m.deleteArtifact(ctx, deleteArtifactReq.GetDataset(), deleteArtifactReq); err != nil { + // bulk delete endpoint is idempotent, ignore errors regarding missing artifacts as they might've already + // been deleted by a previous call. + if errors.IsDoesNotExistError(err) { + continue + } + m.systemMetrics.bulkDeleteFailureCounter.Inc(ctx) + return nil, err + } + } + + m.systemMetrics.bulkDeleteSuccessCounter.Inc(ctx) + return &datacatalog.DeleteArtifactResponse{}, nil +} + func NewArtifactManager(repo repositories.RepositoryInterface, store *storage.DataStore, storagePrefix storage.DataReference, artifactScope promutils.Scope) interfaces.ArtifactManager { artifactMetrics := artifactMetrics{ scope: artifactScope, @@ -489,6 +531,9 @@ func NewArtifactManager(repo repositories.RepositoryInterface, store *storage.Da deleteResponseTime: labeled.NewStopWatch("delete_duration", "The duration of the delete artifact calls.", time.Millisecond, artifactScope, labeled.EmitUnlabeledMetric), deleteSuccessCounter: labeled.NewCounter("delete_success_count", "The number of times delete artifact succeeded", artifactScope, labeled.EmitUnlabeledMetric), deleteFailureCounter: labeled.NewCounter("delete_failure_count", "The number of times delete artifact failed", artifactScope, labeled.EmitUnlabeledMetric), + bulkDeleteResponseTime: labeled.NewStopWatch("bulk_delete_duration", "The duration of the bulk delete artifacts calls.", time.Millisecond, artifactScope, labeled.EmitUnlabeledMetric), + bulkDeleteSuccessCounter: labeled.NewCounter("bulk_delete_success_count", "The number of times bulk delete artifacts succeeded", artifactScope, labeled.EmitUnlabeledMetric), + bulkDeleteFailureCounter: labeled.NewCounter("bulk_delete_failure_count", "The number of times bulk delete artifacts failed", artifactScope, labeled.EmitUnlabeledMetric), } return &artifactManager{ diff --git a/pkg/manager/impl/artifact_manager_test.go b/pkg/manager/impl/artifact_manager_test.go index a0b355a4..4bf8f1e3 100644 --- a/pkg/manager/impl/artifact_manager_test.go +++ b/pkg/manager/impl/artifact_manager_test.go @@ -1238,3 +1238,403 @@ func TestDeleteArtifact(t *testing.T) { assert.Nil(t, artifactResponse) }) } + +func TestDeleteArtifacts(t *testing.T) { + ctx := context.Background() + datastore := createInmemoryDataStore(t, mockScope.NewTestScope()) + testStoragePrefix, err := datastore.ConstructReference(ctx, datastore.GetBaseContainerFQN(ctx), "test") + assert.NoError(t, err) + + expectedDataset := getTestDataset() + expectedArtifact := getTestArtifact() + expectedTag := getTestTag() + + t.Run("Delete by ID", func(t *testing.T) { + ctx := context.Background() + datastore := createInmemoryDataStore(t, mockScope.NewTestScope()) + mockArtifactModel := getExpectedArtifactModel(ctx, t, datastore, expectedArtifact) + + dcRepo := newMockDataCatalogRepo() + dcRepo.MockArtifactRepo.On("Get", + mock.MatchedBy(func(ctx context.Context) bool { return true }), + mock.MatchedBy(func(artifactKey models.ArtifactKey) bool { + return artifactKey.ArtifactID == expectedArtifact.Id && + artifactKey.DatasetProject == expectedArtifact.Dataset.Project && + artifactKey.DatasetDomain == expectedArtifact.Dataset.Domain && + artifactKey.DatasetName == expectedArtifact.Dataset.Name && + artifactKey.DatasetVersion == expectedArtifact.Dataset.Version + })).Return(mockArtifactModel, nil) + + dcRepo.MockArtifactRepo.On("Delete", + mock.MatchedBy(func(ctx context.Context) bool { return true }), + mock.MatchedBy(func(artifact models.Artifact) bool { + return artifact.ArtifactID == expectedArtifact.Id && + artifact.DatasetProject == expectedArtifact.Dataset.Project && + artifact.DatasetDomain == expectedArtifact.Dataset.Domain && + artifact.DatasetName == expectedArtifact.Dataset.Name && + artifact.DatasetVersion == expectedArtifact.Dataset.Version + })).Return(nil) + + request := &datacatalog.DeleteArtifactsRequest{ + Artifacts: []*datacatalog.DeleteArtifactRequest{ + { + Dataset: expectedDataset.Id, + QueryHandle: &datacatalog.DeleteArtifactRequest_ArtifactId{ + ArtifactId: expectedArtifact.Id, + }, + }, + }, + } + + artifactManager := NewArtifactManager(dcRepo, datastore, testStoragePrefix, mockScope.NewTestScope()) + artifactResponse, err := artifactManager.DeleteArtifacts(ctx, request) + assert.NoError(t, err) + assert.NotNil(t, artifactResponse) + + // check that the datastore no longer has artifactData available + dataRef, err := getExpectedDatastoreLocationFromName(ctx, datastore, testStoragePrefix, expectedArtifact, "data1") + assert.NoError(t, err) + var value core.Literal + err = datastore.ReadProtobuf(ctx, dataRef, &value) + assert.Error(t, err) + assert.True(t, stdErrors.Is(err, os.ErrNotExist)) + }) + + t.Run("Delete by artifact tag", func(t *testing.T) { + ctx := context.Background() + datastore := createInmemoryDataStore(t, mockScope.NewTestScope()) + mockArtifactModel := getExpectedArtifactModel(ctx, t, datastore, expectedArtifact) + + dcRepo := newMockDataCatalogRepo() + dcRepo.MockArtifactRepo.On("Delete", + mock.MatchedBy(func(ctx context.Context) bool { return true }), + mock.MatchedBy(func(artifact models.Artifact) bool { + return artifact.ArtifactID == expectedArtifact.Id && + artifact.DatasetProject == expectedArtifact.Dataset.Project && + artifact.DatasetDomain == expectedArtifact.Dataset.Domain && + artifact.DatasetName == expectedArtifact.Dataset.Name && + artifact.DatasetVersion == expectedArtifact.Dataset.Version + })).Return(nil) + + dcRepo.MockTagRepo.On("Get", mock.Anything, + mock.MatchedBy(func(tag models.TagKey) bool { + return tag.TagName == expectedTag.TagName && + tag.DatasetProject == expectedTag.DatasetProject && + tag.DatasetDomain == expectedTag.DatasetDomain && + tag.DatasetVersion == expectedTag.DatasetVersion && + tag.DatasetName == expectedTag.DatasetName + })).Return(models.Tag{ + TagKey: models.TagKey{ + DatasetProject: expectedTag.DatasetProject, + DatasetDomain: expectedTag.DatasetDomain, + DatasetName: expectedTag.DatasetName, + DatasetVersion: expectedTag.DatasetVersion, + TagName: expectedTag.TagName, + }, + DatasetUUID: expectedTag.DatasetUUID, + Artifact: mockArtifactModel, + ArtifactID: mockArtifactModel.ArtifactID, + }, nil) + + request := &datacatalog.DeleteArtifactsRequest{ + Artifacts: []*datacatalog.DeleteArtifactRequest{ + { + Dataset: expectedDataset.Id, + QueryHandle: &datacatalog.DeleteArtifactRequest_TagName{ + TagName: expectedTag.TagName, + }, + }, + }, + } + + artifactManager := NewArtifactManager(dcRepo, datastore, testStoragePrefix, mockScope.NewTestScope()) + artifactResponse, err := artifactManager.DeleteArtifacts(ctx, request) + assert.NoError(t, err) + assert.NotNil(t, artifactResponse) + + // check that the datastore no longer has artifactData available + dataRef, err := getExpectedDatastoreLocationFromName(ctx, datastore, testStoragePrefix, expectedArtifact, "data1") + assert.NoError(t, err) + var value core.Literal + err = datastore.ReadProtobuf(ctx, dataRef, &value) + assert.Error(t, err) + assert.True(t, stdErrors.Is(err, os.ErrNotExist)) + }) + + t.Run("Artifact not found", func(t *testing.T) { + ctx := context.Background() + datastore := createInmemoryDataStore(t, mockScope.NewTestScope()) + + dcRepo := newMockDataCatalogRepo() + dcRepo.MockArtifactRepo.On("Get", mock.Anything, mock.Anything).Return(models.Artifact{}, repoErrors.GetMissingEntityError("Artifact", &datacatalog.Artifact{ + Dataset: expectedDataset.Id, + Id: expectedArtifact.Id, + })) + + request := &datacatalog.DeleteArtifactsRequest{ + Artifacts: []*datacatalog.DeleteArtifactRequest{ + { + Dataset: expectedDataset.Id, + QueryHandle: &datacatalog.DeleteArtifactRequest_ArtifactId{ + ArtifactId: expectedArtifact.Id, + }, + }, + }, + } + + artifactManager := NewArtifactManager(dcRepo, datastore, testStoragePrefix, mockScope.NewTestScope()) + artifactResponse, err := artifactManager.DeleteArtifacts(ctx, request) + assert.NoError(t, err) + assert.NotNil(t, artifactResponse) + }) + + t.Run("Artifact not found during delete", func(t *testing.T) { + ctx := context.Background() + datastore := createInmemoryDataStore(t, mockScope.NewTestScope()) + mockArtifactModel := getExpectedArtifactModel(ctx, t, datastore, expectedArtifact) + + dcRepo := newMockDataCatalogRepo() + dcRepo.MockArtifactRepo.On("Get", + mock.MatchedBy(func(ctx context.Context) bool { return true }), + mock.MatchedBy(func(artifactKey models.ArtifactKey) bool { + return artifactKey.ArtifactID == expectedArtifact.Id && + artifactKey.DatasetProject == expectedArtifact.Dataset.Project && + artifactKey.DatasetDomain == expectedArtifact.Dataset.Domain && + artifactKey.DatasetName == expectedArtifact.Dataset.Name && + artifactKey.DatasetVersion == expectedArtifact.Dataset.Version + })).Return(mockArtifactModel, nil) + dcRepo.MockArtifactRepo.On("Delete", mock.Anything, mock.Anything).Return(repoErrors.GetMissingEntityError("Artifact", &datacatalog.Artifact{ + Dataset: expectedDataset.Id, + Id: expectedArtifact.Id, + })) + + request := &datacatalog.DeleteArtifactsRequest{ + Artifacts: []*datacatalog.DeleteArtifactRequest{ + { + Dataset: expectedDataset.Id, + QueryHandle: &datacatalog.DeleteArtifactRequest_ArtifactId{ + ArtifactId: expectedArtifact.Id, + }, + }, + }, + } + + artifactManager := NewArtifactManager(dcRepo, datastore, testStoragePrefix, mockScope.NewTestScope()) + artifactResponse, err := artifactManager.DeleteArtifacts(ctx, request) + assert.NoError(t, err) + assert.NotNil(t, artifactResponse) + }) + + t.Run("Artifact delete failed", func(t *testing.T) { + ctx := context.Background() + datastore := createInmemoryDataStore(t, mockScope.NewTestScope()) + mockArtifactModel := getExpectedArtifactModel(ctx, t, datastore, expectedArtifact) + + dcRepo := newMockDataCatalogRepo() + dcRepo.MockArtifactRepo.On("Get", + mock.MatchedBy(func(ctx context.Context) bool { return true }), + mock.MatchedBy(func(artifactKey models.ArtifactKey) bool { + return artifactKey.ArtifactID == expectedArtifact.Id && + artifactKey.DatasetProject == expectedArtifact.Dataset.Project && + artifactKey.DatasetDomain == expectedArtifact.Dataset.Domain && + artifactKey.DatasetName == expectedArtifact.Dataset.Name && + artifactKey.DatasetVersion == expectedArtifact.Dataset.Version + })).Return(mockArtifactModel, nil) + dcRepo.MockArtifactRepo.On("Delete", mock.Anything, mock.Anything).Return(errors.NewDataCatalogError(codes.Internal, "failed")) + + request := &datacatalog.DeleteArtifactsRequest{ + Artifacts: []*datacatalog.DeleteArtifactRequest{ + { + Dataset: expectedDataset.Id, + QueryHandle: &datacatalog.DeleteArtifactRequest_ArtifactId{ + ArtifactId: expectedArtifact.Id, + }, + }, + }, + } + + artifactManager := NewArtifactManager(dcRepo, datastore, testStoragePrefix, mockScope.NewTestScope()) + artifactResponse, err := artifactManager.DeleteArtifacts(ctx, request) + assert.Error(t, err) + assert.Equal(t, codes.Internal, status.Code(err)) + assert.Nil(t, artifactResponse) + }) + + t.Run("Missing artifact ID", func(t *testing.T) { + ctx := context.Background() + datastore := createInmemoryDataStore(t, mockScope.NewTestScope()) + + dcRepo := newMockDataCatalogRepo() + + request := &datacatalog.DeleteArtifactsRequest{ + Artifacts: []*datacatalog.DeleteArtifactRequest{ + { + Dataset: expectedDataset.Id, + QueryHandle: &datacatalog.DeleteArtifactRequest_ArtifactId{}, + }, + }, + } + + artifactManager := NewArtifactManager(dcRepo, datastore, testStoragePrefix, mockScope.NewTestScope()) + artifactResponse, err := artifactManager.DeleteArtifacts(ctx, request) + assert.Error(t, err) + assert.Equal(t, codes.InvalidArgument, status.Code(err)) + assert.Nil(t, artifactResponse) + }) + + t.Run("Missing artifact tag", func(t *testing.T) { + ctx := context.Background() + datastore := createInmemoryDataStore(t, mockScope.NewTestScope()) + + dcRepo := newMockDataCatalogRepo() + + request := &datacatalog.DeleteArtifactsRequest{ + Artifacts: []*datacatalog.DeleteArtifactRequest{ + { + Dataset: expectedDataset.Id, + QueryHandle: &datacatalog.DeleteArtifactRequest_TagName{}, + }, + }, + } + + artifactManager := NewArtifactManager(dcRepo, datastore, testStoragePrefix, mockScope.NewTestScope()) + artifactResponse, err := artifactManager.DeleteArtifacts(ctx, request) + assert.Error(t, err) + assert.Equal(t, codes.InvalidArgument, status.Code(err)) + assert.Nil(t, artifactResponse) + }) + + t.Run("Idempotency", func(t *testing.T) { + ctx := context.Background() + datastore := createInmemoryDataStore(t, mockScope.NewTestScope()) + mockArtifactModel := getExpectedArtifactModel(ctx, t, datastore, expectedArtifact) + + dcRepo := newMockDataCatalogRepo() + dcRepo.MockArtifactRepo.On("Get", + mock.MatchedBy(func(ctx context.Context) bool { return true }), + mock.MatchedBy(func(artifactKey models.ArtifactKey) bool { + return artifactKey.ArtifactID == expectedArtifact.Id && + artifactKey.DatasetProject == expectedArtifact.Dataset.Project && + artifactKey.DatasetDomain == expectedArtifact.Dataset.Domain && + artifactKey.DatasetName == expectedArtifact.Dataset.Name && + artifactKey.DatasetVersion == expectedArtifact.Dataset.Version + })).Once().Return(mockArtifactModel, nil) + dcRepo.MockArtifactRepo.On("Get", mock.Anything, mock.Anything).Return(models.Artifact{}, + repoErrors.GetMissingEntityError("Artifact", &datacatalog.Artifact{ + Dataset: expectedDataset.Id, + Id: expectedArtifact.Id, + })) + + dcRepo.MockArtifactRepo.On("Delete", + mock.MatchedBy(func(ctx context.Context) bool { return true }), + mock.MatchedBy(func(artifact models.Artifact) bool { + return artifact.ArtifactID == expectedArtifact.Id && + artifact.DatasetProject == expectedArtifact.Dataset.Project && + artifact.DatasetDomain == expectedArtifact.Dataset.Domain && + artifact.DatasetName == expectedArtifact.Dataset.Name && + artifact.DatasetVersion == expectedArtifact.Dataset.Version + })).Once().Return(nil) + dcRepo.MockArtifactRepo.On("Delete", mock.Anything, mock.Anything). + Return(repoErrors.GetMissingEntityError("Artifact", &datacatalog.Artifact{ + Dataset: expectedDataset.Id, + Id: expectedArtifact.Id, + })) + + request := &datacatalog.DeleteArtifactsRequest{ + Artifacts: []*datacatalog.DeleteArtifactRequest{ + { + Dataset: expectedDataset.Id, + QueryHandle: &datacatalog.DeleteArtifactRequest_ArtifactId{ + ArtifactId: expectedArtifact.Id, + }, + }, + }, + } + + artifactManager := NewArtifactManager(dcRepo, datastore, testStoragePrefix, mockScope.NewTestScope()) + artifactResponse, err := artifactManager.DeleteArtifacts(ctx, request) + assert.NoError(t, err) + assert.NotNil(t, artifactResponse) + + // check that the datastore no longer has artifactData available + dataRef, err := getExpectedDatastoreLocationFromName(ctx, datastore, testStoragePrefix, expectedArtifact, "data1") + assert.NoError(t, err) + var value core.Literal + err = datastore.ReadProtobuf(ctx, dataRef, &value) + assert.Error(t, err) + assert.True(t, stdErrors.Is(err, os.ErrNotExist)) + + artifactResponse, err = artifactManager.DeleteArtifacts(ctx, request) + assert.NoError(t, err) + assert.NotNil(t, artifactResponse) + }) + + t.Run("Multiple states", func(t *testing.T) { + ctx := context.Background() + datastore := createInmemoryDataStore(t, mockScope.NewTestScope()) + mockArtifactModel := getExpectedArtifactModel(ctx, t, datastore, expectedArtifact) + + dcRepo := newMockDataCatalogRepo() + dcRepo.MockArtifactRepo.On("Get", + mock.MatchedBy(func(ctx context.Context) bool { return true }), + mock.MatchedBy(func(artifactKey models.ArtifactKey) bool { + return artifactKey.ArtifactID == expectedArtifact.Id && + artifactKey.DatasetProject == expectedArtifact.Dataset.Project && + artifactKey.DatasetDomain == expectedArtifact.Dataset.Domain && + artifactKey.DatasetName == expectedArtifact.Dataset.Name && + artifactKey.DatasetVersion == expectedArtifact.Dataset.Version + })).Return(mockArtifactModel, nil) + dcRepo.MockArtifactRepo.On("Get", + mock.MatchedBy(func(ctx context.Context) bool { return true }), + mock.MatchedBy(func(artifactKey models.ArtifactKey) bool { + return artifactKey.ArtifactID == "notFound" && + artifactKey.DatasetProject == expectedArtifact.Dataset.Project && + artifactKey.DatasetDomain == expectedArtifact.Dataset.Domain && + artifactKey.DatasetName == expectedArtifact.Dataset.Name && + artifactKey.DatasetVersion == expectedArtifact.Dataset.Version + })).Return(models.Artifact{}, repoErrors.GetMissingEntityError("Artifact", &datacatalog.Artifact{ + Dataset: expectedDataset.Id, + Id: "notFound", + })) + + dcRepo.MockArtifactRepo.On("Delete", + mock.MatchedBy(func(ctx context.Context) bool { return true }), + mock.MatchedBy(func(artifact models.Artifact) bool { + return artifact.ArtifactID == expectedArtifact.Id && + artifact.DatasetProject == expectedArtifact.Dataset.Project && + artifact.DatasetDomain == expectedArtifact.Dataset.Domain && + artifact.DatasetName == expectedArtifact.Dataset.Name && + artifact.DatasetVersion == expectedArtifact.Dataset.Version + })).Return(nil) + + request := &datacatalog.DeleteArtifactsRequest{ + Artifacts: []*datacatalog.DeleteArtifactRequest{ + { + Dataset: expectedDataset.Id, + QueryHandle: &datacatalog.DeleteArtifactRequest_ArtifactId{ + ArtifactId: expectedArtifact.Id, + }, + }, + { + Dataset: expectedDataset.Id, + QueryHandle: &datacatalog.DeleteArtifactRequest_ArtifactId{ + ArtifactId: "notFound", + }, + }, + }, + } + + artifactManager := NewArtifactManager(dcRepo, datastore, testStoragePrefix, mockScope.NewTestScope()) + artifactResponse, err := artifactManager.DeleteArtifacts(ctx, request) + assert.NoError(t, err) + assert.NotNil(t, artifactResponse) + + // check that the datastore no longer has artifactData available + dataRef, err := getExpectedDatastoreLocationFromName(ctx, datastore, testStoragePrefix, expectedArtifact, "data1") + assert.NoError(t, err) + var value core.Literal + err = datastore.ReadProtobuf(ctx, dataRef, &value) + assert.Error(t, err) + assert.True(t, stdErrors.Is(err, os.ErrNotExist)) + }) +} diff --git a/pkg/manager/impl/reservation_manager.go b/pkg/manager/impl/reservation_manager.go index 3adee114..60a69962 100644 --- a/pkg/manager/impl/reservation_manager.go +++ b/pkg/manager/impl/reservation_manager.go @@ -9,13 +9,13 @@ import ( "github.com/flyteorg/flytestdlib/promutils/labeled" "github.com/flyteorg/datacatalog/pkg/errors" + "github.com/flyteorg/datacatalog/pkg/manager/impl/validators" + "github.com/flyteorg/datacatalog/pkg/manager/interfaces" "github.com/flyteorg/datacatalog/pkg/repositories" - repo_errors "github.com/flyteorg/datacatalog/pkg/repositories/errors" + repoerrors "github.com/flyteorg/datacatalog/pkg/repositories/errors" "github.com/flyteorg/datacatalog/pkg/repositories/models" "github.com/flyteorg/datacatalog/pkg/repositories/transformers" - "github.com/flyteorg/datacatalog/pkg/manager/interfaces" - "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/datacatalog" ) @@ -25,7 +25,9 @@ type reservationMetrics struct { reservationReleased labeled.Counter reservationAlreadyInProgress labeled.Counter acquireReservationFailure labeled.Counter + acquireReservationsFailure labeled.Counter releaseReservationFailure labeled.Counter + releaseReservationsFailure labeled.Counter reservationDoesNotExist labeled.Counter } @@ -39,7 +41,7 @@ type reservationManager struct { systemMetrics reservationMetrics } -// Creates a new reservation manager with the specified properties +// NewReservationManager creates a new reservation manager with the specified properties func NewReservationManager( repo repositories.RepositoryInterface, heartbeatGracePeriodMultiplier time.Duration, @@ -67,11 +69,21 @@ func NewReservationManager( "Number of times we failed to acquire reservation", reservationScope, ), + acquireReservationsFailure: labeled.NewCounter( + "acquire_reservations_failure", + "Number of times we failed to acquire multiple reservations", + reservationScope, + ), releaseReservationFailure: labeled.NewCounter( "release_reservation_failure", "Number of times we failed to release a reservation", reservationScope, ), + releaseReservationsFailure: labeled.NewCounter( + "release_reservations_failure", + "Number of times we failed to release multiple reservations", + reservationScope, + ), reservationDoesNotExist: labeled.NewCounter( "reservation_does_not_exist", "Number of times we attempt to modify a reservation that does not exist", @@ -88,9 +100,15 @@ func NewReservationManager( } } -// Attempt to acquire a reservation for the specified artifact. If there is not active reservation, successfully -// acquire it. If you are the owner of the active reservation, extend it. If another owner, return the existing reservation. +// GetOrExtendReservation attempts to acquire a reservation for the specified artifact. If there is no active +// reservation, successfully acquire it. If you are the owner of the active reservation, extend it. If another owner, +// return the existing reservation. func (r *reservationManager) GetOrExtendReservation(ctx context.Context, request *datacatalog.GetOrExtendReservationRequest) (*datacatalog.GetOrExtendReservationResponse, error) { + if err := validators.ValidateGetOrExtendReservationRequest(request); err != nil { + r.systemMetrics.acquireReservationFailure.Inc(ctx) + return nil, err + } + reservationID := request.ReservationId // Use minimum of maxHeartbeatInterval and requested heartbeat interval @@ -111,6 +129,36 @@ func (r *reservationManager) GetOrExtendReservation(ctx context.Context, request }, nil } +// GetOrExtendReservations attempts to get or extend reservations for multiple artifacts in a single operation. +func (r *reservationManager) GetOrExtendReservations(ctx context.Context, request *datacatalog.GetOrExtendReservationsRequest) (*datacatalog.GetOrExtendReservationsResponse, error) { + if err := validators.ValidateGetOrExtendReservationsRequest(request); err != nil { + r.systemMetrics.acquireReservationsFailure.Inc(ctx) + return nil, err + } + + var reservations []*datacatalog.Reservation + for _, req := range request.GetReservations() { + // Use minimum of maxHeartbeatInterval and requested heartbeat interval + heartbeatInterval := r.maxHeartbeatInterval + requestHeartbeatInterval := req.GetHeartbeatInterval() + if requestHeartbeatInterval != nil && requestHeartbeatInterval.AsDuration() < heartbeatInterval { + heartbeatInterval = requestHeartbeatInterval.AsDuration() + } + + reservation, err := r.tryAcquireReservation(ctx, req.ReservationId, req.OwnerId, heartbeatInterval) + if err != nil { + r.systemMetrics.acquireReservationsFailure.Inc(ctx) + return nil, err + } + + reservations = append(reservations, &reservation) + } + + return &datacatalog.GetOrExtendReservationsResponse{ + Reservations: reservations, + }, nil +} + // tryAcquireReservation will fetch the reservation first and only create/update // the reservation if it does not exist or has expired. // This is an optimization to reduce the number of writes to db. We always need @@ -159,7 +207,7 @@ func (r *reservationManager) tryAcquireReservation(ctx context.Context, reservat } if repoErr != nil { - if repoErr.Error() == repo_errors.AlreadyExists { + if repoErr.Error() == repoerrors.AlreadyExists { // Looks like someone else tried to obtain the reservation // at the same time and they won. Let's find out who won. rsv1, err := repo.Get(ctx, reservationKey) @@ -189,24 +237,57 @@ func (r *reservationManager) tryAcquireReservation(ctx context.Context, reservat return reservation, nil } -// Release an active reservation with the specified owner. If one does not exist, gracefully return. +// ReleaseReservation releases an active reservation with the specified owner. If one does not exist, gracefully return. func (r *reservationManager) ReleaseReservation(ctx context.Context, request *datacatalog.ReleaseReservationRequest) (*datacatalog.ReleaseReservationResponse, error) { + if err := validators.ValidateReleaseReservationRequest(request); err != nil { + return nil, err + } + + if err := r.releaseReservation(ctx, request.ReservationId, request.OwnerId); err != nil { + r.systemMetrics.releaseReservationFailure.Inc(ctx) + return nil, err + } + + return &datacatalog.ReleaseReservationResponse{}, nil +} + +// ReleaseReservations releases reservations for multiple artifacts in a single operation. +// This is an idempotent operation, releasing reservations multiple times or trying to release an unknown reservation +// will not result in an error being returned. +func (r *reservationManager) ReleaseReservations(ctx context.Context, request *datacatalog.ReleaseReservationsRequest) (*datacatalog.ReleaseReservationResponse, error) { + if err := validators.ValidateReleaseReservationsRequest(request); err != nil { + return nil, err + } + + for _, req := range request.GetReservations() { + if err := r.releaseReservation(ctx, req.ReservationId, req.OwnerId); err != nil { + r.systemMetrics.releaseReservationsFailure.Inc(ctx) + return nil, err + } + } + + return &datacatalog.ReleaseReservationResponse{}, nil +} + +// releaseReservation performs the actual reservation release operation, deleting the respective object from +// datacatalog's database, thus freeing the associated artifact for other entities. If the specified reservation was not +// found, no error will be returned. +func (r *reservationManager) releaseReservation(ctx context.Context, reservationID *datacatalog.ReservationID, ownerID string) error { repo := r.repo.ReservationRepo() - reservationKey := transformers.FromReservationID(request.ReservationId) + reservationKey := transformers.FromReservationID(reservationID) - err := repo.Delete(ctx, reservationKey, request.OwnerId) + err := repo.Delete(ctx, reservationKey, ownerID) if err != nil { if errors.IsDoesNotExistError(err) { - logger.Warnf(ctx, "Reservation does not exist id: %+v, err %v", request.ReservationId, err) + logger.Warnf(ctx, "Reservation does not exist id: %+v, err %v", reservationID, err) r.systemMetrics.reservationDoesNotExist.Inc(ctx) - return &datacatalog.ReleaseReservationResponse{}, nil + return nil } logger.Errorf(ctx, "Failed to release reservation: %+v, err: %v", reservationKey, err) - r.systemMetrics.releaseReservationFailure.Inc(ctx) - return nil, err + return err } r.systemMetrics.reservationReleased.Inc(ctx) - return &datacatalog.ReleaseReservationResponse{}, nil + return nil } diff --git a/pkg/manager/impl/reservation_manager_test.go b/pkg/manager/impl/reservation_manager_test.go index 207ce813..bf9d1983 100644 --- a/pkg/manager/impl/reservation_manager_test.go +++ b/pkg/manager/impl/reservation_manager_test.go @@ -17,6 +17,7 @@ import ( "github.com/golang/protobuf/ptypes" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" "google.golang.org/grpc/codes" ) @@ -91,6 +92,78 @@ func TestGetOrExtendReservation_CreateReservation(t *testing.T) { assert.Equal(t, heartbeatIntervalPb, resp.GetReservation().HeartbeatInterval) } +func TestGetOrExtendReservations_CreateReservations(t *testing.T) { + dcRepo := getDatacatalogRepo() + + setUpTagRepoGetNotFound(&dcRepo) + + tagNames := map[string]bool{tagName: true, "tag2": true, "nonexistence": true} + dcRepo.MockReservationRepo.On("Get", + mock.MatchedBy(func(ctx context.Context) bool { return true }), + mock.MatchedBy(func(key models.ReservationKey) bool { + return key.DatasetProject == datasetID.Project && + key.DatasetDomain == datasetID.Domain && + key.DatasetVersion == datasetID.Version && + key.DatasetName == datasetID.Name && + tagNames[key.TagName] + })).Return(models.Reservation{}, errors2.NewDataCatalogErrorf(codes.NotFound, "entry not found")) + + now := time.Now() + dcRepo.MockReservationRepo.On("Create", + mock.MatchedBy(func(ctx context.Context) bool { return true }), + mock.MatchedBy(func(reservation models.Reservation) bool { + return reservation.DatasetProject == datasetID.Project && + reservation.DatasetDomain == datasetID.Domain && + reservation.DatasetName == datasetID.Name && + reservation.DatasetVersion == datasetID.Version && + tagNames[reservation.TagName] && + reservation.OwnerID == currentOwner && + reservation.ExpiresAt == now.Add(heartbeatInterval*heartbeatGracePeriodMultiplier) + }), + mock.MatchedBy(func(now time.Time) bool { return true }), + ).Return(nil) + + reservationManager := NewReservationManager(&dcRepo, + heartbeatGracePeriodMultiplier, maxHeartbeatInterval, + func() time.Time { return now }, mockScope.NewTestScope()) + + req := &datacatalog.GetOrExtendReservationsRequest{ + Reservations: []*datacatalog.GetOrExtendReservationRequest{ + { + ReservationId: &reservationID, + OwnerId: currentOwner, + HeartbeatInterval: heartbeatIntervalPb, + }, + { + ReservationId: &datacatalog.ReservationID{ + DatasetId: &datasetID, + TagName: "tag2", + }, + OwnerId: currentOwner, + HeartbeatInterval: heartbeatIntervalPb, + }, + { + ReservationId: &datacatalog.ReservationID{ + DatasetId: &datasetID, + TagName: "nonexistence", + }, + OwnerId: currentOwner, + HeartbeatInterval: heartbeatIntervalPb, + }, + }, + } + + resp, err := reservationManager.GetOrExtendReservations(context.Background(), req) + + assert.Nil(t, err) + require.NotNil(t, resp.GetReservations()) + require.Len(t, resp.GetReservations(), 3) + for _, reservation := range resp.GetReservations() { + assert.Equal(t, currentOwner, reservation.OwnerId) + assert.Equal(t, heartbeatIntervalPb, reservation.HeartbeatInterval) + } +} + func TestGetOrExtendReservation_MaxHeartbeatInterval(t *testing.T) { dcRepo := getDatacatalogRepo() @@ -139,6 +212,79 @@ func TestGetOrExtendReservation_MaxHeartbeatInterval(t *testing.T) { assert.Equal(t, heartbeatIntervalPb, resp.GetReservation().HeartbeatInterval) } +func TestGetOrExtendReservations_MaxHeartbeatInterval(t *testing.T) { + dcRepo := getDatacatalogRepo() + + setUpTagRepoGetNotFound(&dcRepo) + + tagNames := map[string]bool{tagName: true, "tag2": true, "nonexistence": true} + dcRepo.MockReservationRepo.On("Get", + mock.MatchedBy(func(ctx context.Context) bool { return true }), + mock.MatchedBy(func(key models.ReservationKey) bool { + return key.DatasetProject == datasetID.Project && + key.DatasetDomain == datasetID.Domain && + key.DatasetVersion == datasetID.Version && + key.DatasetName == datasetID.Name && + tagNames[key.TagName] + })).Return(models.Reservation{}, errors2.NewDataCatalogErrorf(codes.NotFound, "entry not found")) + + now := time.Now() + + dcRepo.MockReservationRepo.On("Create", + mock.MatchedBy(func(ctx context.Context) bool { return true }), + mock.MatchedBy(func(reservation models.Reservation) bool { + return reservation.DatasetProject == datasetID.Project && + reservation.DatasetDomain == datasetID.Domain && + reservation.DatasetName == datasetID.Name && + reservation.DatasetVersion == datasetID.Version && + tagNames[reservation.TagName] && + reservation.OwnerID == currentOwner && + reservation.ExpiresAt == now.Add(heartbeatInterval*heartbeatGracePeriodMultiplier) + }), + mock.MatchedBy(func(now time.Time) bool { return true }), + ).Return(nil) + + reservationManager := NewReservationManager(&dcRepo, + heartbeatGracePeriodMultiplier, heartbeatInterval, + func() time.Time { return now }, mockScope.NewTestScope()) + + req := &datacatalog.GetOrExtendReservationsRequest{ + Reservations: []*datacatalog.GetOrExtendReservationRequest{ + { + ReservationId: &reservationID, + OwnerId: currentOwner, + HeartbeatInterval: maxHeartbeatIntervalPb, + }, + { + ReservationId: &datacatalog.ReservationID{ + DatasetId: &datasetID, + TagName: "tag2", + }, + OwnerId: currentOwner, + HeartbeatInterval: maxHeartbeatIntervalPb, + }, + { + ReservationId: &datacatalog.ReservationID{ + DatasetId: &datasetID, + TagName: "nonexistence", + }, + OwnerId: currentOwner, + HeartbeatInterval: maxHeartbeatIntervalPb, + }, + }, + } + + resp, err := reservationManager.GetOrExtendReservations(context.Background(), req) + + assert.Nil(t, err) + require.NotNil(t, resp.GetReservations()) + require.Len(t, resp.GetReservations(), 3) + for _, reservation := range resp.GetReservations() { + assert.Equal(t, currentOwner, reservation.OwnerId) + assert.Equal(t, heartbeatIntervalPb, reservation.HeartbeatInterval) + } +} + func TestGetOrExtendReservation_ExtendReservation(t *testing.T) { dcRepo := getDatacatalogRepo() @@ -179,6 +325,71 @@ func TestGetOrExtendReservation_ExtendReservation(t *testing.T) { assert.Equal(t, prevOwner, resp.GetReservation().OwnerId) } +func TestGetOrExtendReservations_ExtendReservations(t *testing.T) { + dcRepo := getDatacatalogRepo() + + setUpTagRepoGetNotFound(&dcRepo) + + now := time.Now() + prevExpiresAt := now.Add(time.Second * 10) + + tagNames := map[string]bool{tagName: true, "tag2": true, "nonexistence": true} + setUpReservationRepoGet(&dcRepo, prevExpiresAt, tagName, "tag2", "nonexistence") + + dcRepo.MockReservationRepo.On("Update", + mock.MatchedBy(func(ctx context.Context) bool { return true }), + mock.MatchedBy(func(reservation models.Reservation) bool { + return reservation.DatasetProject == datasetID.Project && + reservation.DatasetDomain == datasetID.Domain && + reservation.DatasetName == datasetID.Name && + reservation.DatasetVersion == datasetID.Version && + tagNames[reservation.TagName] && + reservation.OwnerID == prevOwner && + reservation.ExpiresAt == now.Add(heartbeatInterval*heartbeatGracePeriodMultiplier) + }), + mock.MatchedBy(func(now time.Time) bool { return true }), + ).Return(nil) + + reservationManager := NewReservationManager(&dcRepo, + heartbeatGracePeriodMultiplier, maxHeartbeatInterval, + func() time.Time { return now }, mockScope.NewTestScope()) + + req := &datacatalog.GetOrExtendReservationsRequest{ + Reservations: []*datacatalog.GetOrExtendReservationRequest{ + { + ReservationId: &reservationID, + OwnerId: prevOwner, + HeartbeatInterval: heartbeatIntervalPb, + }, + { + ReservationId: &datacatalog.ReservationID{ + DatasetId: &datasetID, + TagName: "tag2", + }, + OwnerId: prevOwner, + HeartbeatInterval: heartbeatIntervalPb, + }, + { + ReservationId: &datacatalog.ReservationID{ + DatasetId: &datasetID, + TagName: "nonexistence", + }, + OwnerId: prevOwner, + HeartbeatInterval: heartbeatIntervalPb, + }, + }, + } + + resp, err := reservationManager.GetOrExtendReservations(context.Background(), req) + + assert.Nil(t, err) + require.NotNil(t, resp.GetReservations()) + require.Len(t, resp.GetReservations(), 3) + for _, reservation := range resp.GetReservations() { + assert.Equal(t, prevOwner, reservation.OwnerId) + } +} + func TestGetOrExtendReservation_TakeOverReservation(t *testing.T) { dcRepo := getDatacatalogRepo() @@ -219,6 +430,71 @@ func TestGetOrExtendReservation_TakeOverReservation(t *testing.T) { assert.Equal(t, currentOwner, resp.GetReservation().OwnerId) } +func TestGetOrExtendReservations_TakeOverReservations(t *testing.T) { + dcRepo := getDatacatalogRepo() + + setUpTagRepoGetNotFound(&dcRepo) + + now := time.Now() + prevExpiresAt := now.Add(time.Second * 10 * time.Duration(-1)) + + tagNames := map[string]bool{tagName: true, "tag2": true, "nonexistence": true} + setUpReservationRepoGet(&dcRepo, prevExpiresAt, tagName, "tag2", "nonexistence") + + dcRepo.MockReservationRepo.On("Update", + mock.MatchedBy(func(ctx context.Context) bool { return true }), + mock.MatchedBy(func(reservation models.Reservation) bool { + return reservation.DatasetProject == datasetID.Project && + reservation.DatasetDomain == datasetID.Domain && + reservation.DatasetName == datasetID.Name && + reservation.DatasetVersion == datasetID.Version && + tagNames[reservation.TagName] && + reservation.OwnerID == currentOwner && + reservation.ExpiresAt == now.Add(heartbeatInterval*heartbeatGracePeriodMultiplier) + }), + mock.MatchedBy(func(now time.Time) bool { return true }), + ).Return(nil) + + reservationManager := NewReservationManager(&dcRepo, + heartbeatGracePeriodMultiplier, maxHeartbeatInterval, + func() time.Time { return now }, mockScope.NewTestScope()) + + req := &datacatalog.GetOrExtendReservationsRequest{ + Reservations: []*datacatalog.GetOrExtendReservationRequest{ + { + ReservationId: &reservationID, + OwnerId: currentOwner, + HeartbeatInterval: heartbeatIntervalPb, + }, + { + ReservationId: &datacatalog.ReservationID{ + DatasetId: &datasetID, + TagName: "tag2", + }, + OwnerId: currentOwner, + HeartbeatInterval: heartbeatIntervalPb, + }, + { + ReservationId: &datacatalog.ReservationID{ + DatasetId: &datasetID, + TagName: "nonexistence", + }, + OwnerId: currentOwner, + HeartbeatInterval: heartbeatIntervalPb, + }, + }, + } + + resp, err := reservationManager.GetOrExtendReservations(context.Background(), req) + + assert.Nil(t, err) + require.NotNil(t, resp.GetReservations()) + require.Len(t, resp.GetReservations(), 3) + for _, reservation := range resp.GetReservations() { + assert.Equal(t, currentOwner, reservation.OwnerId) + } +} + func TestGetOrExtendReservation_ReservationExists(t *testing.T) { dcRepo := getDatacatalogRepo() @@ -245,6 +521,209 @@ func TestGetOrExtendReservation_ReservationExists(t *testing.T) { assert.Equal(t, prevOwner, resp.GetReservation().OwnerId) } +func TestGetOrExtendReservations_ReservationsExist(t *testing.T) { + dcRepo := getDatacatalogRepo() + + setUpTagRepoGetNotFound(&dcRepo) + + now := time.Now() + prevExpiresAt := now.Add(time.Second * 10) + + setUpReservationRepoGet(&dcRepo, prevExpiresAt, tagName, "tag2", "nonexistence") + + reservationManager := NewReservationManager(&dcRepo, + heartbeatGracePeriodMultiplier, maxHeartbeatInterval, + func() time.Time { return now }, mockScope.NewTestScope()) + + req := &datacatalog.GetOrExtendReservationsRequest{ + Reservations: []*datacatalog.GetOrExtendReservationRequest{ + { + ReservationId: &reservationID, + OwnerId: currentOwner, + HeartbeatInterval: heartbeatIntervalPb, + }, + { + ReservationId: &datacatalog.ReservationID{ + DatasetId: &datasetID, + TagName: "tag2", + }, + OwnerId: currentOwner, + HeartbeatInterval: heartbeatIntervalPb, + }, + { + ReservationId: &datacatalog.ReservationID{ + DatasetId: &datasetID, + TagName: "nonexistence", + }, + OwnerId: currentOwner, + HeartbeatInterval: heartbeatIntervalPb, + }, + }, + } + + resp, err := reservationManager.GetOrExtendReservations(context.Background(), req) + + assert.Nil(t, err) + require.NotNil(t, resp.GetReservations()) + require.Len(t, resp.GetReservations(), 3) + for _, reservation := range resp.GetReservations() { + assert.Equal(t, prevOwner, reservation.OwnerId) + } +} + +func TestGetOrExtendReservations_MultipleStates(t *testing.T) { + dcRepo := getDatacatalogRepo() + + setUpTagRepoGetNotFound(&dcRepo) + + now := time.Now() + prevExpiresAt := now.Add(time.Second * 10) + + dcRepo.MockReservationRepo.On("Get", + mock.MatchedBy(func(ctx context.Context) bool { return true }), + mock.MatchedBy(func(key models.ReservationKey) bool { + return key.DatasetProject == datasetID.Project && + key.DatasetDomain == datasetID.Domain && + key.DatasetVersion == datasetID.Version && + key.DatasetName == datasetID.Name && + key.TagName == tagName + })).Return(models.Reservation{}, errors2.NewDataCatalogErrorf(codes.NotFound, "entry not found")) + + dcRepo.MockReservationRepo.On("Get", + mock.MatchedBy(func(ctx context.Context) bool { return true }), + mock.MatchedBy(func(key models.ReservationKey) bool { + return key.DatasetProject == datasetID.Project && + key.DatasetDomain == datasetID.Domain && + key.DatasetVersion == datasetID.Version && + key.DatasetName == datasetID.Name && + key.TagName == "prevOwner" + })).Return( + models.Reservation{ + ReservationKey: models.ReservationKey{ + DatasetProject: project, + DatasetName: name, + DatasetDomain: domain, + DatasetVersion: version, + TagName: "prevOwner", + }, + OwnerID: prevOwner, + ExpiresAt: prevExpiresAt, + }, nil, + ) + + dcRepo.MockReservationRepo.On("Get", + mock.MatchedBy(func(ctx context.Context) bool { return true }), + mock.MatchedBy(func(key models.ReservationKey) bool { + return key.DatasetProject == datasetID.Project && + key.DatasetDomain == datasetID.Domain && + key.DatasetVersion == datasetID.Version && + key.DatasetName == datasetID.Name && + key.TagName == "currentOwner" + })).Return( + models.Reservation{ + ReservationKey: getReservationKey(), + OwnerID: currentOwner, + ExpiresAt: prevExpiresAt, + }, nil, + ) + + dcRepo.MockReservationRepo.On("Get", + mock.MatchedBy(func(ctx context.Context) bool { return true }), + mock.MatchedBy(func(key models.ReservationKey) bool { + return key.DatasetProject == datasetID.Project && + key.DatasetDomain == datasetID.Domain && + key.DatasetVersion == datasetID.Version && + key.DatasetName == datasetID.Name && + key.TagName == "currentOwnerExpired" + })).Return( + models.Reservation{ + ReservationKey: getReservationKey(), + OwnerID: currentOwner, + ExpiresAt: now.Add(-10 * time.Second), + }, nil, + ) + + dcRepo.MockReservationRepo.On("Create", + mock.MatchedBy(func(ctx context.Context) bool { return true }), + mock.MatchedBy(func(reservation models.Reservation) bool { + return reservation.DatasetProject == datasetID.Project && + reservation.DatasetDomain == datasetID.Domain && + reservation.DatasetName == datasetID.Name && + reservation.DatasetVersion == datasetID.Version && + reservation.TagName == tagName && + reservation.OwnerID == currentOwner && + reservation.ExpiresAt == now.Add(heartbeatInterval*heartbeatGracePeriodMultiplier) + }), + mock.MatchedBy(func(now time.Time) bool { return true }), + ).Return(nil) + + updateTagNames := map[string]bool{"currentOwner": true, "currentOwnerExpired": true} + dcRepo.MockReservationRepo.On("Update", + mock.MatchedBy(func(ctx context.Context) bool { return true }), + mock.MatchedBy(func(reservation models.Reservation) bool { + return reservation.DatasetProject == datasetID.Project && + reservation.DatasetDomain == datasetID.Domain && + reservation.DatasetName == datasetID.Name && + reservation.DatasetVersion == datasetID.Version && + updateTagNames[reservation.TagName] && + reservation.OwnerID == currentOwner + }), + mock.MatchedBy(func(now time.Time) bool { return true }), + ).Return(nil) + + reservationManager := NewReservationManager(&dcRepo, + heartbeatGracePeriodMultiplier, maxHeartbeatInterval, + func() time.Time { return now }, mockScope.NewTestScope()) + + req := &datacatalog.GetOrExtendReservationsRequest{ + Reservations: []*datacatalog.GetOrExtendReservationRequest{ + { + ReservationId: &reservationID, + OwnerId: currentOwner, + HeartbeatInterval: heartbeatIntervalPb, + }, + { + ReservationId: &datacatalog.ReservationID{ + DatasetId: &datasetID, + TagName: "prevOwner", + }, + OwnerId: currentOwner, + HeartbeatInterval: heartbeatIntervalPb, + }, + { + ReservationId: &datacatalog.ReservationID{ + DatasetId: &datasetID, + TagName: "currentOwner", + }, + OwnerId: currentOwner, + HeartbeatInterval: heartbeatIntervalPb, + }, + { + ReservationId: &datacatalog.ReservationID{ + DatasetId: &datasetID, + TagName: "currentOwnerExpired", + }, + OwnerId: currentOwner, + HeartbeatInterval: heartbeatIntervalPb, + }, + }, + } + + resp, err := reservationManager.GetOrExtendReservations(context.Background(), req) + + assert.Nil(t, err) + require.NotNil(t, resp.GetReservations()) + require.Len(t, resp.GetReservations(), 4) + for _, reservation := range resp.GetReservations() { + if reservation.ReservationId.TagName == "prevOwner" { + assert.Equal(t, prevOwner, reservation.OwnerId) + } else { + assert.Equal(t, currentOwner, reservation.OwnerId) + } + assert.Equal(t, heartbeatIntervalPb, reservation.HeartbeatInterval) + } +} + func TestReleaseReservation(t *testing.T) { dcRepo := getDatacatalogRepo() @@ -278,6 +757,58 @@ func TestReleaseReservation(t *testing.T) { assert.Nil(t, err) } +func TestReleaseReservations(t *testing.T) { + dcRepo := getDatacatalogRepo() + + now := time.Now() + + tagNames := map[string]bool{tagName: true, "tag2": true, "nonexistence": true} + dcRepo.MockReservationRepo.On("Delete", + mock.MatchedBy(func(ctx context.Context) bool { return true }), + mock.MatchedBy(func(reservationKey models.ReservationKey) bool { + return reservationKey.DatasetProject == datasetID.Project && + reservationKey.DatasetDomain == datasetID.Domain && + reservationKey.DatasetName == datasetID.Name && + reservationKey.DatasetVersion == datasetID.Version && + tagNames[reservationKey.TagName] + }), + mock.MatchedBy(func(ownerID string) bool { + return ownerID == currentOwner + }), + ).Return(nil) + + reservationManager := NewReservationManager(&dcRepo, + heartbeatGracePeriodMultiplier, maxHeartbeatInterval, + func() time.Time { return now }, mockScope.NewTestScope()) + + req := &datacatalog.ReleaseReservationsRequest{ + Reservations: []*datacatalog.ReleaseReservationRequest{ + { + ReservationId: &reservationID, + OwnerId: currentOwner, + }, + { + ReservationId: &datacatalog.ReservationID{ + DatasetId: &datasetID, + TagName: "tag2", + }, + OwnerId: currentOwner, + }, + { + ReservationId: &datacatalog.ReservationID{ + DatasetId: &datasetID, + TagName: "nonexistence", + }, + OwnerId: currentOwner, + }, + }, + } + + _, err := reservationManager.ReleaseReservations(context.Background(), req) + + assert.Nil(t, err) +} + func TestReleaseReservation_Failure(t *testing.T) { dcRepo := getDatacatalogRepo() @@ -312,6 +843,59 @@ func TestReleaseReservation_Failure(t *testing.T) { assert.Equal(t, reservationErr, err) } +func TestReleaseReservations_Failure(t *testing.T) { + dcRepo := getDatacatalogRepo() + + now := time.Now() + reservationErr := fmt.Errorf("unknown error") + + tagNames := map[string]bool{tagName: true, "tag2": true, "nonexistence": true} + dcRepo.MockReservationRepo.On("Delete", + mock.MatchedBy(func(ctx context.Context) bool { return true }), + mock.MatchedBy(func(reservationKey models.ReservationKey) bool { + return reservationKey.DatasetProject == datasetID.Project && + reservationKey.DatasetDomain == datasetID.Domain && + reservationKey.DatasetName == datasetID.Name && + reservationKey.DatasetVersion == datasetID.Version && + tagNames[reservationKey.TagName] + }), + mock.MatchedBy(func(ownerID string) bool { + return ownerID == currentOwner + }), + ).Return(reservationErr) + + reservationManager := NewReservationManager(&dcRepo, + heartbeatGracePeriodMultiplier, maxHeartbeatInterval, + func() time.Time { return now }, mockScope.NewTestScope()) + + req := &datacatalog.ReleaseReservationsRequest{ + Reservations: []*datacatalog.ReleaseReservationRequest{ + { + ReservationId: &reservationID, + OwnerId: currentOwner, + }, + { + ReservationId: &datacatalog.ReservationID{ + DatasetId: &datasetID, + TagName: "tag2", + }, + OwnerId: currentOwner, + }, + { + ReservationId: &datacatalog.ReservationID{ + DatasetId: &datasetID, + TagName: "nonexistence", + }, + OwnerId: currentOwner, + }, + }, + } + + _, err := reservationManager.ReleaseReservations(context.Background(), req) + + assert.Equal(t, reservationErr, err) +} + func TestReleaseReservation_GracefulFailure(t *testing.T) { dcRepo := getDatacatalogRepo() @@ -350,6 +934,133 @@ func TestReleaseReservation_GracefulFailure(t *testing.T) { assert.Nil(t, err) } +func TestReleaseReservations_GracefulFailure(t *testing.T) { + dcRepo := getDatacatalogRepo() + + now := time.Now() + reservationErr := errors3.GetMissingEntityError("Reservation", + &datacatalog.ReservationID{ + DatasetId: &datasetID, + TagName: tagName, + }) + + tagNames := map[string]bool{tagName: true, "tag2": true, "nonexistence": true} + dcRepo.MockReservationRepo.On("Delete", + mock.MatchedBy(func(ctx context.Context) bool { return true }), + mock.MatchedBy(func(reservationKey models.ReservationKey) bool { + return reservationKey.DatasetProject == datasetID.Project && + reservationKey.DatasetDomain == datasetID.Domain && + reservationKey.DatasetName == datasetID.Name && + reservationKey.DatasetVersion == datasetID.Version && + tagNames[reservationKey.TagName] + }), + mock.MatchedBy(func(ownerID string) bool { + return ownerID == currentOwner + }), + ).Return(reservationErr) + + reservationManager := NewReservationManager(&dcRepo, + heartbeatGracePeriodMultiplier, maxHeartbeatInterval, + func() time.Time { return now }, mockScope.NewTestScope()) + + req := &datacatalog.ReleaseReservationsRequest{ + Reservations: []*datacatalog.ReleaseReservationRequest{ + { + ReservationId: &reservationID, + OwnerId: currentOwner, + }, + { + ReservationId: &datacatalog.ReservationID{ + DatasetId: &datasetID, + TagName: "tag2", + }, + OwnerId: currentOwner, + }, + { + ReservationId: &datacatalog.ReservationID{ + DatasetId: &datasetID, + TagName: "nonexistence", + }, + OwnerId: currentOwner, + }, + }, + } + + _, err := reservationManager.ReleaseReservations(context.Background(), req) + + assert.Nil(t, err) +} + +func TestReleaseReservation_MultipleStates(t *testing.T) { + dcRepo := getDatacatalogRepo() + + now := time.Now() + notFoundErr := errors3.GetMissingEntityError("Reservation", + &datacatalog.ReservationID{ + DatasetId: &datasetID, + TagName: tagName, + }) + + dcRepo.MockReservationRepo.On("Delete", + mock.MatchedBy(func(ctx context.Context) bool { return true }), + mock.MatchedBy(func(reservationKey models.ReservationKey) bool { + return reservationKey.DatasetProject == datasetID.Project && + reservationKey.DatasetDomain == datasetID.Domain && + reservationKey.DatasetName == datasetID.Name && + reservationKey.DatasetVersion == datasetID.Version && + (reservationKey.TagName == tagName || reservationKey.TagName == prevOwner) + }), + mock.MatchedBy(func(ownerID string) bool { + return ownerID == currentOwner + }), + ).Return(notFoundErr) + + dcRepo.MockReservationRepo.On("Delete", + mock.MatchedBy(func(ctx context.Context) bool { return true }), + mock.MatchedBy(func(reservationKey models.ReservationKey) bool { + return reservationKey.DatasetProject == datasetID.Project && + reservationKey.DatasetDomain == datasetID.Domain && + reservationKey.DatasetName == datasetID.Name && + reservationKey.DatasetVersion == datasetID.Version && + reservationKey.TagName == "currentOwner" + }), + mock.MatchedBy(func(ownerID string) bool { + return ownerID == currentOwner + }), + ).Return(nil) + + reservationManager := NewReservationManager(&dcRepo, + heartbeatGracePeriodMultiplier, maxHeartbeatInterval, + func() time.Time { return now }, mockScope.NewTestScope()) + + req := &datacatalog.ReleaseReservationsRequest{ + Reservations: []*datacatalog.ReleaseReservationRequest{ + { + ReservationId: &reservationID, + OwnerId: currentOwner, + }, + { + ReservationId: &datacatalog.ReservationID{ + DatasetId: &datasetID, + TagName: "prevOwner", + }, + OwnerId: currentOwner, + }, + { + ReservationId: &datacatalog.ReservationID{ + DatasetId: &datasetID, + TagName: "currentOwner", + }, + OwnerId: currentOwner, + }, + }, + } + + _, err := reservationManager.ReleaseReservations(context.Background(), req) + + assert.Nil(t, err) +} + func getDatacatalogRepo() mocks.DataCatalogRepo { return mocks.DataCatalogRepo{ MockReservationRepo: &mocks.ReservationRepo{}, @@ -357,7 +1068,15 @@ func getDatacatalogRepo() mocks.DataCatalogRepo { } } -func setUpReservationRepoGet(dcRepo *mocks.DataCatalogRepo, prevExpiresAt time.Time) { +func setUpReservationRepoGet(dcRepo *mocks.DataCatalogRepo, prevExpiresAt time.Time, tagNames ...string) { + if len(tagNames) == 0 { + tagNames = []string{tagName} + } + tags := make(map[string]bool) + for _, tn := range tagNames { + tags[tn] = true + } + dcRepo.MockReservationRepo.On("Get", mock.MatchedBy(func(ctx context.Context) bool { return true }), mock.MatchedBy(func(key models.ReservationKey) bool { @@ -365,7 +1084,7 @@ func setUpReservationRepoGet(dcRepo *mocks.DataCatalogRepo, prevExpiresAt time.T key.DatasetDomain == datasetID.Domain && key.DatasetVersion == datasetID.Version && key.DatasetName == datasetID.Name && - key.TagName == tagName + tags[key.TagName] })).Return( models.Reservation{ ReservationKey: getReservationKey(), diff --git a/pkg/manager/impl/validators/artifact_validator.go b/pkg/manager/impl/validators/artifact_validator.go index d38609a1..49a7d686 100644 --- a/pkg/manager/impl/validators/artifact_validator.go +++ b/pkg/manager/impl/validators/artifact_validator.go @@ -3,7 +3,10 @@ package validators import ( "fmt" + "google.golang.org/grpc/codes" + "github.com/flyteorg/datacatalog/pkg/common" + "github.com/flyteorg/datacatalog/pkg/errors" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/datacatalog" ) @@ -170,3 +173,22 @@ func ValidateDeleteArtifactRequest(request *datacatalog.DeleteArtifactRequest) e return nil } + +func ValidateDeleteArtifactsRequest(request *datacatalog.DeleteArtifactsRequest) error { + if request.GetArtifacts() == nil { + return NewMissingArgumentError("artifacts") + } + + var errs []error + for _, deleteArtifactReq := range request.GetArtifacts() { + if err := ValidateDeleteArtifactRequest(deleteArtifactReq); err != nil { + errs = append(errs, err) + } + } + + if len(errs) > 0 { + return errors.NewCollectedErrors(codes.InvalidArgument, errs) + } + + return nil +} diff --git a/pkg/manager/impl/validators/reservation_validator.go b/pkg/manager/impl/validators/reservation_validator.go new file mode 100644 index 00000000..e12e6f90 --- /dev/null +++ b/pkg/manager/impl/validators/reservation_validator.go @@ -0,0 +1,86 @@ +package validators + +import ( + "google.golang.org/grpc/codes" + + "github.com/flyteorg/datacatalog/pkg/errors" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/datacatalog" +) + +func ValidateGetOrExtendReservationRequest(request *datacatalog.GetOrExtendReservationRequest) error { + if request.GetReservationId() == nil { + return NewMissingArgumentError("reservationID") + } + if err := ValidateDatasetID(request.GetReservationId().GetDatasetId()); err != nil { + return err + } + if err := ValidateEmptyStringField(request.GetReservationId().GetTagName(), tagName); err != nil { + return err + } + + if err := ValidateEmptyStringField(request.GetOwnerId(), "ownerID"); err != nil { + return err + } + + if request.GetHeartbeatInterval() == nil { + return NewMissingArgumentError("heartbeatInterval") + } + + return nil +} + +func ValidateGetOrExtendReservationsRequest(request *datacatalog.GetOrExtendReservationsRequest) error { + if request.GetReservations() == nil { + return NewMissingArgumentError("reservations") + } + + var errs []error + for _, req := range request.GetReservations() { + if err := ValidateGetOrExtendReservationRequest(req); err != nil { + errs = append(errs, err) + } + } + + if len(errs) > 0 { + return errors.NewCollectedErrors(codes.InvalidArgument, errs) + } + + return nil +} + +func ValidateReleaseReservationRequest(request *datacatalog.ReleaseReservationRequest) error { + if request.GetReservationId() == nil { + return NewMissingArgumentError("reservationID") + } + if err := ValidateDatasetID(request.GetReservationId().GetDatasetId()); err != nil { + return err + } + if err := ValidateEmptyStringField(request.GetReservationId().GetTagName(), tagName); err != nil { + return err + } + + if err := ValidateEmptyStringField(request.GetOwnerId(), "ownerID"); err != nil { + return err + } + + return nil +} + +func ValidateReleaseReservationsRequest(request *datacatalog.ReleaseReservationsRequest) error { + if request.GetReservations() == nil { + return NewMissingArgumentError("reservations") + } + + var errs []error + for _, req := range request.GetReservations() { + if err := ValidateReleaseReservationRequest(req); err != nil { + errs = append(errs, err) + } + } + + if len(errs) > 0 { + return errors.NewCollectedErrors(codes.InvalidArgument, errs) + } + + return nil +} diff --git a/pkg/manager/interfaces/artifact.go b/pkg/manager/interfaces/artifact.go index a51ddd0b..73c32202 100644 --- a/pkg/manager/interfaces/artifact.go +++ b/pkg/manager/interfaces/artifact.go @@ -14,4 +14,5 @@ type ArtifactManager interface { ListArtifacts(ctx context.Context, request *idl_datacatalog.ListArtifactsRequest) (*idl_datacatalog.ListArtifactsResponse, error) UpdateArtifact(ctx context.Context, request *idl_datacatalog.UpdateArtifactRequest) (*idl_datacatalog.UpdateArtifactResponse, error) DeleteArtifact(ctx context.Context, request *idl_datacatalog.DeleteArtifactRequest) (*idl_datacatalog.DeleteArtifactResponse, error) + DeleteArtifacts(ctx context.Context, request *idl_datacatalog.DeleteArtifactsRequest) (*idl_datacatalog.DeleteArtifactResponse, error) } diff --git a/pkg/manager/interfaces/reservation.go b/pkg/manager/interfaces/reservation.go index 42d95b8c..a0c99e49 100644 --- a/pkg/manager/interfaces/reservation.go +++ b/pkg/manager/interfaces/reservation.go @@ -11,5 +11,7 @@ import ( // in flyteidl type ReservationManager interface { GetOrExtendReservation(context.Context, *datacatalog.GetOrExtendReservationRequest) (*datacatalog.GetOrExtendReservationResponse, error) + GetOrExtendReservations(context.Context, *datacatalog.GetOrExtendReservationsRequest) (*datacatalog.GetOrExtendReservationsResponse, error) ReleaseReservation(context.Context, *datacatalog.ReleaseReservationRequest) (*datacatalog.ReleaseReservationResponse, error) + ReleaseReservations(context.Context, *datacatalog.ReleaseReservationsRequest) (*datacatalog.ReleaseReservationResponse, error) } diff --git a/pkg/manager/mocks/artifact_manager.go b/pkg/manager/mocks/artifact_manager.go index fb4d5c4b..f61f809e 100644 --- a/pkg/manager/mocks/artifact_manager.go +++ b/pkg/manager/mocks/artifact_manager.go @@ -97,6 +97,47 @@ func (_m *ArtifactManager) DeleteArtifact(ctx context.Context, request *datacata return r0, r1 } +type ArtifactManager_DeleteArtifacts struct { + *mock.Call +} + +func (_m ArtifactManager_DeleteArtifacts) Return(_a0 *datacatalog.DeleteArtifactResponse, _a1 error) *ArtifactManager_DeleteArtifacts { + return &ArtifactManager_DeleteArtifacts{Call: _m.Call.Return(_a0, _a1)} +} + +func (_m *ArtifactManager) OnDeleteArtifacts(ctx context.Context, request *datacatalog.DeleteArtifactsRequest) *ArtifactManager_DeleteArtifacts { + c_call := _m.On("DeleteArtifacts", ctx, request) + return &ArtifactManager_DeleteArtifacts{Call: c_call} +} + +func (_m *ArtifactManager) OnDeleteArtifactsMatch(matchers ...interface{}) *ArtifactManager_DeleteArtifacts { + c_call := _m.On("DeleteArtifacts", matchers...) + return &ArtifactManager_DeleteArtifacts{Call: c_call} +} + +// DeleteArtifacts provides a mock function with given fields: ctx, request +func (_m *ArtifactManager) DeleteArtifacts(ctx context.Context, request *datacatalog.DeleteArtifactsRequest) (*datacatalog.DeleteArtifactResponse, error) { + ret := _m.Called(ctx, request) + + var r0 *datacatalog.DeleteArtifactResponse + if rf, ok := ret.Get(0).(func(context.Context, *datacatalog.DeleteArtifactsRequest) *datacatalog.DeleteArtifactResponse); ok { + r0 = rf(ctx, request) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*datacatalog.DeleteArtifactResponse) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, *datacatalog.DeleteArtifactsRequest) error); ok { + r1 = rf(ctx, request) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + type ArtifactManager_GetArtifact struct { *mock.Call } diff --git a/pkg/manager/mocks/reservation_manager.go b/pkg/manager/mocks/reservation_manager.go index edde09ad..10385dfa 100644 --- a/pkg/manager/mocks/reservation_manager.go +++ b/pkg/manager/mocks/reservation_manager.go @@ -56,6 +56,47 @@ func (_m *ReservationManager) GetOrExtendReservation(_a0 context.Context, _a1 *d return r0, r1 } +type ReservationManager_GetOrExtendReservations struct { + *mock.Call +} + +func (_m ReservationManager_GetOrExtendReservations) Return(_a0 *datacatalog.GetOrExtendReservationsResponse, _a1 error) *ReservationManager_GetOrExtendReservations { + return &ReservationManager_GetOrExtendReservations{Call: _m.Call.Return(_a0, _a1)} +} + +func (_m *ReservationManager) OnGetOrExtendReservations(_a0 context.Context, _a1 *datacatalog.GetOrExtendReservationsRequest) *ReservationManager_GetOrExtendReservations { + c_call := _m.On("GetOrExtendReservations", _a0, _a1) + return &ReservationManager_GetOrExtendReservations{Call: c_call} +} + +func (_m *ReservationManager) OnGetOrExtendReservationsMatch(matchers ...interface{}) *ReservationManager_GetOrExtendReservations { + c_call := _m.On("GetOrExtendReservations", matchers...) + return &ReservationManager_GetOrExtendReservations{Call: c_call} +} + +// GetOrExtendReservations provides a mock function with given fields: _a0, _a1 +func (_m *ReservationManager) GetOrExtendReservations(_a0 context.Context, _a1 *datacatalog.GetOrExtendReservationsRequest) (*datacatalog.GetOrExtendReservationsResponse, error) { + ret := _m.Called(_a0, _a1) + + var r0 *datacatalog.GetOrExtendReservationsResponse + if rf, ok := ret.Get(0).(func(context.Context, *datacatalog.GetOrExtendReservationsRequest) *datacatalog.GetOrExtendReservationsResponse); ok { + r0 = rf(_a0, _a1) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*datacatalog.GetOrExtendReservationsResponse) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, *datacatalog.GetOrExtendReservationsRequest) error); ok { + r1 = rf(_a0, _a1) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + type ReservationManager_ReleaseReservation struct { *mock.Call } @@ -96,3 +137,44 @@ func (_m *ReservationManager) ReleaseReservation(_a0 context.Context, _a1 *datac return r0, r1 } + +type ReservationManager_ReleaseReservations struct { + *mock.Call +} + +func (_m ReservationManager_ReleaseReservations) Return(_a0 *datacatalog.ReleaseReservationResponse, _a1 error) *ReservationManager_ReleaseReservations { + return &ReservationManager_ReleaseReservations{Call: _m.Call.Return(_a0, _a1)} +} + +func (_m *ReservationManager) OnReleaseReservations(_a0 context.Context, _a1 *datacatalog.ReleaseReservationsRequest) *ReservationManager_ReleaseReservations { + c_call := _m.On("ReleaseReservations", _a0, _a1) + return &ReservationManager_ReleaseReservations{Call: c_call} +} + +func (_m *ReservationManager) OnReleaseReservationsMatch(matchers ...interface{}) *ReservationManager_ReleaseReservations { + c_call := _m.On("ReleaseReservations", matchers...) + return &ReservationManager_ReleaseReservations{Call: c_call} +} + +// ReleaseReservations provides a mock function with given fields: _a0, _a1 +func (_m *ReservationManager) ReleaseReservations(_a0 context.Context, _a1 *datacatalog.ReleaseReservationsRequest) (*datacatalog.ReleaseReservationResponse, error) { + ret := _m.Called(_a0, _a1) + + var r0 *datacatalog.ReleaseReservationResponse + if rf, ok := ret.Get(0).(func(context.Context, *datacatalog.ReleaseReservationsRequest) *datacatalog.ReleaseReservationResponse); ok { + r0 = rf(_a0, _a1) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*datacatalog.ReleaseReservationResponse) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, *datacatalog.ReleaseReservationsRequest) error); ok { + r1 = rf(_a0, _a1) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} diff --git a/pkg/rpc/datacatalogservice/service.go b/pkg/rpc/datacatalogservice/service.go index 6c4e69c5..8910ef45 100644 --- a/pkg/rpc/datacatalogservice/service.go +++ b/pkg/rpc/datacatalogservice/service.go @@ -69,14 +69,26 @@ func (s *DataCatalogService) DeleteArtifact(ctx context.Context, request *catalo return s.ArtifactManager.DeleteArtifact(ctx, request) } +func (s *DataCatalogService) DeleteArtifacts(ctx context.Context, request *catalog.DeleteArtifactsRequest) (*catalog.DeleteArtifactResponse, error) { + return s.ArtifactManager.DeleteArtifacts(ctx, request) +} + func (s *DataCatalogService) GetOrExtendReservation(ctx context.Context, request *catalog.GetOrExtendReservationRequest) (*catalog.GetOrExtendReservationResponse, error) { return s.ReservationManager.GetOrExtendReservation(ctx, request) } +func (s *DataCatalogService) GetOrExtendReservations(ctx context.Context, request *catalog.GetOrExtendReservationsRequest) (*catalog.GetOrExtendReservationsResponse, error) { + return s.ReservationManager.GetOrExtendReservations(ctx, request) +} + func (s *DataCatalogService) ReleaseReservation(ctx context.Context, request *catalog.ReleaseReservationRequest) (*catalog.ReleaseReservationResponse, error) { return s.ReservationManager.ReleaseReservation(ctx, request) } +func (s *DataCatalogService) ReleaseReservations(ctx context.Context, request *catalog.ReleaseReservationsRequest) (*catalog.ReleaseReservationResponse, error) { + return s.ReservationManager.ReleaseReservations(ctx, request) +} + func NewDataCatalogService() *DataCatalogService { configProvider := runtime.NewConfigurationProvider() dataCatalogConfig := configProvider.ApplicationConfiguration().GetDataCatalogConfig()