Skip to content

Commit

Permalink
Add steps to prevent recreation of deployment entry when picking up o…
Browse files Browse the repository at this point in the history
…ld jobs
  • Loading branch information
deadlycoconuts committed Jan 15, 2024
1 parent 649eebb commit 1ef8e87
Show file tree
Hide file tree
Showing 5 changed files with 284 additions and 17 deletions.
31 changes: 21 additions & 10 deletions api/queue/work/model_service_deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (depl *ModelServiceDeployment) Deploy(job *queue.Job) error {

isRedeployment := false

// Need to reassign destionationURL cause it is ignored when marshalled and unmarshalled
// Need to reassign destionationURL because it is ignored when marshalled and unmarshalled
if endpoint.Logger != nil {
endpoint.Logger.DestinationURL = depl.LoggerDestinationURL
}
Expand All @@ -94,16 +94,27 @@ func (depl *ModelServiceDeployment) Deploy(job *queue.Job) error {

log.Infof("creating deployment for model %s version %s revision %s with endpoint id: %s", model.Name, endpoint.VersionID, endpoint.RevisionID, endpoint.ID)

// record the deployment process
deployment, err := depl.DeploymentStorage.Save(&models.Deployment{
ProjectID: model.ProjectID,
VersionModelID: model.ID,
VersionID: endpoint.VersionID,
VersionEndpointID: endpoint.ID,
Status: models.EndpointPending,
})
// check if the latest deployment entry in the deployments table is in the 'pending' state (aborted workflow)
deployment, err := depl.DeploymentStorage.GetLatestDeployment(model.ID, endpoint.VersionID)
if err != nil {
log.Warnf("unable to create deployment history", err)
if !errors.Is(err, gorm.ErrRecordNotFound) {
log.Errorf("failed retrieving from db the latest deployment with the error: %v", err)
return err
}
// do not create a new entry if the last deployment entry in the db is 'pending'
if deployment.Status != models.EndpointPending {
deployment, err = depl.DeploymentStorage.Save(&models.Deployment{
ProjectID: model.ProjectID,
VersionModelID: model.ID,
VersionID: endpoint.VersionID,
VersionEndpointID: endpoint.ID,
Status: models.EndpointPending,
})
// record the deployment process
if err != nil {
log.Warnf("unable to create deployment history", err)
}
}
}

defer func() {
Expand Down
141 changes: 141 additions & 0 deletions api/queue/work/model_service_deployment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/caraml-dev/merlin/storage/mocks"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"gorm.io/gorm"
"k8s.io/apimachinery/pkg/api/resource"
)

Expand Down Expand Up @@ -83,6 +84,111 @@ func TestExecuteDeployment(t *testing.T) {
},
deploymentStorage: func() *mocks.DeploymentStorage {
mockStorage := &mocks.DeploymentStorage{}
mockStorage.On("GetLatestDeployment", mock.Anything, mock.Anything).Return(nil, gorm.ErrRecordNotFound)
mockStorage.On("Save", mock.Anything).Return(&models.Deployment{}, nil)
mockStorage.On("OnDeploymentSuccess", mock.Anything).Return(nil)
return mockStorage
},
storage: func() *mocks.VersionEndpointStorage {
mockStorage := &mocks.VersionEndpointStorage{}
mockStorage.On("Save", mock.Anything).Return(nil)
mockStorage.On("Get", mock.Anything).Return(&models.VersionEndpoint{
Environment: env,
EnvironmentName: env.Name,
ResourceRequest: env.DefaultResourceRequest,
VersionID: version.ID,
Namespace: project.Name,
}, nil)
return mockStorage
},
controller: func() *clusterMock.Controller {
ctrl := &clusterMock.Controller{}
ctrl.On("Deploy", mock.Anything, mock.Anything).
Return(&models.Service{
Name: iSvcName,
Namespace: project.Name,
ServiceName: svcName,
URL: url,
Metadata: svcMetadata,
}, nil)
return ctrl
},
imageBuilder: func() *imageBuilderMock.ImageBuilder {
mockImgBuilder := &imageBuilderMock.ImageBuilder{}
return mockImgBuilder
},
},
{
name: "Success: Latest deployment entry in storage stuck in pending",
model: model,
version: version,
endpoint: &models.VersionEndpoint{
EnvironmentName: env.Name,
ResourceRequest: env.DefaultResourceRequest,
VersionID: version.ID,
Namespace: project.Name,
},
deploymentStorage: func() *mocks.DeploymentStorage {
mockStorage := &mocks.DeploymentStorage{}
mockStorage.On("GetLatestDeployment", mock.Anything, mock.Anything).Return(
&models.Deployment{
ProjectID: model.ProjectID,
VersionModelID: model.ID,
VersionID: version.ID,
Status: models.EndpointPending,
}, nil)
mockStorage.On("Save", mock.Anything).Return(&models.Deployment{}, nil)
mockStorage.On("OnDeploymentSuccess", mock.Anything).Return(nil)
return mockStorage
},
storage: func() *mocks.VersionEndpointStorage {
mockStorage := &mocks.VersionEndpointStorage{}
mockStorage.On("Save", mock.Anything).Return(nil)
mockStorage.On("Get", mock.Anything).Return(&models.VersionEndpoint{
Environment: env,
EnvironmentName: env.Name,
ResourceRequest: env.DefaultResourceRequest,
VersionID: version.ID,
Namespace: project.Name,
}, nil)
return mockStorage
},
controller: func() *clusterMock.Controller {
ctrl := &clusterMock.Controller{}
ctrl.On("Deploy", mock.Anything, mock.Anything).
Return(&models.Service{
Name: iSvcName,
Namespace: project.Name,
ServiceName: svcName,
URL: url,
Metadata: svcMetadata,
}, nil)
return ctrl
},
imageBuilder: func() *imageBuilderMock.ImageBuilder {
mockImgBuilder := &imageBuilderMock.ImageBuilder{}
return mockImgBuilder
},
},
{
name: "Success: Latest deployment entry in storage not in pending state",
model: model,
version: version,
endpoint: &models.VersionEndpoint{
EnvironmentName: env.Name,
ResourceRequest: env.DefaultResourceRequest,
VersionID: version.ID,
Namespace: project.Name,
},
deploymentStorage: func() *mocks.DeploymentStorage {
mockStorage := &mocks.DeploymentStorage{}
mockStorage.On("GetLatestDeployment", mock.Anything, mock.Anything).Return(
&models.Deployment{
ProjectID: model.ProjectID,
VersionModelID: model.ID,
VersionID: version.ID,
Status: models.EndpointRunning,
}, nil)
mockStorage.On("Save", mock.Anything).Return(&models.Deployment{}, nil)
mockStorage.On("OnDeploymentSuccess", mock.Anything).Return(nil)
return mockStorage
Expand Down Expand Up @@ -128,6 +234,7 @@ func TestExecuteDeployment(t *testing.T) {
},
deploymentStorage: func() *mocks.DeploymentStorage {
mockStorage := &mocks.DeploymentStorage{}
mockStorage.On("GetLatestDeployment", mock.Anything, mock.Anything).Return(nil, gorm.ErrRecordNotFound)
mockStorage.On("Save", mock.Anything).Return(&models.Deployment{}, nil)
mockStorage.On("OnDeploymentSuccess", mock.Anything).Return(nil)
return mockStorage
Expand Down Expand Up @@ -173,6 +280,7 @@ func TestExecuteDeployment(t *testing.T) {
},
deploymentStorage: func() *mocks.DeploymentStorage {
mockStorage := &mocks.DeploymentStorage{}
mockStorage.On("GetLatestDeployment", mock.Anything, mock.Anything).Return(nil, gorm.ErrRecordNotFound)
mockStorage.On("Save", mock.Anything).Return(&models.Deployment{}, nil)
mockStorage.On("OnDeploymentSuccess", mock.Anything).Return(nil)
return mockStorage
Expand Down Expand Up @@ -220,6 +328,7 @@ func TestExecuteDeployment(t *testing.T) {
},
deploymentStorage: func() *mocks.DeploymentStorage {
mockStorage := &mocks.DeploymentStorage{}
mockStorage.On("GetLatestDeployment", mock.Anything, mock.Anything).Return(nil, gorm.ErrRecordNotFound)
mockStorage.On("Save", mock.Anything).Return(&models.Deployment{}, nil)
mockStorage.On("OnDeploymentSuccess", mock.Anything).Return(nil)
return mockStorage
Expand Down Expand Up @@ -267,6 +376,7 @@ func TestExecuteDeployment(t *testing.T) {
},
deploymentStorage: func() *mocks.DeploymentStorage {
mockStorage := &mocks.DeploymentStorage{}
mockStorage.On("GetLatestDeployment", mock.Anything, mock.Anything).Return(nil, gorm.ErrRecordNotFound)
mockStorage.On("Save", mock.Anything).Return(&models.Deployment{}, nil)
mockStorage.On("OnDeploymentSuccess", mock.Anything).Return(nil)
return mockStorage
Expand Down Expand Up @@ -321,6 +431,7 @@ func TestExecuteDeployment(t *testing.T) {
},
deploymentStorage: func() *mocks.DeploymentStorage {
mockStorage := &mocks.DeploymentStorage{}
mockStorage.On("GetLatestDeployment", mock.Anything, mock.Anything).Return(nil, gorm.ErrRecordNotFound)
mockStorage.On("Save", mock.Anything).Return(&models.Deployment{}, nil)
mockStorage.On("OnDeploymentSuccess", mock.Anything).Return(nil)
return mockStorage
Expand Down Expand Up @@ -374,6 +485,7 @@ func TestExecuteDeployment(t *testing.T) {
},
deploymentStorage: func() *mocks.DeploymentStorage {
mockStorage := &mocks.DeploymentStorage{}
mockStorage.On("GetLatestDeployment", mock.Anything, mock.Anything).Return(nil, gorm.ErrRecordNotFound)
mockStorage.On("Save", mock.Anything).Return(&models.Deployment{}, nil)
return mockStorage
},
Expand Down Expand Up @@ -413,6 +525,7 @@ func TestExecuteDeployment(t *testing.T) {
},
deploymentStorage: func() *mocks.DeploymentStorage {
mockStorage := &mocks.DeploymentStorage{}
mockStorage.On("GetLatestDeployment", mock.Anything, mock.Anything).Return(nil, gorm.ErrRecordNotFound)
mockStorage.On("Save", mock.Anything).Return(&models.Deployment{}, nil)
return mockStorage
},
Expand Down Expand Up @@ -573,6 +686,13 @@ func TestExecuteRedeployment(t *testing.T) {
expectedEndpointStatus: models.EndpointRunning,
deploymentStorage: func() *mocks.DeploymentStorage {
mockStorage := &mocks.DeploymentStorage{}
mockStorage.On("GetLatestDeployment", mock.Anything, mock.Anything).Return(
&models.Deployment{
ProjectID: model.ProjectID,
VersionModelID: model.ID,
VersionID: version.ID,
Status: models.EndpointRunning,
}, nil)
mockStorage.On("Save", mock.Anything).Return(&models.Deployment{}, nil)
mockStorage.On("OnDeploymentSuccess", mock.Anything).Return(nil)
return mockStorage
Expand Down Expand Up @@ -638,6 +758,13 @@ func TestExecuteRedeployment(t *testing.T) {
expectedEndpointStatus: models.EndpointServing,
deploymentStorage: func() *mocks.DeploymentStorage {
mockStorage := &mocks.DeploymentStorage{}
mockStorage.On("GetLatestDeployment", mock.Anything, mock.Anything).Return(
&models.Deployment{
ProjectID: model.ProjectID,
VersionModelID: model.ID,
VersionID: version.ID,
Status: models.EndpointServing,
}, nil)
mockStorage.On("Save", mock.Anything).Return(&models.Deployment{}, nil)
mockStorage.On("OnDeploymentSuccess", mock.Anything).Return(nil)
return mockStorage
Expand Down Expand Up @@ -703,6 +830,13 @@ func TestExecuteRedeployment(t *testing.T) {
expectedEndpointStatus: models.EndpointRunning,
deploymentStorage: func() *mocks.DeploymentStorage {
mockStorage := &mocks.DeploymentStorage{}
mockStorage.On("GetLatestDeployment", mock.Anything, mock.Anything).Return(
&models.Deployment{
ProjectID: model.ProjectID,
VersionModelID: model.ID,
VersionID: version.ID,
Status: models.EndpointFailed,
}, nil)
mockStorage.On("Save", mock.Anything).Return(&models.Deployment{}, nil)
mockStorage.On("OnDeploymentSuccess", mock.Anything).Return(nil)
return mockStorage
Expand Down Expand Up @@ -769,6 +903,13 @@ func TestExecuteRedeployment(t *testing.T) {
expectedEndpointStatus: models.EndpointRunning,
deploymentStorage: func() *mocks.DeploymentStorage {
mockStorage := &mocks.DeploymentStorage{}
mockStorage.On("GetLatestDeployment", mock.Anything, mock.Anything).Return(
&models.Deployment{
ProjectID: model.ProjectID,
VersionModelID: model.ID,
VersionID: version.ID,
Status: models.EndpointRunning,
}, nil)
mockStorage.On("Save", mock.Anything).Return(&models.Deployment{}, nil)
return mockStorage
},
Expand Down
13 changes: 12 additions & 1 deletion api/storage/deployment_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@ type DeploymentStorage interface {
ListInModel(model *models.Model) ([]*models.Deployment, error)
// ListInModelVersion return all deployment within a model
ListInModelVersion(modelID, versionID, endpointUUID string) ([]*models.Deployment, error)
// Save save the deployment to underlying storage
// Save saves the deployment to underlying storage
Save(deployment *models.Deployment) (*models.Deployment, error)
// GetLatestDeployment gets the latest deployment record
GetLatestDeployment(modelID models.ID, versionID models.ID) (*models.Deployment, error)
// OnDeploymentSuccess updates the new deployment status to successful on DB and update all previous deployment status for that version endpoint to terminated.
OnDeploymentSuccess(newDeployment *models.Deployment) error
// Undeploy updates all successful deployment status to terminated on DB
Expand Down Expand Up @@ -63,6 +65,15 @@ func (d *deploymentStorage) Save(deployment *models.Deployment) (*models.Deploym
return deployment, err
}

func (d *deploymentStorage) GetLatestDeployment(modelID models.ID, versionID models.ID) (*models.Deployment, error) {
deployment := &models.Deployment{}
if err := d.db.Where("version_id = ? AND version_model_id = ? ORDER BY updated_at DESC LIMIT 1;", versionID,
modelID).Find(deployment).Error; err != nil {
return nil, err
}
return deployment, nil
}

func (d *deploymentStorage) GetFirstSuccessModelVersionPerModel() (map[models.ID]models.ID, error) {
rows, err := d.db.Table("deployments").
Select("version_model_id , min(version_id)").
Expand Down
79 changes: 79 additions & 0 deletions api/storage/deployment_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,85 @@ func TestDeploymentStorage_List(t *testing.T) {
})
}

func TestGetLatestDeployment(t *testing.T) {
database.WithTestDatabase(t, func(t *testing.T, db *gorm.DB) {
deploymentStorage := NewDeploymentStorage(db)
isDefaultTrue := true

p := mlp.Project{
Name: "project",
MLFlowTrackingURL: "http://mlflow:5000",
}

m := models.Model{
ID: 1,
ProjectID: models.ID(p.ID),
ExperimentID: 1,
Name: "model",
Type: models.ModelTypeSkLearn,
}
db.Create(&m)

v := models.Version{
ModelID: m.ID,
RunID: "1",
ArtifactURI: "gcs:/mlp/1/1",
PythonVersion: "3.7.*",
}
db.Create(&v)

env1 := models.Environment{
Name: "env1",
Cluster: "k8s",
IsDefault: &isDefaultTrue,
}
db.Create(&env1)

endpoint := models.VersionEndpoint{
ID: uuid.New(),
VersionID: v.ID,
VersionModelID: m.ID,
Status: "pending",
EnvironmentName: env1.Name,
DeploymentMode: deployment.ServerlessDeploymentMode,
}
db.Create(&endpoint)

deploy1 := &models.Deployment{
ProjectID: models.ID(p.ID),
VersionID: v.ID,
VersionModelID: m.ID,
VersionEndpointID: endpoint.ID,
Status: models.EndpointServing,
Error: "",
CreatedUpdated: models.CreatedUpdated{
UpdatedAt: time.Date(2022, 4, 27, 16, 33, 41, 0, time.UTC),
},
}

deploy2 := &models.Deployment{
ProjectID: models.ID(p.ID),
VersionID: v.ID,
VersionModelID: m.ID,
VersionEndpointID: endpoint.ID,
Status: models.EndpointServing,
Error: "",
CreatedUpdated: models.CreatedUpdated{
UpdatedAt: time.Date(2021, 4, 27, 16, 33, 41, 0, time.UTC),
},
}

_, err := deploymentStorage.Save(deploy1)
assert.NoError(t, err)

_, err = deploymentStorage.Save(deploy2)
assert.NoError(t, err)

dep, err := deploymentStorage.GetLatestDeployment(m.ID, endpoint.VersionID)
assert.Equal(t, deploy1, dep)
})
}

func TestDeploymentStorage_GetFirstSuccessModelVersionPerModel(t *testing.T) {
database.WithTestDatabase(t, func(t *testing.T, db *gorm.DB) {
deploymentStorage := NewDeploymentStorage(db)
Expand Down
Loading

0 comments on commit 1ef8e87

Please sign in to comment.