diff --git a/runtime/v2/runc/task/service.go b/runtime/v2/runc/task/service.go index 132a5cbb3e32..4436cf7d9d30 100644 --- a/runtime/v2/runc/task/service.go +++ b/runtime/v2/runc/task/service.go @@ -73,15 +73,17 @@ func NewTaskService(ctx context.Context, publisher shim.Publisher, sd shutdown.S } go ep.Run(ctx) s := &service{ - context: ctx, - events: make(chan interface{}, 128), - ec: reaper.Default.Subscribe(), - ep: ep, - shutdown: sd, - containers: make(map[string]*runc.Container), - running: make(map[int][]containerProcess), - pendingExecs: make(map[*runc.Container]int), - exitSubscribers: make(map[*map[int][]runcC.Exit]struct{}), + context: ctx, + events: make(chan interface{}, 128), + ec: reaper.Default.Subscribe(), + ep: ep, + shutdown: sd, + containers: make(map[string]*runc.Container), + running: make(map[int][]containerProcess), + runningExecs: make(map[*runc.Container]int), + execCountSubscribers: make(map[*runc.Container]chan<- int), + containerInitExit: make(map[*runc.Container]runcC.Exit), + exitSubscribers: make(map[*map[int][]runcC.Exit]struct{}), } go s.processExits() runcC.Monitor = reaper.Default @@ -116,7 +118,19 @@ type service struct { lifecycleMu sync.Mutex running map[int][]containerProcess // pid -> running process, guarded by lifecycleMu - pendingExecs map[*runc.Container]int // container -> num pending execs, guarded by lifecycleMu + runningExecs map[*runc.Container]int // container -> num running execs, guarded by lifecycleMu + // container -> subscription to exec exits/changes to s.runningExecs[container], + // guarded by lifecycleMu + execCountSubscribers map[*runc.Container]chan<- int + // container -> init exits, guarded by lifecycleMu + // Used to stash container init process exits, so that we can hold them + // until after we've made sure to publish all the container's exec exits. + // Also used to prevent starting new execs from being started if the + // container's init process (read: pid, not [process.Init]) has already been + // reaped by the shim. + // Note that this flag gets updated before the container's [process.Init.Status] + // is transitioned to "stopped". + containerInitExit map[*runc.Container]runcC.Exit // Subscriptions to exits for PIDs. Adding/deleting subscriptions and // dereferencing the subscription pointers must only be done while holding // lifecycleMu. @@ -137,8 +151,7 @@ type containerProcess struct { // // The returned handleStarted closure records that the process has started so // that its exit can be handled efficiently. If the process has already exited, -// it handles the exit immediately. In addition, if the process is an exec and -// its container's init process has already exited, that exit is also processed. +// it handles the exit immediately. // handleStarted should be called after the event announcing the start of the // process has been published. Note that s.lifecycleMu must not be held when // calling handleStarted. @@ -173,44 +186,8 @@ func (s *service) preStart(c *runc.Container) (handleStarted func(*runc.Containe pid = p.Pid() } - _, init := p.(*process.Init) s.lifecycleMu.Lock() - var initExits []runcC.Exit - var initCps []containerProcess - if !init { - s.pendingExecs[c]-- - - initPid := c.Pid() - iExits, initExited := exits[initPid] - if initExited && s.pendingExecs[c] == 0 { - // c's init process has exited before handleStarted was called and - // this is the last pending exec process start - we need to process - // the exit for the init process after processing this exec, so: - // - delete c from the s.pendingExecs map - // - keep the exits for the init pid to process later (after we process - // this exec's exits) - // - get the necessary containerProcesses for the init process (that we - // need to process the exits), and remove them from s.running (which we skipped - // doing in processExits). - delete(s.pendingExecs, c) - initExits = iExits - var skipped []containerProcess - for _, initPidCp := range s.running[initPid] { - if initPidCp.Container == c { - initCps = append(initCps, initPidCp) - } else { - skipped = append(skipped, initPidCp) - } - } - if len(skipped) == 0 { - delete(s.running, initPid) - } else { - s.running[initPid] = skipped - } - } - } - ees, exited := exits[pid] delete(s.exitSubscribers, &exits) exits = nil @@ -219,11 +196,6 @@ func (s *service) preStart(c *runc.Container) (handleStarted func(*runc.Containe for _, ee := range ees { s.handleProcessExit(ee, c, p) } - for _, eee := range initExits { - for _, cp := range initCps { - s.handleProcessExit(eee, cp.Container, cp.Process) - } - } } else { // Process start was successful, add to `s.running`. s.running[pid] = append(s.running[pid], containerProcess{ @@ -304,7 +276,11 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI. if r.ExecID == "" { cinit = container } else { - s.pendingExecs[container]++ + if _, initExited := s.containerInitExit[container]; initExited { + s.lifecycleMu.Unlock() + return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container %s init process is not running", container.ID) + } + s.runningExecs[container]++ } handleStarted, cleanup := s.preStart(cinit) s.lifecycleMu.Unlock() @@ -312,6 +288,17 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI. p, err := container.Start(ctx, r) if err != nil { + // If we failed to even start the process, s.runningExecs + // won't get decremented in s.handleProcessExit. We still need + // to update it. + if r.ExecID != "" { + s.lifecycleMu.Lock() + s.runningExecs[container]-- + if ch, ok := s.execCountSubscribers[container]; ok { + ch <- s.runningExecs[container] + } + s.lifecycleMu.Unlock() + } handleStarted(container, p) return nil, errdefs.ToGRPC(err) } @@ -676,28 +663,23 @@ func (s *service) processExits() { // Handle the exit for a created/started process. If there's more than // one, assume they've all exited. One of them will be the correct // process. - var cps, skipped []containerProcess + var cps []containerProcess for _, cp := range s.running[e.Pid] { _, init := cp.Process.(*process.Init) - if init && s.pendingExecs[cp.Container] != 0 { - // This exit relates to a container for which we have pending execs. In - // order to ensure order between execs and the init process for a given - // container, skip processing this exit here and let the `handleStarted` - // closure for the pending exec publish it. - skipped = append(skipped, cp) - } else { - cps = append(cps, cp) + if init { + s.containerInitExit[cp.Container] = e } + cps = append(cps, cp) } - if len(skipped) > 0 { - s.running[e.Pid] = skipped - } else { - delete(s.running, e.Pid) - } + delete(s.running, e.Pid) s.lifecycleMu.Unlock() for _, cp := range cps { - s.handleProcessExit(e, cp.Container, cp.Process) + if ip, ok := cp.Process.(*process.Init); ok { + s.handleInitExit(e, cp.Container, ip) + } else { + s.handleProcessExit(e, cp.Container, cp.Process) + } } } } @@ -706,18 +688,60 @@ func (s *service) send(evt interface{}) { s.events <- evt } -// s.mu must be locked when calling handleProcessExit -func (s *service) handleProcessExit(e runcC.Exit, c *runc.Container, p process.Process) { - if ip, ok := p.(*process.Init); ok { - // Ensure all children are killed - if runc.ShouldKillAllOnExit(s.context, c.Bundle) { - if err := ip.KillAll(s.context); err != nil { - logrus.WithError(err).WithField("id", ip.ID()). - Error("failed to kill init's children") - } +// handleInitExit processes container init process exits. +// This is handled separately from non-init exits, because there +// are some extra invariants we want to ensure in this case, namely: +// - for a given container, the init process exit MUST be the last exit published +// This is achieved by: +// - killing all running container processes (if the container has a shared pid +// namespace, otherwise all other processes have been reaped already). +// - waiting for the container's running exec counter to reach 0. +// - finally, publishing the init exit. +func (s *service) handleInitExit(e runcC.Exit, c *runc.Container, p *process.Init) { + // kill all running container processes + if runc.ShouldKillAllOnExit(s.context, c.Bundle) { + if err := p.KillAll(s.context); err != nil { + logrus.WithError(err).WithField("id", p.ID()). + Error("failed to kill init's children") } } + s.lifecycleMu.Lock() + numRunningExecs := s.runningExecs[c] + if numRunningExecs == 0 { + delete(s.runningExecs, c) + s.lifecycleMu.Unlock() + s.handleProcessExit(e, c, p) + return + } + + events := make(chan int, numRunningExecs) + s.execCountSubscribers[c] = events + + s.lifecycleMu.Unlock() + + go func() { + defer func() { + s.lifecycleMu.Lock() + defer s.lifecycleMu.Unlock() + delete(s.execCountSubscribers, c) + delete(s.runningExecs, c) + }() + + // wait for running processes to exit + for { + if runningExecs := <-events; runningExecs == 0 { + break + } + } + + // all running processes have exited now, and no new + // ones can start, so we can publish the init exit + s.handleProcessExit(e, c, p) + }() +} + +func (s *service) handleProcessExit(e runcC.Exit, c *runc.Container, p process.Process) { p.SetExited(e.Status) s.send(&eventstypes.TaskExit{ ContainerID: c.ID, @@ -726,6 +750,14 @@ func (s *service) handleProcessExit(e runcC.Exit, c *runc.Container, p process.P ExitStatus: uint32(e.Status), ExitedAt: protobuf.ToTimestamp(p.ExitedAt()), }) + if _, init := p.(*process.Init); !init { + s.lifecycleMu.Lock() + s.runningExecs[c]-- + if ch, ok := s.execCountSubscribers[c]; ok { + ch <- s.runningExecs[c] + } + s.lifecycleMu.Unlock() + } } func (s *service) getContainerPids(ctx context.Context, container *runc.Container) ([]uint32, error) {