Skip to content

Commit

Permalink
Prevent unnecessary reuploads and eviction during parallel linear wri…
Browse files Browse the repository at this point in the history
…te (fix the new test)

This is done by prioritizing uploads of full dirty parts over partly
empty dirty parts and fully zero dirty parts
  • Loading branch information
vitalif committed Mar 26, 2024
1 parent 3069e57 commit a7ae339
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 21 deletions.
57 changes: 42 additions & 15 deletions internal/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type FileHandle struct {
// On Linux and MacOS, IOV_MAX = 1024
const IOV_MAX = 1024
const READ_BUF_SIZE = 128 * 1024
const MAX_FLUSH_PRIORITY = 3

// NewFileHandle returns a new file handle for the given `inode`
func NewFileHandle(inode *Inode) *FileHandle {
Expand Down Expand Up @@ -167,6 +168,7 @@ func (fh *FileHandle) WriteFile(offset int64, data []byte, copyData bool) (err e
}

allocated := fh.inode.buffers.Add(uint64(offset), data, BUF_DIRTY, copyData)
atomic.StoreUint64(&fh.inode.fs.hasNewWrites, 1)

fh.inode.lastWriteEnd = end
if fh.inode.CacheState == ST_CACHED {
Expand Down Expand Up @@ -621,7 +623,7 @@ func (inode *Inode) recordFlushError(err error) {
inode.fs.ScheduleRetryFlush()
}

func (inode *Inode) TryFlush() bool {
func (inode *Inode) TryFlush(priority int) bool {
overDeleted := false
parent := inode.Parent
if parent != nil {
Expand Down Expand Up @@ -654,12 +656,12 @@ func (inode *Inode) TryFlush() bool {
if overDeleted {
return false
}
return inode.sendUpload()
return inode.sendUpload(priority)
}
return false
}

func (inode *Inode) sendUpload() bool {
func (inode *Inode) sendUpload(priority int) bool {
if inode.oldParent != nil && inode.IsFlushing == 0 && inode.mpu == nil {
// Rename file
inode.sendRename()
Expand Down Expand Up @@ -719,7 +721,7 @@ func (inode *Inode) sendUpload() bool {
}

// Pick part(s) to flush
initiated, canComplete := inode.sendUploadParts()
initiated, canComplete := inode.sendUploadParts(priority)
if initiated {
return true
}
Expand Down Expand Up @@ -978,12 +980,18 @@ func (inode *Inode) sendStartMultipart() {
}()
}

func (inode *Inode) sendUploadParts() (bool, bool) {
func (inode *Inode) sendUploadParts(priority int) (bool, bool) {
initiated := false
shouldComplete := true
flushInode := inode.fileHandles == 0 || inode.forceFlush
wantFree := atomic.LoadInt32(&inode.fs.wantFree) > 0
var partlyZero []uint64
var fullyZero []uint64
anyEvicted := false
// Dirty parts should be flushed in the following order:
// 1) completely filled non-zero non-RMW-evicted parts
// 2) only when there are no more (priority 1) parts: partly filled non-RMW parts
// 3) only when there are no (1) and (2) parts, but there exist RMW-evicted parts: zero parts
inode.buffers.IterateDirtyParts(func(partNum uint64) bool {
partOffset, partSize := inode.fs.partRange(partNum)
if inode.IsRangeLocked(partOffset, partSize, true) {
Expand All @@ -998,6 +1006,7 @@ func (inode *Inode) sendUploadParts() (bool, bool) {
}
partEnd := partOffset+partSize
var partDirty, partEvicted, partZero, partNonZero bool
var lastBufferEnd uint64
inode.buffers.Ascend(partOffset+1, func(end uint64, buf *FileBuffer) (cont bool, changed bool) {
if buf.offset >= partEnd {
return false, false
Expand All @@ -1006,34 +1015,48 @@ func (inode *Inode) sendUploadParts() (bool, bool) {
partEvicted = partEvicted || buf.state == BUF_FL_CLEARED
partZero = partZero || buf.zero
partNonZero = partNonZero || !buf.zero
lastBufferEnd = end
return true, false
})
partZero = partZero || lastBufferEnd < partEnd
if !partDirty || partEvicted {
// Don't flush parts which require RMW with evicted buffers
anyEvicted = anyEvicted || partEvicted
} else if partZero && !flushInode {
// Don't flush parts with empty ranges UNLESS we're under memory pressure
// and tried all parts without empty ranges, because flushing them may be
// the only way to free memory
if wantFree && partNonZero {
if !partNonZero && priority >= 3 {
fullyZero = append(fullyZero, partNum)
}
if wantFree && partNonZero && priority >= 2 {
partlyZero = append(partlyZero, partNum)
}
shouldComplete = false
} else {
// Guard part against eviction
initiated = true
shouldComplete = false
inode.goFlushPart(partNum, partOffset, partSize)
if inode.flushLimitsExceeded() {
return false
if inode.goFlushPart(partNum, partOffset, partSize, 1) {
initiated = true
if inode.flushLimitsExceeded() {
return false
}
}
}
return true
})
if !initiated && len(partlyZero) > 0 {
for _, partNum := range partlyZero {
partOffset, partSize := inode.fs.partRange(partNum)
initiated = true
inode.goFlushPart(partNum, partOffset, partSize)
initiated = initiated || inode.goFlushPart(partNum, partOffset, partSize, 2)
if inode.flushLimitsExceeded() {
break
}
}
}
if !initiated && anyEvicted && len(fullyZero) > 0 {
for _, partNum := range partlyZero {
partOffset, partSize := inode.fs.partRange(partNum)
initiated = initiated || inode.goFlushPart(partNum, partOffset, partSize, 3)
if inode.flushLimitsExceeded() {
break
}
Expand All @@ -1043,20 +1066,24 @@ func (inode *Inode) sendUploadParts() (bool, bool) {
}

// LOCKS_REQUIRED(inode.mu)
func (inode *Inode) goFlushPart(partNum, partOffset, partSize uint64) {
func (inode *Inode) goFlushPart(partNum, partOffset, partSize uint64, priority uint64) bool {
// Guard part against eviction
inode.LockRange(partOffset, partSize, true)
inode.IsFlushing++
atomic.AddInt64(&inode.fs.stats.flushes, 1)
atomic.AddInt64(&inode.fs.activeFlushers, 1)
atomic.AddInt64(&inode.fs.flushPriorities[priority], 1)
go func() {
inode.mu.Lock()
inode.flushPart(partNum)
inode.UnlockRange(partOffset, partSize, true)
inode.IsFlushing--
inode.mu.Unlock()
atomic.AddInt64(&inode.fs.flushPriorities[priority], -1)
atomic.AddInt64(&inode.fs.activeFlushers, -1)
inode.fs.WakeupFlusher()
}()
return true
}

func (inode *Inode) uploadedAsMultipart() bool {
Expand Down Expand Up @@ -1717,7 +1744,7 @@ func (inode *Inode) SyncFile() (err error) {
}
inode.forceFlush = true
inode.mu.Unlock()
inode.TryFlush()
inode.TryFlush(MAX_FLUSH_PRIORITY)
inode.fs.flusherMu.Lock()
if inode.fs.flushPending == 0 {
inode.fs.flusherCond.Wait()
Expand Down
39 changes: 33 additions & 6 deletions internal/goofys.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ type Goofys struct {

activeFlushers int64
flushRetrySet int32
hasNewWrites uint64
flushPriorities []int64

forgotCnt uint32

Expand Down Expand Up @@ -296,6 +298,7 @@ func newGoofys(ctx context.Context, bucket string, flags *cfg.FlagStorage,
stats: OpStats{
ts: time.Now(),
},
flushPriorities: make([]int64, MAX_FLUSH_PRIORITY+1),
}

var prefix string
Expand Down Expand Up @@ -619,26 +622,50 @@ func (fs *Goofys) ScheduleRetryFlush() {
// With on-disk cache we can unload some dirty buffers to disk.
func (fs *Goofys) Flusher() {
var inodeID, nextQueueID uint64
priority := 1
for atomic.LoadInt32(&fs.shutdown) == 0 {
fs.flusherMu.Lock()
if fs.flushPending == 0 {
fs.flusherCond.Wait()
}
fs.flushPending = 0
fs.flusherMu.Unlock()
attempt := 1
if nextQueueID != 0 {
attempt = 2
attempts := 1
if priority > 1 || priority == 1 && nextQueueID != 0 {
attempts = 2
}
for attempt > 0 && atomic.LoadInt64(&fs.activeFlushers) < fs.flags.MaxFlushers {
curPriorityOk := false
for i := 1; i <= priority; i++ {
curPriorityOk = curPriorityOk || atomic.LoadInt64(&fs.flushPriorities[priority]) > 0
}
for attempts > 0 && atomic.LoadInt64(&fs.activeFlushers) < fs.flags.MaxFlushers {
inodeID, nextQueueID = fs.inodeQueue.Next(nextQueueID)
if inodeID == 0 {
attempt--
if curPriorityOk {
break
}
priority++
if priority > MAX_FLUSH_PRIORITY {
attempts--
priority = 1
}
if curPriorityOk {
break
}
} else {
if atomic.CompareAndSwapUint64(&fs.hasNewWrites, 1, 0) {
// restart from the beginning
inodeID, nextQueueID = 0, 0
priority = 1
attempts = 1
curPriorityOk = atomic.LoadInt64(&fs.flushPriorities[1]) > 0
continue
}
fs.mu.RLock()
inode := fs.inodes[fuseops.InodeID(inodeID)]
fs.mu.RUnlock()
inode.TryFlush()
started := inode.TryFlush(priority)
curPriorityOk = curPriorityOk || started
}
}
}
Expand Down

0 comments on commit a7ae339

Please sign in to comment.