From 22729156d1e508ec16b1bc98f59d1ffc6249927e Mon Sep 17 00:00:00 2001 From: 0xb10c Date: Sat, 16 Nov 2024 15:21:27 +0100 Subject: [PATCH] add --ephemeral mode for persistent worker When passing `--ephemeral` to `cirrus worker run`, the worker will accept one task and then exit the process once the task completed. This can be used inside ephemeral VMs which should be shutdown after each task. A user has to take care of cleaning up after the worker has finished. --- PERSISTENT-WORKERS.md | 13 +++++++++ internal/commands/worker/config.go | 13 +++++++-- internal/commands/worker/config_test.go | 9 ++++++ .../commands/worker/testdata/ephemeral.yml | 4 +++ internal/worker/options.go | 6 ++++ internal/worker/task.go | 8 +++-- internal/worker/worker.go | 29 ++++++++++++++++++- 7 files changed, 76 insertions(+), 6 deletions(-) create mode 100644 internal/commands/worker/testdata/ephemeral.yml diff --git a/PERSISTENT-WORKERS.md b/PERSISTENT-WORKERS.md index 889fd14c..ae9527d6 100644 --- a/PERSISTENT-WORKERS.md +++ b/PERSISTENT-WORKERS.md @@ -28,6 +28,18 @@ Note that persistent worker's name should be unique within a pool. Note that by default a persistent worker has the privileges of the user that invoked it. Read more [about isolation](#isolation) below to learn how to limit or extend persistent worker privileges. +### Ephemeral Mode + +The worker can be started in ephemeral mode with: + +``` +cirrus worker run --token --ephemeral +``` + +After having completed a single task, the worker process will exit. This can be used in ephemeral setups where the enviroment should be cleaned after each run. + +Note that users of `--ephemeral` need to take care of cleaning the enviroment and caching build inputs themselves. + ## Configuration Path to the YAML configuration can be specified via the `--file` (or `-f` for short version) command-line flag. @@ -38,6 +50,7 @@ Example configuration: token: e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855 name: "MacMini-Rack-1-Slot-2" +ephemeral: false labels: connected-device: iPhone12ProMax diff --git a/internal/commands/worker/config.go b/internal/commands/worker/config.go index d21c77af..2e377b50 100644 --- a/internal/commands/worker/config.go +++ b/internal/commands/worker/config.go @@ -23,8 +23,9 @@ import ( var ErrConfiguration = errors.New("configuration error") type Config struct { - Name string `yaml:"name"` - Token string `yaml:"token"` + Name string `yaml:"name"` + Token string `yaml:"token"` + Ephemeral bool `yaml:"ephemeral"` Labels map[string]string `yaml:"labels"` Resources map[string]float64 `yaml:"resources"` @@ -67,6 +68,7 @@ var ( labels map[string]string resources map[string]string rpcEndpoint string + ephemeral bool ) func attachFlags(cmd *cobra.Command) { @@ -88,6 +90,8 @@ func attachFlags(cmd *cobra.Command) { "additional resources to use (e.g. --resources devices=2)") cmd.PersistentFlags().StringVar(&rpcEndpoint, "rpc-endpoint", upstream.DefaultRPCEndpoint, "RPC endpoint address") + cmd.PersistentFlags().BoolVar(&ephemeral, "ephemeral", false, + "run a single task and then exit the process") } func parseConfig(path string) (*Config, error) { @@ -95,6 +99,7 @@ func parseConfig(path string) (*Config, error) { config := Config{ Name: name, Token: token, + Ephemeral: ephemeral, Labels: labels, Resources: map[string]float64{}, RPC: ConfigRPC{ @@ -231,6 +236,10 @@ func buildWorker(output io.Writer) (*worker.Worker, error) { opts = append(opts, worker.WithTartPrePull(config.TartPrePull)) } + if ephemeral { + opts = append(opts, worker.WithEphemeral(ephemeral)) + } + // Instantiate worker return worker.New(opts...) } diff --git a/internal/commands/worker/config_test.go b/internal/commands/worker/config_test.go index 0ea3790a..b9722a83 100644 --- a/internal/commands/worker/config_test.go +++ b/internal/commands/worker/config_test.go @@ -70,4 +70,13 @@ func TestRestrictForceSoftnet(t *testing.T) { require.NoError(t, err) require.True(t, config.Security.AllowedIsolations.Tart.ForceSoftnet) + // ephemeral is not set, it should default to false + require.False(t, config.Ephemeral) +} + +func TestEphemeral(t *testing.T) { + config, err := parseConfig(filepath.Join("testdata", "ephemeral.yml")) + require.NoError(t, err) + + require.True(t, config.Ephemeral) } diff --git a/internal/commands/worker/testdata/ephemeral.yml b/internal/commands/worker/testdata/ephemeral.yml new file mode 100644 index 00000000..b43d8d4c --- /dev/null +++ b/internal/commands/worker/testdata/ephemeral.yml @@ -0,0 +1,4 @@ +token: e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855 + +name: "ephemeral-runner" +ephemeral: true diff --git a/internal/worker/options.go b/internal/worker/options.go index 054c2b1b..6ff38e12 100644 --- a/internal/worker/options.go +++ b/internal/worker/options.go @@ -56,3 +56,9 @@ func WithTartPrePull(tartPrePull []string) Option { e.tartPrePull = tartPrePull } } + +func WithEphemeral(ephemeral bool) Option { + return func(e *Worker) { + e.ephemeral = ephemeral + } +} diff --git a/internal/worker/task.go b/internal/worker/task.go index b2588509..b0649ee5 100644 --- a/internal/worker/task.go +++ b/internal/worker/task.go @@ -35,14 +35,14 @@ func (worker *Worker) startTask( ctx context.Context, upstream *upstreampkg.Upstream, agentAwareTask *api.PollResponse_AgentAwareTask, -) { +) (string, error) { taskID := agentAwareTask.TaskId if taskID == "" { taskID = fmt.Sprintf("%d", agentAwareTask.OldTaskId) } if _, ok := worker.tasks.Load(taskID); ok { worker.logger.Warnf("attempted to run task %s which is already running", taskID) - return + return "", fmt.Errorf("task %s already running", taskID) } ctx = metadata.AppendToOutgoingContext(ctx, @@ -67,7 +67,7 @@ func (worker *Worker) startTask( Message: err.Error(), }) - return + return "", fmt.Errorf("failed to create an instance for task: %v", err) } worker.imagesCounter.Add(ctx, 1, metric.WithAttributes(inst.Attributes()...)) @@ -75,6 +75,8 @@ func (worker *Worker) startTask( taskID, agentAwareTask.ClientSecret, agentAwareTask.ServerSecret) worker.logger.Infof("started task %s", taskID) + + return taskID, nil } func (worker *Worker) getInstance( diff --git a/internal/worker/worker.go b/internal/worker/worker.go index 90b64bc3..4922b886 100644 --- a/internal/worker/worker.go +++ b/internal/worker/worker.go @@ -57,6 +57,10 @@ type Worker struct { standbyInstance abstract.Instance tartPrePull []string + + // In ephemeral mode, only run a single task and then exit + ephemeral bool + ephemeralTaskID string } func New(opts ...Option) (*Worker, error) { @@ -73,6 +77,8 @@ func New(opts ...Option) (*Worker, error) { logger: logrus.New(), echelonLogger: echelon.NewLogger(echelon.TraceLevel, renderers.NewSimpleRenderer(os.Stdout, nil)), + + ephemeralTaskID: "", } // Apply options @@ -309,7 +315,21 @@ func (worker *Worker) pollSingleUpstream(ctx context.Context, upstream *upstream } for _, taskToStart := range response.TasksToStart { - worker.startTask(ctx, upstream, taskToStart) + // don't start a new task in ephemeral mode if one is running + if worker.ephemeral && worker.ephemeralTaskID != "" { + break + } + + taskID, err := worker.startTask(ctx, upstream, taskToStart) + if err != nil { + worker.logger.Warnf("failed to start task: %v", err) + continue + } + + if worker.ephemeral { + worker.ephemeralTaskID = taskID + break + } } if response.Shutdown { @@ -319,6 +339,13 @@ func (worker *Worker) pollSingleUpstream(ctx context.Context, upstream *upstream return ErrShutdown } + if worker.ephemeral && worker.ephemeralTaskID != "" { + if _, ok := worker.tasks.Load(worker.ephemeralTaskID); !ok { + worker.logger.Infof("In ephemeral mode: task %s completed, terminating...", worker.ephemeralTaskID) + return ErrShutdown + } + } + return nil }