Skip to content

Commit

Permalink
dynamic host volumes: monitor readiness from CLI
Browse files Browse the repository at this point in the history
When creating a dynamic host volumes, set up an optional monitor that waits for
the node to fingerprint the volume as healthy.

Ref: #24479
  • Loading branch information
tgross committed Nov 21, 2024
1 parent 193f913 commit 65829df
Show file tree
Hide file tree
Showing 6 changed files with 209 additions and 34 deletions.
22 changes: 20 additions & 2 deletions command/volume_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,22 @@ Usage: nomad volume create [options] <input>
General Options:
` + generalOptionsUsage(usageOptsDefault)
` + generalOptionsUsage(usageOptsDefault) + `
Create Options:
-detach
Return immediately instead of entering monitor mode for dynamic host
volumes. After creating a volume, the volume ID will be printed to the
screen, which can be used to examine the volume using the volume status
command. If -detach is omitted or false, the command will monitor the state
of the volume until it is ready to be scheduled.
-verbose
Display full information when monitoring volume state. Used for dynamic host
volumes only.
`

return strings.TrimSpace(helpText)
}
Expand All @@ -51,7 +66,10 @@ func (c *VolumeCreateCommand) Synopsis() string {
func (c *VolumeCreateCommand) Name() string { return "volume create" }

func (c *VolumeCreateCommand) Run(args []string) int {
var detach, verbose bool
flags := c.Meta.FlagSet(c.Name(), FlagSetClient)
flags.BoolVar(&detach, "detach", false, "detach from monitor")
flags.BoolVar(&verbose, "verbose", false, "display full volume IDs")
flags.Usage = func() { c.Ui.Output(c.Help()) }

if err := flags.Parse(args); err != nil {
Expand Down Expand Up @@ -102,7 +120,7 @@ func (c *VolumeCreateCommand) Run(args []string) int {
case "csi":
return c.csiCreate(client, ast)
case "host":
return c.hostVolumeCreate(client, ast)
return c.hostVolumeCreate(client, ast, detach, verbose)
default:
c.Ui.Error(fmt.Sprintf("Error unknown volume type: %s", volType))
return 1
Expand Down
151 changes: 145 additions & 6 deletions command/volume_create_host.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,23 @@
package command

import (
"context"
"fmt"
"strconv"
"time"

"github.com/hashicorp/hcl"
"github.com/hashicorp/hcl/hcl/ast"
"github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/helper"
"github.com/mitchellh/go-glint"
"github.com/mitchellh/go-glint/components"
"github.com/mitchellh/mapstructure"
)

func (c *VolumeCreateCommand) hostVolumeCreate(client *api.Client, ast *ast.File) int {
func (c *VolumeCreateCommand) hostVolumeCreate(
client *api.Client, ast *ast.File, detach, verbose bool) int {

vol, err := decodeHostVolume(ast)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error decoding the volume definition: %s", err))
Expand All @@ -29,17 +35,150 @@ func (c *VolumeCreateCommand) hostVolumeCreate(client *api.Client, ast *ast.File
c.Ui.Error(fmt.Sprintf("Error creating volume: %s", err))
return 1
}

var volID string
var lastIndex uint64

// note: the command only ever returns 1 volume from the API
for _, vol := range vols {
// note: the command only ever returns 1 volume from the API
c.Ui.Output(fmt.Sprintf(
"Created host volume %s with ID %s", vol.Name, vol.ID))
if detach || vol.State == api.HostVolumeStateReady {
c.Ui.Output(fmt.Sprintf(
"Created host volume %s with ID %s", vol.Name, vol.ID))
return 0
} else {
c.Ui.Output(fmt.Sprintf(
"==> Created host volume %s with ID %s", vol.Name, vol.ID))
volID = vol.ID
lastIndex = vol.ModifyIndex
break
}
}

// TODO(1.10.0): monitor so we can report when the node has fingerprinted

err = c.monitorHostVolume(client, volID, lastIndex, verbose)
if err != nil {
c.Ui.Error(fmt.Sprintf("==> %s: %v", formatTime(time.Now()), err.Error()))
return 1
}
return 0
}

func (c *VolumeCreateCommand) monitorHostVolume(client *api.Client, id string, lastIndex uint64, verbose bool) error {
length := shortId
if verbose {
length = fullId
}

opts := formatOpts{
verbose: verbose,
short: !verbose,
length: length,
}

if isStdoutTerminal() {
return c.ttyMonitor(client, id, lastIndex, opts)
} else {
return c.nottyMonitor(client, id, lastIndex, opts)
}
}

func (c *VolumeCreateCommand) ttyMonitor(client *api.Client, id string, lastIndex uint64, opts formatOpts) error {

gUi := glint.New()
spinner := glint.Layout(
components.Spinner(),
glint.Text(fmt.Sprintf(" Monitoring volume %q in progress...", limit(id, opts.length))),
).Row().MarginLeft(2)
refreshRate := 100 * time.Millisecond

gUi.SetRefreshRate(refreshRate)
gUi.Set(spinner)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

go gUi.Render(ctx)

qOpts := &api.QueryOptions{
AllowStale: true,
WaitIndex: lastIndex,
WaitTime: time.Second * 5,
}

var statusComponent *glint.LayoutComponent
var endSpinner *glint.LayoutComponent

DONE:
for {
vol, meta, err := client.HostVolumes().Get(id, qOpts)
if err != nil {
return err
}
str, err := formatHostVolume(vol, opts)
if err != nil {
// should never happen b/c we don't pass json/template via opts here
return err
}
statusComponent = glint.Layout(
glint.Text(""),
glint.Text(formatTime(time.Now())),
glint.Text(c.Colorize().Color(str)),
).MarginLeft(4)

statusComponent = glint.Layout(statusComponent)
gUi.Set(spinner, statusComponent)

endSpinner = glint.Layout(
components.Spinner(),
glint.Text(fmt.Sprintf(" Host volume %q %s", limit(id, opts.length), vol.State)),
).Row().MarginLeft(2)

switch vol.State {
case api.HostVolumeStateReady:
endSpinner = glint.Layout(
glint.Text(fmt.Sprintf("✓ Host volume %q %s", limit(id, opts.length), vol.State)),
).Row().MarginLeft(2)
break DONE

case api.HostVolumeStateDeleted:
endSpinner = glint.Layout(
glint.Text(fmt.Sprintf("! Host volume %q %s", limit(id, opts.length), vol.State)),
).Row().MarginLeft(2)
break DONE

default:
qOpts.WaitIndex = meta.LastIndex
continue
}

}

// Render one final time with completion message
gUi.Set(endSpinner, statusComponent, glint.Text(""))
gUi.RenderFrame()
return nil
}

func (c *VolumeCreateCommand) nottyMonitor(client *api.Client, id string, lastIndex uint64, opts formatOpts) error {

c.Ui.Info(fmt.Sprintf("==> %s: Monitoring volume %q...",
formatTime(time.Now()), limit(id, opts.length)))

for {
vol, _, err := client.HostVolumes().Get(id, &api.QueryOptions{
WaitIndex: lastIndex,
WaitTime: time.Second * 5,
})
if err != nil {
return err
}
if vol.State == api.HostVolumeStateReady {
c.Ui.Info(fmt.Sprintf("==> %s: Volume %q ready",
formatTime(time.Now()), limit(vol.Name, opts.length)))
return nil
}
}
}

func decodeHostVolume(input *ast.File) (*api.HostVolume, error) {
var err error
vol := &api.HostVolume{}
Expand Down
2 changes: 1 addition & 1 deletion command/volume_create_host_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ parameters {
_, err = file.WriteString(hclTestFile)
must.NoError(t, err)

args := []string{"-address", url, file.Name()}
args := []string{"-address", url, "-detach", file.Name()}

code := cmd.Run(args)
must.Eq(t, 0, code, must.Sprintf("got error: %s", ui.ErrorWriter.String()))
Expand Down
64 changes: 41 additions & 23 deletions command/volume_status_host.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,14 @@ func (c *VolumeStatusCommand) hostVolumeStatus(client *api.Client, id, nodeID, n
return 1
}

opts := formatOpts{
verbose: c.verbose,
short: c.short,
length: c.length,
json: c.json,
template: c.template,
}

// get a host volume that matches the given prefix or a list of all matches
// if an exact match is not found. note we can't use the shared getByPrefix
// helper here because the List API doesn't match the required signature
Expand All @@ -32,7 +40,7 @@ func (c *VolumeStatusCommand) hostVolumeStatus(client *api.Client, id, nodeID, n
return 1
}
if len(possible) > 0 {
out, err := c.formatHostVolumes(possible)
out, err := formatHostVolumes(possible, opts)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error formatting: %s", err))
return 1
Expand All @@ -47,12 +55,12 @@ func (c *VolumeStatusCommand) hostVolumeStatus(client *api.Client, id, nodeID, n
return 1
}

str, err := c.formatHostVolume(vol)
str, err := formatHostVolume(vol, opts)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error formatting volume: %s", err))
return 1
}
c.Ui.Output(str)
c.Ui.Output(c.Colorize().Color(str))
return 0
}

Expand All @@ -66,13 +74,20 @@ func (c *VolumeStatusCommand) listHostVolumes(client *api.Client, nodeID, nodePo
return 1
}

str, err := c.formatHostVolumes(vols)
opts := formatOpts{
verbose: c.verbose,
short: c.short,
length: c.length,
json: c.json,
template: c.template,
}

str, err := formatHostVolumes(vols, opts)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error formatting volumes: %s", err))
return 1
}
c.Ui.Output(str)

c.Ui.Output(c.Colorize().Color(str))
return 0
}

Expand Down Expand Up @@ -108,9 +123,9 @@ func (c *VolumeStatusCommand) getByPrefix(client *api.Client, prefix string) (*a
}
}

func (c *VolumeStatusCommand) formatHostVolume(vol *api.HostVolume) (string, error) {
if c.json || len(c.template) > 0 {
out, err := Format(c.json, c.template, vol)
func formatHostVolume(vol *api.HostVolume, opts formatOpts) (string, error) {
if opts.json || len(opts.template) > 0 {
out, err := Format(opts.json, opts.template, vol)
if err != nil {
return "", fmt.Errorf("format error: %v", err)
}
Expand All @@ -130,48 +145,51 @@ func (c *VolumeStatusCommand) formatHostVolume(vol *api.HostVolume) (string, err
}

// Exit early
if c.short {
if opts.short {
return formatKV(output), nil
}

full := []string{formatKV(output)}

// Format the allocs
banner := c.Colorize().Color("\n[bold]Allocations[reset]")
allocs := formatAllocListStubs(vol.Allocations, c.verbose, c.length)
banner := "\n[bold]Allocations[reset]"
allocs := formatAllocListStubs(vol.Allocations, opts.verbose, opts.length)
full = append(full, banner)
full = append(full, allocs)

return strings.Join(full, "\n"), nil
}

func (c *VolumeStatusCommand) formatHostVolumes(vols []*api.HostVolumeStub) (string, error) {
// TODO: we could make a bunch more formatters into shared functions using this
type formatOpts struct {
verbose bool
short bool
length int
json bool
template string
}

func formatHostVolumes(vols []*api.HostVolumeStub, opts formatOpts) (string, error) {
// Sort the output by volume ID
sort.Slice(vols, func(i, j int) bool { return vols[i].ID < vols[j].ID })

if c.json || len(c.template) > 0 {
out, err := Format(c.json, c.template, vols)
if opts.json || len(opts.template) > 0 {
out, err := Format(opts.json, opts.template, vols)
if err != nil {
return "", fmt.Errorf("format error: %v", err)
}
return out, nil
}

// Truncate the id unless full length is requested
length := shortId
if c.verbose {
length = fullId
}

rows := make([]string, len(vols)+1)
rows[0] = "ID|Name|Namespace|Plugin ID|Node ID|Node Pool|State"
for i, v := range vols {
rows[i+1] = fmt.Sprintf("%s|%s|%s|%s|%s|%s|%s",
limit(v.ID, length),
limit(v.ID, opts.length),
v.Name,
v.Namespace,
v.PluginID,
limit(v.NodeID, length),
limit(v.NodeID, opts.length),
v.NodePool,
v.State,
)
Expand Down
2 changes: 1 addition & 1 deletion command/volume_status_host_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ capability {
_, err = file.WriteString(hclTestFile)
must.NoError(t, err)

args := []string{"-address", url, file.Name()}
args := []string{"-address", url, "-detach", file.Name()}
cmd := &VolumeCreateCommand{Meta: Meta{Ui: ui}}
code := cmd.Run(args)
must.Eq(t, 0, code, must.Sprintf("got error: %s", ui.ErrorWriter.String()))
Expand Down
2 changes: 1 addition & 1 deletion demo/hostvolume/example-host-volume
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ case "$1" in
create_volume "$host_path" "$CAPACITY_MIN_BYTES"
# output what Nomad expects
bytes="$(stat --format='%s' "$host_path.ext4")"
printf '{"path": "%s", "bytes": %s}' "$host_path", "$bytes"
printf '{"path": "%s", "bytes": %s}' "$host_path" "$bytes"
;;
"delete")
delete_volume "$host_path" ;;
Expand Down

0 comments on commit 65829df

Please sign in to comment.