diff --git a/CHANGELOG.md b/CHANGELOG.md index ceb5705..4b83bc0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,8 @@ This project adheres to [Semantic Versioning](http://semver.org/). - Add `Flags` to txfile.Options. PR #5 - Add support to increase a file's maxSize on open. PR #5 - Add support to pre-allocate the meta area. PR #7 +- Begin returns an error if transaction is not compatible to file open mode. PR #17 +- Introduce Error type to txfile package. PR #17 ### Changed - Refine platform dependent file syncing. PR #10 diff --git a/alloc.go b/alloc.go index dd14a2e..0fa74a2 100644 --- a/alloc.go +++ b/alloc.go @@ -18,6 +18,7 @@ package txfile import ( + "fmt" "math" "github.com/elastic/go-txfile/internal/invariant" @@ -156,7 +157,11 @@ func (a *allocator) makeTxAllocState(withOverflow bool, growPercentage int) txAl } } -func (a *allocator) fileCommitPrepare(st *allocCommitState, tx *txAllocState, forceUpdate bool) { +func (a *allocator) fileCommitPrepare( + st *allocCommitState, + tx *txAllocState, + forceUpdate bool, +) { st.tx = tx st.updated = forceUpdate || tx.Updated() if st.updated { @@ -164,7 +169,9 @@ func (a *allocator) fileCommitPrepare(st *allocCommitState, tx *txAllocState, fo } } -func (a *allocator) fileCommitAlloc(st *allocCommitState) error { +func (a *allocator) fileCommitAlloc(st *allocCommitState) reason { + const op = "txfile/commit-alloc-meta" + if !st.updated { return nil } @@ -197,7 +204,8 @@ func (a *allocator) fileCommitAlloc(st *allocCommitState) error { if n := prediction.count; n > 0 { allocRegions = a.MetaAllocator().AllocRegions(st.tx, n) if allocRegions == nil { - return errOutOfMemory + return a.err(op).of(OutOfMemory). + report("not enough space to allocate freelist meta pages") } } @@ -278,12 +286,19 @@ func releaseOverflowPages( func (a *allocator) fileCommitSerialize( st *allocCommitState, - onPage func(id PageID, buf []byte) error, -) error { + onPage func(id PageID, buf []byte) reason, +) reason { + const op = "txfile/commit-serialize-alloc" + if !st.updated || len(st.allocRegions) == 0 { return nil } - return writeFreeLists(st.allocRegions, a.pageSize, st.metaList, st.dataList, onPage) + + err := writeFreeLists(st.allocRegions, a.pageSize, st.metaList, st.dataList, onPage) + if err != nil { + return a.errWrap(op, err).report("failed to serialize allocator state") + } + return nil } func (a *allocator) fileCommitMeta(meta *metaPage, st *allocCommitState) { @@ -331,6 +346,14 @@ func (a *allocator) Rollback(st *txAllocState) { a.data.rollback(&st.data) } +func (a *allocator) err(op string) *Error { + return &Error{op: op} +} + +func (a *allocator) errWrap(op string, err error) *Error { + return a.err(op).causedBy(err) +} + func (a *allocArea) commit(endMarker PageID, regions regionList) { a.endMarker = endMarker a.freelist.regions = regions @@ -400,12 +423,9 @@ func (mm *metaManager) Ensure(st *txAllocState, n uint) bool { // Can not grow until 'requiredMax' -> try to grow up to requiredMin, // potentially allocating pages from the overflow area requiredMin := szMinMeta - total - if mm.tryGrow(st, requiredMin, st.options.overflowAreaEnabled) { - return true - } - // out of memory - return false + // returns false if we are out of memory + return mm.tryGrow(st, requiredMin, st.options.overflowAreaEnabled) } func (mm *metaManager) tryGrow( @@ -576,7 +596,7 @@ func (a *dataAllocator) Free(st *txAllocState, id PageID) { traceln("free page:", id) if id < 2 || id >= a.data.endMarker { - panic(errOutOfBounds) + panic(fmt.Sprintf("freed page ID %v out of bounds", id)) } if !st.data.new.Has(id) { @@ -713,7 +733,7 @@ func (s *txAllocArea) Updated() bool { // allocator state (de-)serialization // ---------------------------------- -func readAllocatorState(a *allocator, f *File, meta *metaPage, opts Options) error { +func readAllocatorState(a *allocator, f *File, meta *metaPage, opts Options) reason { if a.maxSize > 0 { a.maxPages = a.maxSize / a.pageSize } diff --git a/errkind_string.go b/errkind_string.go new file mode 100644 index 0000000..ccf308a --- /dev/null +++ b/errkind_string.go @@ -0,0 +1,16 @@ +// Code generated by "stringer -type=ErrKind -linecomment=true"; DO NOT EDIT. + +package txfile + +import "strconv" + +const _ErrKind_name = "internal errorcan not create filefailed to initialize from fileconfiguration errorinvalid file sizemeta page invalidinvalid operationpage id out of boundsinvalid parameterout of memorytransaction failed during committransaction failed during rollbacktransaction failedfinished transactionreadonly transactionunknown error kind" + +var _ErrKind_index = [...]uint16{0, 14, 33, 63, 82, 99, 116, 133, 154, 171, 184, 216, 250, 268, 288, 308, 326} + +func (i ErrKind) String() string { + if i < 0 || i >= ErrKind(len(_ErrKind_index)-1) { + return "ErrKind(" + strconv.FormatInt(int64(i), 10) + ")" + } + return _ErrKind_name[_ErrKind_index[i]:_ErrKind_index[i+1]] +} diff --git a/errors.go b/errors.go index 6ade797..5bdc083 100644 --- a/errors.go +++ b/errors.go @@ -17,36 +17,239 @@ package txfile -import "errors" +import ( + "fmt" -var ( - // settings errors - errReadOnlyUpdateSize = errors.New("can not update the file size in read only mode") + "github.com/elastic/go-txfile/internal/strbld" + "github.com/elastic/go-txfile/internal/vfs" + "github.com/elastic/go-txfile/txerr" +) + +// reason is used as package internal error type. It's used to guarantee all +// package level errors generated or returned by txfile are compatible to txerr.Error. +type reason interface { + txerr.Error +} + +// Error is the actual error type returned by all functions/methods within the +// txfile package. +// The Error is compatible to error and txerr.Error, but adds a few additional +// meta-data for applications to report and handle errors. +// Each single field in Error is optional. Fields can be accessed by methods only. +// As fields can being optional and Error being used to wrap other errors +// as well, txerr should be for inspecting errors. +type Error struct { + op string + kind error + cause error + + ctx errorCtx + msg string +} + +// errorCtx stores additional metadata associated with an error and it's root cause. +// When adding an error cause, the context is merged, such that no two context +// variables with same contents will be reported twice. +type errorCtx struct { + // database filename. Empty string if error is not related to a file + file string + + // exact file offset an error was detected at + offset int64 + isOff bool // set if offset is valid + + // active transaction ID + txid uint + isTx bool // set if txid is valid + + // page number an error was detected for + page PageID + isPage bool // set if the page id is valid +} - // file meta page validation errors +var _ reason = &Error{} - errMagic = errors.New("invalid magic number") - errVersion = errors.New("invalid version number") - errChecksum = errors.New("checksum mismatch") +// Error formats the error message. The cause will not be included in the error +// string. Use fmt with %+v to create a formatted multiline error. +func (e *Error) Error() string { return txerr.Report(e, false) } - // file sizing errors +// Format adds support for fmt.Formatter to Error. +// The format patterns %v and %s print the top-level error message only +// (similar to `(*Error).Error()`). The format pattern "q" is similar to "%s", +// but adds double quotes before and after the message. +// Use %+v to create a multiline string containing the full trace of errors. +func (e *Error) Format(s fmt.State, c rune) { txerr.Format(e, s, c) } - errMmapTooLarge = errors.New("mmap too large") - errFileSizeTooLage = errors.New("max file size to large for this system") - errInvalidFileSize = errors.New("invalid file size") +// Op returns the operation the error occured at. Returns "" if the error value +// is used to wrap another error. Better use `txerr.GetOp(err)` to query an error value for +// the causing operation. +func (e *Error) Op() string { return e.op } - // page access/allocation errors +// Kind returns the error kind of the error. The kind should be used by +// applications to check if it is possible to recover from an error condition. +// Kind return nil if the error value does not define a kind. Better use +// `txerr.Is` or `txerr.GetKind` to query the error kind. +func (e *Error) Kind() error { return e.kind } - errOutOfBounds = errors.New("out of bounds page id") - errOutOfMemory = errors.New("out of memory") - errFreedPage = errors.New("trying to access an already freed page") - errPageFlushed = errors.New("page is already flushed") - errTooManyBytes = errors.New("contents exceeds page size") - errNoPageData = errors.New("accessing page without contents") - errFreeDirtyPage = errors.New("freeing dirty page") +// Context returns a formatted string of the related meta-data as key/value +// pairs. +func (e *Error) Context() string { return e.ctx.String() } + +// Message returns the user-focused error message. +func (e *Error) Message() string { return e.msg } + +// Cause returns the causing error, if any. +func (e *Error) Cause() error { return e.cause } + +// Errors is similar to `Cause()`, but returns a slice of errors. This way the +// error value can be consumed and formatted by zap (and propably other +// loggers). +func (e *Error) Errors() []error { + if e.cause == nil { + return nil + } + return []error{e.cause} +} + +// ErrKind defines txfile error kinds(codes). ErrKind is compatible to error, so it can be used with `txerr.Is()`. +type ErrKind int + +// internal txfile error kinds + +//go:generate stringer -type=ErrKind -linecomment=true + +const ( + NoError ErrKind = iota // no error + InternalError // internal error + FileCreationFailed // can not create file + InitFailed // failed to initialize from file + InvalidConfig // configuration error + InvalidFileSize // invalid file size + InvalidMetaPage // meta page invalid + InvalidOp // invalid operation + InvalidPageID // page id out of bounds + InvalidParam // invalid parameter + OutOfMemory // out of memory + TxCommitFail // transaction failed during commit + TxRollbackFail // transaction failed during rollback + TxFailed // transaction failed + TxFinished // finished transaction + TxReadOnly // readonly transaction + endOfErrKind // unknown error kind +) - // transaction errors +// re-export file system error kinds (from internal/vfs) - errTxFinished = errors.New("transaction has already been closed") - errTxReadonly = errors.New("readonly transaction") +const ( + PermissionError = vfs.ErrPermission + FileExists = vfs.ErrExist + FileDoesNotExist = vfs.ErrNotExist + FileClosed = vfs.ErrClosed + NoDiskSpace = vfs.ErrNoSpace + FDLimit = vfs.ErrFDLimit + CantResolvePath = vfs.ErrResolvePath + IOError = vfs.ErrIO + OSOtherError = vfs.ErrOSOther + OperationNotSupported = vfs.ErrNotSupported + LockFailed = vfs.ErrLockFailed ) + +// Error returns a user readable error message. +func (k ErrKind) Error() string { + if k > endOfErrKind { + k = endOfErrKind + } + return k.String() +} + +func (e *Error) of(kind ErrKind) *Error { e.kind = kind; return e } + +func (e *Error) report(m string) *Error { e.msg = m; return e } +func (e *Error) reportf(m string, vs ...interface{}) *Error { return e.report(fmt.Sprintf(m, vs...)) } + +// causedBy adds a cause to e and returns the modified e itself. +// The error contexts are merged (duplicates are removed from the cause), if +// the cause is `*Error`. +func (e *Error) causedBy(cause error) *Error { + e.cause = cause + other, ok := cause.(*Error) + if !ok { + return e + } + + errCtx := &e.ctx + causeCtx := &other.ctx + if errCtx.file == causeCtx.file { + causeCtx.file = "" + } + if errCtx.isTx && causeCtx.isTx && errCtx.txid == causeCtx.txid { + causeCtx.isTx = false // delete common tx id from cause context + } + if errCtx.isPage && causeCtx.isPage && errCtx.page == causeCtx.page { + causeCtx.isPage = false // delete common page id from cause context + } + if errCtx.isOff && causeCtx.isOff && errCtx.offset == causeCtx.offset { + causeCtx.isOff = false // delete common page id from cause context + } + + return e +} + +func (ctx *errorCtx) String() string { + buf := &strbld.Builder{} + if ctx.file != "" { + buf.Fmt("file='%s'", ctx.file) + } + if ctx.isTx { + buf.Pad(" ") + buf.Fmt("tx=%v", ctx.txid) + } + if ctx.isPage { + buf.Pad(" ") + buf.Fmt("page=%v", ctx.page) + } + if ctx.isOff { + buf.Pad(" ") + buf.Fmt("offset=%v", ctx.offset) + } + return buf.String() +} + +func (ctx *errorCtx) SetPage(id PageID) { + ctx.isPage, ctx.page = true, id +} + +func (ctx *errorCtx) SetOffset(off int64) { + ctx.isOff, ctx.offset = true, off +} + +func errOp(op string) *Error { + return &Error{op: op} +} + +func errOf(kind ErrKind) *Error { + return &Error{kind: kind} +} + +func wrapErr(err error) *Error { + return &Error{cause: err} +} + +func raiseInvalidParam(msg string) reason { + return &Error{kind: InvalidParam, msg: msg} +} + +func raiseInvalidParamf(msg string, vs ...interface{}) reason { + return raiseInvalidParam(fmt.Sprintf(msg, vs...)) +} + +func raiseOutOfBounds(id PageID) reason { + return &Error{ + kind: InvalidPageID, + ctx: errorCtx{ + isPage: true, + page: id, + }, + msg: "out put bounds page id", + } +} diff --git a/file.go b/file.go index f1e5c8c..3569d68 100644 --- a/file.go +++ b/file.go @@ -25,6 +25,7 @@ import ( "sync" "unsafe" + "github.com/elastic/beats/libbeat/common/atomic" "github.com/elastic/go-txfile/internal/cleanup" "github.com/elastic/go-txfile/internal/invariant" "github.com/elastic/go-txfile/internal/vfs" @@ -36,6 +37,7 @@ import ( // from within active transactions. type File struct { path string + readonly bool file vfs.File size int64 // real file size locks lock @@ -50,6 +52,8 @@ type File struct { // meta pages meta [2]*metaPage metaActive int + + txids atomic.Uint } // internal contants @@ -67,13 +71,15 @@ const ( // error if file access fails, file can not be locked or file meta pages are // found to be invalid. func Open(path string, mode os.FileMode, opts Options) (*File, error) { + const op = "txfile/open" + if err := opts.Validate(); err != nil { - return nil, err + return nil, fileErrWrap(op, path, err) } file, err := osfs.Open(path, mode) if err != nil { - return nil, err + return nil, fileErrWrap(op, path, err).report("can not open file") } initOK := false @@ -86,7 +92,7 @@ func Open(path string, mode os.FileMode, opts Options) (*File, error) { f, err = openWith(file, opts) } if err != nil { - return nil, err + return nil, fileErrWrap(op, path, err).report("failed to open file") } initOK = true @@ -98,10 +104,10 @@ func Open(path string, mode os.FileMode, opts Options) (*File, error) { // openWith implements the actual opening sequence, including file // initialization and validation. -func openWith(file vfs.File, opts Options) (*File, error) { - sz, err := file.Size() - if err != nil { - return nil, err +func openWith(file vfs.File, opts Options) (*File, reason) { + sz, ferr := file.Size() + if ferr != nil { + return nil, wrapErr(ferr) } isNew := false @@ -127,7 +133,7 @@ func openWith(file vfs.File, opts Options) (*File, error) { } if maxSize > uint64(maxUint) { - return nil, errFileSizeTooLage + return nil, raiseInvalidParam("max file size to large for this system") } f, err := newFile(file, opts, metaActive, uint(maxSize), uint(pageSize)) @@ -137,7 +143,7 @@ func openWith(file vfs.File, opts Options) (*File, error) { // Update the files MaxSize after the new file object has been created. // This allows us to handle the max size update like a transaction. - if (!isNew && opts.Flags.Check(FlagUpdMaxSize)) && opts.MaxSize != maxSize { + if (!isNew && opts.Flags.check(FlagUpdMaxSize)) && opts.MaxSize != maxSize { ok := false defer cleanup.IfNot(&ok, cleanup.IgnoreError(f.Close)) @@ -164,7 +170,7 @@ func newFile( opts Options, metaActive int, maxSize, pageSize uint, -) (*File, error) { +) (*File, reason) { f := &File{ file: file, @@ -180,7 +186,7 @@ func newFile( return nil, err } initOK := false - defer cleanup.IfNot(&initOK, cleanup.IgnoreError(f.munmap)) + defer cleanup.IfNot(&initOK, ignoreReason(f.munmap)) if err := f.init(metaActive, opts); err != nil { return nil, err @@ -202,16 +208,22 @@ func newFile( } // init initializes the File state from most recent valid meta-page. -func (f *File) init(metaActive int, opts Options) error { +func (f *File) init(metaActive int, opts Options) reason { + const op = "txfile/init-read-state" + // reference active meta page for initializing internal structures f.metaActive = metaActive meta := f.meta[f.metaActive] if err := readWALMapping(&f.wal, f.mmapedPage, meta.wal.Get()); err != nil { - return err + return f.errWrap(op, err).of(InitFailed) } - return readAllocatorState(&f.allocator, f, meta, opts) + if err := readAllocatorState(&f.allocator, f, meta, opts); err != nil { + return f.errWrap(op, err).of(InitFailed) + } + + return nil } // Close closes the file, after all transactions have been quit. After closing @@ -251,29 +263,45 @@ func (f *File) Close() error { return errClose } +// Readonly returns true if the file has been opened in readonly mode. +func (f *File) Readonly() bool { + return f.readonly +} + // Begin creates a new read-write transaction. The transaction returned // does hold the Reserved Lock on the file. Use Close, Rollback, or Commit to // release the lock. -func (f *File) Begin() *Tx { +func (f *File) Begin() (*Tx, error) { return f.BeginWith(TxOptions{Readonly: false}) } // BeginReadonly creates a new readonly transaction. The transaction returned // does hold the Shared Lock on the file. Use Close() to release the lock. -func (f *File) BeginReadonly() *Tx { +func (f *File) BeginReadonly() (*Tx, error) { return f.BeginWith(TxOptions{Readonly: true}) } // BeginWith creates a new readonly or read-write transaction, with additional // transaction settings. -func (f *File) BeginWith(settings TxOptions) *Tx { +func (f *File) BeginWith(settings TxOptions) (*Tx, error) { + return f.beginTx(settings) +} + +func (f *File) beginTx(settings TxOptions) (*Tx, reason) { + const op = "txfile/begin-tx" + + if f.readonly && !settings.Readonly { + msg := "can not start writable transaction on readonly file" + return nil, f.err(op).of(InvalidOp).report(msg) + } + tracef("request new transaction (readonly: %v)\n", settings.Readonly) lock := f.locks.TxLock(settings.Readonly) lock.Lock() tracef("init new transaction (readonly: %v)\n", settings.Readonly) - tx := newTx(f, lock, settings) + tx := newTx(f, f.txids.Inc(), lock, settings) tracef("begin transaction: %p (readonly: %v)\n", tx, settings.Readonly) - return tx + return tx, nil } // PageSize returns the files page size in bytes @@ -305,28 +333,33 @@ func (f *File) SplitOffset(offset uintptr) (PageID, uintptr) { // filesystem not properly truncating mmapped files. // Due to the memory mapping being updated while truncating, all file locks // must be held, ensuring no other transaction can read from the file. -func (f *File) truncate(sz int64) error { +func (f *File) truncate(sz int64) reason { + const op = "txfile/truncate" + const errMsg = "can not update file size" + isMMapped := f.mapped != nil if isMMapped { if err := f.munmap(); err != nil { - return err + return f.errWrap(op, err).report(errMsg) } } if err := f.file.Truncate(sz); err != nil { - return err + return f.errWrap(op, err).report(errMsg) } if isMMapped { - return f.mmap() + if err := f.mmap(); err != nil { + return f.errWrap(op, err).report(errMsg) + } } return nil } // mmapUpdate updates the mmaped states. // A go-routine updating the mmaped aread, must hold all locks on the file. -func (f *File) mmapUpdate() (err error) { +func (f *File) mmapUpdate() (err reason) { if err = f.munmap(); err == nil { err = f.mmap() } @@ -334,14 +367,18 @@ func (f *File) mmapUpdate() (err error) { } // mmap maps the files contents and updates internal pointers into the mmaped memory area. -func (f *File) mmap() error { +func (f *File) mmap() reason { + const op = "txfile/mmap" + // update real file size - fileSize, err := f.file.Size() - if err != nil { - return err + fileSize, fileErr := f.file.Size() + if fileErr != nil { + const msg = "unable to determine file size for mmap region" + return f.errWrap(op, fileErr).report(msg) } if fileSize < 0 { - return errInvalidFileSize + msg := fmt.Sprintf("file size %v < 0", fileSize) + return f.err(op).of(InvalidFileSize).report(msg) } f.size = fileSize @@ -356,9 +393,9 @@ func (f *File) mmap() error { } // map file - buf, err := f.file.MMap(int(sz)) + buf, fileErr := f.file.MMap(int(sz)) if err != nil { - return err + return f.errWrap(op, err).report("can not mmap file") } f.mapped = buf @@ -369,10 +406,14 @@ func (f *File) mmap() error { } // munmap unmaps the file and sets internal mapping to nil. -func (f *File) munmap() error { +func (f *File) munmap() reason { + const op = "txfile/munmap" err := f.file.MUnmap(f.mapped) f.mapped = nil - return err + if err != nil { + return f.errWrap(op, err) + } + return nil } // mmapedPage finds the mmaped page contents by the given pageID. @@ -389,17 +430,20 @@ func (f *File) mmapedPage(id PageID) []byte { } // initNewFile initializes a new, yet empty Files metapages. -func initNewFile(file vfs.File, opts Options) error { +func initNewFile(file vfs.File, opts Options) reason { + const op = "txfile/create" + var flags uint32 if opts.MaxSize > 0 && opts.Prealloc { flags |= metaFlagPrealloc if err := file.Truncate(int64(opts.MaxSize)); err != nil { - return fmt.Errorf("truncation failed with %v", err) + return fileErrWrap(op, file.Name(), err).of(FileCreationFailed). + report("unable to preallocate file") } } maxSize := opts.MaxSize - if opts.Flags.Check(FlagUnboundMaxSize) { + if opts.Flags.check(FlagUnboundMaxSize) { maxSize = 0 } @@ -411,10 +455,12 @@ func initNewFile(file vfs.File, opts Options) error { } } if !isPowerOf2(uint64(pageSize)) { - return fmt.Errorf("pageSize %v is no power of 2", pageSize) + cause := raiseInvalidParamf("pageSize %v is not power of 2", pageSize) + return fileErrWrap(op, file.Name(), cause).of(FileCreationFailed) } if pageSize < minPageSize { - return fmt.Errorf("pageSize must be > %v", minPageSize) + cause := raiseInvalidParamf("pageSize must be >= %v", minPageSize) + return fileErrWrap(op, file.Name(), cause).of(FileCreationFailed) } // create buffer to hold contents for the initial pages: @@ -462,24 +508,27 @@ func initNewFile(file vfs.File, opts Options) error { } // write initial pages to disk - err := writeAt(file, buf[:int(pageSize)*requiredPages], 0) + err := writeAt(op, file, buf[:int(pageSize)*requiredPages], 0) if err == nil { - err = file.Sync(vfs.SyncAll) + if syncErr := file.Sync(vfs.SyncAll); syncErr != nil { + err = fileErrWrap(op, file.Name(), syncErr) + } } if err != nil { - return fmt.Errorf("initializing data file failed with %v", err) + return fileErrWrap(op, file.Name(), err).of(FileCreationFailed). + report("io error while initializing data file") } return nil } // readValidMeta tries to read a valid meta page from the file. // The first valid meta page encountered is returned. -func readValidMeta(f vfs.File) (metaPage, int, error) { +func readValidMeta(f vfs.File) (metaPage, int, reason) { var pages [2]metaPage - var metaErr [2]error + var metaErr [2]reason var metaActive int - var err error + var err reason pages[0], err = readMeta(f, 0) if err != nil { @@ -518,17 +567,24 @@ func readValidMeta(f vfs.File) (metaPage, int, error) { return pages[metaActive], metaActive, nil } -func readMeta(f vfs.File, off int64) (metaPage, error) { +func readMeta(f vfs.File, off int64) (metaPage, reason) { + const op = "txfile/read-file-meta" + var buf [unsafe.Sizeof(metaPage{})]byte _, err := f.ReadAt(buf[:], off) - return *castMetaPage(buf[:]), err + if err != nil { + reason := fileErrWrap(op, f.Name(), err) + reason.ctx.SetOffset(off) + return metaPage{}, reason.report("failed to read file header page") + } + return *castMetaPage(buf[:]), nil } // computeMmapSize determines the page count in multiple of pages. // Up to 1GB, the mmaped file area is double (starting at 64KB) on every grows. // That is, exponential grows with values of 64KB, 128KB, 512KB, 1024KB, and so on. // Once 1GB is reached, the mmaped area is always a multiple of 1GB. -func computeMmapSize(minSize, maxSize, pageSize uint) (uint, error) { +func computeMmapSize(minSize, maxSize, pageSize uint) (uint, reason) { var maxMapSize uint if math.MaxUint32 == maxUint { maxMapSize = 2 * sz1GB @@ -547,7 +603,7 @@ func computeMmapSize(minSize, maxSize, pageSize uint) (uint, error) { sz := ((maxSize + pageSize - 1) / pageSize) * pageSize if sz < initSize { - return 0, fmt.Errorf("max size of %v bytes is too small", maxSize) + return 0, raiseInvalidParamf("max size of %v bytes is too small", maxSize) } return sz, nil @@ -566,7 +622,7 @@ func computeMmapSize(minSize, maxSize, pageSize uint) (uint, error) { // allocate number of 1GB blocks to fulfill minSize sz := ((minSize + (sz1GB - 1)) / sz1GB) * sz1GB if sz > maxMapSize { - return 0, errMmapTooLarge + return 0, raiseInvalidParamf("mmap size of %v bytes is too large", sz) } // ensure we have a multiple of pageSize @@ -583,10 +639,20 @@ func (f *File) getMetaPage() *metaPage { // growFile executes a write transaction, growing the files max size setting. // If opts.Preallocate is set, the file will be truncated to the new file size on success. -func growFile(f *File, opts Options) error { +func growFile(f *File, opts Options) reason { + const op = "txfile/grow" + + err := doGrowFile(f, opts) + if err != nil { + return fileErrWrap(op, f.path, err).report("failed to increase file size") + } + return nil +} + +func doGrowFile(f *File, opts Options) reason { maxPages, maxSize, err := initTxMaxSize(f, opts.MaxSize) if err != nil { - return fmt.Errorf("growing max size transaction failed with %v", err) + return err } // Transaction completed. Update file allocator limits @@ -596,10 +662,10 @@ func growFile(f *File, opts Options) error { // Allocate space on disk if prealloc is enabled and new file size is bounded. if opts.Prealloc && maxSize > 0 { if err := f.truncate(int64(maxSize)); err != nil { - return fmt.Errorf("allocating space on disk failed with %v", err) + return err } if err := f.mmapUpdate(); err != nil { - return fmt.Errorf("failed to mmap file %v", err) + return wrapErr(err) } } @@ -612,11 +678,14 @@ func growFile(f *File, opts Options) error { // allowed to fail. Excessive pages are freed in future transactions anyways. // The file is not truncated yet, as the last 2 transaction must agree on the // actual file size before truncating. Truncation is postponed to later transactions. -func shrinkFile(f *File, opts Options) error { +func shrinkFile(f *File, opts Options) reason { + const op = "txfile/shrink" + // 1. Start transaction updating the file meta header only. This transaction must succeed. maxPages, maxSize, err := initTxMaxSize(f, opts.MaxSize) if err != nil { - return fmt.Errorf("shrinking max size transaction failed with %v", err) + return fileErrWrap(op, f.path, err). + report("failed to reduce the maximum file size") } // 2. Transaction completed. Update file allocator limits @@ -646,9 +715,11 @@ func shrinkFile(f *File, opts Options) error { func initTxMaxSize( f *File, newMaxSize uint64, -) (maxPages, maxSize uint, err error) { +) (maxPages, maxSize uint, err reason) { + const op = "txfile/tx-update-maxsize" + var metaID int - err = withInitTx(f, func(tx *Tx) error { + err = withInitTx(f, func(tx *Tx) reason { // create new meta header for new ongoing write transaction newMetaBuf := tx.prepareMetaBuffer() newMeta := newMetaBuf.cast() @@ -662,7 +733,13 @@ func initTxMaxSize( // sync new transaction state to disk metaID = tx.syncNewMeta(&newMetaBuf) - return tx.writeSync.Wait() + err := tx.writeSync.Wait() + if err != nil { + return f.errWrap(op, err).of(TxFailed). + report("failed to update the on disk max size header entry") + } + + return nil }) if err == nil { @@ -680,7 +757,7 @@ func initTxMaxSize( // initTxReleaseRegions should only be called if it's clear pages can be // removed. Otherwise an 'empty' transaction func initTxReleaseRegions(f *File) { - withInitTx(f, func(tx *Tx) error { + withInitTx(f, func(tx *Tx) reason { // Init new allocator commit state, returning current meta pages into the // freelist. var csAlloc allocCommitState @@ -707,7 +784,7 @@ func initTxReleaseRegions(f *File) { // Finalize on-disk transaction. if err := tx.writeSync.Wait(); err != nil { - return err + return wrapErr(err) } // Commit allocator changes to in-memory allocator. @@ -720,12 +797,16 @@ func initTxReleaseRegions(f *File) { }) } -func withInitTx(f *File, fn func(tx *Tx) error) error { - tx := f.Begin() +func withInitTx(f *File, fn func(tx *Tx) reason) reason { + tx, err := f.beginTx(TxOptions{Readonly: false}) + if err != nil { + return err.(reason) + } + defer tx.close() commitOK := false - defer cleanup.IfNot(&commitOK, cleanup.IgnoreError(tx.rollbackChanges)) + defer cleanup.IfNot(&commitOK, tx.rollbackChanges) // use write transactions commit locks. As file if being generated, the // locks are not really required, yet. But better execute a correct transaction @@ -742,7 +823,29 @@ func withInitTx(f *File, fn func(tx *Tx) error) error { // we have to return early on error. On success, this is basically a no-op. defer tx.writeSync.Wait() - err := fn(tx) + err = fn(tx) commitOK = err == nil return err } + +func (f *File) err(op string) *Error { + return fileErr(op, f.path) +} + +func (f *File) errWrap(op string, cause error) *Error { + return fileErrWrap(op, f.path, cause) +} + +func (f *File) errCtx() errorCtx { return fileErrCtx(f.path) } + +func fileErr(op, path string) *Error { + return &Error{op: op, ctx: fileErrCtx(path)} +} + +func fileErrWrap(op, path string, cause error) *Error { + return fileErr(op, path).causedBy(cause) +} + +func fileErrCtx(path string) errorCtx { + return errorCtx{file: path} +} diff --git a/file_other.go b/file_other.go index f91131e..8d304b2 100644 --- a/file_other.go +++ b/file_other.go @@ -21,6 +21,6 @@ package txfile // computePlatformMmapSize computes the maximum amount of bytes to be mmaped, // depending on the actual file size and the configured maximum file size. -func computePlatformMmapSize(fileSize, maxSize, pageSize uint) (uint, error) { +func computePlatformMmapSize(fileSize, maxSize, pageSize uint) (uint, reason) { return computeMmapSize(fileSize, maxSize, pageSize) } diff --git a/file_test.go b/file_test.go index 026fd39..3924f1a 100644 --- a/file_test.go +++ b/file_test.go @@ -1059,6 +1059,24 @@ func (f *testFile) checkConsistency() bool { return ok } +func (f *testFile) BeginWith(opts TxOptions) *Tx { + tx, err := f.File.BeginWith(opts) + f.assert.FatalOnError(err) + return tx +} + +func (f *testFile) Begin() *Tx { + tx, err := f.File.Begin() + f.assert.FatalOnError(err) + return tx +} + +func (f *testFile) BeginReadonly() *Tx { + tx, err := f.File.BeginReadonly() + f.assert.FatalOnError(err) + return tx +} + func (f *testFile) withTx(write bool, fn func(tx *Tx)) { tx := f.BeginWith(TxOptions{Readonly: !write}) defer func() { diff --git a/file_windows.go b/file_windows.go index 91d4d5d..5582938 100644 --- a/file_windows.go +++ b/file_windows.go @@ -20,7 +20,7 @@ package txfile // computePlatformMmapSize computes the maximum amount of bytes to be mmaped, // depending on the actual file size and the configured maximum file size. // On Windows, the size returned MUST NOT exceed the actual file size. -func computePlatformMmapSize(fileSize, maxSize, pageSize uint) (uint, error) { +func computePlatformMmapSize(fileSize, maxSize, pageSize uint) (uint, reason) { if maxSize == 0 { return fileSize, nil } diff --git a/freelist.go b/freelist.go index 8d98075..ba40c88 100644 --- a/freelist.go +++ b/freelist.go @@ -392,14 +392,21 @@ func readFreeList( access func(PageID) []byte, root PageID, fn func(bool, region), -) (idList, error) { +) (idList, reason) { + const op = "txfile/read-freelist" + if root == 0 { return nil, nil } rootPage := access(root) if rootPage == nil { - return nil, errOutOfBounds + return nil, &Error{ + op: op, + kind: InvalidMetaPage, + cause: raiseOutOfBounds(root), + msg: "root page not in bounds", + } } var metaPages idList @@ -407,7 +414,12 @@ func readFreeList( metaPages.Add(pageID) node, payload := castFreePage(access(pageID)) if node == nil { - return nil, errOutOfBounds + return nil, &Error{ + op: op, + kind: InvalidMetaPage, + cause: raiseOutOfBounds(pageID), + msg: "invalid freelist node page", + } } pageID = node.next.Get() @@ -428,12 +440,12 @@ func writeFreeLists( to regionList, pageSize uint, metaList, dataList regionList, - onPage func(id PageID, buf []byte) error, -) error { + onPage func(id PageID, buf []byte) reason, +) reason { allocPages := to.PageIDs() writer := newPagingWriter(allocPages, pageSize, 0, onPage) - var writeErr error + var writeErr reason writeList := func(isMeta bool, lst regionList) { if writeErr != nil { return diff --git a/layout.go b/layout.go index 2cc8629..97dedd5 100644 --- a/layout.go +++ b/layout.go @@ -89,17 +89,16 @@ const magic uint32 = 0xBEA77AEB const version uint32 = 1 func init() { - checkPacked := func(t reflect.Type) error { + checkPacked := func(t reflect.Type) { off := uintptr(0) for i := 0; i < t.NumField(); i++ { f := t.Field(i) if f.Offset != off { - return fmt.Errorf("field %v offset mismatch (expected=%v, actual=%v)", - f.Name, off, f.Offset) + panic(fmt.Sprintf("field %v offset mismatch (expected=%v, actual=%v)", + f.Name, off, f.Offset)) } off += f.Type.Size() } - return nil } // check compiler really generates packed structes. Required, so file can be @@ -128,15 +127,15 @@ func (m *metaPage) Finalize() { m.checksum.Set(m.computeChecksum()) } -func (m *metaPage) Validate() error { +func (m *metaPage) Validate() reason { if m.magic.Get() != magic { - return errMagic + return errOf(InvalidMetaPage).report("invalid magic number") } if m.version.Get() != version { - return errVersion + return errOf(InvalidMetaPage).report("invalid version number") } if m.checksum.Get() != m.computeChecksum() { - return errChecksum + return errOf(InvalidMetaPage).report("checksum mismatch") } return nil diff --git a/opts.go b/opts.go index 35f6558..0ffbbfe 100644 --- a/opts.go +++ b/opts.go @@ -17,8 +17,6 @@ package txfile -import "fmt" - // Options provides common file options used when opening or creating a file. type Options struct { // Additional flags. @@ -69,13 +67,15 @@ const ( // Validate checks if all fields in Options are consistent with the File implementation. func (o *Options) Validate() error { - if o.Flags.Check(FlagUpdMaxSize) { + if o.Flags.check(FlagUpdMaxSize) { if o.Readonly { - return errReadOnlyUpdateSize + return errOf(InvalidConfig). + report("can not update max size on in readonly mode") } - if !o.Flags.Check(FlagUnboundMaxSize) && o.MaxSize > 0 && o.MaxSize < minRequiredFileSize { - return fmt.Errorf("max size must be at least %v bytes ", minRequiredFileSize) + if !o.Flags.check(FlagUnboundMaxSize) && o.MaxSize > 0 && o.MaxSize < minRequiredFileSize { + return errOf(InvalidConfig). + reportf("max size must be at least %v bytes ", minRequiredFileSize) } } @@ -84,14 +84,26 @@ func (o *Options) Validate() error { totalPages := o.MaxSize / uint64(o.PageSize) avail := totalPages - headerPages if uint64(metaSz) >= avail { - return fmt.Errorf("meta area of %v pages exceeds the available pages %v", - metaSz, avail) + return errOf(InvalidConfig). + reportf("meta area of %v pages exceeds the available pages %v", metaSz, avail) + } + } + + if o.PageSize != 0 { + if !isPowerOf2(uint64(o.PageSize)) { + return errOf(InvalidConfig). + reportf("pageSize %v is not power of 2", o.PageSize) + } + + if o.PageSize < minPageSize { + return errOf(InvalidConfig). + reportf("pageSize must be >= %v", minPageSize) } } return nil } -func (f Flag) Check(check Flag) bool { +func (f Flag) check(check Flag) bool { return (f & check) == check } diff --git a/page.go b/page.go index 14be499..9584772 100644 --- a/page.go +++ b/page.go @@ -65,10 +65,11 @@ func (p *Page) Dirty() bool { return p.flags.dirty } // MarkDirty marks a page as dirty. MarkDirty should only be used if // in-place modification to the pages buffer have been made, after use of Load(). func (p *Page) MarkDirty() error { - if err := p.canWrite(); err != nil { + const op = "txfile/page-mark-dirty" + + if err := p.canWrite(op); err != nil { return err } - p.flags.dirty = true return nil } @@ -76,11 +77,14 @@ func (p *Page) MarkDirty() error { // Free marks a page as free. Freeing a dirty page will return an error. // The page will be returned to the allocator when the transaction commits. func (p *Page) Free() error { - if err := p.canWrite(); err != nil { + const op = "txfile/page-free" + + if err := p.canWrite(op); err != nil { return err } if p.flags.dirty { - return errFreeDirtyPage + const msg = "freeing dirty pages is not allowed" + return &Error{op: op, kind: InvalidOp, ctx: p.errCtx(), msg: msg} } p.tx.freePage(p.id) @@ -100,21 +104,25 @@ func (p *Page) Free() error { // or the transaction is already been closed. // Use SetBytes() or Load(), to initialize the buffer of a newly allocated page. func (p *Page) Bytes() ([]byte, error) { - if err := p.canRead(); err != nil { + const op = "txfile/page-bytes" + + if err := p.canRead(op); err != nil { return nil, err } if p.bytes == nil && p.flags.new { - return nil, errNoPageData + const msg = "can not read contents of fresh allocated page without contents" + return nil, &Error{op: op, kind: InvalidOp, ctx: p.errCtx(), msg: msg} } - return p.getBytes() + return p.getBytes(op) } -func (p *Page) getBytes() ([]byte, error) { +func (p *Page) getBytes(op string) ([]byte, reason) { if p.bytes == nil { bytes := p.tx.access(p.ondiskID) if bytes == nil { - return nil, errOutOfBounds + cause := raiseOutOfBounds(p.ondiskID) + return nil, &Error{op: op, ctx: p.errCtx(), cause: cause} } p.bytes = bytes @@ -131,14 +139,16 @@ func (p *Page) getBytes() ([]byte, error) { // After load, the write-buffer can be accessed via Bytes(). After modifications to the buffer, // one must use MarkDirty(), so the page will be flushed on commit. func (p *Page) Load() error { - if err := p.canWrite(); err != nil { + const op = "txfile/page-load-writable" + + if err := p.canWrite(op); err != nil { return err } - return p.loadBytes() + return p.loadBytes(op) } -func (p *Page) loadBytes() error { +func (p *Page) loadBytes(op string) reason { if p.flags.cached { return nil } @@ -155,7 +165,7 @@ func (p *Page) loadBytes() error { } // copy original contents into writable buffer (page needs to be marked dirty if contents is overwritten) - orig, err := p.getBytes() + orig, err := p.getBytes(op) if err != nil { return err } @@ -172,17 +182,20 @@ func (p *Page) loadBytes() error { // of contents matches the page size, a reference to the contents buffer will // be held. To enforce a copy, use Load(), Bytes(), copy() and MarkDirty(). func (p *Page) SetBytes(contents []byte) error { - if err := p.canWrite(); err != nil { + const op = "txfile/page-set-bytes" + + if err := p.canWrite(op); err != nil { return err } pageSize := p.tx.PageSize() if len(contents) > pageSize { - return errTooManyBytes + const msg = "page contents must not exceed the page size" + return &Error{op: op, kind: InvalidParam, ctx: p.errCtx(), msg: msg} } if len(contents) < pageSize { - if err := p.loadBytes(); err != nil { + if err := p.loadBytes(op); err != nil { return err } copy(p.bytes, contents) @@ -199,14 +212,16 @@ func (p *Page) SetBytes(contents []byte) error { // is executed asynchronously in the background. // Dirty pages will be automatically flushed on commit. func (p *Page) Flush() error { - if err := p.canWrite(); err != nil { + const op = "txfile/page-flush" + + if err := p.canWrite(op); err != nil { return err } - return p.doFlush() + return p.doFlush(op) } -func (p *Page) doFlush() error { +func (p *Page) doFlush(op string) reason { if !p.flags.dirty || p.flags.flushed { return nil } @@ -215,7 +230,8 @@ func (p *Page) doFlush() error { if p.id == p.ondiskID { walID := p.tx.allocWALID(p.id) if walID == 0 { - return errOutOfMemory + const msg = "not enough space to allocate write ahead page" + return &Error{op: op, kind: OutOfMemory, ctx: p.errCtx(), msg: msg} } p.ondiskID = walID } else { @@ -230,26 +246,36 @@ func (p *Page) doFlush() error { return nil } -func (p *Page) canRead() error { - if !p.tx.Active() { - return errTxFinished - } - if p.flags.freed { - return errFreedPage +func (p *Page) canRead(op string) *Error { + err := p.tx.canRead(op) + if err != nil { + err.ctx = p.errCtx() } - return nil + return err } -func (p *Page) canWrite() error { - if err := p.tx.canWrite(); err != nil { +func (p *Page) canWrite(op string) *Error { + if err := p.tx.canWrite(op); err != nil { + err.ctx = p.errCtx() return err } - if p.flags.freed { - return errFreedPage + var msg string + switch { + case p.flags.freed: + msg = "page is already freed" + case p.flags.flushed: + msg = "page is already flushed" } - if p.flags.flushed { - return errPageFlushed + + if msg != "" { + return &Error{op: op, kind: InvalidOp, ctx: p.errCtx(), msg: msg} } return nil } + +func (p *Page) errCtx() errorCtx { + ctx := p.tx.errCtx() + ctx.page, ctx.isPage = p.id, true + return ctx +} diff --git a/pq/access.go b/pq/access.go index 2843a3f..8dd862b 100644 --- a/pq/access.go +++ b/pq/access.go @@ -47,7 +47,10 @@ func makeAccess(delegate Delegate) (access, error) { func (a *access) ReadRoot() ([SzRoot]byte, error) { var buf [SzRoot]byte - tx := a.BeginRead() + tx, err := a.BeginRead() + if err != nil { + return buf, err + } defer tx.Close() return buf, withPage(tx, a.rootID, func(page []byte) error { diff --git a/pq/ack.go b/pq/ack.go index 9a81fe6..e37a7c8 100644 --- a/pq/ack.go +++ b/pq/ack.go @@ -78,7 +78,10 @@ func (a *acker) handle(n uint) error { // start write transaction to free pages and update the next read offset in // the queue root - tx := a.accessor.BeginCleanup() + tx, err := a.accessor.BeginCleanup() + if err != nil { + return err + } defer tx.Close() traceln("acker: free data pages:", len(state.free)) @@ -124,7 +127,10 @@ func (a *acker) handle(n uint) error { // initACK uses a read-transaction to collect pages to be removed from list and // find offset of next read required to start reading the next un-acked event. func (a *acker) initACK(n uint) (ackState, error) { - tx := a.accessor.BeginRead() + tx, err := a.accessor.BeginRead() + if err != nil { + return ackState{}, err + } defer tx.Close() hdr, err := a.accessor.RootHdr(tx) @@ -288,7 +294,10 @@ func (a *acker) findNewStartPositions(c *txCursor, id uint64) (head, read positi // Active returns the total number of active, not yet ACKed events. func (a *acker) Active() (uint, error) { - tx := a.accessor.BeginRead() + tx, err := a.accessor.BeginRead() + if err != nil { + return 0, err + } defer tx.Close() hdr, err := a.accessor.RootHdr(tx) diff --git a/pq/delegate.go b/pq/delegate.go index fee240d..51b79ab 100644 --- a/pq/delegate.go +++ b/pq/delegate.go @@ -35,15 +35,15 @@ type Delegate interface { // BeginWrite must create a read-write transaction for use by the writer. // The transaction will be used to allocate pages and flush the current write // buffer. - BeginWrite() *txfile.Tx + BeginWrite() (*txfile.Tx, error) // BeginRead must return a readonly transaction. - BeginRead() *txfile.Tx + BeginRead() (*txfile.Tx, error) // BeginCleanup must return a read-write transaction for the ACK handling to // remove events. No new contents will be written, but pages will be freed // and the queue root page being updated. - BeginCleanup() *txfile.Tx + BeginCleanup() (*txfile.Tx, error) } // standaloneDelegate wraps a txfile.File into a standalone queue only file. @@ -56,7 +56,10 @@ type standaloneDelegate struct { // NewStandaloneDelegate creates a standaonle Delegate from an txfile.File // instance. This function will allocate and initialize the queue root page. func NewStandaloneDelegate(f *txfile.File) (Delegate, error) { - tx := f.Begin() + tx, err := f.Begin() + if err != nil { + return nil, err + } defer tx.Close() root := tx.Root() @@ -106,20 +109,20 @@ func (d *standaloneDelegate) SplitOffset(offset uintptr) (txfile.PageID, uintptr } // BeginWrite creates a new transaction for flushing the write buffers to disk. -func (d *standaloneDelegate) BeginWrite() *txfile.Tx { +func (d *standaloneDelegate) BeginWrite() (*txfile.Tx, error) { return d.file.BeginWith(txfile.TxOptions{ WALLimit: 3, }) } // BeginRead returns a readonly transaction. -func (d *standaloneDelegate) BeginRead() *txfile.Tx { +func (d *standaloneDelegate) BeginRead() (*txfile.Tx, error) { return d.file.BeginReadonly() } // BeginCleanup creates a new write transaction configured for cleaning up used // events/pages only. -func (d *standaloneDelegate) BeginCleanup() *txfile.Tx { +func (d *standaloneDelegate) BeginCleanup() (*txfile.Tx, error) { return d.file.BeginWith(txfile.TxOptions{ EnableOverflowArea: true, WALLimit: 3, diff --git a/pq/pq.go b/pq/pq.go index 5f6ff10..15919bc 100644 --- a/pq/pq.go +++ b/pq/pq.go @@ -134,7 +134,10 @@ func (q *Queue) Close() error { // Pending returns the total number of enqueued, but unacked events. func (q *Queue) Pending() int { - tx := q.accessor.BeginRead() + tx, err := q.accessor.BeginRead() + if err != nil { + return -1 + } defer tx.Close() hdr, err := q.accessor.RootHdr(tx) diff --git a/pq/reader.go b/pq/reader.go index e1d5b2a..046679e 100644 --- a/pq/reader.go +++ b/pq/reader.go @@ -62,11 +62,16 @@ func (r *Reader) Available() uint { return 0 } + var err error func() { - tx := r.accessor.BeginRead() + var tx *txfile.Tx + tx, err = r.accessor.BeginRead() defer tx.Close() - r.updateQueueState(tx) + err = r.updateQueueState(tx) }() + if err != nil { + return 0 + } if r.state.cursor.Nil() { return 0 @@ -77,11 +82,18 @@ func (r *Reader) Available() uint { // Begin starts a new read transaction, shared between multiple read calls. // User must execute Done, to close the file transaction. -func (r *Reader) Begin() { +func (r *Reader) Begin() error { if r.tx != nil { r.tx.Close() } - r.tx = r.accessor.BeginRead() + + tx, err := r.accessor.BeginRead() + if err != nil { + return err + } + + r.tx = tx + return nil } // Done closes the active read transaction. @@ -115,7 +127,11 @@ func (r *Reader) Read(b []byte) (int, error) { func (r *Reader) readInto(to []byte) ([]byte, error) { tx := r.tx if tx == nil { - tx = r.accessor.BeginRead() + var err error + tx, err = r.accessor.BeginRead() + if err != nil { + return nil, err + } defer tx.Close() } @@ -164,7 +180,11 @@ func (r *Reader) Next() (int, error) { tx := r.tx if tx == nil { - tx = r.accessor.BeginRead() + var err error + tx, err = r.accessor.BeginRead() + if err != nil { + return -1, err + } defer tx.Close() } diff --git a/pq/util.go b/pq/util.go index 68b59d4..6b85d91 100644 --- a/pq/util.go +++ b/pq/util.go @@ -37,7 +37,10 @@ func withPage(tx *txfile.Tx, id txfile.PageID, fn func([]byte) error) error { } func readPageByID(accessor *access, pool *pagePool, id txfile.PageID) (*page, error) { - tx := accessor.BeginRead() + tx, err := accessor.BeginRead() + if err != nil { + return nil, err + } defer tx.Close() var page *page diff --git a/pq/writer.go b/pq/writer.go index f3427ec..82873e2 100644 --- a/pq/writer.go +++ b/pq/writer.go @@ -184,7 +184,10 @@ func (w *Writer) doFlush() error { } } - tx := w.accessor.BeginWrite() + tx, err := w.accessor.BeginWrite() + if err != nil { + return err + } defer tx.Close() rootPage, queueHdr, err := w.accessor.LoadRootPage(tx) diff --git a/region.go b/region.go index f1f620f..34ed55e 100644 --- a/region.go +++ b/region.go @@ -137,8 +137,8 @@ func (r region) EachPage(fn func(PageID)) { } } -func (l region) PageIDs() (ids idList) { - l.EachPage(ids.Add) +func (r region) PageIDs() (ids idList) { + r.EachPage(ids.Add) return } diff --git a/testing_test.go b/testing_test.go index a4514cd..95b784c 100644 --- a/testing_test.go +++ b/testing_test.go @@ -101,7 +101,7 @@ func makeCloseWait(timeout time.Duration) closeWaiter { func (w *closeWaiter) Add(n int) { w.wg.Add(n) } func (w *closeWaiter) Done() { w.wg.Done() } func (w *closeWaiter) Wait() bool { - err := waitFn(w.duration, func() error { + err := waitFn(w.duration, func() reason { w.wg.Wait() return nil }) @@ -128,7 +128,7 @@ func (p *testPageStore) Get(id PageID) []byte { return b } -func (p *testPageStore) Set(id PageID, b []byte) error { +func (p *testPageStore) Set(id PageID, b []byte) reason { if id < 2 { panic("must not overwrite file meta region") } @@ -136,7 +136,7 @@ func (p *testPageStore) Set(id PageID, b []byte) error { return nil } -func waitFn(timeout time.Duration, fn func() error) error { +func waitFn(timeout time.Duration, fn func() reason) error { ch := make(chan error) go func() { ch <- fn() diff --git a/tx.go b/tx.go index be8e050..3987572 100644 --- a/tx.go +++ b/tx.go @@ -18,7 +18,6 @@ package txfile import ( - "fmt" "sync" "github.com/elastic/go-txfile/internal/cleanup" @@ -29,8 +28,10 @@ import ( // A transaction MUST always be closed, so to guarantee locks being released as // well. type Tx struct { - flags txFlags - file *File + flags txFlags + file *File + txid uint // internal correlation id + lock sync.Locker writeSync *txWriteSync rootID PageID @@ -76,7 +77,7 @@ type txFlags struct { checkpoint bool // mark wal checkpoint has been applied } -func newTx(file *File, lock sync.Locker, settings TxOptions) *Tx { +func newTx(file *File, id uint, lock sync.Locker, settings TxOptions) *Tx { meta := file.getMetaPage() invariant.Check(meta != nil, "file meta is not set") @@ -152,15 +153,24 @@ func (tx *Tx) RootPage() (*Page, error) { if tx.rootID < 2 { return nil, nil } - return tx.Page(tx.rootID) + return tx.getPage("txfile/tx-access-root", tx.rootID) } // Rollback rolls back and closes the current transaction. Rollback returns an // error if the transaction has already been closed by Close, Rollback or // Commit. func (tx *Tx) Rollback() error { + const op = "txfile/tx-rollback" + tracef("rollback transaction: %p\n", tx) - return tx.finishWith(tx.rollbackChanges) + err := tx.finishWith(func() reason { + tx.rollbackChanges() + return nil + }) + if err != nil { + return tx.errWrap(op, err).of(TxRollbackFail) + } + return nil } // Commit commits the current transaction to file. The commit step needs to @@ -168,8 +178,14 @@ func (tx *Tx) Rollback() error { // Returns an error if the transaction has already been closed by Close, // Rollback or Commit. func (tx *Tx) Commit() error { + const op = "txfile/tx-commit" + tracef("commit transaction: %p\n", tx) - return tx.finishWith(tx.commitChanges) + err := tx.finishWith(tx.commitChanges) + if err != nil { + return tx.errWrap(op, err).of(TxCommitFail) + } + return nil } // Close closes the transaction, releasing any locks held by the transaction. @@ -191,11 +207,22 @@ func (tx *Tx) Commit() error { // return tx.Commit() // func (tx *Tx) Close() error { + const op = "txfile/tx-close" + tracef("close transaction: %p\n", tx) if !tx.flags.active { return nil } - return tx.finishWith(tx.rollbackChanges) + + err := tx.finishWith(func() reason { + tx.rollbackChanges() + return nil + }) + if err != nil { + return tx.errWrap(op, err).of(TxRollbackFail) + } + + return nil } // CheckpointWAL copies all overwrite pages contents into the original pages. @@ -207,15 +234,16 @@ func (tx *Tx) Close() error { // the end of a transaction, right before committing, so to reduce writes if // contents is to be overwritten anyways. func (tx *Tx) CheckpointWAL() error { - if err := tx.canWrite(); err != nil { + if err := tx.canWrite("txfile/tx-checkpoint"); err != nil { return err } - return tx.doCheckpointWAL() + tx.doCheckpointWAL() + return nil } -func (tx *Tx) doCheckpointWAL() error { +func (tx *Tx) doCheckpointWAL() { if tx.flags.checkpoint { - return nil + return } // collect page ids that would have an old WAL page @@ -236,7 +264,7 @@ func (tx *Tx) doCheckpointWAL() error { } if len(ids) == 0 { - return nil + return } // XXX: Some OS/filesystems might lock up when writing to file @@ -264,12 +292,11 @@ func (tx *Tx) doCheckpointWAL() error { } tx.flags.checkpoint = true - return nil } -func (tx *Tx) finishWith(fn func() error) error { +func (tx *Tx) finishWith(fn func() reason) reason { if !tx.flags.active { - return errTxFinished + return errOf(TxFinished).report("transaction is already closed") } defer tx.close() @@ -289,9 +316,9 @@ func (tx *Tx) close() { tx.lock.Unlock() } -func (tx *Tx) commitChanges() error { +func (tx *Tx) commitChanges() reason { commitOK := false - defer cleanup.IfNot(&commitOK, cleanup.IgnoreError(tx.rollbackChanges)) + defer cleanup.IfNot(&commitOK, tx.rollbackChanges) err := tx.tryCommitChanges() if commitOK = err == nil; !commitOK { @@ -326,7 +353,9 @@ func (tx *Tx) commitChanges() error { // 7. fsync // 8. update internal structures // 9. release locks -func (tx *Tx) tryCommitChanges() error { +func (tx *Tx) tryCommitChanges() reason { + const op = "txfile/tx-commit" + pending, exclusive := tx.file.locks.Pending(), tx.file.locks.Exclusive() newMetaBuf := tx.prepareMetaBuffer() @@ -352,8 +381,8 @@ func (tx *Tx) tryCommitChanges() error { }) // Flush pages. - if err := tx.Flush(); err != nil { - return fmt.Errorf("dirty pages flushing failed with %v", err) + if err := tx.flushPages(op); err != nil { + return tx.err(op).report("failed to flush dirty pages") } // 1. finish Tx state updates and free file pages used to hold meta pages @@ -514,14 +543,12 @@ func (tx *Tx) syncNewMeta(buf *metaBuf) int { return metaID } -func (tx *Tx) commitPrepareWAL() (walCommitState, error) { +func (tx *Tx) commitPrepareWAL() (walCommitState, reason) { var st walCommitState tx.file.wal.fileCommitPrepare(&st, &tx.wal) if st.checkpoint { - if err := tx.doCheckpointWAL(); err != nil { - return st, err - } + tx.doCheckpointWAL() } if st.updated { @@ -534,7 +561,7 @@ func (tx *Tx) access(id PageID) []byte { return tx.file.mmapedPage(id) } -func (tx *Tx) scheduleWrite(id PageID, buf []byte) error { +func (tx *Tx) scheduleWrite(id PageID, buf []byte) reason { tx.file.writer.Schedule(tx.writeSync, id, buf) return nil } @@ -555,12 +582,12 @@ func (tx *Tx) scheduleWrite(id PageID, buf []byte) error { // => // - Truncate file only if pages in overflow area have been allocated. // - If maxSize == 0, truncate file to old end marker. -func (tx *Tx) rollbackChanges() error { +func (tx *Tx) rollbackChanges() { tx.file.allocator.Rollback(&tx.alloc) maxPages := tx.file.allocator.maxPages if maxPages == 0 { - return nil + return } // compute endmarker from before running the last transaction @@ -571,8 +598,9 @@ func (tx *Tx) rollbackChanges() error { sz, err := tx.file.file.Size() if err != nil { - // getting file size failed. State is valid, but we can not truncate :/ - return err + // getting file size failed. State is valid, but we can not truncate + // ¯\_(ツ)_/¯ + return } truncateSz := uint(endMarker) * tx.file.allocator.pageSize @@ -584,8 +612,6 @@ func (tx *Tx) rollbackChanges() error { traceln("rollback file truncate failed with:", err) } } - - return nil } // Page accesses a page by ID. Accessed pages are cached. Retrieving a page @@ -593,6 +619,11 @@ func (tx *Tx) rollbackChanges() error { // Returns an error if the id is known to be invalid or the page has already // been freed. func (tx *Tx) Page(id PageID) (*Page, error) { + const op = "txfile/tx-access-page" + return tx.getPage(op, id) +} + +func (tx *Tx) getPage(op string, id PageID) (*Page, error) { inBounds := id >= 2 if tx.flags.readonly { inBounds = inBounds && id < tx.dataEndID @@ -600,11 +631,12 @@ func (tx *Tx) Page(id PageID) (*Page, error) { inBounds = inBounds && id < tx.file.allocator.data.endMarker } if !inBounds { - return nil, errOutOfBounds + return nil, tx.errWrap(op, raiseOutOfBounds(id)) } if tx.alloc.data.freed.Has(id) || tx.alloc.meta.freed.Has(id) { - return nil, errFreedPage + return nil, tx.err(op).of(InvalidOp). + report("trying to access an already freed page") } if p := tx.pages[id]; p != nil { @@ -625,18 +657,22 @@ func (tx *Tx) Page(id PageID) (*Page, error) { // new contents. // Returns an error if the transaction is readonly or no more space is available. func (tx *Tx) Alloc() (page *Page, err error) { - if err := tx.canWrite(); err != nil { + const op = "txfile/tx-alloc-page" + + if err := tx.canWrite(op); err != nil { return nil, err } - err = tx.allocPagesWith(1, func(p *Page) { page = p }) + err = tx.allocPagesWith(op, 1, func(p *Page) { page = p }) return } // AllocN allocates n potentially non-contious, yet empty pages. // Returns an error if the transaction is readonly or no more space is available. func (tx *Tx) AllocN(n int) (pages []*Page, err error) { - if err := tx.canWrite(); err != nil { + const op = "txfile/tx-alloc-pages" + + if err := tx.canWrite(op); err != nil { return nil, err } @@ -645,7 +681,7 @@ func (tx *Tx) AllocN(n int) (pages []*Page, err error) { } pages, i := make([]*Page, n), 0 - err = tx.allocPagesWith(n, func(page *Page) { + err = tx.allocPagesWith(op, n, func(page *Page) { pages[i], i = page, i+1 }) if err != nil { @@ -666,7 +702,7 @@ func (tx *Tx) walAllocator() *walAllocator { return tx.file.allocator.WALPageAllocator() } -func (tx *Tx) allocPagesWith(n int, fn func(*Page)) error { +func (tx *Tx) allocPagesWith(op string, n int, fn func(*Page)) reason { count := tx.dataAllocator().AllocRegionsWith(&tx.alloc, uint(n), func(reg region) { reg.EachPage(func(id PageID) { page := newPage(tx, id) @@ -676,7 +712,7 @@ func (tx *Tx) allocPagesWith(n int, fn func(*Page)) error { }) }) if count == 0 { - return errOutOfMemory + return tx.err(op).of(OutOfMemory).reportf("not enough memory to allocate %v data page(s)", n) } return nil } @@ -700,24 +736,56 @@ func (tx *Tx) freeWALID(id, walID PageID) { // Flush flushes all dirty pages within the transaction. func (tx *Tx) Flush() error { - if err := tx.canWrite(); err != nil { + return tx.flushPages("txfile/tx-flush") +} + +func (tx *Tx) flushPages(op string) reason { + if err := tx.canWrite(op); err != nil { return err } for _, page := range tx.pages { - if err := page.doFlush(); err != nil { + if err := page.doFlush("txfile/page-flush"); err != nil { return err } } return nil } -func (tx *Tx) canWrite() error { +func (tx *Tx) canRead(op string) *Error { + if !tx.flags.active { + return tx.err(op).of(TxFinished).report("no read operation on finished transactions allowed") + } + return nil +} + +func (tx *Tx) canWrite(op string) *Error { + var kind ErrKind + var msg string + if !tx.flags.active { - return errTxFinished + kind, msg = TxFinished, "no write operation on finished transactions allowed" } if tx.flags.readonly { - return errTxReadonly + kind, msg = TxReadOnly, "no write operation on read only transaction allowed" + } + + if kind != NoError { + return tx.err(op).of(kind).report(msg) } return nil } + +func (tx *Tx) err(op string) *Error { + return &Error{op: op, ctx: tx.errCtx()} +} + +func (tx *Tx) errWrap(op string, cause error) *Error { + return tx.err(op).causedBy(cause) +} + +func (tx *Tx) errCtx() errorCtx { + ctx := tx.file.errCtx() + ctx.txid, ctx.isTx = tx.txid, true + return ctx +} diff --git a/util.go b/util.go index 89a278d..372990a 100644 --- a/util.go +++ b/util.go @@ -29,7 +29,7 @@ type pagingWriter struct { pageSize uint extraHeader uint - onPage func(id PageID, buf []byte) error + onPage func(id PageID, buf []byte) reason // current page state i int @@ -46,7 +46,7 @@ func newPagingWriter( ids idList, pageSize uint, extraHeader uint, - onPage func(id PageID, buf []byte) error, + onPage func(id PageID, buf []byte) reason, ) *pagingWriter { if len(ids) == 0 { return nil @@ -69,17 +69,19 @@ func newPagingWriter( extraHeader: extraHeader, onPage: onPage, } - w.prepareNext() + w.prepareNext("") return w } -func (w *pagingWriter) Write(entry []byte) error { +func (w *pagingWriter) Write(entry []byte) reason { + const op = "txfile/write-meta-list" + if w == nil { return nil } if len(w.payload) < len(entry) { - if err := w.flushCurrent(); err != nil { + if err := w.flushCurrent(op); err != nil { return err } } @@ -90,7 +92,9 @@ func (w *pagingWriter) Write(entry []byte) error { return nil } -func (w *pagingWriter) Flush() error { +func (w *pagingWriter) Flush() reason { + const op = "txfile/flush-meta-list" + if w == nil { return nil } @@ -101,10 +105,7 @@ func (w *pagingWriter) Flush() error { for w.i < len(w.ids) { // update to next page - if err := w.prepareNext(); err != nil { - return err - } - + w.prepareNext(op) if err := w.finalizePage(); err != nil { return err } @@ -113,14 +114,14 @@ func (w *pagingWriter) Flush() error { return nil } -func (w *pagingWriter) flushCurrent() (err error) { +func (w *pagingWriter) flushCurrent(op string) (err reason) { if err = w.finalizePage(); err == nil { - err = w.prepareNext() + err = w.prepareNext(op) } return } -func (w *pagingWriter) finalizePage() error { +func (w *pagingWriter) finalizePage() reason { w.hdr.count.Set(w.count) if w.onPage != nil { if err := w.onPage(w.ids[w.i], w.page); err != nil { @@ -134,10 +135,11 @@ func (w *pagingWriter) finalizePage() error { return nil } -func (w *pagingWriter) prepareNext() error { +func (w *pagingWriter) prepareNext(op string) reason { if w.i >= len(w.ids) { - return errOutOfMemory + return errOp(op).of(InternalError).report("Not enough pages pre-allocated") } + w.page = w.buf[w.off : w.off+w.pageSize] w.hdr, w.payload = castListPage(w.page) w.payload = w.payload[w.extraHeader:] @@ -156,3 +158,7 @@ func nextPowerOf2(u uint64) uint64 { b := uint64(bits.LeadingZeros64(u)) return uint64(1) << (64 - b) } + +func ignoreReason(fn func() reason) func() { + return func() { _ = fn() } +} diff --git a/util_test.go b/util_test.go index d2c4ab0..2cae8db 100644 --- a/util_test.go +++ b/util_test.go @@ -63,8 +63,8 @@ func TestInternalHelpers(t *testing.T) { func TestPagingWriter(t *testing.T) { assert := newAssertions(t) - recordPages := func(ids *idList, pages *[][]byte) func(PageID, []byte) error { - return func(id PageID, buf []byte) error { + recordPages := func(ids *idList, pages *[][]byte) func(PageID, []byte) reason { + return func(id PageID, buf []byte) reason { *ids, *pages = append(*ids, id), append(*pages, buf) return nil } diff --git a/wal.go b/wal.go index fc76260..74cf535 100644 --- a/wal.go +++ b/wal.go @@ -17,7 +17,9 @@ package txfile -import "unsafe" +import ( + "unsafe" +) // waLog (write-ahead-log) mapping page ids to overwrite page ids in // the write-ahead-log. @@ -86,7 +88,9 @@ func (l *waLog) fileCommitPrepare(st *walCommitState, tx *txWalState) { st.mapping = newWal } -func (l *waLog) fileCommitAlloc(tx *Tx, st *walCommitState) error { +func (l *waLog) fileCommitAlloc(tx *Tx, st *walCommitState) reason { + const op = "txfile/commit-alloc-wal" + if !st.updated { return nil } @@ -95,7 +99,8 @@ func (l *waLog) fileCommitAlloc(tx *Tx, st *walCommitState) error { if pages > 0 { st.allocRegions = tx.metaAllocator().AllocRegions(&tx.alloc, pages) if st.allocRegions == nil { - return errOutOfMemory + return errOp(op).of(OutOfMemory). + report("not enough space to allocate write ahead meta pages") } } return nil @@ -104,8 +109,8 @@ func (l *waLog) fileCommitAlloc(tx *Tx, st *walCommitState) error { func (l *waLog) fileCommitSerialize( st *walCommitState, pageSize uint, - onPage func(id PageID, buf []byte) error, -) error { + onPage func(id PageID, buf []byte) reason, +) reason { if !st.updated { return nil } @@ -188,10 +193,10 @@ func readWALMapping( wal *waLog, access func(PageID) []byte, root PageID, -) error { +) reason { mapping, ids, err := readWAL(access, root) if err != nil { - return nil + return err } wal.mapping = mapping @@ -202,7 +207,9 @@ func readWALMapping( func readWAL( access func(PageID) []byte, root PageID, -) (walMapping, idList, error) { +) (walMapping, idList, reason) { + const op = "txfile/read-wal" + if root == 0 { return walMapping{}, nil, nil } @@ -213,7 +220,9 @@ func readWAL( metaPages.Add(pageID) node, data := castWalPage(access(pageID)) if node == nil { - return nil, nil, errOutOfBounds + return nil, nil, errOp(op).of(InvalidMetaPage). + causedBy(raiseOutOfBounds(pageID)). + report("write ahead metadata corrupted") } count := int(node.count.Get()) @@ -237,8 +246,8 @@ func writeWAL( to regionList, pageSize uint, mapping walMapping, - onPage func(id PageID, buf []byte) error, -) error { + onPage func(id PageID, buf []byte) reason, +) reason { allocPages := to.PageIDs() writer := newPagingWriter(allocPages, pageSize, 0, onPage) for id, walID := range mapping { diff --git a/write.go b/write.go index 8f61c19..e9f8578 100644 --- a/write.go +++ b/write.go @@ -55,7 +55,7 @@ type syncMsg struct { } type txWriteSync struct { - err error + err reason wg sync.WaitGroup } @@ -137,9 +137,9 @@ func (w *writer) Sync(sync *txWriteSync, flags syncFlag) { w.cond.Signal() } -func (w *writer) Run() (bool, error) { +func (w *writer) Run() (bool, reason) { var ( - err error + err reason done bool cmd command buf [1024]writeMsg @@ -159,12 +159,14 @@ func (w *writer) Run() (bool, error) { return msgs[i].id < msgs[j].id }) for _, msg := range msgs { + const op = "txfile/write-page" + if err == nil { // execute actual write on the page it's file offset: off := uint64(msg.id) * uint64(w.pageSize) tracef("write at(id=%v, off=%v, len=%v)\n", msg.id, off, len(msg.buf)) - err = writeAt(w.target, msg.buf, int64(off)) + err = writeAt(op, w.target, msg.buf, int64(off)) } msg.sync.err = err @@ -173,6 +175,8 @@ func (w *writer) Run() (bool, error) { // execute pending fsync: if fsync := cmd.fsync; fsync != nil { + const op = "txfile/write-sync" + resetErr := cmd.syncFlags.Test(syncResetErr) if err == nil { syncFlag := vfs.SyncAll @@ -180,7 +184,9 @@ func (w *writer) Run() (bool, error) { syncFlag = vfs.SyncDataOnly } - err = w.target.Sync(syncFlag) + if syncErr := w.target.Sync(syncFlag); syncErr != nil { + err = errOp(op).causedBy(syncErr) + } } fsync.err = err @@ -273,16 +279,17 @@ func (s *txWriteSync) Release() { s.wg.Done() } -func (s *txWriteSync) Wait() error { +func (s *txWriteSync) Wait() reason { s.wg.Wait() return s.err } -func writeAt(out io.WriterAt, buf []byte, off int64) error { +func writeAt(op string, out io.WriterAt, buf []byte, off int64) reason { for len(buf) > 0 { n, err := out.WriteAt(buf, off) if err != nil { - return err + return errOp(op).causedBy(err). + reportf("writing %v bytes to off=%v failed", len(buf), off) } off += int64(n) diff --git a/write_test.go b/write_test.go index 2e51f4b..8b92564 100644 --- a/write_test.go +++ b/write_test.go @@ -58,6 +58,14 @@ const ( func TestFileWriter(t *testing.T) { assert := newAssertions(t) + checkCause := func(assert *assertions, expected, err error) { + if reason, ok := err.(reason); ok { + assert.Equal(expected, reason.Cause()) + } else { + assert.Fail("expected failure reason") + } + } + assert.Run("start stop", func(assert *assertions) { var ops testIOOperations _, teardown := newTestWriter(recordWriteOps(&ops), 64) @@ -192,9 +200,7 @@ func TestFileWriter(t *testing.T) { w.Sync(sync, 0) err := waitFn(1*time.Second, sync.Wait) - if expectedErr != err { - assert.FailNow("unexpected error") - } + checkCause(assert, expectedErr, err) // writer should stop on first error and ignore all following commands expectedOps := []opType{opWriteAt, opSync} @@ -230,7 +236,7 @@ func TestFileWriter(t *testing.T) { err := waitFn(5*time.Second, sync.Wait) assert.Error(err) - assert.Equal(expectedErr, err) + checkCause(assert, expectedErr, err) // writer should stop on first error and ignore all following commands expectedOps := []opType{opWriteAt}