Skip to content

Commit

Permalink
fix: Some flakyness in windows-agent and wsl-pro-service (#873)
Browse files Browse the repository at this point in the history
## Addressing windows-agent failures

The tests that fail often are contained in
`windows-agent/internal/distros/worker/worker_test.go`. they all show
the same pattern:
1. worker.SubmitTasks submits an immediate task.
2. the already running goroutine processing tasks is expected to be
notified and dequeue that task.
3. require.Eventually asserts the side effects of that task.

I could find half explanation for the failure mode observed: 

```
time="2024-07-26T15:14:24Z" level=debug msg="Distro \"testDistro_UP4W_TestTaskProcessing_Success_executing_a_task_7763261449570405205\": starting task processing"
time="2024-07-26T15:14:24Z" level=info msg="Distro \"testDistro_UP4W_TestTaskProcessing_Success_executing_a_task_7763261449570405205\": Submitting tasks [\"Test task\"] to queue"
time="2024-07-26T15:14:25Z" level=debug msg="Distro \"testDistro_UP4W_TestTaskProcessing_Success_executing_a_task_7763261449570405205\": stopping task processing"
```

The second log line confirms a synchronous call is made to
worker.Submit, and the failed assertion proves it returned. Since it's
synchronous the task was delivered.

Processing is asynchronous. The goroutine is running (maybe paused, but
that's up to the runtime). If the task was dequeued we'd see this line
in the log:

```
time="2024-07-26T15:14:23Z" level=debug msg="Distro \"testDistro_UP4W_TestTaskProcessing_Success_executing_a_task_7763261449570405205\": starting task \"Test task\""
```

If we augment the task queue Push and Pull methods with more logs to
observe how submission and dequeuing behave, we find an interesting
pattern: the writer arrives to the channel earlier than the reader when
that failure mode happens.

```diff
diff --git a/windows-agent/internal/distros/worker/task_queue.go b/windows-agent/internal/distros/worker/task_queue.go
index 74f2b9bf..cf9f08b9 100644
--- a/windows-agent/internal/distros/worker/task_queue.go
+++ b/windows-agent/internal/distros/worker/task_queue.go
@@ -4,6 +4,7 @@ import (
        "context"
        "sync"

+       log "github.com/canonical/ubuntu-pro-for-wsl/common/grpc/logstreamer"
        "github.com/canonical/ubuntu-pro-for-wsl/windows-agent/internal/distros/task"
 )

@@ -87,8 +88,10 @@ func (q *taskQueue) Push(t task.Task) {
        q.data = append(q.data, t)

        // Notify waiters if there are any
+       log.Warningf(context.TODO(), "+++++++++ NOTIFYING AWAITERS OF TASKS %v on channel", t)
        select {
        case q.wait <- struct{}{}:
+               log.Warningf(context.TODO(), "+++++++++ NOTIFY COMPLETE %v", t)
        default:
        }
 }
@@ -155,14 +158,16 @@ func (q *taskQueue) Pull(ctx context.Context) task.Task {
                if task, ok := q.tryPopFront(); ok {
                        return task
                }

                q.mu.RLock()
                // This is mostly to appease the race detector
                wait := q.wait
+               log.Warning(ctx, "+++++++++ Pull: Waiting on the channel")
                q.mu.RUnlock()

                select {
                case <-ctx.Done():
+                       log.Warning(ctx, "+++++++++ Pull: inside the loop CTX DONE")
                        return nil
                case <-wait:
                        // ↑
@@ -170,6 +175,7 @@ func (q *taskQueue) Pull(ctx context.Context) task.Task {
                        // | only entry in the queue. Or an empty Load could
                        // | leave an empty "data" behind.
                        // ↓
+                       log.Warning(ctx, "+++++++++ TRY POPPING after the lock")
                        if task, ok := q.tryPopFront(); ok {
                                return task
                        }
```

I couldn't understand why the reader would arrive later, since the
goroutine supposed to process incoming tasks starts earlier in the test
cases, but somehow sometimes it only blocks on the channel after the
writer attempted to write into and failed. I'd be surprised if the
attempt to acquire a read lock inside the Pull method delayed it that
much. The channel is unbuffered and the select-default statement
prevents the writter from blocking if there are no readers.

```
=== RUN   TestTaskProcessing/Success_executing_a_task
=== PAUSE TestTaskProcessing/Success_executing_a_task
=== CONT  TestTaskProcessing/Success_executing_a_task
time="2024-08-28T15:26:15-03:00" level=debug msg="Distro \"testDistro_UP4W_TestTaskProcessing_Success_executing_a_task_2931794278530498729\": starting task processing"
time="2024-08-28T15:26:15-03:00" level=info msg="Distro \"testDistro_UP4W_TestTaskProcessing_Success_executing_a_task_2931794278530498729\": Submitting tasks [\"Test task\"] to queue"
time="2024-08-28T15:26:15-03:00" level=warning msg="+++++++++ NOTIFYING AWAITERS OF TASKS Test task on channel"
time="2024-08-28T15:26:15-03:00" level=warning msg="+++++++++ Pull: Waiting on the channel"
    worker_test.go:206:
                Error Trace:    D:/UP4W/cloudinit/windows-agent/internal/distros/worker/worker_test.go:206
                Error:          Condition never satisfied
                Test:           TestTaskProcessing/Success_executing_a_task
                Messages:       distro should have been "Running" after SubmitTask(). Current state is "Stopped"
time="2024-08-28T15:26:20-03:00" level=debug msg="Distro \"testDistro_UP4W_TestTaskProcessing_Success_executing_a_task_2931794278530498729\": stopping task processing"
time="2024-08-28T15:26:20-03:00" level=warning msg="+++++++++ Pull: inside the loop CTX DONE"
--- FAIL: TestTaskProcessing (0.00s)
    --- FAIL: TestTaskProcessing/Success_executing_a_task (5.02s)
```

Removing the select-default is not an option, test cases would remain
blocked until timeout. In production that's less likely to happen, but
still undesirable.

I opted into making the `q.wait` channel buffered, so some ammount of
writters will succeed without blocking, giving the reader more chances
to reach the channel and subsequent writers should have more chances of
success as well.
I don't have a good explanation for the exact number of slots the
channel should have.

I ran the worker tests 1367 times in my machine without a single
failure, so this seems to be a suitable fix.

`go test -v .\windows-agent\internal\distros\worker\ -tags=gowslmock
-count=1367 -failfast -timeout=120m`. :)

Before that I was experimenting failures quite often when running with
`--count=200` with any of the `TestTaskProcessing` or
`TestTaskDeduplication` test cases.

---

## wsl-pro-service

There is still one test failing sometimes:
`wsl-pro-service/internal/streams/server_test.go`. The test assumes that
when `server.Stop()` is called the gorountine serving the multistream
service will exit with an error. It seems that the current
implementation of the internal `handlingLoop[Command].run()` method
connects the context objects used in the RPCs in a way that allows for
its loop to interpret a `server.Stop()` as a graceful cancellation.

This method derives two context ojects mirroring the structure found in
the Server type: `gCtx` for graceful stop and `ctx` for "brute forcing".
Those local context objects added complexity and it seems we can get rid
of them.

Additionally, there was an error in the server constructor `NewServer`
causing the `gracefulCtx` to be a child of the `ctx` due a shadow
declaration.

Per my current understanding this change in wsl-pro-service seems to not
affect its runtime behaviour apart from responding to force-exit
internal requests.

---
UDENG-3311
  • Loading branch information
CarlosNihelton authored Aug 29, 2024
2 parents 7b95165 + 4ed6408 commit 1b8dea7
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 30 deletions.
15 changes: 11 additions & 4 deletions windows-agent/internal/distros/worker/task_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,17 @@ type taskQueue struct {
data []task.Task
}

// newWaitChannel creates a channel to notify waiters of new tasks.
func newWaitChannel() chan struct{} {
// Tests has shown that's possible to have writers reaching the channel before any reader, thus the notification may never arrive.
// The amount of writers is a best-guess at this moment and may be adjusted.
return make(chan struct{}, 4)
}

func newTaskQueue() *taskQueue {
return &taskQueue{
mu: sync.RWMutex{},
wait: make(chan struct{}),
wait: newWaitChannel(),
data: make([]task.Task, 0),
}
}
Expand All @@ -35,7 +42,7 @@ func (q *taskQueue) Load(newData []task.Task) {
defer q.mu.Unlock()

close(q.wait)
q.wait = make(chan struct{})
q.wait = newWaitChannel()

q.data = newData
}
Expand All @@ -51,11 +58,11 @@ func (q *taskQueue) Absorb(other *taskQueue) {
transferedData := other.data

close(other.wait)
other.wait = make(chan struct{})
other.wait = newWaitChannel()
other.data = make([]task.Task, 0)

close(q.wait)
q.wait = make(chan struct{})
q.wait = newWaitChannel()
q.data = append(q.data, transferedData...)
}

Expand Down
6 changes: 3 additions & 3 deletions windows-agent/internal/distros/worker/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func TestTaskProcessing(t *testing.T) {
wantState = "Unregistered"
}

require.Eventuallyf(t, func() bool { return d.state() == wantState }, distroWakeUpTime, 200*time.Millisecond,
require.Eventuallyf(t, func() bool { return d.state() == wantState }, 5*distroWakeUpTime, 500*time.Millisecond,
"distro should have been %q after SubmitTask(). Current state is %q", wantState, d.state())

// Testing task before an active connection is established
Expand Down Expand Up @@ -424,7 +424,7 @@ func TestTaskDeferral(t *testing.T) {
// delete+write: testutils.ReplaceFileWithDir
require.Eventually(t, func() bool {
return w.CheckTotalTaskCount(1) == nil
}, time.Second, 100*time.Millisecond, "Setup: Blocking task was never popped from queue")
}, 5*time.Second, 500*time.Millisecond, "Setup: Blocking task was never popped from queue")

testutils.ReplaceFileWithDir(t, taskFile, "Setup: could not replace task file with dir to interfere with SubmitDeferredTasks")
}
Expand Down Expand Up @@ -514,7 +514,7 @@ func TestTaskDeduplication(t *testing.T) {

err = w.SubmitTasks(blocker)
require.NoError(t, err, "SubmitTasks should return no error")
require.Eventually(t, blocker.executing.Load, 5*time.Second, 100*time.Millisecond, "Blocker task was never dequeued")
require.Eventually(t, blocker.executing.Load, 5*time.Second, 500*time.Millisecond, "Blocker task was never dequeued")

// Unique task: normal submission
err = w.SubmitTasks(task1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -880,7 +880,8 @@ func TestNotifyConfigUpdate(t *testing.T) {
distroName, _ := wsltestutils.RegisterDistro(t, ctx, true)
d, err := db.GetDistroAndUpdateProperties(ctx, distroName, distro.Properties{})
require.NoError(t, err, "Setup: distro %s GetDistroAndUpdateProperties should return no errors", distroName)
defer d.Cleanup(ctx)
// Prevents the distro's worker to dequeue tasks, otherwise we race, as we attempt to read pending tasks.
d.Cleanup(ctx)
}

var cloudInit mockCloudInit
Expand Down Expand Up @@ -944,7 +945,8 @@ func TestNotifyConfigUpdateWithAgentYaml(t *testing.T) {
distroName, _ := wsltestutils.RegisterDistro(t, ctx, true)
d, err := db.GetDistroAndUpdateProperties(ctx, distroName, distro.Properties{})
require.NoError(t, err, "Setup: distro %s GetDistroAndUpdateProperties should return no errors", distroName)
defer d.Cleanup(ctx)
// Let's stop the distro right away so it's worker don't race with us for the tasks file.
d.Cleanup(ctx)

homedir := t.TempDir()
c := config.New(ctx, storageDir)
Expand Down
35 changes: 14 additions & 21 deletions wsl-pro-service/internal/streams/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,11 @@ type Server struct {

done chan struct{}

// This context will be the parent of the streams's context
ctx context.Context
cancel context.CancelFunc

// This context will be used for graceful stopping the server, i.e. waiting the streams to finish their current activities.
gracefulCtx context.Context
gracefulCancel context.CancelFunc
}
Expand Down Expand Up @@ -60,15 +62,16 @@ func (err SystemError) Is(e error) bool {

// NewServer creates a new Server.
func NewServer(ctx context.Context, sys *system.System, conn *grpc.ClientConn) *Server {
ctx, cancel := context.WithCancel(ctx)
fCtx, cancel := context.WithCancel(ctx)
gCtx, gCancel := context.WithCancel(ctx)

s := &Server{
conn: conn,
system: sys,
done: make(chan struct{}),

ctx: ctx,
// the stream context will be a child of forcequit context and will thus be cancelled with it.
ctx: fCtx,
cancel: cancel,

gracefulCtx: gCtx,
Expand All @@ -81,13 +84,15 @@ func NewServer(ctx context.Context, sys *system.System, conn *grpc.ClientConn) *
// Stop stops the server and the underlying connection immediately.
// It blocks until the server finishes its teardown.
func (s *Server) Stop() {
// Since this cancellation also cancels the streams's context, this should be an immediate stop.
s.cancel()
<-s.done
}

// GracefulStop stops the server as soon as all active unary calls finish.
// It blocks until the server finishes its teardown.
func (s *Server) GracefulStop() {
// Since this cancellation won't affect the streams directly, it allows the streams to finish their current activities before stopping the server loop.
s.gracefulCancel()
<-s.done
}
Expand Down Expand Up @@ -177,26 +182,21 @@ type handlingLoop[Command any] struct {
}

func (h *handlingLoop[Command]) run(s *Server, client *multiClient) error {
// Use this context to log onto the stream, and to cancel with server.Stop
ctx, cancel := cancelWith(h.stream.Context(), s.ctx)
defer cancel()

// Use this context to log onto the stream, but cancel with server.GracefulStop
gCtx, cancel := cancelWith(ctx, s.gracefulCtx)
defer cancel()

// We deliberately use the stream's context for logging, running the handler callback and acquiring system info.
ctx := h.stream.Context()
for {
// Graceful stop
select {
case <-gCtx.Done():
case <-s.gracefulCtx.Done():
log.Debugf(ctx, "Stopping serving %s requests", reflect.TypeFor[Command]())
return nil
default:
}

log.Debugf(ctx, "Started serving %s requests", reflect.TypeFor[Command]())

// Handle a single command
msg, ok, err := receiveWithContext(gCtx, h.stream.Recv)
// Handle a single command responsive to the cancellation of s.gracefulCtx.
msg, ok, err := receiveWithContext(s.gracefulCtx, h.stream.Recv)
if err != nil {
return fmt.Errorf("could not receive ProAttachCmd: %w", err)
} else if !ok {
Expand All @@ -217,18 +217,11 @@ func (h *handlingLoop[Command]) run(s *Server, client *multiClient) error {
}

if err = client.SendInfo(info); err != nil {
log.Warningf(ctx, "Streamserver: could not stream back info after command completion")
log.Warningf(ctx, "Streamserver: could not stream back info after command completion: %v", err)
}
}
}

// cancelWith creates a child context that is cancelled when with is done.
func cancelWith(ctx, with context.Context) (context.Context, context.CancelFunc) {
ctx, cancel := context.WithCancel(ctx)
context.AfterFunc(with, cancel)
return ctx, cancel
}

// Receive with context calls the recv receiver asyncronously.
// Returns (message, message error) if recv returned.
// Returns (nil, context error) if the context was cancelled.
Expand Down

0 comments on commit 1b8dea7

Please sign in to comment.