Skip to content

Commit

Permalink
Rework inflight change tracking and also apply it to overwrites in ad…
Browse files Browse the repository at this point in the history
…dition to deletes

Otherwise a multipart upload could race with listings. An approximate test case is:

1. Mount the FS with `--stat-cache-ttl=1s --debug_s3`.
2. Create a 1 GB file: `dd if=/dev/urandom of=mnt/directory/1GBFILE bs=1M count=1000`.
3. Start `while true; do ls mnt/directory > /dev/null; done` in a parallel shell.
   where mnt/directory is the directory that contains the 1 GB file from step 2
4. Run `for i in {1..100}; do dd if=/dev/urandom of=mnt/directory/1GBFILE bs=1 count=1 conv=notrunc; done`
   several times. Another way is to just run that `dd` command a lot of times with
   random delays between runs.
5. Look for "AbortMultipartUpload" in the log. An aborted upload means that you hit the bug
   and one of the modifications is lost.
  • Loading branch information
vitalif committed Oct 22, 2021
1 parent 2497a25 commit c5f012b
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 61 deletions.
35 changes: 11 additions & 24 deletions internal/dir.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,9 +264,7 @@ func (parent *Inode) listObjectsSlurp(inode *Inode, startAfter string, lock bool
startWith = PString(key+".\xF4\x8F\xBF\xBF")
}

parent.fs.mu.Lock()
myList := parent.fs.addInflightListing()
parent.fs.mu.Unlock()

