Skip to content

Commit

Permalink
use .objinsync as working dir to implement atomic update
Browse files Browse the repository at this point in the history
tmp will not work when it's mounted as a different partition
  • Loading branch information
Qingping Hou committed Mar 24, 2020
1 parent 91a0137 commit 34db797
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 57 deletions.
10 changes: 4 additions & 6 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,24 +96,22 @@ func main() {
localDir := args[1]
interval := time.Second * 5

_, err := os.Stat(localDir)
puller, err := sync.NewPuller(remoteUri, localDir)
if err != nil {
log.Fatal(localDir, " is not a valid dir: ", err)
log.Fatal(err)
}

puller := sync.NewPuller()
if FlagExclude != nil {
puller.AddExcludePatterns(FlagExclude)
}
if !FlagScratch {
puller.PopulateChecksum(localDir)
puller.PopulateChecksum()
}

pull := func() {
start := time.Now()
l.Info("Pull started.")

errMsg := puller.Pull(remoteUri, localDir)
errMsg := puller.Pull()
if errMsg != "" {
sentry.CaptureMessage(errMsg)
sentry.Flush(time.Second * 5)
Expand Down
111 changes: 70 additions & 41 deletions pkg/sync/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,32 @@ func uidFromLocalPath(localPath string) (string, error) {
return fmt.Sprintf("\"%s\"", uid), nil
}

type Puller struct {
RemoteUri string
LocalDir string

workingDir string
exclude []string
workerCnt int
uidCache map[string]string
uidLock *sync.Mutex
taskQueue chan DownloadTask
errMsgQueue chan string
// Here is how filesToDelete is being used:
//
// 1. before each pull action, we populate filesToDelete with all files
// (without dirs) from local target directory. During this process, we also
// delete local empty directories.
//
// 2. we list S3 bucket, for any file in the bucket, we remove related
// entry from the delete list
//
// 3. at the end of the pull, we delete files from the list
filesToDelete map[string]bool
fileListedCnt int
filePulledCnt int
}

func (self *Puller) downloadHandler(task DownloadTask, downloader GenericDownloader) {
l := zap.S()

Expand All @@ -125,9 +151,9 @@ func (self *Puller) downloadHandler(task DownloadTask, downloader GenericDownloa
}

// create file
tmpfile, err := ioutil.TempFile(os.TempDir(), "objinsync-download-")
tmpfile, err := ioutil.TempFile(self.workingDir, filepath.Base(task.LocalPath))
if err != nil {
self.errMsgQueue <- fmt.Sprintf("Failed to create file %s for download: %v", tmpfile.Name(), err)
self.errMsgQueue <- fmt.Sprintf("Failed to create file for download: %v", err)
return
}
defer tmpfile.Close()
Expand Down Expand Up @@ -228,50 +254,41 @@ func (self *Puller) handlePageList(
return true
}

type Puller struct {
exclude []string
workerCnt int
uidCache map[string]string
uidLock *sync.Mutex
taskQueue chan DownloadTask
errMsgQueue chan string
// Here is how filesToDelete is being used:
//
// 1. before each pull action, we populate filesToDelete with all files
// (without dirs) from local target directory. During this process, we also
// delete local empty directories.
//
// 2. we list S3 bucket, for any file in the bucket, we remove related
// entry from the delete list
//
// 3. at the end of the pull, we delete files from the list
filesToDelete map[string]bool
fileListedCnt int
filePulledCnt int
}

func (self *Puller) AddExcludePatterns(patterns []string) {
for _, pattern := range patterns {
self.exclude = append(self.exclude, pattern)
}
}

func (self *Puller) Pull(remoteUri string, localDir string) string {
func (self *Puller) SetupWorkingDir() error {
// create temporary working directory to hold downloads for atomic rename
// TmpDir won't work because it could be in a different partition, which
// will lead to invalid cross-device link error
if _, err := os.Stat(self.workingDir); os.IsNotExist(err) {
err = os.MkdirAll(self.workingDir, os.ModePerm)
if err != nil {
return err
}
}
return nil
}

func (self *Puller) Pull() string {
l := zap.S()

filesToDelete, err := listAndPruneDir(localDir, self.exclude)
filesToDelete, err := listAndPruneDir(self.LocalDir, self.exclude)
if err != nil {
return fmt.Sprintf("Failed to list and prune local dir %s: %v", localDir, err)
return fmt.Sprintf("Failed to list and prune local dir %s: %v", self.LocalDir, err)
}
// handlePageList method will remove files existed in remote source from this list
self.filesToDelete = filesToDelete
defer func() {
self.filesToDelete = nil
}()

bucket, remoteDirPath, err := parseObjectUri(remoteUri)
bucket, remoteDirPath, err := parseObjectUri(self.RemoteUri)
if err != nil {
return fmt.Sprintf("Invalid remote uri %s: %v", remoteUri, err)
return fmt.Sprintf("Invalid remote uri %s: %v", self.RemoteUri, err)
}

self.taskQueue = make(chan DownloadTask, 30)
Expand All @@ -292,6 +309,11 @@ func (self *Puller) Pull(remoteUri string, localDir string) string {
svc := s3.New(sess, aws.NewConfig().WithRegion(region))
downloader := s3manager.NewDownloaderWithClient(svc)

if err := self.SetupWorkingDir(); err != nil {
return fmt.Sprintf("Failed to create working directory %s: %v", self.workingDir, err)
}
defer os.RemoveAll(self.workingDir) // purge working dir when downlaods are done

// spawn worker goroutines
var wg sync.WaitGroup
for i := 0; i < self.workerCnt; i++ {
Expand Down Expand Up @@ -329,7 +351,7 @@ func (self *Puller) Pull(remoteUri string, localDir string) string {

err = svc.ListObjectsV2Pages(listParams,
func(page *s3.ListObjectsV2Output, lastPage bool) bool {
return self.handlePageList(page, lastPage, bucket, remoteDirPath, localDir)
return self.handlePageList(page, lastPage, bucket, remoteDirPath, self.LocalDir)
})
close(self.taskQueue)
wg.Wait()
Expand All @@ -339,7 +361,7 @@ func (self *Puller) Pull(remoteUri string, localDir string) string {
metricsFilePulled.Set(float64(self.filePulledCnt))

if err != nil {
return fmt.Sprintf("Failed to list remote uri %s: %v", remoteUri, err)
return fmt.Sprintf("Failed to list remote uri %s: %v", self.RemoteUri, err)
} else {
errMsgWg.Wait()

Expand All @@ -354,7 +376,7 @@ func (self *Puller) Pull(remoteUri string, localDir string) string {
}
}

func (self *Puller) PopulateChecksum(localDir string) {
func (self *Puller) PopulateChecksum() {
l := zap.S()

setFileChecksum := func(path string) {
Expand All @@ -369,9 +391,9 @@ func (self *Puller) PopulateChecksum(localDir string) {
l.Errorf("Failed to calculate checksum for file: %s, err: %s", path, err)
}

uidKey, err := uidKeyFromLocalPath(localDir, path)
uidKey, err := uidKeyFromLocalPath(self.LocalDir, path)
if err != nil {
l.Errorf("Failed to calculate uidKey for file: %s under dir: %s, err: %s", path, localDir, err)
l.Errorf("Failed to calculate uidKey for file: %s under dir: %s, err: %s", path, self.LocalDir, err)
return
}

Expand All @@ -386,14 +408,14 @@ func (self *Puller) PopulateChecksum(localDir string) {
self.uidLock.Unlock()
}

err := filepath.Walk(localDir, func(path string, info os.FileInfo, err error) error {
err := filepath.Walk(self.LocalDir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}

// ignore file that matches exclude rules
shouldSkip := false
relPath, err := filepath.Rel(localDir, path)
relPath, err := filepath.Rel(self.LocalDir, path)
if err != nil {
l.Errorf("Got invalid path from filepath.Walk: %s, err: %s", path, err)
shouldSkip = true
Expand Down Expand Up @@ -424,10 +446,17 @@ func (self *Puller) PopulateChecksum(localDir string) {
}
}

func NewPuller() *Puller {
return &Puller{
workerCnt: 5,
uidCache: map[string]string{},
uidLock: &sync.Mutex{},
func NewPuller(remoteUri string, localDir string) (*Puller, error) {
if _, err := os.Stat(localDir); os.IsNotExist(err) {
return nil, fmt.Errorf("local directory `%s` does not exist: %v", localDir, err)
}

return &Puller{
RemoteUri: remoteUri,
LocalDir: localDir,
workingDir: filepath.Join(localDir, ".objinsync"),
workerCnt: 5,
uidCache: map[string]string{},
uidLock: &sync.Mutex{},
}, nil
}
46 changes: 36 additions & 10 deletions pkg/sync/pull_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,13 @@ import (
)

func TestSkipParentDir(t *testing.T) {
p := NewPuller()
dir, err := ioutil.TempDir("", "")
assert.Equal(t, nil, err)
defer os.RemoveAll(dir)
p, err := NewPuller("s3://foo/home", dir)
assert.Equal(t, nil, err)
p.SetupWorkingDir()

p.taskQueue = make(chan DownloadTask, 10)
p.handlePageList(
&s3.ListObjectsV2Output{
Expand All @@ -33,7 +39,7 @@ func TestSkipParentDir(t *testing.T) {
false,
"foo",
"home",
"abc",
dir,
)
close(p.taskQueue)

Expand All @@ -46,6 +52,7 @@ func TestSkipParentDir(t *testing.T) {

func TestDeleteStaleFile(t *testing.T) {
dir, err := ioutil.TempDir("", "")
defer os.RemoveAll(dir)
nonEmptyDir := filepath.Join(dir, "bar")
os.MkdirAll(nonEmptyDir, os.ModePerm)
fileA := filepath.Join(nonEmptyDir, "a.go")
Expand All @@ -62,7 +69,9 @@ func TestDeleteStaleFile(t *testing.T) {
err = ioutil.WriteFile(deletedFileB, []byte("test2"), 0644)
assert.Equal(t, nil, err)

p := NewPuller()
p, err := NewPuller("s3://foo/home/dags", dir)
assert.Equal(t, nil, err)
assert.Equal(t, nil, p.SetupWorkingDir())
p.taskQueue = make(chan DownloadTask, 10)
p.filesToDelete, err = listAndPruneDir(dir, nil)
assert.Equal(t, nil, err)
Expand Down Expand Up @@ -117,7 +126,12 @@ func TestDeleteStaleFile(t *testing.T) {
}

func TestSkipObjectsWithoutChange(t *testing.T) {
p := NewPuller()
dir, err := ioutil.TempDir("", "")
assert.Equal(t, nil, err)
defer os.RemoveAll(dir)
p, err := NewPuller("s3://foo/home/dags", dir)
assert.Equal(t, nil, err)
assert.Equal(t, nil, p.SetupWorkingDir())
p.taskQueue = make(chan DownloadTask, 10)
p.uidCache["b.file"] = "\"1\""

Expand Down Expand Up @@ -146,7 +160,7 @@ func TestSkipObjectsWithoutChange(t *testing.T) {
false,
"foo",
"home/dags",
"bar",
dir,
)
close(p.taskQueue)
wg.Wait()
Expand All @@ -156,7 +170,12 @@ func TestSkipObjectsWithoutChange(t *testing.T) {
}

func TestSkipExcludedObjects(t *testing.T) {
p := NewPuller()
dir, err := ioutil.TempDir("", "")
assert.Equal(t, nil, err)
defer os.RemoveAll(dir)
p, err := NewPuller("s3://foo/home", dir)
assert.Equal(t, nil, err)
assert.Equal(t, nil, p.SetupWorkingDir())
p.taskQueue = make(chan DownloadTask, 10)

var wg sync.WaitGroup
Expand Down Expand Up @@ -197,7 +216,7 @@ func TestSkipExcludedObjects(t *testing.T) {
false,
"foo",
"home",
"bar",
dir,
)
close(p.taskQueue)
wg.Wait()
Expand All @@ -207,7 +226,12 @@ func TestSkipExcludedObjects(t *testing.T) {
}

func TestSkipDirectories(t *testing.T) {
p := NewPuller()
dir, err := ioutil.TempDir("", "")
assert.Equal(t, nil, err)
defer os.RemoveAll(dir)
p, err := NewPuller("s3://foo/home/dags", dir)
assert.Equal(t, nil, err)
assert.Equal(t, nil, p.SetupWorkingDir())
p.taskQueue = make(chan DownloadTask, 10)

var wg sync.WaitGroup
Expand Down Expand Up @@ -235,7 +259,7 @@ func TestSkipDirectories(t *testing.T) {
false,
"foo",
"home/dags",
"bar",
dir,
)
close(p.taskQueue)
wg.Wait()
Expand All @@ -257,8 +281,10 @@ func TestNestedPathDownload(t *testing.T) {

mockDownloader := MockDownloader{}

p := NewPuller()
p, err := NewPuller("s3://abc/efg", dir)
assert.Equal(t, nil, err)
p.errMsgQueue = make(chan string, 30)
assert.Equal(t, nil, p.SetupWorkingDir())

p.downloadHandler(
DownloadTask{
Expand Down

0 comments on commit 34db797

Please sign in to comment.