Skip to content

Commit

Permalink
#461: experimenting with being able to delete/update only a subset de…
Browse files Browse the repository at this point in the history
…fined earlier when pushing to s3
  • Loading branch information
Charlotte committed Jul 13, 2018
1 parent e0f9751 commit 002830a
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 15 deletions.
31 changes: 28 additions & 3 deletions cmd/dotmesh-server/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,9 +232,25 @@ func downloadS3Object(downloader *s3manager.Downloader, key, versionId, bucket,
return nil
}

func removeOldS3Files(keyToVersionIds map[string]string, paths map[string]int64, bucket string, svc *s3.S3) (map[string]string, error) {
func removeOldS3Files(keyToVersionIds map[string]string, paths map[string]int64, bucket string, prefixes []string, svc *s3.S3) (map[string]string, error) {
if len(prefixes) == 0 {
return removeOldPrefixedS3Files(keyToVersionIds, paths, bucket, "", svc)
}
var err error
for _, prefix := range prefixes {
keyToVersionIds, err = removeOldPrefixedS3Files(keyToVersionIds, paths, bucket, prefix, svc)
if err != nil {
return nil, err
}
}
return keyToVersionIds, nil
}
func removeOldPrefixedS3Files(keyToVersionIds map[string]string, paths map[string]int64, bucket, prefix string, svc *s3.S3) (map[string]string, error) {
// get a list of the objects in s3, if there's anything there that isn't in our list of files in dotmesh, delete it.
params := &s3.ListObjectsV2Input{Bucket: aws.String(bucket)}
if prefix != "" {
params.SetPrefix(prefix)
}
var innerError error
err := svc.ListObjectsV2Pages(params, func(output *s3.ListObjectsV2Output, lastPage bool) bool {
for _, item := range output.Contents {
Expand All @@ -260,10 +276,19 @@ func removeOldS3Files(keyToVersionIds map[string]string, paths map[string]int64,
return keyToVersionIds, nil
}

func updateS3Files(keyToVersionIds map[string]string, paths map[string]int64, pathToMount, transferRequestId, bucket string, svc *s3.S3, pollResult TransferPollResult) (map[string]string, error) {
func updateS3Files(keyToVersionIds map[string]string, paths map[string]int64, pathToMount, transferRequestId, bucket string, prefixes []string, svc *s3.S3, pollResult TransferPollResult) (map[string]string, error) {
// push every key up to s3 and then send back a map of object key -> s3 version id
uploader := s3manager.NewUploaderWithClient(svc)
for key, size := range paths {
// filter out any paths we don't care about in an S3 remote
filtered := make(map[string]int64)
for _, elem := range prefixes {
for key, size := range paths {
if strings.HasPrefix(key, elem) {
filtered[key] = size
}
}
}
for key, size := range filtered {
path := fmt.Sprintf("%s/%s", pathToMount, key)
file, err := os.Open(path)
pollResult.Index += 1
Expand Down
4 changes: 2 additions & 2 deletions cmd/dotmesh-server/s3pushinitiatorstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,12 @@ func s3PushInitiatorState(f *fsMachine) stateFn {
return backoffState
}
keyToVersionIds := make(map[string]string)
keyToVersionIds, err = updateS3Files(keyToVersionIds, paths, pathToMount, transferRequestId, transferRequest.RemoteName, svc, pollResult)
keyToVersionIds, err = updateS3Files(keyToVersionIds, paths, pathToMount, transferRequestId, transferRequest.RemoteName, transferRequest.Prefixes, svc, pollResult)
if err != nil {
f.errorDuringTransfer("error-updating-s3-objects", err)
return backoffState
}
keyToVersionIds, err = removeOldS3Files(keyToVersionIds, paths, transferRequest.RemoteName, svc)
keyToVersionIds, err = removeOldS3Files(keyToVersionIds, paths, transferRequest.RemoteName, transferRequest.Prefixes, svc)
if err != nil {
f.errorDuringTransfer("error-during-object-pagination", err)
return backoffState
Expand Down
29 changes: 19 additions & 10 deletions tests/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,22 +220,31 @@ func TestS3Remote(t *testing.T) {
}
})

t.Run("CloneSubsetPull", func(t *testing.T) {
// fsname := citools.UniqName()
// citools.RunOnNode(t, node1, "dm s3 clone-subset test-real-s3 test.dotmesh subset/ --local-name="+fsname)
// citools.RunOnNode(t, node1, "dm push test-real-s3 "+fsname)
// resp := citools.OutputFromRunOnNode(t, node1, s3cmd+" ls s3://test.dotmesh")
// if !strings.Contains(resp, "hello-world.txt") {
// citools.RunOnNode(t, node1, s3cmd+" put hello-world.txt s3://test.dotmesh")
// t.Error("Deleted a file we aren't tracking")
// }
t.Run("CloneSubsetPushPull", func(t *testing.T) {
fsname := citools.UniqName()
citools.RunOnNode(t, node1, "dm s3 clone-subset test-real-s3 test.dotmesh subset/ --local-name="+fsname)
citools.RunOnNode(t, node1, citools.DockerRun(fsname)+" touch /foo/file.txt")
citools.TryUntilSucceeds(func() error {
output := citools.OutputFromRunOnNode(t, node1, "dm dot show "+fsname+" -H | grep dirty")
if strings.Contains(output, "\t0") {
return fmt.Errorf("not dirty yet")
}
return nil
}, "waiting for dirty data...")
citools.RunOnNode(t, node1, "dm commit -m 'add file to s3'")
citools.RunOnNode(t, node1, "dm push test-real-s3 "+fsname)
resp := citools.OutputFromRunOnNode(t, node1, s3cmd("ls s3://test.dotmesh"))
if !strings.Contains(resp, "hello-world.txt") {
citools.RunOnNode(t, node1, s3cmd("put hello-world.txt s3://test.dotmesh"))
t.Error("Deleted a file we aren't tracking")
}
PutBackS3Files(node1)
citools.RunOnNode(t, node1, "mkdir -p subset && echo 'directories' > subset/subdir.txt")
citools.RunOnNode(t, node1, s3cmd("put subset/subdir.txt s3://test.dotmesh/subset/subdir.txt"))
fsname2 := citools.UniqName()
citools.RunOnNode(t, node1, "dm s3 clone-subset test-real-s3 test.dotmesh subset/ --local-name="+fsname2)
citools.RunOnNode(t, node1, "dm pull test-real-s3 "+fsname2)
resp := citools.OutputFromRunOnNode(t, node1, citools.DockerRun(fsname2)+" ls /foo/")
resp = citools.OutputFromRunOnNode(t, node1, citools.DockerRun(fsname2)+" ls /foo/")
if strings.Contains(resp, "hello-world.txt") {
t.Error("Pulled down a file we aren't tracking")
}
Expand Down

0 comments on commit 002830a

Please sign in to comment.