Skip to content

Commit

Permalink
add documentation and comment
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelMure committed Apr 27, 2022
1 parent ff6502b commit 6c3252c
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 5 deletions.
29 changes: 24 additions & 5 deletions flatfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ var (
// calculating the DiskUsage upon a start when no
// DiskUsageFile is present.
// If this period did not suffice to read the size of the datastore,
// the remaining sizes will be stimated.
// the remaining sizes will be estimated.
DiskUsageCalcTimeout = 5 * time.Minute
// RetryDelay is a timeout for a backoff on retrying operations
// that fail due to transient errors like too many file descriptors open.
Expand Down Expand Up @@ -118,7 +118,7 @@ func init() {
// write operations to the same key. See the explanation in
// Put().
type Datastore struct {
// atmoic operations should always be used with diskUsage.
// atomic operations should always be used with diskUsage.
// Must be first in struct to ensure correct alignment
// (see https://golang.org/pkg/sync/atomic/#pkg-note-BUG)
diskUsage int64
Expand All @@ -129,7 +129,7 @@ type Datastore struct {
shardStr string
getDir ShardFunc

// sychronize all writes and directory changes for added safety
// synchronize all writes and directory changes for added safety
sync bool

// these values should only be used during internalization or
Expand Down Expand Up @@ -167,6 +167,8 @@ type op struct {
v []byte // value
}

// opMap is a synchronisation structure where a single op can be stored
// for each key.
type opMap struct {
ops sync.Map
}
Expand All @@ -179,7 +181,11 @@ type opResult struct {
name string
}

// Returns nil if there's nothing to do.
// Begins starts the processing of an op:
// - if no other op for the same key exist, register it and return immediately
// - if another op exist for the same key, wait until it's done:
// - if that previous op succeeded, consider that ours shouldn't execute and return nil
// - if that previous op failed, start ours
func (m *opMap) Begin(name string) *opResult {
for {
myOp := &opResult{opMap: m, name: name}
Expand Down Expand Up @@ -305,13 +311,17 @@ func (fs *Datastore) ShardStr() string {
return fs.shardStr
}

// encode returns the directory and file names for a given key according to
// the sharding function.
func (fs *Datastore) encode(key datastore.Key) (dir, file string) {
noslash := key.String()[1:]
dir = filepath.Join(fs.path, fs.getDir(noslash))
file = filepath.Join(dir, noslash+extension)
return dir, file
}

// decode returns the datastore.Key corresponding to a file name, according
// to the sharding function.
func (fs *Datastore) decode(file string) (key datastore.Key, ok bool) {
if !strings.HasSuffix(file, extension) {
// We expect random files like "put-". Log when we encounter
Expand All @@ -325,6 +335,8 @@ func (fs *Datastore) decode(file string) (key datastore.Key, ok bool) {
return datastore.NewKey(name), true
}

// makeDir is identical to makeDirNoSync but also enforce the sync
// if required by the config.
func (fs *Datastore) makeDir(dir string) error {
if err := fs.makeDirNoSync(dir); err != nil {
return err
Expand All @@ -342,6 +354,7 @@ func (fs *Datastore) makeDir(dir string) error {
return nil
}

// makeDirNoSync idempotently create a directory on disk.
func (fs *Datastore) makeDirNoSync(dir string) error {
if err := os.Mkdir(dir, 0755); err != nil {
// EEXIST is safe to ignore here, that just means the prefix
Expand Down Expand Up @@ -371,7 +384,7 @@ func (fs *Datastore) renameAndUpdateDiskUsage(tmpPath, path string) error {

// Rename and add new file's diskUsage. If the rename fails,
// it will either a) Re-add the size of an existing file, which
// was sustracted before b) Add 0 if there is no existing file.
// was subtracted before b) Add 0 if there is no existing file.
for i := 0; i < RetryAttempts; i++ {
err = rename(tmpPath, path)
// if there's no error, or the source file doesn't exist, abort.
Expand Down Expand Up @@ -573,6 +586,8 @@ func (fs *Datastore) putMany(data map[datastore.Key][]byte) error {
return nil
}

// Start by writing all the data in temp files so that we can be sure that
// all the data is on disk before renaming to the final places.
for key, value := range data {
dir, path := fs.encode(key)
if err := fs.makeDirNoSync(dir); err != nil {
Expand Down Expand Up @@ -971,6 +986,7 @@ func (fs *Datastore) updateDiskUsage(path string, add bool) {
}
}

// checkpointDiskUsage triggers a disk usage checkpoint write.
func (fs *Datastore) checkpointDiskUsage() {
select {
case fs.checkpointCh <- struct{}{}:
Expand All @@ -980,6 +996,8 @@ func (fs *Datastore) checkpointDiskUsage() {
}
}

// checkpointLoop periodically or following checkpoint event, write the current
// disk usage on disk.
func (fs *Datastore) checkpointLoop() {
defer close(fs.done)

Expand Down Expand Up @@ -1023,6 +1041,7 @@ func (fs *Datastore) checkpointLoop() {
}
}

// writeDiskUsageFile write the given checkpoint disk usage in a file.
func (fs *Datastore) writeDiskUsageFile(du int64, doSync bool) {
tmp, err := fs.tempFile()
if err != nil {
Expand Down
7 changes: 7 additions & 0 deletions shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ func (f *ShardIdV1) Func() ShardFunc {
return f.fun
}

// Prefix returns a sharding function taking the first prefixLen characters of the key.
// If too short, the key is padded with "_".
func Prefix(prefixLen int) *ShardIdV1 {
padding := strings.Repeat("_", prefixLen)
return &ShardIdV1{
Expand All @@ -42,6 +44,8 @@ func Prefix(prefixLen int) *ShardIdV1 {
}
}

// Prefix returns a sharding function taking the last suffixLen characters of the key.
// If too short, the key is padded with "_".
func Suffix(suffixLen int) *ShardIdV1 {
padding := strings.Repeat("_", suffixLen)
return &ShardIdV1{
Expand All @@ -54,6 +58,9 @@ func Suffix(suffixLen int) *ShardIdV1 {
}
}

// Prefix returns a sharding function taking the suffixLen characters of the key
// before the very last character.
// If too short, the key is padded with "_".
func NextToLast(suffixLen int) *ShardIdV1 {
padding := strings.Repeat("_", suffixLen+1)
return &ShardIdV1{
Expand Down

0 comments on commit 6c3252c

Please sign in to comment.