From 4284ac7d8c910358040f9d6145c5c2a3ce4d2f2c Mon Sep 17 00:00:00 2001 From: Nick Sanford Date: Thu, 31 Oct 2024 17:19:49 -0400 Subject: [PATCH] DATA-3338-fix-stability-of-vision-capture-all-from-camera --- data/capture_buffer.go | 54 +-- data/capture_file.go | 48 ++- data/collector.go | 330 +++++++++++++++--- data/registry.go | 1 + go.mod | 2 + go.sum | 2 - .../datamanager/builtin/capture/capture.go | 3 +- 7 files changed, 346 insertions(+), 94 deletions(-) diff --git a/data/capture_buffer.go b/data/capture_buffer.go index 0989efcae57..bc1fae5ac6f 100644 --- a/data/capture_buffer.go +++ b/data/capture_buffer.go @@ -8,7 +8,8 @@ import ( // CaptureBufferedWriter is a buffered, persistent queue of SensorData. type CaptureBufferedWriter interface { - Write(item *v1.SensorData) error + WriteBinary(items []*v1.SensorData) error + WriteTabular(items []*v1.SensorData) error Flush() error Path() string } @@ -32,27 +33,37 @@ func NewCaptureBuffer(dir string, md *v1.DataCaptureMetadata, maxCaptureFileSize } // Write writes item onto b. Binary sensor data is written to its own file. -// Tabular data is written to disk in maxCaptureFileSize sized files. Files that -// are still being written to are indicated with the extension -// InProgressFileExt. Files that have finished being written to are indicated by -// FileExt. -func (b *CaptureBuffer) Write(item *v1.SensorData) error { +// Files that are still being written to are indicated with the extension +// '.prog'. +// Files that have finished being written to are indicated by +// '.capture'. +func (b *CaptureBuffer) WriteBinary(items []*v1.SensorData) error { b.lock.Lock() defer b.lock.Unlock() - if item.GetBinary() != nil { - binFile, err := NewCaptureFile(b.Directory, b.MetaData) - if err != nil { - return err - } + binFile, err := NewCaptureFile(b.Directory, b.MetaData) + if err != nil { + return err + } + for _, item := range items { if err := binFile.WriteNext(item); err != nil { return err } - if err := binFile.Close(); err != nil { - return err - } - return nil } + if err := binFile.Close(); err != nil { + return err + } + return nil +} + +// Tabular data is written to disk in maxCaptureFileSize sized files. +// Files that are still being written to are indicated with the extension +// '.prog'. +// Files that have finished being written to are indicated by +// '.capture'. +func (b *CaptureBuffer) WriteTabular(items []*v1.SensorData) error { + b.lock.Lock() + defer b.lock.Unlock() if b.nextFile == nil { nextFile, err := NewCaptureFile(b.Directory, b.MetaData) @@ -60,10 +71,7 @@ func (b *CaptureBuffer) Write(item *v1.SensorData) error { return err } b.nextFile = nextFile - // We want to special case on "CaptureAllFromCamera" because it is sensor data that contains images - // and their corresponding annotations. We want each image and its annotations to be stored in a - // separate file. - } else if b.nextFile.Size() > b.maxCaptureFileSize || b.MetaData.MethodName == "CaptureAllFromCamera" { + } else if b.nextFile.Size() > b.maxCaptureFileSize { if err := b.nextFile.Close(); err != nil { return err } @@ -74,7 +82,13 @@ func (b *CaptureBuffer) Write(item *v1.SensorData) error { b.nextFile = nextFile } - return b.nextFile.WriteNext(item) + for _, item := range items { + if err := b.nextFile.WriteNext(item); err != nil { + return err + } + } + + return nil } // Flush flushes all buffered data to disk and marks any in progress file as complete. diff --git a/data/capture_file.go b/data/capture_file.go index 47b118e9926..2207de0a474 100644 --- a/data/capture_file.go +++ b/data/capture_file.go @@ -216,17 +216,17 @@ func BuildCaptureMetadata( additionalParams map[string]string, methodParams map[string]*anypb.Any, tags []string, -) *v1.DataCaptureMetadata { - dataType := getDataType(method) +) (*v1.DataCaptureMetadata, DataType) { + dataType := GetDataType(method) return &v1.DataCaptureMetadata{ ComponentType: compAPI.String(), ComponentName: compName, MethodName: method, - Type: dataType, + Type: dataType.ToProto(), MethodParameters: methodParams, - FileExtension: GetFileExt(dataType, method, additionalParams), + FileExtension: getFileExt(dataType, method, additionalParams), Tags: tags, - } + }, dataType } // IsDataCaptureFile returns whether or not f is a data capture file. @@ -240,25 +240,41 @@ func getFileTimestampName() string { return time.Now().Format(time.RFC3339Nano) } -// TODO DATA-246: Implement this in some more robust, programmatic way. -func getDataType(methodName string) v1.DataType { +type DataType int + +const ( + DataTypeUnspecified DataType = iota + DataTypeTabular + DataTypeBinary +) + +func (dt DataType) ToProto() v1.DataType { + switch dt { + case DataTypeTabular: + return v1.DataType_DATA_TYPE_TABULAR_SENSOR + case DataTypeBinary: + return v1.DataType_DATA_TYPE_BINARY_SENSOR + default: + return v1.DataType_DATA_TYPE_UNSPECIFIED + } +} + +func GetDataType(methodName string) DataType { switch methodName { case nextPointCloud, readImage, pointCloudMap, GetImages: - return v1.DataType_DATA_TYPE_BINARY_SENSOR + return DataTypeBinary default: - return v1.DataType_DATA_TYPE_TABULAR_SENSOR + return DataTypeTabular } } -// GetFileExt gets the file extension for a capture file. -func GetFileExt(dataType v1.DataType, methodName string, parameters map[string]string) string { +// getFileExt gets the file extension for a capture file. +func getFileExt(dataType DataType, methodName string, parameters map[string]string) string { defaultFileExt := "" switch dataType { - case v1.DataType_DATA_TYPE_TABULAR_SENSOR: + case DataTypeTabular: return ".dat" - case v1.DataType_DATA_TYPE_FILE: - return defaultFileExt - case v1.DataType_DATA_TYPE_BINARY_SENSOR: + case DataTypeBinary: if methodName == nextPointCloud { return ".pcd" } @@ -275,8 +291,6 @@ func GetFileExt(dataType v1.DataType, methodName string, parameters map[string]s return defaultFileExt } } - case v1.DataType_DATA_TYPE_UNSPECIFIED: - return defaultFileExt default: return defaultFileExt } diff --git a/data/collector.go b/data/collector.go index 948d74768b1..a9f31efceb2 100644 --- a/data/collector.go +++ b/data/collector.go @@ -5,15 +5,14 @@ package data import ( "context" "fmt" - "reflect" "sync" "time" "github.com/benbjohnson/clock" "github.com/pkg/errors" "go.opencensus.io/trace" - v1 "go.viam.com/api/app/datasync/v1" - pb "go.viam.com/api/common/v1" + dataPB "go.viam.com/api/app/data/v1" + datasyncPB "go.viam.com/api/app/datasync/v1" "go.viam.com/utils" "go.viam.com/utils/protoutils" "google.golang.org/grpc/codes" @@ -30,7 +29,185 @@ import ( var sleepCaptureCutoff = 2 * time.Millisecond // CaptureFunc allows the creation of simple Capturers with anonymous functions. -type CaptureFunc func(ctx context.Context, params map[string]*anypb.Any) (interface{}, error) +type CaptureFunc func(ctx context.Context, params map[string]*anypb.Any) (CaptureResult, error) + +type Timestamps struct { + TimeRequested time.Time + TimeReceived time.Time +} + +type MimeType int + +// This follows the mime types supported in https://github.com/viamrobotics/api/pull/571/files#diff-b77927298d8d5d5228beeea47bd0860d9b322b4f3ef45e129bc238ec17704826R75 +const ( + MimeTypeUnspecified MimeType = iota + MimeTypeImageJpeg + MimeTypeImagePng + MimeTypePcd +) + +type CaptureResultWithTimestamps struct { + CaptureResult CaptureResult + Timestamps Timestamps +} + +type CaptureResultType int + +const ( + CaptureResultTypeUnspecified CaptureResultType = iota + CaptureResultTypeTabular + CaptureResultTypeBinary +) + +type CaptureResult struct { + Type CaptureResultType + TabularData *TabularData + Binaries []Binary +} + +type Binary struct { + OverRideTimestamps *Timestamps + Payload []byte + MimeType MimeType + Annotations Annotations +} + +func (mt MimeType) ToProto() datasyncPB.MimeType { + switch mt { + case MimeTypeUnspecified: + return datasyncPB.MimeType_MIME_TYPE_UNSPECIFIED + case MimeTypeImageJpeg: + return datasyncPB.MimeType_MIME_TYPE_IMAGE_JPEG + case MimeTypeImagePng: + return datasyncPB.MimeType_MIME_TYPE_IMAGE_PNG + case MimeTypePcd: + return datasyncPB.MimeType_MIME_TYPE_PCD + default: + return datasyncPB.MimeType_MIME_TYPE_UNSPECIFIED + } +} + +type TabularData struct { + OverRideTimestamps *Timestamps + Payload *structpb.Struct + Annotations Annotations +} + +type BoundingBox struct { + Id string + Label string + Confidence *float64 + XMinNormalized float64 + YMinNormalized float64 + XMaxNormalized float64 + YMaxNormalized float64 +} + +type Classification struct { + Id string + Label string + Confidence *float64 +} + +type Annotations struct { + BoundingBoxes []BoundingBox + Tags []string + Classifications []Classification +} + +func (mt Annotations) ToProto() *dataPB.Annotations { + var bboxes []*dataPB.BoundingBox + for _, bb := range mt.BoundingBoxes { + bboxes = append(bboxes, &dataPB.BoundingBox{ + Label: bb.Label, + Confidence: bb.Confidence, + XMinNormalized: bb.XMinNormalized, + XMaxNormalized: bb.XMaxNormalized, + YMinNormalized: bb.YMinNormalized, + YMaxNormalized: bb.YMaxNormalized, + }) + } + + var classifications []*dataPB.Classification + for _, c := range mt.Classifications { + classifications = append(classifications, &dataPB.Classification{ + Label: c.Label, + Confidence: c.Confidence, + }) + } + + return &dataPB.Annotations{ + Bboxes: bboxes, + Tags: mt.Tags, + Classifications: classifications, + } +} + +func (cr *CaptureResult) Validate() error { + switch cr.Type { + case CaptureResultTypeTabular: + if len(cr.Binaries) > 0 { + return errors.New("tabular result can't contain binary data") + } + if cr.TabularData == nil { + return errors.New("tabular result must have non empty tabular data") + } + if len(cr.TabularData.Annotations.BoundingBoxes) > 0 { + return errors.New("TabularData may not have BoundingBox annotations") + } + return nil + case CaptureResultTypeBinary: + if cr.TabularData != nil { + return errors.New("binary result can't contain tabular data") + } + if len(cr.Binaries) == 0 { + return errors.New("binary result must have non empty binary data") + } + return nil + default: + return fmt.Errorf("unknown CaptureResultType: %d", cr.Type) + } +} + +func (crwt CaptureResultWithTimestamps) ToProto() []*datasyncPB.SensorData { + if td := crwt.CaptureResult.TabularData; td != nil { + ts := crwt.Timestamps + if tso := td.OverRideTimestamps; tso != nil { + ts = *tso + } + return []*datasyncPB.SensorData{{ + Metadata: &datasyncPB.SensorMetadata{ + TimeRequested: timestamppb.New(ts.TimeRequested.UTC()), + TimeReceived: timestamppb.New(ts.TimeReceived.UTC()), + Annotations: td.Annotations.ToProto(), + }, + Data: &datasyncPB.SensorData_Struct{ + Struct: td.Payload, + }, + }} + } + + var sd []*datasyncPB.SensorData + for _, b := range crwt.CaptureResult.Binaries { + ts := crwt.Timestamps + if tso := b.OverRideTimestamps; tso != nil { + ts = *tso + } + + sd = append(sd, &datasyncPB.SensorData{ + Metadata: &datasyncPB.SensorMetadata{ + TimeRequested: timestamppb.New(ts.TimeRequested.UTC()), + TimeReceived: timestamppb.New(ts.TimeReceived.UTC()), + MimeType: b.MimeType.ToProto(), + Annotations: b.Annotations.ToProto(), + }, + Data: &datasyncPB.SensorData_Binary{ + Binary: b.Payload, + }, + }) + } + return sd +} // FromDMContextKey is used to check whether the context is from data management. // Deprecated: use a camera.Extra with camera.NewContext instead. @@ -57,7 +234,7 @@ type Collector interface { type collector struct { clock clock.Clock - captureResults chan *v1.SensorData + captureResults chan CaptureResultWithTimestamps captureErrors chan error interval time.Duration params map[string]*anypb.Any @@ -71,6 +248,7 @@ type collector struct { captureFunc CaptureFunc target CaptureBufferedWriter lastLoggedErrors map[string]int64 + dataType DataType } // Close closes the channels backing the Collector. It should always be called before disposing of a Collector to avoid @@ -171,10 +349,27 @@ func (c *collector) tickerBasedCapture(started chan struct{}) { } } +func (c *collector) validateReadingType(t CaptureResultType) error { + switch c.dataType { + case DataTypeTabular: + if t != CaptureResultTypeTabular { + return fmt.Errorf("expected result of type CaptureResultTypeTabular as collector is of type DataTypeTabular, instead got CaptureResultType: %d", t) + } + return nil + case DataTypeBinary: + if t != CaptureResultTypeBinary { + return fmt.Errorf("expected result of type CaptureResultTypeBinary as collector is of type DataTypeBinary, instead got CaptureResultType: %d", t) + } + return nil + default: + return fmt.Errorf("unknown collector data type: %d", c.dataType) + } +} + func (c *collector) getAndPushNextReading() { - timeRequested := timestamppb.New(c.clock.Now().UTC()) + timeRequested := c.clock.Now() reading, err := c.captureFunc(c.cancelCtx, c.params) - timeReceived := timestamppb.New(c.clock.Now().UTC()) + timeReceived := c.clock.Now() if c.cancelCtx.Err() != nil { return @@ -189,56 +384,72 @@ func (c *collector) getAndPushNextReading() { return } - var msg v1.SensorData - switch v := reading.(type) { - case []byte: - msg = v1.SensorData{ - Metadata: &v1.SensorMetadata{ - TimeRequested: timeRequested, - TimeReceived: timeReceived, - }, - Data: &v1.SensorData_Binary{ - Binary: v, - }, - } - default: - // If it's not bytes, it's a struct. - var pbReading *structpb.Struct - var err error - - if reflect.TypeOf(reading) == reflect.TypeOf(pb.GetReadingsResponse{}) { - // We special-case the GetReadingsResponse because it already contains - // structpb.Values in it, and the StructToStructPb logic does not handle - // that cleanly. - topLevelMap := make(map[string]*structpb.Value) - topLevelMap["readings"] = structpb.NewStructValue( - &structpb.Struct{Fields: reading.(pb.GetReadingsResponse).Readings}, - ) - pbReading = &structpb.Struct{Fields: topLevelMap} - } else { - pbReading, err = protoutils.StructToStructPbIgnoreOmitEmpty(reading) - if err != nil { - c.captureErrors <- errors.Wrap(err, "error while converting reading to structpb.Struct") - return - } - } + if err := c.validateReadingType(reading.Type); err != nil { + c.captureErrors <- errors.Wrap(err, "capture result invalid type") + return + } - msg = v1.SensorData{ - Metadata: &v1.SensorMetadata{ - TimeRequested: timeRequested, - TimeReceived: timeReceived, - }, - Data: &v1.SensorData_Struct{ - Struct: pbReading, - }, - } + if err := reading.Validate(); err != nil { + c.captureErrors <- errors.Wrap(err, "capture result failed validation") + return } + // var msg v1.SensorData + // switch v := reading.(type) { + // case []byte: + // msg = v1.SensorData{ + // Metadata: &v1.SensorMetadata{ + // TimeRequested: timeRequested, + // TimeReceived: timeReceived, + // }, + // Data: &v1.SensorData_Binary{ + // Binary: v, + // }, + // } + // default: + // // If it's not bytes, it's a struct. + // var pbReading *structpb.Struct + // var err error + + // if reflect.TypeOf(reading) == reflect.TypeOf(pb.GetReadingsResponse{}) { + // // We special-case the GetReadingsResponse because it already contains + // // structpb.Values in it, and the StructToStructPb logic does not handle + // // that cleanly. + // topLevelMap := make(map[string]*structpb.Value) + // topLevelMap["readings"] = structpb.NewStructValue( + // &structpb.Struct{Fields: reading.(pb.GetReadingsResponse).Readings}, + // ) + // pbReading = &structpb.Struct{Fields: topLevelMap} + // } else { + // pbReading, err = protoutils.StructToStructPbIgnoreOmitEmpty(reading) + // if err != nil { + // c.captureErrors <- errors.Wrap(err, "error while converting reading to structpb.Struct") + // return + // } + // } + + // msg = v1.SensorData{ + // Metadata: &v1.SensorMetadata{ + // TimeRequested: timeRequested, + // TimeReceived: timeReceived, + // }, + // Data: &v1.SensorData_Struct{ + // Struct: pbReading, + // }, + // } + // } + select { // If c.captureResults is full, c.captureResults <- a can block indefinitely. This additional select block allows cancel to // still work when this happens. case <-c.cancelCtx.Done(): - case c.captureResults <- &msg: + case c.captureResults <- CaptureResultWithTimestamps{ + CaptureResult: reading, + Timestamps: Timestamps{ + TimeRequested: timeRequested, + TimeReceived: timeReceived, + }, + }: } } @@ -257,8 +468,9 @@ func NewCollector(captureFunc CaptureFunc, params CollectorParams) (Collector, e c = params.Clock } return &collector{ - captureResults: make(chan *v1.SensorData, params.QueueSize), + captureResults: make(chan CaptureResultWithTimestamps, params.QueueSize), captureErrors: make(chan error, params.QueueSize), + dataType: params.DataType, interval: params.Interval, params: params.MethodParams, logger: params.Logger, @@ -281,9 +493,19 @@ func (c *collector) writeCaptureResults() { case <-c.cancelCtx.Done(): return case msg := <-c.captureResults: - if err := c.target.Write(msg); err != nil { - c.logger.Error(errors.Wrap(err, fmt.Sprintf("failed to write to collector %s", c.target.Path())).Error()) - return + proto := msg.ToProto() + + switch msg.CaptureResult.Type { + case CaptureResultTypeTabular: + if err := c.target.WriteTabular(proto); err != nil { + c.logger.Error(errors.Wrap(err, fmt.Sprintf("failed to write tabular data to prog file %s", c.target.Path())).Error()) + return + } + case CaptureResultTypeBinary: + if err := c.target.WriteBinary(proto); err != nil { + c.logger.Error(errors.Wrap(err, fmt.Sprintf("failed to write binary data to prog file %s", c.target.Path())).Error()) + return + } } } } diff --git a/data/registry.go b/data/registry.go index d534e1b09c3..9366c520199 100644 --- a/data/registry.go +++ b/data/registry.go @@ -18,6 +18,7 @@ type CollectorConstructor func(resource interface{}, params CollectorParams) (Co // CollectorParams contain the parameters needed to construct a Collector. type CollectorParams struct { + DataType DataType ComponentName string Interval time.Duration MethodParams map[string]*anypb.Any diff --git a/go.mod b/go.mod index 756426c4bbb..ef90af75e0d 100644 --- a/go.mod +++ b/go.mod @@ -444,3 +444,5 @@ require ( github.com/ziutek/mymysql v1.5.4 // indirect golang.org/x/exp v0.0.0-20240904232852-e7e105dedf7e ) + +replace go.viam.com/api => /Users/nicksanford/code/api diff --git a/go.sum b/go.sum index 2c528c234a2..286164edc19 100644 --- a/go.sum +++ b/go.sum @@ -1637,8 +1637,6 @@ go.uber.org/zap v1.18.1/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI= go.uber.org/zap v1.23.0/go.mod h1:D+nX8jyLsMHMYrln8A0rJjFt/T/9/bGgIhAqxv5URuY= go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= -go.viam.com/api v0.1.351 h1:jNIpa7K1RzQzWrwnXJBwFZ+oQ/OiZBWLqTp9+7nznPI= -go.viam.com/api v0.1.351/go.mod h1:5lpVRxMsKFCaahqsnJfPGwJ9baoQ6PIKQu3lxvy6Wtw= go.viam.com/test v1.1.1-0.20220913152726-5da9916c08a2 h1:oBiK580EnEIzgFLU4lHOXmGAE3MxnVbeR7s1wp/F3Ps= go.viam.com/test v1.1.1-0.20220913152726-5da9916c08a2/go.mod h1:XM0tej6riszsiNLT16uoyq1YjuYPWlRBweTPRDanIts= go.viam.com/utils v0.1.110 h1:xV6rcJiNq4iKy1f7y/5JeoasEZM29314X7Mg8xSUKLk= diff --git a/services/datamanager/builtin/capture/capture.go b/services/datamanager/builtin/capture/capture.go index ebb5038a54e..321bf963346 100644 --- a/services/datamanager/builtin/capture/capture.go +++ b/services/datamanager/builtin/capture/capture.go @@ -225,7 +225,7 @@ func (c *Capture) initializeOrUpdateCollector( return nil, errors.Wrapf(err, "failed to create target directory %s with 700 file permissions", targetDir) } // Build metadata. - captureMetadata := data.BuildCaptureMetadata( + captureMetadata, dataType := data.BuildCaptureMetadata( collectorConfig.Name.API, collectorConfig.Name.ShortName(), collectorConfig.Method, @@ -237,6 +237,7 @@ func (c *Capture) initializeOrUpdateCollector( queueSize := defaultIfZeroVal(collectorConfig.CaptureQueueSize, defaultCaptureQueueSize) bufferSize := defaultIfZeroVal(collectorConfig.CaptureBufferSize, defaultCaptureBufferSize) collector, err := collectorConstructor(res, data.CollectorParams{ + DataType: dataType, ComponentName: collectorConfig.Name.ShortName(), Interval: data.GetDurationFromHz(collectorConfig.CaptureFrequencyHz), MethodParams: methodParams,