Skip to content

Commit

Permalink
DATA-3338-fix-stability-of-vision-capture-all-from-camera
Browse files Browse the repository at this point in the history
  • Loading branch information
nicksanford committed Nov 14, 2024
1 parent a3262bf commit 2ccf1f4
Show file tree
Hide file tree
Showing 33 changed files with 1,502 additions and 885 deletions.
25 changes: 15 additions & 10 deletions components/arm/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package arm
import (
"context"
"errors"
"time"

v1 "go.viam.com/api/common/v1"
pb "go.viam.com/api/component/arm/v1"
Expand Down Expand Up @@ -39,18 +40,20 @@ func newEndPositionCollector(resource interface{}, params data.CollectorParams)
return nil, err
}

cFunc := data.CaptureFunc(func(ctx context.Context, _ map[string]*anypb.Any) (interface{}, error) {
cFunc := data.CaptureFunc(func(ctx context.Context, _ map[string]*anypb.Any) (data.CaptureResult, error) {
timeRequested := time.Now()
var res data.CaptureResult
v, err := arm.EndPosition(ctx, data.FromDMExtraMap)
if err != nil {
// A modular filter component can be created to filter the readings from a component. The error ErrNoCaptureToStore
// is used in the datamanager to exclude readings from being captured and stored.
if errors.Is(err, data.ErrNoCaptureToStore) {
return nil, err
return res, err
}
return nil, data.FailedToReadErr(params.ComponentName, endPosition.String(), err)
return res, data.FailedToReadErr(params.ComponentName, endPosition.String(), err)
}
o := v.Orientation().OrientationVectorDegrees()
return pb.GetEndPositionResponse{
return data.NewTabularCaptureResult(timeRequested, pb.GetEndPositionResponse{
Pose: &v1.Pose{
X: v.Point().X,
Y: v.Point().Y,
Expand All @@ -60,7 +63,7 @@ func newEndPositionCollector(resource interface{}, params data.CollectorParams)
OZ: o.OZ,
Theta: o.Theta,
},
}, nil
})
})
return data.NewCollector(cFunc, params)
}
Expand All @@ -73,21 +76,23 @@ func newJointPositionsCollector(resource interface{}, params data.CollectorParam
return nil, err
}

cFunc := data.CaptureFunc(func(ctx context.Context, _ map[string]*anypb.Any) (interface{}, error) {
cFunc := data.CaptureFunc(func(ctx context.Context, _ map[string]*anypb.Any) (data.CaptureResult, error) {
timeRequested := time.Now()
var res data.CaptureResult
v, err := arm.JointPositions(ctx, data.FromDMExtraMap)
if err != nil {
// A modular filter component can be created to filter the readings from a component. The error ErrNoCaptureToStore
// is used in the datamanager to exclude readings from being captured and stored.
if errors.Is(err, data.ErrNoCaptureToStore) {
return nil, err
return res, err
}
return nil, data.FailedToReadErr(params.ComponentName, jointPositions.String(), err)
return res, data.FailedToReadErr(params.ComponentName, jointPositions.String(), err)
}
jp, err := referenceframe.JointPositionsFromInputs(arm.ModelFrame(), v)
if err != nil {
return nil, data.FailedToReadErr(params.ComponentName, jointPositions.String(), err)
return res, data.FailedToReadErr(params.ComponentName, jointPositions.String(), err)
}
return pb.GetJointPositionsResponse{Positions: jp}, nil
return data.NewTabularCaptureResult(timeRequested, pb.GetJointPositionsResponse{Positions: jp})
})
return data.NewCollector(cFunc, params)
}
Expand Down
71 changes: 44 additions & 27 deletions components/arm/collectors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ import (
"testing"
"time"

clk "github.com/benbjohnson/clock"
"github.com/benbjohnson/clock"
"github.com/golang/geo/r3"
v1 "go.viam.com/api/common/v1"
datasyncpb "go.viam.com/api/app/datasync/v1"
pb "go.viam.com/api/component/arm/v1"
"go.viam.com/test"
"google.golang.org/protobuf/types/known/structpb"

"go.viam.com/rdk/components/arm"
"go.viam.com/rdk/data"
Expand All @@ -22,50 +23,71 @@ import (

const (
componentName = "arm"
captureInterval = time.Second
numRetries = 5
captureInterval = time.Millisecond
)

var floatList = &pb.JointPositions{Values: []float64{1.0, 2.0, 3.0}}

func TestCollectors(t *testing.T) {
l, err := structpb.NewList([]any{1.0, 2.0, 3.0})
test.That(t, err, test.ShouldBeNil)

tests := []struct {
name string
collector data.CollectorConstructor
expected map[string]any
expected *datasyncpb.SensorData
}{
{
name: "End position collector should write a pose",
collector: arm.NewEndPositionCollector,
expected: tu.ToProtoMapIgnoreOmitEmpty(pb.GetEndPositionResponse{
Pose: &v1.Pose{
OX: 0,
OY: 0,
OZ: 1,
Theta: 0,
X: 1,
Y: 2,
Z: 3,
},
}),
expected: &datasyncpb.SensorData{
Metadata: &datasyncpb.SensorMetadata{},
Data: &datasyncpb.SensorData_Struct{Struct: &structpb.Struct{
Fields: map[string]*structpb.Value{
"pose": structpb.NewStructValue(&structpb.Struct{
Fields: map[string]*structpb.Value{
"o_x": structpb.NewNumberValue(0),
"o_y": structpb.NewNumberValue(0),
"o_z": structpb.NewNumberValue(1),
"theta": structpb.NewNumberValue(0),
"x": structpb.NewNumberValue(1),
"y": structpb.NewNumberValue(2),
"z": structpb.NewNumberValue(3),
},
}),
},
}},
},
},
{
name: "Joint positions collector should write a list of positions",
collector: arm.NewJointPositionsCollector,
expected: tu.ToProtoMapIgnoreOmitEmpty(pb.GetJointPositionsResponse{Positions: floatList}),
expected: &datasyncpb.SensorData{
Metadata: &datasyncpb.SensorMetadata{},
Data: &datasyncpb.SensorData_Struct{Struct: &structpb.Struct{
Fields: map[string]*structpb.Value{
"positions": structpb.NewStructValue(&structpb.Struct{
Fields: map[string]*structpb.Value{"values": structpb.NewListValue(l)},
}),
},
}},
},
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
mockClock := clk.NewMock()
buf := tu.MockBuffer{}
start := time.Now()
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
buf := tu.NewMockBuffer(ctx)
params := data.CollectorParams{
DataType: data.CaptureTypeTabular,
ComponentName: componentName,
Interval: captureInterval,
Logger: logging.NewTestLogger(t),
Clock: mockClock,
Target: &buf,
Clock: clock.New(),
Target: buf,
}

arm := newArm()
Expand All @@ -74,13 +96,8 @@ func TestCollectors(t *testing.T) {

defer col.Close()
col.Collect()
mockClock.Add(captureInterval)

tu.Retry(func() bool {
return buf.Length() != 0
}, numRetries)
test.That(t, buf.Length(), test.ShouldBeGreaterThan, 0)
test.That(t, buf.Writes[0].GetStruct().AsMap(), test.ShouldResemble, tc.expected)
tu.CheckMockBufferWrites(t, ctx, start, buf.TabularWrites, tc.expected)
})
}
}
Expand Down
30 changes: 18 additions & 12 deletions components/board/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package board

import (
"context"
"time"

"github.com/pkg/errors"
pb "go.viam.com/api/component/board/v1"
Expand Down Expand Up @@ -39,10 +40,12 @@ func newAnalogCollector(resource interface{}, params data.CollectorParams) (data
return nil, err
}

cFunc := data.CaptureFunc(func(ctx context.Context, arg map[string]*anypb.Any) (interface{}, error) {
cFunc := data.CaptureFunc(func(ctx context.Context, arg map[string]*anypb.Any) (data.CaptureResult, error) {
timeRequested := time.Now()
var res data.CaptureResult
var analogValue AnalogValue
if _, ok := arg[analogReaderNameKey]; !ok {
return nil, data.FailedToReadErr(params.ComponentName, analogs.String(),
return res, data.FailedToReadErr(params.ComponentName, analogs.String(),
errors.New("Must supply reader_name in additional_params for analog collector"))
}
if reader, err := board.AnalogByName(arg[analogReaderNameKey].String()); err == nil {
Expand All @@ -51,17 +54,18 @@ func newAnalogCollector(resource interface{}, params data.CollectorParams) (data
// A modular filter component can be created to filter the readings from a component. The error ErrNoCaptureToStore
// is used in the datamanager to exclude readings from being captured and stored.
if errors.Is(err, data.ErrNoCaptureToStore) {
return nil, err
return res, err
}
return nil, data.FailedToReadErr(params.ComponentName, analogs.String(), err)
return res, data.FailedToReadErr(params.ComponentName, analogs.String(), err)
}
}
return pb.ReadAnalogReaderResponse{

return data.NewTabularCaptureResult(timeRequested, pb.ReadAnalogReaderResponse{
Value: int32(analogValue.Value),
MinRange: analogValue.Min,
MaxRange: analogValue.Max,
StepSize: analogValue.StepSize,
}, nil
})
})
return data.NewCollector(cFunc, params)
}
Expand All @@ -74,10 +78,12 @@ func newGPIOCollector(resource interface{}, params data.CollectorParams) (data.C
return nil, err
}

cFunc := data.CaptureFunc(func(ctx context.Context, arg map[string]*anypb.Any) (interface{}, error) {
cFunc := data.CaptureFunc(func(ctx context.Context, arg map[string]*anypb.Any) (data.CaptureResult, error) {
timeRequested := time.Now()
var res data.CaptureResult
var value bool
if _, ok := arg[gpioPinNameKey]; !ok {
return nil, data.FailedToReadErr(params.ComponentName, gpios.String(),
return res, data.FailedToReadErr(params.ComponentName, gpios.String(),
errors.New("Must supply pin_name in additional params for gpio collector"))
}
if gpio, err := board.GPIOPinByName(arg[gpioPinNameKey].String()); err == nil {
Expand All @@ -86,14 +92,14 @@ func newGPIOCollector(resource interface{}, params data.CollectorParams) (data.C
// A modular filter component can be created to filter the readings from a component. The error ErrNoCaptureToStore
// is used in the datamanager to exclude readings from being captured and stored.
if errors.Is(err, data.ErrNoCaptureToStore) {
return nil, err
return res, err
}
return nil, data.FailedToReadErr(params.ComponentName, gpios.String(), err)
return res, data.FailedToReadErr(params.ComponentName, gpios.String(), err)
}
}
return pb.GetGPIOResponse{
return data.NewTabularCaptureResult(timeRequested, pb.GetGPIOResponse{
High: value,
}, nil
})
})
return data.NewCollector(cFunc, params)
}
Expand Down
67 changes: 36 additions & 31 deletions components/board/collectors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@ import (
"testing"
"time"

clk "github.com/benbjohnson/clock"
"github.com/benbjohnson/clock"
"github.com/golang/protobuf/ptypes/wrappers"
pb "go.viam.com/api/component/board/v1"
datasyncpb "go.viam.com/api/app/datasync/v1"
"go.viam.com/test"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/structpb"

"go.viam.com/rdk/components/board"
"go.viam.com/rdk/data"
Expand All @@ -22,22 +23,20 @@ import (

const (
componentName = "board"
captureInterval = time.Second
numRetries = 5
captureInterval = time.Millisecond
)

func TestCollectors(t *testing.T) {
tests := []struct {
name string
params data.CollectorParams
collector data.CollectorConstructor
expected map[string]any
shouldError bool
expectedError error
name string
params data.CollectorParams
collector data.CollectorConstructor
expected *datasyncpb.SensorData
}{
{
name: "Board analog collector should write an analog response",
params: data.CollectorParams{
DataType: data.CaptureTypeTabular,
ComponentName: componentName,
Interval: captureInterval,
Logger: logging.NewTestLogger(t),
Expand All @@ -46,17 +45,22 @@ func TestCollectors(t *testing.T) {
},
},
collector: board.NewAnalogCollector,
expected: tu.ToProtoMapIgnoreOmitEmpty(pb.ReadAnalogReaderResponse{
Value: 1,
MinRange: 0,
MaxRange: 10,
StepSize: 0.1,
}),
shouldError: false,
expected: &datasyncpb.SensorData{
Metadata: &datasyncpb.SensorMetadata{},
Data: &datasyncpb.SensorData_Struct{Struct: &structpb.Struct{
Fields: map[string]*structpb.Value{
"value": structpb.NewNumberValue(1),
"min_range": structpb.NewNumberValue(0),
"max_range": structpb.NewNumberValue(10),
"step_size": structpb.NewNumberValue(float64(float32(0.1))),
},
}},
},
},
{
name: "Board gpio collector should write a gpio response",
params: data.CollectorParams{
DataType: data.CaptureTypeTabular,
ComponentName: componentName,
Interval: captureInterval,
Logger: logging.NewTestLogger(t),
Expand All @@ -65,33 +69,34 @@ func TestCollectors(t *testing.T) {
},
},
collector: board.NewGPIOCollector,
expected: tu.ToProtoMapIgnoreOmitEmpty(pb.GetGPIOResponse{
High: true,
}),
shouldError: false,
expected: &datasyncpb.SensorData{
Metadata: &datasyncpb.SensorMetadata{},
Data: &datasyncpb.SensorData_Struct{Struct: &structpb.Struct{
Fields: map[string]*structpb.Value{
"high": structpb.NewBoolValue(true),
},
}},
},
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
mockClock := clk.NewMock()
buf := tu.MockBuffer{}
tc.params.Clock = mockClock
tc.params.Target = &buf
start := time.Now()
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
buf := tu.NewMockBuffer(ctx)
tc.params.Clock = clock.New()
tc.params.Target = buf

board := newBoard()
col, err := tc.collector(board, tc.params)
test.That(t, err, test.ShouldBeNil)

defer col.Close()
col.Collect()
mockClock.Add(captureInterval)

tu.Retry(func() bool {
return buf.Length() != 0
}, numRetries)
test.That(t, buf.Length(), test.ShouldBeGreaterThan, 0)
test.That(t, buf.Writes[0].GetStruct().AsMap(), test.ShouldResemble, tc.expected)
tu.CheckMockBufferWrites(t, ctx, start, buf.TabularWrites, tc.expected)
})
}
}
Expand Down
Loading

0 comments on commit 2ccf1f4

Please sign in to comment.