Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add --ephemeral mode for persistent worker #813

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions PERSISTENT-WORKERS.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,18 @@ Note that persistent worker's name should be unique within a pool.

Note that by default a persistent worker has the privileges of the user that invoked it. Read more [about isolation](#isolation) below to learn how to limit or extend persistent worker privileges.

### Ephemeral Mode

The worker can be started in ephemeral mode with:

```
cirrus worker run --token <poll registration token> --ephemeral
```

After having completed a single task, the worker process will exit. This can be used in ephemeral setups where the enviroment should be cleaned after each run.

Note that users of `--ephemeral` need to take care of cleaning the enviroment and caching build inputs themselves.

## Configuration

Path to the YAML configuration can be specified via the `--file` (or `-f` for short version) command-line flag.
Expand All @@ -38,6 +50,7 @@ Example configuration:
token: e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855

name: "MacMini-Rack-1-Slot-2"
ephemeral: false

labels:
connected-device: iPhone12ProMax
Expand Down
13 changes: 11 additions & 2 deletions internal/commands/worker/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ import (
var ErrConfiguration = errors.New("configuration error")

type Config struct {
Name string `yaml:"name"`
Token string `yaml:"token"`
Name string `yaml:"name"`
Token string `yaml:"token"`
Ephemeral bool `yaml:"ephemeral"`

Labels map[string]string `yaml:"labels"`
Resources map[string]float64 `yaml:"resources"`
Expand Down Expand Up @@ -67,6 +68,7 @@ var (
labels map[string]string
resources map[string]string
rpcEndpoint string
ephemeral bool
)

func attachFlags(cmd *cobra.Command) {
Expand All @@ -88,13 +90,16 @@ func attachFlags(cmd *cobra.Command) {
"additional resources to use (e.g. --resources devices=2)")
cmd.PersistentFlags().StringVar(&rpcEndpoint, "rpc-endpoint", upstream.DefaultRPCEndpoint,
"RPC endpoint address")
cmd.PersistentFlags().BoolVar(&ephemeral, "ephemeral", false,
"run a single task and then exit the process")
}

func parseConfig(path string) (*Config, error) {
// Instantiate a default configuration
config := Config{
Name: name,
Token: token,
Ephemeral: ephemeral,
Labels: labels,
Resources: map[string]float64{},
RPC: ConfigRPC{
Expand Down Expand Up @@ -231,6 +236,10 @@ func buildWorker(output io.Writer) (*worker.Worker, error) {
opts = append(opts, worker.WithTartPrePull(config.TartPrePull))
}

if ephemeral {
opts = append(opts, worker.WithEphemeral(ephemeral))
}

// Instantiate worker
return worker.New(opts...)
}
9 changes: 9 additions & 0 deletions internal/commands/worker/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,13 @@ func TestRestrictForceSoftnet(t *testing.T) {
require.NoError(t, err)

require.True(t, config.Security.AllowedIsolations.Tart.ForceSoftnet)
// ephemeral is not set, it should default to false
require.False(t, config.Ephemeral)
}

func TestEphemeral(t *testing.T) {
config, err := parseConfig(filepath.Join("testdata", "ephemeral.yml"))
require.NoError(t, err)

require.True(t, config.Ephemeral)
}
4 changes: 4 additions & 0 deletions internal/commands/worker/testdata/ephemeral.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
token: e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855

name: "ephemeral-runner"
ephemeral: true
6 changes: 6 additions & 0 deletions internal/worker/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,9 @@ func WithTartPrePull(tartPrePull []string) Option {
e.tartPrePull = tartPrePull
}
}

func WithEphemeral(ephemeral bool) Option {
return func(e *Worker) {
e.ephemeral = ephemeral
}
}
8 changes: 5 additions & 3 deletions internal/worker/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,14 @@ func (worker *Worker) startTask(
ctx context.Context,
upstream *upstreampkg.Upstream,
agentAwareTask *api.PollResponse_AgentAwareTask,
) {
) (string, error) {
taskID := agentAwareTask.TaskId
if taskID == "" {
taskID = fmt.Sprintf("%d", agentAwareTask.OldTaskId)
}
if _, ok := worker.tasks.Load(taskID); ok {
worker.logger.Warnf("attempted to run task %s which is already running", taskID)
return
return "", fmt.Errorf("task %s already running", taskID)
}

ctx = metadata.AppendToOutgoingContext(ctx,
Expand All @@ -67,14 +67,16 @@ func (worker *Worker) startTask(
Message: err.Error(),
})

return
return "", fmt.Errorf("failed to create an instance for task: %v", err)
}

worker.imagesCounter.Add(ctx, 1, metric.WithAttributes(inst.Attributes()...))
go worker.runTask(taskCtx, upstream, inst, agentAwareTask.CliVersion,
taskID, agentAwareTask.ClientSecret, agentAwareTask.ServerSecret)

worker.logger.Infof("started task %s", taskID)

return taskID, nil
}

func (worker *Worker) getInstance(
Expand Down
29 changes: 28 additions & 1 deletion internal/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ type Worker struct {
standbyInstance abstract.Instance

tartPrePull []string

// In ephemeral mode, only run a single task and then exit
ephemeral bool
ephemeralTaskID string
}

func New(opts ...Option) (*Worker, error) {
Expand All @@ -73,6 +77,8 @@ func New(opts ...Option) (*Worker, error) {

logger: logrus.New(),
echelonLogger: echelon.NewLogger(echelon.TraceLevel, renderers.NewSimpleRenderer(os.Stdout, nil)),

ephemeralTaskID: "",
}

// Apply options
Expand Down Expand Up @@ -309,7 +315,21 @@ func (worker *Worker) pollSingleUpstream(ctx context.Context, upstream *upstream
}

for _, taskToStart := range response.TasksToStart {
worker.startTask(ctx, upstream, taskToStart)
// don't start a new task in ephemeral mode if one is running
if worker.ephemeral && worker.ephemeralTaskID != "" {
break
}

taskID, err := worker.startTask(ctx, upstream, taskToStart)
if err != nil {
worker.logger.Warnf("failed to start task: %v", err)
continue
}

if worker.ephemeral {
worker.ephemeralTaskID = taskID
break
}
}

if response.Shutdown {
Expand All @@ -319,6 +339,13 @@ func (worker *Worker) pollSingleUpstream(ctx context.Context, upstream *upstream
return ErrShutdown
}

if worker.ephemeral && worker.ephemeralTaskID != "" {
if _, ok := worker.tasks.Load(worker.ephemeralTaskID); !ok {
worker.logger.Infof("In ephemeral mode: task %s completed, terminating...", worker.ephemeralTaskID)
return ErrShutdown
}
}

return nil
}

Expand Down