Skip to content

Commit

Permalink
#461: implement subset selection on the server end
Browse files Browse the repository at this point in the history
  • Loading branch information
Charlotte committed Jul 13, 2018
1 parent de6aa5e commit d29db1a
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 2 deletions.
20 changes: 19 additions & 1 deletion cmd/dotmesh-server/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,22 @@ func getS3Client(transferRequest S3TransferRequest) (*s3.S3, error) {
return svc, nil
}

func downloadS3Bucket(svc *s3.S3, bucketName, destPath, transferRequestId string, pollResult *TransferPollResult, currentKeyVersions map[string]string) (bool, map[string]string, error) {
func downloadS3Bucket(svc *s3.S3, bucketName, destPath, transferRequestId string, prefixes []string, pollResult *TransferPollResult, currentKeyVersions map[string]string) (bool, map[string]string, error) {
if len(prefixes) == 0 {
return downloadPartialS3Bucket(svc, bucketName, destPath, transferRequestId, "", pollResult, currentKeyVersions)
}
var changed bool
var err error
for _, prefix := range prefixes {
changed, currentKeyVersions, err = downloadPartialS3Bucket(svc, bucketName, destPath, transferRequestId, prefix, pollResult, currentKeyVersions)
if err != nil {
return false, nil, err
}
}
return changed, currentKeyVersions, nil
}

func downloadPartialS3Bucket(svc *s3.S3, bucketName, destPath, transferRequestId, prefix string, pollResult *TransferPollResult, currentKeyVersions map[string]string) (bool, map[string]string, error) {
// for every version in the bucket
// 1. Delete anything locally that's been deleted in S3.
// 2. Download new versions of things that have changed
Expand All @@ -125,6 +140,9 @@ func downloadS3Bucket(svc *s3.S3, bucketName, destPath, transferRequestId string
params := &s3.ListObjectVersionsInput{
Bucket: aws.String(bucketName),
}
if prefix != "" {
params.Prefix = prefix
}
downloader := s3manager.NewDownloaderWithClient(svc)
var innerError error
err := svc.ListObjectVersionsPages(params,
Expand Down
2 changes: 1 addition & 1 deletion cmd/dotmesh-server/s3pullinitiatorstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func s3PullInitiatorState(f *fsMachine) stateFn {
return backoffState
}
}
bucketChanged, keyVersions, err := downloadS3Bucket(svc, transferRequest.RemoteName, destPath, transferRequestId, &pollResult, latestMeta)
bucketChanged, keyVersions, err := downloadS3Bucket(svc, transferRequest.RemoteName, destPath, transferRequestId, transferRequest.Prefixes, &pollResult, latestMeta)
if err != nil {
f.errorDuringTransfer("cant-pull-from-s3", err)
return backoffState
Expand Down

0 comments on commit d29db1a

Please sign in to comment.