Skip to content

Commit

Permalink
feat: claim endpoints (#12)
Browse files Browse the repository at this point in the history
  • Loading branch information
fmorency authored Apr 5, 2024
1 parent 42a7f8d commit 7105829
Show file tree
Hide file tree
Showing 10 changed files with 112 additions and 257 deletions.
21 changes: 7 additions & 14 deletions cmd/claim.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,6 @@ func ClaimCmdRunE(cmd *cobra.Command, args []string) error {

claimConfig := LoadClaimConfigFromCLI()
slog.Debug("args", "claim-c", claimConfig)
if err := claimConfig.Validate(); err != nil {
return err
}

authConfig := LoadAuthConfigFromCLI()
slog.Debug("args", "auth-c", authConfig)
Expand Down Expand Up @@ -76,11 +73,6 @@ func SetupClaimCmdFlags(command *cobra.Command) {
slog.Error(ErrorBindingFlag, "error", err)
}

command.Flags().UintP("jobs", "j", 1, "Number of parallel jobs to claim")
if err := viper.BindPFlag("jobs", command.Flags().Lookup("jobs")); err != nil {
slog.Error(ErrorBindingFlag, "error", err)
}

command.Flags().String("uuid", "", "UUID of the work item to claim")
if err := viper.BindPFlag("claim-uuid", command.Flags().Lookup("uuid")); err != nil {
slog.Error(ErrorBindingFlag, "error", err)
Expand All @@ -95,14 +87,15 @@ func claimWorkItem(r *resty.Client, uuidStr string, config config.ClaimConfig) (
if uuidStr != "" {
var item *store.WorkItem
item, err = store.ClaimWorkItemFromUUID(r, uuid.MustParse(uuidStr), config.Force)
if err != nil {
return nil, errors.WithMessage(err, "could not claim work item")
}
items = append(items, item)
} else {
items, err = store.ClaimWorkItemFromQueue(r, config.Jobs)
}

// An error occurred during the claim
if err != nil {
return nil, errors.WithMessage(err, "could not claim work item")
items, err = store.ClaimWorkItemFromQueue(r)
if err != nil {
return nil, errors.WithMessage(err, "could not claim work item")
}
}

for _, item := range items {
Expand Down
32 changes: 6 additions & 26 deletions cmd/claim_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,46 +42,27 @@ func TestClaimCmd(t *testing.T) {
{name: "password missing", args: usernameArg, err: "password is required"},
{name: "claim from queue (default neighborhood)", args: passwordArg, endpoints: []testutils.HttpResponder{
{Method: "POST", Url: testutils.LoginUrl, Responder: testutils.AuthResponder},
{Method: "GET", Url: testutils.DefaultMigrationsUrl, Responder: testutils.MustAllMigrationsGetResponder(1, store.CREATED)},
{Method: "GET", Url: "=~^" + testutils.DefaultMigrationUrl, Responder: testutils.MustMigrationGetResponder(store.CREATED)},
{Method: "PUT", Url: "=~^" + testutils.DefaultMigrationUrl, Responder: testutils.MigrationUpdateResponder},
{Method: "PUT", Url: testutils.DefaultClaimUrl, Responder: testutils.MigrationClaimResponder(1, store.CLAIMED)},
}, expected: "Work item claimed"},
{name: "claim from queue (neighborhood == 1)", args: neighborhoodArg, endpoints: []testutils.HttpResponder{
{Method: "POST", Url: testutils.LoginUrl, Responder: testutils.AuthResponder},
{Method: "GET", Url: testutils.MigrationsUrl, Responder: testutils.MustAllMigrationsGetResponder(1, store.CREATED)},
{Method: "GET", Url: "=~^" + testutils.MigrationUrl, Responder: testutils.MustMigrationGetResponder(store.CREATED)},
{Method: "PUT", Url: "=~^" + testutils.MigrationUrl, Responder: testutils.MigrationUpdateResponder},
{Method: "PUT", Url: "=~^" + testutils.ClaimUrl, Responder: testutils.MigrationClaimResponder(1, store.CLAIMED)},
}, expected: "Work item claimed"},
{name: "auth endpoint not found", args: neighborhoodArg, endpoints: []testutils.HttpResponder{
{Method: "POST", Url: testutils.LoginUrl, Responder: testutils.NotFoundResponder},
}, err: "response status code: 404"},
{name: "unable to claim from queue (invalid state)", args: neighborhoodArg, endpoints: []testutils.HttpResponder{
{Method: "POST", Url: testutils.LoginUrl, Responder: testutils.AuthResponder},
{Method: "GET", Url: testutils.MigrationsUrl, Responder: testutils.MustAllMigrationsGetResponder(1, store.CLAIMED)},
{Method: "GET", Url: "=~^" + testutils.MigrationUrl, Responder: testutils.MustMigrationGetResponder(store.CLAIMED)},
}, expected: "invalid state"},
{name: "unable to claim from queue (no work items available)", args: neighborhoodArg, endpoints: []testutils.HttpResponder{
{Method: "POST", Url: testutils.LoginUrl, Responder: testutils.AuthResponder},
{Method: "GET", Url: testutils.MigrationsUrl, Responder: testutils.MustAllMigrationsGetResponder(0, store.CREATED)},
{Method: "PUT", Url: testutils.ClaimUrl, Responder: testutils.MigrationClaimResponder(0, store.CLAIMED)},
}, expected: "No work items available"},
{name: "claim from uuid", args: append(neighborhoodArg, []string{"--uuid", testutils.DummyUUIDStr}...), endpoints: []testutils.HttpResponder{
{Method: "POST", Url: testutils.LoginUrl, Responder: testutils.AuthResponder},
{Method: "GET", Url: "=~^" + testutils.MigrationUrl, Responder: testutils.MustMigrationGetResponder(store.CREATED)},
{Method: "PUT", Url: "=~^" + testutils.MigrationUrl, Responder: testutils.MigrationUpdateResponder},
{Method: "PUT", Url: "=~^" + testutils.ClaimUuidUrl, Responder: testutils.MigrationClaimOneResponder(store.CLAIMED)},
}, expected: "Work item claimed"},
{name: "unable to claim from uuid (invalid state)", args: append(neighborhoodArg, []string{"--uuid", testutils.DummyUUIDStr}...), endpoints: []testutils.HttpResponder{
{Method: "POST", Url: testutils.LoginUrl, Responder: testutils.AuthResponder},
{Method: "GET", Url: "=~^" + testutils.MigrationUrl, Responder: testutils.MustMigrationGetResponder(store.CLAIMED)},
}, err: "invalid state"},
{name: "unable to claim from uuid (not found)", args: append(neighborhoodArg, []string{"--uuid", testutils.DummyUUIDStr}...), endpoints: []testutils.HttpResponder{
{Method: "POST", Url: testutils.LoginUrl, Responder: testutils.AuthResponder},
{Method: "GET", Url: "=~^" + testutils.MigrationUrl, Responder: testutils.NotFoundResponder},
{Method: "PUT", Url: "=~^" + testutils.ClaimUuidUrl, Responder: testutils.NotFoundResponder},
}, err: "response status code: 404"},
{name: "force claim from uuid", args: append(neighborhoodArg, []string{"--uuid", testutils.DummyUUIDStr, "--force"}...), endpoints: []testutils.HttpResponder{
{Method: "POST", Url: testutils.LoginUrl, Responder: testutils.AuthResponder},
{Method: "GET", Url: "=~^" + testutils.MigrationUrl, Responder: testutils.MustMigrationGetResponder(store.FAILED)},
{Method: "PUT", Url: "=~^" + testutils.MigrationUrl, Responder: testutils.MigrationUpdateResponder},
}, expected: "forcing re-claim of work item"},
}
for _, tc := range tt {
command := &cobra.Command{Use: "claim", PersistentPreRunE: cmd.RootCmdPersistentPreRunE, RunE: cmd.ClaimCmdRunE}
Expand All @@ -93,8 +74,6 @@ func TestClaimCmd(t *testing.T) {

// Enable http mocking on the resty client
httpmock.ActivateNonDefault(client.GetClient())
defer httpmock.DeactivateAndReset()

cmd.SetupRootCmdFlags(command)
cmd.SetupClaimCmdFlags(command)

Expand All @@ -111,6 +90,7 @@ func TestClaimCmd(t *testing.T) {
} else {
require.ErrorContains(t, err, tc.err)
}
httpmock.Reset()
})

// Remove the work item file if it exists
Expand Down
1 change: 0 additions & 1 deletion cmd/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ func LoadAuthConfigFromCLI() config.AuthConfig {
func LoadClaimConfigFromCLI() config.ClaimConfig {
return config.ClaimConfig{
Force: viper.GetBool("force"),
Jobs: viper.GetUint("jobs"),
}
}

Expand Down
1 change: 1 addition & 0 deletions interchaintest/migrate_on_chain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ func TestMigrateOnChain(t *testing.T) {
require.NoError(t, err)
require.Equal(t, balanceUN, tc.expected.User.New)
}
httpmock.Reset()
})

// Remove the work item file if it exists
Expand Down
9 changes: 0 additions & 9 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,6 @@ func (c AuthConfig) Validate() error {

type ClaimConfig struct {
Force bool // Force re-claiming of a failed work item
Jobs uint // Number of parallel jobs to claim
}

func (c ClaimConfig) Validate() error {
if c.Jobs == 0 {
return fmt.Errorf("jobs > 0 is required")
}

return nil
}

type MigrateConfig struct {
Expand Down
121 changes: 58 additions & 63 deletions internal/store/claim.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,95 +2,90 @@ package store

import (
"fmt"
"log/slog"
"net/http"

"github.com/go-resty/resty/v2"
"github.com/google/uuid"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
)

// ClaimWorkItemFromQueue retrieves a work item from the remote database work queue.
// Items will be claimed in parallel using goroutine.
// The maximum number of items that can be claimed in parallel is defined by the pagination of the remote database.
func ClaimWorkItemFromQueue(r *resty.Client, jobs uint) ([]*WorkItem, error) {
// 1. Get all work items from remote
status := CREATED
items, err := GetAllWorkItems(r, &status)
func ClaimWorkItemFromQueue(r *resty.Client) ([]*WorkItem, error) {
// 1. Claim work items
items, err := claimWorkItems(r)
if err != nil {
return nil, errors.WithMessage(err, ErrorGettingWorkItems)
return nil, errors.WithMessage(err, "error claiming work items")
}

var g errgroup.Group
g.SetLimit(int(jobs))
claimedItems := make([]*WorkItem, 0)

// 2. Loop over all work items
for _, item := range items.Items {
item := item
g.Go(func() error {
// 2.0 Check if the work item is in the correct state to be claimed
if !itemCanBeClaimed(&item, false) {
slog.Warn("unable to claim work item, invalid state", "uuid", item.UUID, "status", item.Status.String())
return nil
}

claimedItem, err := claimItem(r, &item)
if err != nil {
return errors.WithMessage(err, ErrorClaimingWorkItem)
}
claimedItems = append(claimedItems, claimedItem)
return nil
})
}

if err := g.Wait(); err != nil {
return nil, errors.WithMessage(err, ErrorClaimingWorkItem)
// 2. Save the work item states
for _, item := range items {
if err := SaveState(item); err != nil {
return nil, err
}
}

// No work items available
return claimedItems, nil
return items, nil
}

func ClaimWorkItemFromUUID(r *resty.Client, uuid uuid.UUID, force bool) (*WorkItem, error) {
// 1. Get the work item from the remote database
item, err := GetWorkItem(r, uuid)
item, err := claimWorkItem(r, uuid, force)
if err != nil {
return nil, errors.WithMessage(err, ErrorGettingWorkItem)
return nil, errors.WithMessage(err, "error claiming work item")
}

// 2. Check if the work item is in the correct state to be claimed
if !itemCanBeClaimed(item, force) {
return nil, fmt.Errorf("unable to claim work item, invalid state: %s", &item.Status)
if err := SaveState(item); err != nil {
return nil, err
}

// 3. Try to claim the work item
return claimItem(r, item)
return item, nil
}

func claimItem(r *resty.Client, item *WorkItem) (*WorkItem, error) {
// 1. Try to claim the work item
newItem := *item
newItem.Status = CLAIMED
if item.Error != nil {
slog.Info("clearing previous error", "uuid", item.UUID, "error", item.Error)
newItem.Error = nil
func claimWorkItems(r *resty.Client) ([]*WorkItem, error) {
req := r.R().SetResult(&[]*WorkItem{})
response, err := req.Put("neighborhoods/{neighborhood}/migrations/claim/")
if err != nil {
return nil, errors.WithMessage(err, "error claiming work items")
}
if err := UpdateWorkItemAndSaveState(r, newItem); err != nil {
return nil, errors.WithMessage(err, ErrorClaimingWorkItem)

if response == nil {
return nil, fmt.Errorf("no response returned when claiming work items")
}

statusCode := response.StatusCode()
if statusCode != http.StatusOK {
return nil, fmt.Errorf("response status code: %d", statusCode)
}

claimResponse := response.Result().(*[]*WorkItem)
if claimResponse == nil {
return nil, fmt.Errorf("error unmarshalling claim response")
}

return &newItem, nil
return *claimResponse, nil
}

func itemCanBeClaimed(item *WorkItem, force bool) bool {
if item.Status != CREATED {
if force {
slog.Warn("forcing re-claim of work item", "uuid", item.UUID, "status", item.Status)
return true
}
slog.Debug("work item not in the correct state to be claimed", "uuid", item.UUID, "status", item.Status)
return false
func claimWorkItem(r *resty.Client, itemUUID uuid.UUID, force bool) (*WorkItem, error) {
req := r.R().SetResult(&WorkItem{}).
SetPathParam("uuid", itemUUID.String()).
SetQueryParam("force", fmt.Sprintf("%t", force))
response, err := req.Put("neighborhoods/{neighborhood}/migrations/claim/{uuid}")
if err != nil {
return nil, errors.WithMessage(err, "error claiming work item")
}
return true

if response == nil {
return nil, fmt.Errorf("no response returned when claiming work item: %s", itemUUID)
}

statusCode := response.StatusCode()
if statusCode != http.StatusOK {
return nil, fmt.Errorf("response status code: %d", statusCode)
}

item := response.Result().(*WorkItem)
if item == nil {
return nil, fmt.Errorf("error unmarshalling claim response")
}

return item, nil
}
Loading

0 comments on commit 7105829

Please sign in to comment.