diff --git a/cmd/dotmesh-server/s3.go b/cmd/dotmesh-server/s3.go index 4e0eafe35..9544e590b 100644 --- a/cmd/dotmesh-server/s3.go +++ b/cmd/dotmesh-server/s3.go @@ -232,9 +232,25 @@ func downloadS3Object(downloader *s3manager.Downloader, key, versionId, bucket, return nil } -func removeOldS3Files(keyToVersionIds map[string]string, paths map[string]int64, bucket string, svc *s3.S3) (map[string]string, error) { +func removeOldS3Files(keyToVersionIds map[string]string, paths map[string]int64, bucket string, prefixes []string, svc *s3.S3) (map[string]string, error) { + if len(prefixes) == 0 { + return removeOldPrefixedS3Files(keyToVersionIds, paths, bucket, "", svc) + } + var err error + for _, prefix := range prefixes { + keyToVersionIds, err = removeOldPrefixedS3Files(keyToVersionIds, paths, bucket, prefix, svc) + if err != nil { + return nil, err + } + } + return keyToVersionIds, nil +} +func removeOldPrefixedS3Files(keyToVersionIds map[string]string, paths map[string]int64, bucket, prefix string, svc *s3.S3) (map[string]string, error) { // get a list of the objects in s3, if there's anything there that isn't in our list of files in dotmesh, delete it. params := &s3.ListObjectsV2Input{Bucket: aws.String(bucket)} + if prefix != "" { + params.SetPrefix(prefix) + } var innerError error err := svc.ListObjectsV2Pages(params, func(output *s3.ListObjectsV2Output, lastPage bool) bool { for _, item := range output.Contents { @@ -260,10 +276,19 @@ func removeOldS3Files(keyToVersionIds map[string]string, paths map[string]int64, return keyToVersionIds, nil } -func updateS3Files(keyToVersionIds map[string]string, paths map[string]int64, pathToMount, transferRequestId, bucket string, svc *s3.S3, pollResult TransferPollResult) (map[string]string, error) { +func updateS3Files(keyToVersionIds map[string]string, paths map[string]int64, pathToMount, transferRequestId, bucket string, prefixes []string, svc *s3.S3, pollResult TransferPollResult) (map[string]string, error) { // push every key up to s3 and then send back a map of object key -> s3 version id uploader := s3manager.NewUploaderWithClient(svc) - for key, size := range paths { + // filter out any paths we don't care about in an S3 remote + filtered := make(map[string]int64) + for _, elem := range prefixes { + for key, size := range paths { + if strings.HasPrefix(key, elem) { + filtered[key] = size + } + } + } + for key, size := range filtered { path := fmt.Sprintf("%s/%s", pathToMount, key) file, err := os.Open(path) pollResult.Index += 1 diff --git a/cmd/dotmesh-server/s3pushinitiatorstate.go b/cmd/dotmesh-server/s3pushinitiatorstate.go index 025133ad6..fd1e77d3a 100644 --- a/cmd/dotmesh-server/s3pushinitiatorstate.go +++ b/cmd/dotmesh-server/s3pushinitiatorstate.go @@ -73,12 +73,12 @@ func s3PushInitiatorState(f *fsMachine) stateFn { return backoffState } keyToVersionIds := make(map[string]string) - keyToVersionIds, err = updateS3Files(keyToVersionIds, paths, pathToMount, transferRequestId, transferRequest.RemoteName, svc, pollResult) + keyToVersionIds, err = updateS3Files(keyToVersionIds, paths, pathToMount, transferRequestId, transferRequest.RemoteName, transferRequest.Prefixes, svc, pollResult) if err != nil { f.errorDuringTransfer("error-updating-s3-objects", err) return backoffState } - keyToVersionIds, err = removeOldS3Files(keyToVersionIds, paths, transferRequest.RemoteName, svc) + keyToVersionIds, err = removeOldS3Files(keyToVersionIds, paths, transferRequest.RemoteName, transferRequest.Prefixes, svc) if err != nil { f.errorDuringTransfer("error-during-object-pagination", err) return backoffState diff --git a/tests/s3_test.go b/tests/s3_test.go index 74be949e7..b48b8a5eb 100644 --- a/tests/s3_test.go +++ b/tests/s3_test.go @@ -220,22 +220,31 @@ func TestS3Remote(t *testing.T) { } }) - t.Run("CloneSubsetPull", func(t *testing.T) { - // fsname := citools.UniqName() - // citools.RunOnNode(t, node1, "dm s3 clone-subset test-real-s3 test.dotmesh subset/ --local-name="+fsname) - // citools.RunOnNode(t, node1, "dm push test-real-s3 "+fsname) - // resp := citools.OutputFromRunOnNode(t, node1, s3cmd+" ls s3://test.dotmesh") - // if !strings.Contains(resp, "hello-world.txt") { - // citools.RunOnNode(t, node1, s3cmd+" put hello-world.txt s3://test.dotmesh") - // t.Error("Deleted a file we aren't tracking") - // } + t.Run("CloneSubsetPushPull", func(t *testing.T) { + fsname := citools.UniqName() + citools.RunOnNode(t, node1, "dm s3 clone-subset test-real-s3 test.dotmesh subset/ --local-name="+fsname) + citools.RunOnNode(t, node1, citools.DockerRun(fsname)+" touch /foo/file.txt") + citools.TryUntilSucceeds(func() error { + output := citools.OutputFromRunOnNode(t, node1, "dm dot show "+fsname+" -H | grep dirty") + if strings.Contains(output, "\t0") { + return fmt.Errorf("not dirty yet") + } + return nil + }, "waiting for dirty data...") + citools.RunOnNode(t, node1, "dm commit -m 'add file to s3'") + citools.RunOnNode(t, node1, "dm push test-real-s3 "+fsname) + resp := citools.OutputFromRunOnNode(t, node1, s3cmd("ls s3://test.dotmesh")) + if !strings.Contains(resp, "hello-world.txt") { + citools.RunOnNode(t, node1, s3cmd("put hello-world.txt s3://test.dotmesh")) + t.Error("Deleted a file we aren't tracking") + } PutBackS3Files(node1) citools.RunOnNode(t, node1, "mkdir -p subset && echo 'directories' > subset/subdir.txt") citools.RunOnNode(t, node1, s3cmd("put subset/subdir.txt s3://test.dotmesh/subset/subdir.txt")) fsname2 := citools.UniqName() citools.RunOnNode(t, node1, "dm s3 clone-subset test-real-s3 test.dotmesh subset/ --local-name="+fsname2) citools.RunOnNode(t, node1, "dm pull test-real-s3 "+fsname2) - resp := citools.OutputFromRunOnNode(t, node1, citools.DockerRun(fsname2)+" ls /foo/") + resp = citools.OutputFromRunOnNode(t, node1, citools.DockerRun(fsname2)+" ls /foo/") if strings.Contains(resp, "hello-world.txt") { t.Error("Pulled down a file we aren't tracking") }