Skip to content

Commit

Permalink
dynamic host volumes: search endpoint
Browse files Browse the repository at this point in the history
Add support for dynamic host volumes to the search endpoint. Like many other
objects with UUID identifiers, we're not supporting fuzzy search here, just
prefix search on the fuzzy search endpoint.

Because the search endpoint only returns IDs, we need to seperate CSI volumes
and host volumes for it to be useful. The new context is called `"host_volumes"`
to disambiguate it from `"volumes"`. In future versions of Nomad we should
consider deprecating the `"volumes"` context in lieu of a `"csi_volumes"`
context.

Ref: #24479
  • Loading branch information
tgross committed Nov 21, 2024
1 parent 193f913 commit 0ef88f8
Show file tree
Hide file tree
Showing 6 changed files with 143 additions and 2 deletions.
1 change: 1 addition & 0 deletions api/contexts/contexts.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const (
Plugins Context = "plugins"
Variables Context = "vars"
Volumes Context = "volumes"
HostVolumes Context = "host_volumes"

// These Context types are used to associate a search result from a lower
// level Nomad object with one of the higher level Context types above.
Expand Down
19 changes: 17 additions & 2 deletions nomad/search_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ var (
structs.ScalingPolicies,
structs.Variables,
structs.Namespaces,
structs.HostVolumes,
}
)

