Skip to content

Commit

Permalink
Cancel goal when WatchGoal context is canceled
Browse files Browse the repository at this point in the history
  • Loading branch information
lassilaiho committed Jan 31, 2024
1 parent b0c3e91 commit 56b24e1
Show file tree
Hide file tree
Showing 9 changed files with 137 additions and 32 deletions.
7 changes: 7 additions & 0 deletions internal/msgs/action_msgs/srv/CancelGoal_Request.gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 6 additions & 5 deletions internal/msgs/example_interfaces/action/Fibonacci.gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 6 additions & 5 deletions internal/msgs/test_msgs/action/Fibonacci.gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 6 additions & 5 deletions internal/msgs/test_msgs/action/NestedMessage.gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 15 additions & 6 deletions pkg/gogen/templates.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 21 additions & 7 deletions pkg/rclgo/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -913,25 +913,39 @@ func (c *ActionClient) Node() *Node {
//
// The type support of the message passed to onFeedback is
// types.ActionTypeSupport.FeedbackMessage().
func (c *ActionClient) WatchGoal(ctx context.Context, goal types.Message, onFeedback FeedbackHandler) (types.Message, error) {
func (c *ActionClient) WatchGoal(ctx context.Context, goal types.Message, onFeedback FeedbackHandler) (result types.Message, goalID *types.GoalID, retErr error) {
req, err := c.newSendGoalRequest(goal)
if err != nil {
return nil, err
return nil, nil, err
}
if onFeedback != nil {
ctx, cancel := context.WithCancel(ctx)
var cancel context.CancelFunc
ctx, cancel = context.WithCancel(ctx)
defer cancel()
unsub := c.subscribe(ctx, &c.feedbackSubs, req.GetGoalID(), onFeedback)
defer unsub()
}
resp, err := c.SendGoalRequest(ctx, req)
if err != nil {
return nil, err
return nil, req.GetGoalID(), err
}
if !resp.(goalResponseMessage).GetGoalAccepted() {
return nil, errors.New("goal was rejected")
}
return c.GetResult(ctx, req.GetGoalID())
return nil, req.GetGoalID(), errors.New("goal was rejected")
}
defer func() {
if ctx.Err() != nil {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
cancelReq := c.typeSupport.CancelGoal().Request().New()
cancelReq.(goalIDMessage).SetGoalID(req.GetGoalID())
_, err := c.CancelGoal(ctx, cancelReq) //nolint:contextcheck
if err != nil {
retErr = errors.Join(err, retErr)
}
}
}()
result, err = c.GetResult(ctx, req.GetGoalID())
return result, req.GetGoalID(), err
}

// SendGoal sends a new goal to the server and returns the status message of the
Expand Down
64 changes: 62 additions & 2 deletions pkg/rclgo/action_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/bradleyjkemp/cupaloy/v2"
. "github.com/smartystreets/goconvey/convey" //nolint:revive
"github.com/stretchr/testify/require"
action_msgs_msg "github.com/tiiuae/rclgo/internal/msgs/action_msgs/msg"
action_msgs_srv "github.com/tiiuae/rclgo/internal/msgs/action_msgs/srv"
test_msgs_action "github.com/tiiuae/rclgo/internal/msgs/test_msgs/action"
Expand Down Expand Up @@ -154,7 +155,7 @@ func TestActionExecution(t *testing.T) {
goal.Order = 10
feedbacks := make(fibonacciFeedbacks, 0)
var feedbacksMu sync.Mutex
result, err := client.WatchGoal(ctx, goal, func(c context.Context, m types.Message) {
result, _, err := client.WatchGoal(ctx, goal, func(c context.Context, m types.Message) {
fb := m.(*test_msgs_action.Fibonacci_FeedbackMessage)
feedbacksMu.Lock()
feedbacks = append(feedbacks, &fb.Feedback)
Expand All @@ -179,7 +180,7 @@ func TestActionExecution(t *testing.T) {
close(action.continueChan)
goal := test_msgs_action.NewFibonacci_Goal()
goal.Order = -1
resp, err := client.WatchGoal(ctx, goal, func(c context.Context, m types.Message) {
resp, _, err := client.WatchGoal(ctx, goal, func(c context.Context, m types.Message) {
panic("no feedback should be sent")
})
So(err, ShouldNotBeNil)
Expand Down Expand Up @@ -292,6 +293,65 @@ func TestActionCanceling(t *testing.T) {
})
}

func TestWatchGoalCanceling(t *testing.T) {
_, cancelingAction := newWaitAction()
var (
clientCtx, clientCancel = context.WithCancel(context.Background())
serverCtx, serverCancel = context.WithCancel(context.Background())

rclctx *rclgo.Context
err error
)
defer func() {
clientCancel()
serverCancel()
if rclctx != nil {
rclctx.Close()
}
}()
rclctx, err = newDefaultRCLContext()
require.NoError(t, err)
serverNode, err := rclctx.NewNode("server", "actions_test")
require.NoError(t, err)
_, err = serverNode.NewActionServer("canceling", cancelingAction, actionServerOpts)
require.NoError(t, err)

clientNode, err := rclctx.NewNode("client", "actions_test")
require.NoError(t, err)
client, err := clientNode.NewActionClient("canceling", test_msgs_action.FibonacciTypeSupport, actionClientOpts)
require.NoError(t, err)

spinErrC := make(chan error, 1)
go func() { spinErrC <- rclctx.Spin(serverCtx) }()

goalErrC := make(chan error, 1)
var goalID *types.GoalID
go func() {
var goalErr error
_, goalID, goalErr = client.WatchGoal(clientCtx, test_msgs_action.NewFibonacci_Goal(), nil)
goalErrC <- goalErr
}()

time.Sleep(time.Second)
clientCancel()

err = waitChan(t, time.Second, goalErrC, "Waiting for goal watching to stop")
require.ErrorIs(t, err, context.Canceled)

resultCtx, resultCancel := context.WithTimeout(context.Background(), time.Second)
defer resultCancel()
resp, err := client.GetResult(resultCtx, goalID)
require.NoError(t, err)
require.NotNil(t, resp)
result := resp.(*test_msgs_action.Fibonacci_GetResult_Response)
require.Equal(t, rclgo.GoalCanceled, rclgo.GoalStatus(result.Status))

serverCancel()
err = waitChan(t, time.Second, spinErrC, "Waiting for spinning to stop")
require.Error(t, err)
require.NoError(t, rclctx.Close())
}

type goalStatus struct {
ID types.GoalID
Status rclgo.GoalStatus
Expand Down
4 changes: 2 additions & 2 deletions pkg/rclgo/example_action_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func ExampleActionClient() {
ctx := context.Background()
goal := example_interfaces_action.NewFibonacci_Goal()
goal.Order = 10
result, err := client.WatchGoal(ctx, goal, func(ctx context.Context, feedback types.Message) {
result, _, err := client.WatchGoal(ctx, goal, func(ctx context.Context, feedback types.Message) {
fmt.Println("Got feedback:", feedback)
})
if err != nil {
Expand Down Expand Up @@ -61,7 +61,7 @@ func ExampleActionClient_type_safe_wrapper() {
ctx := context.Background()
goal := example_interfaces_action.NewFibonacci_Goal()
goal.Order = 10
result, err := client.WatchGoal(ctx, goal, func(ctx context.Context, feedback *example_interfaces_action.Fibonacci_FeedbackMessage) {
result, _, err := client.WatchGoal(ctx, goal, func(ctx context.Context, feedback *example_interfaces_action.Fibonacci_FeedbackMessage) {
fmt.Println("Got feedback:", feedback)
})
if err != nil {
Expand Down
12 changes: 12 additions & 0 deletions pkg/rclgo/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,18 @@ func timeOut(timeoutMs int, f func(), testDescription string) {
}
}

func waitChan[T any](t *testing.T, timeout time.Duration, ch <-chan T, testDescription string) (recv T) {
t.Helper()
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
select {
case <-ctx.Done():
t.Fatalf("%s: timeout", testDescription)
case recv = <-ch:
}
return recv
}

func publishString(pub *rclgo.Publisher, s string) {
msg := std_msgs.NewString()
msg.Data = s
Expand Down

0 comments on commit 56b24e1

Please sign in to comment.