Skip to content

Commit

Permalink
[DATA-1649]/[DATA-1647] Add GetPointCloudMap collector and use stream…
Browse files Browse the repository at this point in the history
…ing sync rpc for large files. (viamrobotics#2703)
  • Loading branch information
AaronCasas authored Jul 28, 2023
1 parent 324b09d commit 8b3f5c4
Show file tree
Hide file tree
Showing 9 changed files with 326 additions and 50 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ require (
go.uber.org/atomic v1.10.0
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.24.0
go.viam.com/api v0.1.159
go.viam.com/api v0.1.164
go.viam.com/test v1.1.1-0.20220913152726-5da9916c08a2
go.viam.com/utils v0.1.38
goji.io v2.0.2+incompatible
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1567,8 +1567,8 @@ go.uber.org/zap v1.18.1/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI=
go.uber.org/zap v1.23.0/go.mod h1:D+nX8jyLsMHMYrln8A0rJjFt/T/9/bGgIhAqxv5URuY=
go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60=
go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg=
go.viam.com/api v0.1.159 h1:yjqEV9rT4FFqWAH7KsRPjLVdCAuNy2bmaGRn2j/Sb4E=
go.viam.com/api v0.1.159/go.mod h1:CwLz82Ie4+Z2lSH2F0oQGViP4/TV9uxjJs+rqHvFWE8=
go.viam.com/api v0.1.164 h1:CYR35bAQAueU0DCXRCJMj/DxGZsHMjOxSIJ+4eMWc/Q=
go.viam.com/api v0.1.164/go.mod h1:CwLz82Ie4+Z2lSH2F0oQGViP4/TV9uxjJs+rqHvFWE8=
go.viam.com/test v1.1.1-0.20220913152726-5da9916c08a2 h1:oBiK580EnEIzgFLU4lHOXmGAE3MxnVbeR7s1wp/F3Ps=
go.viam.com/test v1.1.1-0.20220913152726-5da9916c08a2/go.mod h1:XM0tej6riszsiNLT16uoyq1YjuYPWlRBweTPRDanIts=
go.viam.com/utils v0.1.38 h1:Xc5UsEOYjX4WTcnku4vPD9JFKlu6NjdDmA3AY8qnySA=
Expand Down
18 changes: 9 additions & 9 deletions services/datamanager/builtin/builtin.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ type builtIn struct {
logger golog.Logger
captureDir string
captureDisabled bool
collectors map[componentMethodMetadata]*collectorAndConfig
collectors map[resourceMethodMetadata]*collectorAndConfig
lock sync.Mutex
backgroundWorkers sync.WaitGroup
waitAfterLastModifiedMillis int
Expand Down Expand Up @@ -118,7 +118,7 @@ func NewBuiltIn(
Named: conf.ResourceName().AsNamed(),
logger: logger,
captureDir: viamCaptureDotDir,
collectors: make(map[componentMethodMetadata]*collectorAndConfig),
collectors: make(map[resourceMethodMetadata]*collectorAndConfig),
syncIntervalMins: 0,
additionalSyncPaths: []string{},
tags: []string{},
Expand Down Expand Up @@ -179,8 +179,8 @@ type collectorAndConfig struct {

// Identifier for a particular collector: component name, component model, component type,
// method parameters, and method name.
type componentMethodMetadata struct {
ComponentName string
type resourceMethodMetadata struct {
ResourceName string
MethodParams string
MethodMetadata data.MethodMetadata
}
Expand All @@ -196,7 +196,7 @@ func getDurationFromHz(captureFrequencyHz float32) time.Duration {
// Initialize a collector for the component/method or update it if it has previously been created.
// Return the component/method metadata which is used as a key in the collectors map.
func (svc *builtIn) initializeOrUpdateCollector(
md componentMethodMetadata,
md resourceMethodMetadata,
config *datamanager.DataCaptureConfig,
) (
*collectorAndConfig, error,
Expand Down Expand Up @@ -366,11 +366,11 @@ func (svc *builtIn) Reconfigure(
// Service is disabled, so close all collectors and clear the map so we can instantiate new ones if we enable this service.
if svc.captureDisabled {
svc.closeCollectors()
svc.collectors = make(map[componentMethodMetadata]*collectorAndConfig)
svc.collectors = make(map[resourceMethodMetadata]*collectorAndConfig)
}

// Initialize or add collectors based on changes to the component configurations.
newCollectors := make(map[componentMethodMetadata]*collectorAndConfig)
newCollectors := make(map[resourceMethodMetadata]*collectorAndConfig)
if !svc.captureDisabled {
for _, resConf := range svcConfig.ResourceConfigs {
if resConf.Resource == nil {
Expand All @@ -384,8 +384,8 @@ func (svc *builtIn) Reconfigure(
MethodName: resConf.Method,
}

componentMethodMetadata := componentMethodMetadata{
ComponentName: resConf.Name.ShortName(),
componentMethodMetadata := resourceMethodMetadata{
ResourceName: resConf.Name.ShortName(),
MethodMetadata: methodMetadata,
MethodParams: fmt.Sprintf("%v", resConf.AdditionalParams),
}
Expand Down
205 changes: 190 additions & 15 deletions services/datamanager/builtin/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,25 +338,22 @@ func TestArbitraryFileUpload(t *testing.T) {
name: "scheduled sync of arbitrary files should work",
manualSync: false,
scheduleSyncDisabled: false,
serviceFail: false,
},
{
name: "manual sync of arbitrary files should work",
manualSync: true,
scheduleSyncDisabled: true,
serviceFail: false,
},
{
name: "running manual and scheduled sync concurrently should work and not lead to duplicate uploads",
manualSync: true,
scheduleSyncDisabled: false,
serviceFail: false,
},
{
name: "if an error response is received from the backend, local files should not be deleted",
manualSync: false,
scheduleSyncDisabled: false,
serviceFail: false,
serviceFail: true,
},
}

Expand All @@ -372,11 +369,13 @@ func TestArbitraryFileUpload(t *testing.T) {
dmsvc, r := newTestDataManager(t)
dmsvc.SetWaitAfterLastModifiedMillis(0)
defer dmsvc.Close(context.Background())
f := atomic.Bool{}
f.Store(tc.serviceFail)
mockClient := mockDataSyncServiceClient{
succesfulDCRequests: make(chan *v1.DataCaptureUploadRequest, 100),
failedDCRequests: make(chan *v1.DataCaptureUploadRequest, 100),
fileUploads: make(chan *mockFileUploadClient, 100),
fail: &atomic.Bool{},
fail: &f,
}
dmsvc.SetSyncerConstructor(getTestSyncerConstructorMock(mockClient))
cfg, deps := setupConfig(t, disabledTabularCollectorConfigPath)
Expand Down Expand Up @@ -416,10 +415,12 @@ func TestArbitraryFileUpload(t *testing.T) {
var fileUploads []*mockFileUploadClient
var urs []*v1.FileUploadRequest
// Get the successful requests
wait := time.After(time.Second * 5)
wait := time.After(time.Second * 3)
select {
case <-wait:
t.Fatalf("timed out waiting for sync request")
if !tc.serviceFail {
t.Fatalf("timed out waiting for sync request")
}
case r := <-mockClient.fileUploads:
fileUploads = append(fileUploads, r)
select {
Expand All @@ -430,12 +431,8 @@ func TestArbitraryFileUpload(t *testing.T) {
}
}

// Validate error and URs.
remainingFiles := getAllFilePaths(additionalPathsDir)
if tc.serviceFail {
// Error case, file should not be deleted.
test.That(t, len(remainingFiles), test.ShouldEqual, 1)
} else {
waitUntilNoFiles(additionalPathsDir)
if !tc.serviceFail {
// Validate first metadata message.
test.That(t, len(fileUploads), test.ShouldEqual, 1)
test.That(t, len(urs), test.ShouldBeGreaterThan, 0)
Expand All @@ -455,8 +452,144 @@ func TestArbitraryFileUpload(t *testing.T) {
test.That(t, actData, test.ShouldResemble, fileContents)

// Validate file no longer exists.
waitUntilNoFiles(additionalPathsDir)
test.That(t, len(getAllFileInfos(additionalPathsDir)), test.ShouldEqual, 0)
test.That(t, dmsvc.Close(context.Background()), test.ShouldBeNil)
} else {
// Validate no files were successfully uploaded.
test.That(t, len(fileUploads), test.ShouldEqual, 0)
// Validate file still exists.
test.That(t, len(getAllFileInfos(additionalPathsDir)), test.ShouldEqual, 1)
}
})
}
}

func TestStreamingDCUpload(t *testing.T) {
tests := []struct {
name string
serviceFail bool
}{
{
name: "A data capture file greater than MaxUnaryFileSize should be successfully uploaded" +
"via the streaming rpc.",
serviceFail: false,
},
{
name: "if an error response is received from the backend, local files should not be deleted",
serviceFail: true,
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
// Set up server.
mockClock := clk.NewMock()
clock = mockClock
tmpDir := t.TempDir()

// Set up data manager.
dmsvc, r := newTestDataManager(t)
defer dmsvc.Close(context.Background())
var cfg *Config
var deps []string
captureInterval := time.Millisecond * 10
cfg, deps = setupConfig(t, enabledBinaryCollectorConfigPath)

// Set up service config with just capture enabled.
cfg.CaptureDisabled = false
cfg.ScheduledSyncDisabled = true
cfg.SyncIntervalMins = syncIntervalMins
cfg.CaptureDir = tmpDir

resources := resourcesFromDeps(t, r, deps)
err := dmsvc.Reconfigure(context.Background(), resources, resource.Config{
ConvertedAttributes: cfg,
})
test.That(t, err, test.ShouldBeNil)

// Capture an image, then close.
mockClock.Add(captureInterval)
waitForCaptureFilesToExceedNFiles(tmpDir, 0)
err = dmsvc.Close(context.Background())
test.That(t, err, test.ShouldBeNil)

// Get all captured data.
_, capturedData, err := getCapturedData(tmpDir)
test.That(t, err, test.ShouldBeNil)

// Turn dmsvc back on with capture disabled.
newDMSvc, r := newTestDataManager(t)
defer newDMSvc.Close(context.Background())
f := atomic.Bool{}
f.Store(tc.serviceFail)
mockClient := mockDataSyncServiceClient{
streamingDCUploads: make(chan *mockStreamingDCClient, 10),
fail: &f,
}
// Set max unary file size to 1 byte, so it uses the streaming rpc.
datasync.MaxUnaryFileSize = 1
newDMSvc.SetSyncerConstructor(getTestSyncerConstructorMock(mockClient))
cfg.CaptureDisabled = true
cfg.ScheduledSyncDisabled = true
resources = resourcesFromDeps(t, r, deps)
err = newDMSvc.Reconfigure(context.Background(), resources, resource.Config{
ConvertedAttributes: cfg,
})
test.That(t, err, test.ShouldBeNil)

// Call sync.
err = newDMSvc.Sync(context.Background(), nil)
test.That(t, err, test.ShouldBeNil)

// Wait for upload requests.
var uploads []*mockStreamingDCClient
var urs []*v1.StreamingDataCaptureUploadRequest
// Get the successful requests
wait := time.After(time.Second * 3)
select {
case <-wait:
if !tc.serviceFail {
t.Fatalf("timed out waiting for sync request")
}
case r := <-mockClient.streamingDCUploads:
uploads = append(uploads, r)
select {
case <-wait:
t.Fatalf("timed out waiting for sync request")
case <-r.closed:
urs = append(urs, r.reqs...)
}
}
waitUntilNoFiles(tmpDir)

// Validate error and URs.
remainingFiles := getAllFilePaths(tmpDir)
if tc.serviceFail {
// Validate no files were successfully uploaded.
test.That(t, len(uploads), test.ShouldEqual, 0)
// Error case, file should not be deleted.
test.That(t, len(remainingFiles), test.ShouldEqual, 1)
} else {
// Validate first metadata message.
test.That(t, len(uploads), test.ShouldEqual, 1)
test.That(t, len(urs), test.ShouldBeGreaterThan, 0)
actMD := urs[0].GetMetadata()
test.That(t, actMD, test.ShouldNotBeNil)
test.That(t, actMD.GetUploadMetadata(), test.ShouldNotBeNil)
test.That(t, actMD.GetSensorMetadata(), test.ShouldNotBeNil)
test.That(t, actMD.GetUploadMetadata().Type, test.ShouldEqual, v1.DataType_DATA_TYPE_BINARY_SENSOR)
test.That(t, actMD.GetUploadMetadata().PartId, test.ShouldNotBeBlank)

// Validate ensuing data messages.
dataRequests := urs[1:]
var actData []byte
for _, d := range dataRequests {
actData = append(actData, d.GetData()...)
}
test.That(t, actData, test.ShouldResemble, capturedData[0].GetBinary())

// Validate file no longer exists.
test.That(t, len(getAllFileInfos(tmpDir)), test.ShouldEqual, 0)
}
test.That(t, dmsvc.Close(context.Background()), test.ShouldBeNil)
})
Expand Down Expand Up @@ -661,6 +794,7 @@ type mockDataSyncServiceClient struct {
succesfulDCRequests chan *v1.DataCaptureUploadRequest
failedDCRequests chan *v1.DataCaptureUploadRequest
fileUploads chan *mockFileUploadClient
streamingDCUploads chan *mockStreamingDCClient
fail *atomic.Bool
}

Expand Down Expand Up @@ -690,7 +824,26 @@ func (c mockDataSyncServiceClient) FileUpload(ctx context.Context, opts ...grpc.
return nil, errors.New("oh no error")
}
ret := &mockFileUploadClient{closed: make(chan struct{})}
c.fileUploads <- ret
select {
case <-ctx.Done():
return nil, ctx.Err()
case c.fileUploads <- ret:
}
return ret, nil
}

func (c mockDataSyncServiceClient) StreamingDataCaptureUpload(ctx context.Context,
opts ...grpc.CallOption,
) (v1.DataSyncService_StreamingDataCaptureUploadClient, error) {
if c.fail.Load() {
return nil, errors.New("oh no error")
}
ret := &mockStreamingDCClient{closed: make(chan struct{})}
select {
case <-ctx.Done():
return nil, ctx.Err()
case c.streamingDCUploads <- ret:
}
return ret, nil
}

Expand All @@ -711,6 +864,28 @@ func (m *mockFileUploadClient) CloseAndRecv() (*v1.FileUploadResponse, error) {
}

func (m *mockFileUploadClient) CloseSend() error {
m.closed <- struct{}{}
return nil
}

type mockStreamingDCClient struct {
reqs []*v1.StreamingDataCaptureUploadRequest
closed chan struct{}
grpc.ClientStream
}

func (m *mockStreamingDCClient) Send(req *v1.StreamingDataCaptureUploadRequest) error {
m.reqs = append(m.reqs, req)
return nil
}

func (m *mockStreamingDCClient) CloseAndRecv() (*v1.StreamingDataCaptureUploadResponse, error) {
m.closed <- struct{}{}
return &v1.StreamingDataCaptureUploadResponse{}, nil
}

func (m *mockStreamingDCClient) CloseSend() error {
m.closed <- struct{}{}
return nil
}

Expand Down
5 changes: 2 additions & 3 deletions services/datamanager/datacapture/data_capture_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,13 @@ import (

// TODO Data-343: Reorganize this into a more standard interface/package, and add tests.

// TODO: this is all way too complicated i think. Just keep track of read/write offsets

// FileExt defines the file extension for Viam data capture files.
const (
InProgressFileExt = ".prog"
FileExt = ".capture"
readImage = "ReadImage"
nextPointCloud = "NextPointCloud"
getPointCloudMap = "GetPointCloudMap"
)

// File is the data structure containing data captured by collectors. It is backed by a file on disk containing
Expand Down Expand Up @@ -240,7 +239,7 @@ func getFileTimestampName() string {
// TODO DATA-246: Implement this in some more robust, programmatic way.
func getDataType(methodName string) v1.DataType {
switch methodName {
case nextPointCloud, readImage:
case nextPointCloud, readImage, getPointCloudMap:
return v1.DataType_DATA_TYPE_BINARY_SENSOR
default:
return v1.DataType_DATA_TYPE_TABULAR_SENSOR
Expand Down
Loading

0 comments on commit 8b3f5c4

Please sign in to comment.