Expand Down Expand Up @@ -84,6 +85,8 @@ func (s *Search) getPrefixMatches(iter memdb.ResultIterator, prefix string) ([]s
id = t.ID
case *structs.CSIVolume:
id = t.ID
case *structs.HostVolume:
id = t.ID
case *structs.ScalingPolicy:
id = t.ID
case *structs.Namespace:
Expand Down Expand Up @@ -405,6 +408,8 @@ func getResourceIter(context structs.Context, aclObj *acl.ACL, namespace, prefix
return store.ScalingPoliciesByIDPrefix(ws, namespace, prefix)
case structs.Volumes:
return store.CSIVolumesByIDPrefix(ws, namespace, prefix)
case structs.HostVolumes:
return store.HostVolumesByIDPrefix(ws, namespace, prefix, state.SortDefault)
case structs.Namespaces:
iter, err := store.NamespacesByNamePrefix(ws, prefix)
if err != nil {
Expand Down Expand Up @@ -684,6 +689,8 @@ func sufficientSearchPerms(aclObj *acl.ACL, namespace string, context structs.Co
acl.NamespaceCapabilityCSIReadVolume,
acl.NamespaceCapabilityListJobs,
acl.NamespaceCapabilityReadJob)(aclObj, namespace)
case structs.HostVolumes:
return acl.NamespaceValidator(acl.NamespaceCapabilityHostVolumeRead)(aclObj, namespace)
case structs.Variables:
return aclObj.AllowVariableSearch(namespace)
case structs.Plugins:
Expand Down Expand Up @@ -774,7 +781,8 @@ func (s *Search) FuzzySearch(args *structs.FuzzySearchRequest, reply *structs.Fu
for _, ctx := range prefixContexts {
switch ctx {
// only apply on the types that use UUID prefix searching
case structs.Evals, structs.Deployments, structs.ScalingPolicies, structs.Volumes, structs.Quotas, structs.Recommendations:
case structs.Evals, structs.Deployments, structs.ScalingPolicies,
structs.Volumes, structs.HostVolumes, structs.Quotas, structs.Recommendations:
iter, err := getResourceIter(ctx, aclObj, namespace, roundUUIDDownIfOdd(args.Prefix, args.Context), ws, state)
if err != nil {
if !s.silenceError(err) {
Expand All @@ -790,7 +798,9 @@ func (s *Search) FuzzySearch(args *structs.FuzzySearchRequest, reply *structs.Fu
for _, ctx := range fuzzyContexts {
switch ctx {
// skip the types that use UUID prefix searching
case structs.Evals, structs.Deployments, structs.ScalingPolicies, structs.Volumes, structs.Quotas, structs.Recommendations:
case structs.Evals, structs.Deployments, structs.ScalingPolicies,
structs.Volumes, structs.HostVolumes, structs.Quotas,
structs.Recommendations:
continue
default:
iter, err := getFuzzyResourceIterator(ctx, aclObj, namespace, ws, state)
Expand Down Expand Up @@ -927,6 +937,11 @@ func filteredSearchContexts(aclObj *acl.ACL, namespace string, context structs.C
if volRead {
available = append(available, c)
}
case structs.HostVolumes:
if acl.NamespaceValidator(
acl.NamespaceCapabilityHostVolumeRead)(aclObj, namespace) {
available = append(available, c)
}
case structs.Plugins:
if aclObj.AllowPluginList() {
available = append(available, c)
Expand Down
93 changes: 93 additions & 0 deletions nomad/search_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1039,6 +1039,53 @@ func TestSearch_PrefixSearch_CSIVolume(t *testing.T) {
require.False(t, resp.Truncations[structs.Volumes])
}

func TestSearch_PrefixSearch_HostVolume(t *testing.T) {
ci.Parallel(t)

srv, cleanup := TestServer(t, func(c *Config) {
c.NumSchedulers = 0
})
defer cleanup()
codec := rpcClient(t, srv)
testutil.WaitForLeader(t, srv.RPC)

store := srv.fsm.State()
index, _ := store.LatestIndex()

node := mock.Node()
index++
must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, index, node))

id := uuid.Generate()
index++
err := store.UpsertHostVolumes(index, []*structs.HostVolume{{
ID: id,
Name: "example",
Namespace: structs.DefaultNamespace,
PluginID: "glade",
NodeID: node.ID,
NodePool: node.NodePool,
}})
must.NoError(t, err)

req := &structs.SearchRequest{
Prefix: id[:6],
Context: structs.HostVolumes,
QueryOptions: structs.QueryOptions{
Region: "global",
Namespace: structs.DefaultNamespace,
},
}

var resp structs.SearchResponse
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Search.PrefixSearch", req, &resp))

must.Len(t, 1, resp.Matches[structs.HostVolumes])
must.Len(t, 0, resp.Matches[structs.Volumes])
must.Eq(t, id, resp.Matches[structs.HostVolumes][0])
must.False(t, resp.Truncations[structs.HostVolumes])
}

func TestSearch_PrefixSearch_Namespace(t *testing.T) {
ci.Parallel(t)

Expand Down Expand Up @@ -1932,6 +1979,52 @@ func TestSearch_FuzzySearch_CSIVolume(t *testing.T) {
require.False(t, resp.Truncations[structs.Volumes])
}

func TestSearch_FuzzySearch_HostVolume(t *testing.T) {
ci.Parallel(t)

srv, cleanup := TestServer(t, func(c *Config) {
c.NumSchedulers = 0
})
defer cleanup()
codec := rpcClient(t, srv)
testutil.WaitForLeader(t, srv.RPC)

store := srv.fsm.State()
index, _ := store.LatestIndex()

node := mock.Node()
index++
must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, index, node))

id := uuid.Generate()
index++
err := store.UpsertHostVolumes(index, []*structs.HostVolume{{
ID: id,
Name: "example",
Namespace: structs.DefaultNamespace,
PluginID: "glade",
NodeID: node.ID,
NodePool: node.NodePool,
}})
must.NoError(t, err)

req := &structs.FuzzySearchRequest{
Text: id[0:3], // volumes are prefix searched
Context: structs.HostVolumes,
QueryOptions: structs.QueryOptions{
Region: "global",
Namespace: structs.DefaultNamespace,
},
}

var resp structs.FuzzySearchResponse
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Search.FuzzySearch", req, &resp))

must.Len(t, 1, resp.Matches[structs.HostVolumes])
must.Eq(t, id, resp.Matches[structs.HostVolumes][0].ID)
must.False(t, resp.Truncations[structs.HostVolumes])
}

func TestSearch_FuzzySearch_Namespace(t *testing.T) {
ci.Parallel(t)

Expand Down
25 changes: 25 additions & 0 deletions nomad/state/state_store_host_volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package state

import (
"fmt"
"strings"

memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/nomad/structs"
Expand Down Expand Up @@ -156,6 +157,30 @@ func (s *StateStore) HostVolumes(ws memdb.WatchSet, sort SortOption) (memdb.Resu
return s.hostVolumesIter(ws, indexID, sort)
}

// HostVolumesByIDPrefix retrieves all host volumes by ID prefix. Because the ID
// index is namespaced, we need to handle the wildcard namespace here as well.
func (s *StateStore) HostVolumesByIDPrefix(ws memdb.WatchSet, ns, prefix string, sort SortOption) (memdb.ResultIterator, error) {

if ns != structs.AllNamespacesSentinel {
return s.hostVolumesIter(ws, "id_prefix", sort, ns, prefix)
}

// for wildcard namespace, wrap the iterator in a filter function that
// filters all volumes by prefix
iter, err := s.hostVolumesIter(ws, indexID, sort)
if err != nil {
return nil, err
}
wrappedIter := memdb.NewFilterIterator(iter, func(raw any) bool {
vol, ok := raw.(*structs.HostVolume)
if !ok {
return true
}
return !strings.HasPrefix(vol.ID, prefix)
})
return wrappedIter, nil
}

// HostVolumesByName retrieves all host volumes of the same name
func (s *StateStore) HostVolumesByName(ws memdb.WatchSet, ns, name string, sort SortOption) (memdb.ResultIterator, error) {
return s.hostVolumesIter(ws, "name_prefix", sort, ns, name)
Expand Down
6 changes: 6 additions & 0 deletions nomad/state/state_store_host_volumes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,12 @@ func TestStateStore_HostVolumes_CRUD(t *testing.T) {
must.NoError(t, err)
got = consumeIter(iter)
must.MapLen(t, 3, got, must.Sprint(`expected 3 volumes remain`))

prefix := vol.ID[:30] // sufficiently long prefix to avoid flakes
iter, err = store.HostVolumesByIDPrefix(nil, "*", prefix, SortDefault)
must.NoError(t, err)
got = consumeIter(iter)
must.MapLen(t, 1, got, must.Sprint(`expected only one volume to match prefix`))
}

func TestStateStore_UpdateHostVolumesFromFingerprint(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions nomad/structs/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ const (
Plugins Context = "plugins"
Variables Context = "vars"
Volumes Context = "volumes"
HostVolumes Context = "host_volumes"

// Subtypes used in fuzzy matching.
Groups Context = "groups"
Expand Down

0 comments on commit 0ef88f8

Please sign in to comment.