params := &ListBlobsInput{
Prefix: &prefix,
Expand All @@ -283,10 +281,10 @@ func (parent *Inode) listObjectsSlurp(inode *Inode, startAfter string, lock bool
parent.mu.Lock()
}
parent.fs.mu.Lock()
forceDelete := parent.fs.getDeleteOverrides(myList)
skipListing := parent.fs.completeInflightListingUnlocked(myList)
dirs := make(map[*Inode]bool)
for _, obj := range resp.Items {
if forceDelete != nil && forceDelete[*obj.Key] {
if skipListing != nil && skipListing[*obj.Key] {
continue
}
baseName := (*obj.Key)[len(prefix):]
Expand Down Expand Up @@ -393,12 +391,12 @@ func listBlobsSafe(cloud StorageBackend, param *ListBlobsInput) (*ListBlobsOutpu
return res, nil
}

func (dh *DirHandle) handleListResult(resp *ListBlobsOutput, prefix string, forceDelete map[string]bool) {
func (dh *DirHandle) handleListResult(resp *ListBlobsOutput, prefix string, skipListing map[string]bool) {
parent := dh.inode
fs := parent.fs

for _, dir := range resp.Prefixes {
if forceDelete != nil && forceDelete[*dir.Prefix] {
if skipListing != nil && skipListing[*dir.Prefix] {
continue
}
// strip trailing /
Expand Down Expand Up @@ -427,7 +425,7 @@ func (dh *DirHandle) handleListResult(resp *ListBlobsOutput, prefix string, forc
}

for _, obj := range resp.Items {
if forceDelete != nil && forceDelete[*obj.Key] {
if skipListing != nil && skipListing[*obj.Key] {
continue
}
baseName := (*obj.Key)[len(prefix):]
Expand Down Expand Up @@ -476,17 +474,15 @@ func (dh *DirHandle) listObjectsFlat() (err error) {
prefix += "/"
}

myList := dh.inode.fs.addInflightListing()

params := &ListBlobsInput{
Delimiter: aws.String("/"),
ContinuationToken: dh.inode.dir.listMarker,
Prefix: &prefix,
}
dh.mu.Unlock()

dh.inode.fs.mu.Lock()
myList := dh.inode.fs.addInflightListing()
dh.inode.fs.mu.Unlock()

resp, err := listBlobsSafe(cloud, params)
dh.mu.Lock()
if err != nil {
Expand All @@ -498,8 +494,7 @@ func (dh *DirHandle) listObjectsFlat() (err error) {
dh.inode.mu.Lock()
dh.inode.fs.mu.Lock()

forceDelete := dh.inode.fs.getDeleteOverrides(myList)
dh.handleListResult(resp, prefix, forceDelete)
dh.handleListResult(resp, prefix, dh.inode.fs.completeInflightListingUnlocked(myList))

dh.inode.fs.mu.Unlock()
dh.inode.mu.Unlock()
Expand Down Expand Up @@ -944,11 +939,14 @@ func (inode *Inode) SendDelete() {
inode.IsFlushing += inode.fs.flags.MaxParallelParts
implicit := inode.ImplicitDir
go func() {
// Delete may race with a parallel listing
var err error
if !implicit {
inode.fs.addInflightChange(key)
_, err = cloud.DeleteBlob(&DeleteBlobInput{
Key: key,
})
inode.fs.completeInflightChange(key)
}
inode.mu.Lock()
atomic.AddInt64(&inode.Parent.fs.activeFlushers, -1)
Expand Down Expand Up @@ -989,17 +987,6 @@ func (inode *Inode) SendDelete() {
inode.DeRef(0)
inode.mu.Unlock()
}
if err == nil {
// Delete may race with a parallel listing :-X
// Monkey-fix the race :-) I suspect that it wouldn't be needed
// if all of the S3-FS sync code was implemented using CRDT or
// maybe something even more clever. But for now we use simple things. :-)
inode.fs.mu.Lock()
if len(inode.fs.inflightListings) > 0 {
inode.fs.inflightListDeletes = append(inode.fs.inflightListDeletes, key)
}
inode.fs.mu.Unlock()
}
inode.fs.WakeupFlusher()
}()
}
Expand Down
11 changes: 11 additions & 0 deletions internal/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -1152,6 +1152,7 @@ func (inode *Inode) SendUpload() bool {
}

if inode.oldParent != nil && inode.IsFlushing == 0 && inode.mpu == nil {
// Send rename
inode.IsFlushing += inode.fs.flags.MaxParallelParts
atomic.AddInt64(&inode.fs.activeFlushers, 1)
_, from := inode.oldParent.cloud()
Expand All @@ -1173,10 +1174,12 @@ func (inode *Inode) SendUpload() bool {
// because if we used it we'd have to do it under the inode lock. Because otherwise
// a parallel read could hit a non-existing name. So, with S3, we do it in 2 passes.
// First we copy the object, change the inode name, and then we delete the old copy.
inode.fs.addInflightChange(key)
_, err = cloud.CopyBlob(&CopyBlobInput{
Source: from,
Destination: key,
})
inode.fs.completeInflightChange(key)
notFoundIgnore := false
if err != nil {
mappedErr := mapAwsError(err)
Expand Down Expand Up @@ -1239,9 +1242,11 @@ func (inode *Inode) SendUpload() bool {
inode.mu.Unlock()
// Now delete the old key
if !notFoundIgnore {
inode.fs.addInflightChange(delKey)
_, err = cloud.DeleteBlob(&DeleteBlobInput{
Key: delKey,
})
inode.fs.completeInflightChange(delKey)
}
if err != nil {
log.Debugf("Failed to delete %v during rename, will retry later", delKey)
Expand Down Expand Up @@ -1294,13 +1299,15 @@ func (inode *Inode) SendUpload() bool {
inode.IsFlushing += inode.fs.flags.MaxParallelParts
atomic.AddInt64(&inode.fs.activeFlushers, 1)
go func() {
inode.fs.addInflightChange(key)
_, err := cloud.CopyBlob(&CopyBlobInput{
Source: key,
Destination: key,
Size: &inode.Attributes.Size,
ETag: PString(inode.knownETag),
Metadata: escapeMetadata(inode.userMetadata),
})
inode.fs.completeInflightChange(key)
inode.mu.Lock()
inode.recordFlushError(err)
if err != nil {
Expand Down Expand Up @@ -1595,7 +1602,9 @@ func (inode *Inode) FlushSmallObject() {
inode.mpu = nil
}
inode.mu.Unlock()
inode.fs.addInflightChange(key)
resp, err := cloud.PutBlob(params)
inode.fs.completeInflightChange(key)
inode.mu.Lock()

inode.recordFlushError(err)
Expand Down Expand Up @@ -1848,7 +1857,9 @@ func (inode *Inode) completeMultipart() {
// Finalize the upload
inode.mpu.NumParts = uint32(numParts)
inode.mu.Unlock()
inode.fs.addInflightChange(key)
resp, err := cloud.MultipartBlobCommit(inode.mpu)
inode.fs.completeInflightChange(key)
inode.mu.Lock()
if inode.CacheState == ST_CREATED || inode.CacheState == ST_MODIFIED {
inode.recordFlushError(err)
Expand Down
81 changes: 44 additions & 37 deletions internal/goofys.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"net/url"
"os"
"runtime/debug"
"sort"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -89,9 +88,11 @@ type Goofys struct {
// GUARDED_BY(mu)
inodes map[fuseops.InodeID]*Inode

// Inflight changes are tracked to skip them in parallel listings
// Required because we don't have guarantees about listing & change ordering
inflightListingId int
inflightListings []int
inflightListDeletes []string
inflightListings map[int]map[string]bool
inflightChanges map[string]int

nextHandleID fuseops.HandleID
dirHandles map[fuseops.HandleID]*DirHandle
Expand Down Expand Up @@ -192,6 +193,8 @@ func newGoofys(ctx context.Context, bucket string, flags *FlagStorage,
umask: 0122,
lfru: NewLFRU(flags.CachePopularThreshold, flags.CacheMaxHits, flags.CacheAgeInterval, flags.CacheAgeDecrement),
zeroBuf: make([]byte, 1048576),
inflightChanges: make(map[string]int),
inflightListings: make(map[int]map[string]bool),
}

var prefix string
Expand Down Expand Up @@ -1173,44 +1176,48 @@ func (fs *Goofys) ReadDir(
return
}

func (fs *Goofys) addInflightListing() int {
fs.inflightListingId++
myList := fs.inflightListingId
fs.inflightListings = append(fs.inflightListings, myList, len(fs.inflightListDeletes))
return myList
// LOCKS_EXCLUDED(fs.mu)
func (fs *Goofys) addInflightChange(key string) {
fs.mu.Lock()
fs.inflightChanges[key]++
for _, v := range fs.inflightListings {
v[key] = true
}
fs.mu.Unlock()
}

func (fs *Goofys) getDeleteOverrides(myList int) map[string]bool {
var forceDelete map[string]bool
listPos := 2 * sort.Search(len(fs.inflightListings)/2, func(i int) bool {
return fs.inflightListings[i*2] >= myList
})
if fs.inflightListings[listPos] != myList {
panic("listing disappeared")
}
for i := fs.inflightListings[listPos+1]; i < len(fs.inflightListDeletes); i++ {
if forceDelete == nil {
forceDelete = make(map[string]bool)
}
forceDelete[fs.inflightListDeletes[i]] = true
}
fs.inflightListings = append(
fs.inflightListings[0 : listPos],
fs.inflightListings[listPos+2 : ]...
)
minPos := len(fs.inflightListDeletes)
for i := 1; i < len(fs.inflightListings); i += 2 {
if fs.inflightListings[i] < minPos {
minPos = fs.inflightListings[i]
}
// LOCKS_EXCLUDED(fs.mu)
func (fs *Goofys) completeInflightChange(key string) {
fs.mu.Lock()
fs.inflightChanges[key]--
if fs.inflightChanges[key] <= 0 {
delete(fs.inflightChanges, key)
}
if minPos > 0 {
fs.inflightListDeletes = fs.inflightListDeletes[minPos : ]
for i := 1; i < len(fs.inflightListings); i += 2 {
fs.inflightListings[i] -= minPos
}
fs.mu.Unlock()
}

// LOCKS_EXCLUDED(fs.mu)
func (fs *Goofys) addInflightListing() int {
fs.mu.Lock()
fs.inflightListingId++
id := fs.inflightListingId
m := make(map[string]bool)
for k, _ := range fs.inflightChanges {
m[k] = true
}
return forceDelete
fs.inflightListings[id] = m
fs.mu.Unlock()
return id
}

// For any listing, we forcibly exclude all objects modifications of which were
// started before the completion of the listing, but were not completed before
// the beginning of the listing.
// LOCKS_REQUIRED(fs.mu)
func (fs *Goofys) completeInflightListingUnlocked(id int) map[string]bool {
m := fs.inflightListings[id]
delete(fs.inflightListings, id)
return m
}

func (fs *Goofys) ReleaseDirHandle(
Expand Down

0 comments on commit c5f012b

Please sign in to comment.