Skip to content

Commit

Permalink
Improve error reporting in pq package (#18)
Browse files Browse the repository at this point in the history
* Improve error reporting in pq package

- introduce Error type that is compatible to txerr
- improve support for checking and recovering from root cause by
  introducing error kinds and re-export error kinds
- add more context like queue ID (based on pointer address) and page id
  to errors
- create trace of errors annotated with operations that failed
  • Loading branch information
Steffen Siering authored and ph committed Aug 3, 2018
1 parent 372d179 commit 80a2e39
Show file tree
Hide file tree
Showing 12 changed files with 661 additions and 216 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ This project adheres to [Semantic Versioning](http://semver.org/).
- Add support to increase a file's maxSize on open. PR #5
- Add support to pre-allocate the meta area. PR #7
- Begin returns an error if transaction is not compatible to file open mode. PR #17
- Introduce Error type to txfile package. PR #17
- Introduce Error type to txfile and pq package. PR #17, #18

### Changed
- Refine platform dependent file syncing. PR #10
Expand Down
99 changes: 82 additions & 17 deletions pq/access.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,54 +26,69 @@ type access struct {
Delegate
rootID txfile.PageID
rootOff int

quID queueID
}

func makeAccess(delegate Delegate) (access, error) {
func makeAccess(delegate Delegate) (access, ErrKind) {
rootID, rootOff := delegate.Root()
if rootID == 0 {
return access{}, errNoQueueRoot
return access{}, NoQueueRoot
}

return access{
Delegate: delegate,
rootID: rootID,
rootOff: int(rootOff),
}, nil
}, NoError
}

// ReadRoot reads the root page into an array.
// ReadRoot create a short lived read transaction for accessing and copying the
// queue root.
func (a *access) ReadRoot() ([SzRoot]byte, error) {
func (a *access) ReadRoot() ([SzRoot]byte, reason) {
const op = "pq/read-queue-root"

var buf [SzRoot]byte

tx, err := a.BeginRead()
if err != nil {
return buf, err
return buf, a.errWrap(op, err)
}
defer tx.Close()

return buf, withPage(tx, a.rootID, func(page []byte) error {
fail := NoError
err = withPage(tx, a.rootID, func(page []byte) {
n := copy(buf[:], page[a.rootOff:])
if n < SzRoot {
return errIncompleteQueueRoot
fail = InvalidQueueRoot
}
return nil
})

if err != nil {
return buf, a.errWrap(op, err)
}
if fail != NoError {
return buf, a.err(op).of(fail)
}

return buf, nil
}

// RootPage accesses the queue root page from within the passed transaction.
func (a *access) RootPage(tx *txfile.Tx) (*txfile.Page, error) {
// rootPage accesses the queue root page from within the passed transaction.
func (a *access) rootPage(tx *txfile.Tx) (*txfile.Page, error) {
return tx.Page(a.rootID)
}

// LoadRootPage accesses the queue root page from within the passed write
// transaction.
// The Root page it's content is loaded into the write buffer for manipulations.
// The page returned is not marked as dirty yet.
func (a *access) LoadRootPage(tx *txfile.Tx) (*txfile.Page, *queuePage, error) {
func (a *access) LoadRootPage(tx *txfile.Tx) (*txfile.Page, *queuePage, reason) {
const op = "pq/load-queue-root"

var hdr *queuePage
page, err := a.RootPage(tx)
page, err := a.rootPage(tx)
if err == nil {
err = page.Load()
if err == nil {
Expand All @@ -82,17 +97,28 @@ func (a *access) LoadRootPage(tx *txfile.Tx) (*txfile.Page, *queuePage, error) {
}
}

return page, hdr, err
if err != nil {
msg := "Error reading the queue header"
return nil, nil, a.errWrap(op, err).of(ReadFail).report(msg)
}
return page, hdr, nil
}

// RootHdr returns a pointer to the queue root header. The pointer to the
// header is only valid as long as the transaction is still active.
func (a *access) RootHdr(tx *txfile.Tx) (hdr *queuePage, err error) {
err = withPage(tx, a.rootID, func(buf []byte) error {
func (a *access) RootHdr(tx *txfile.Tx) (*queuePage, reason) {
const op = "pq/read-queue-header"

var hdr *queuePage
err := withPage(tx, a.rootID, func(buf []byte) {
hdr = castQueueRootPage(buf[a.rootOff:])
return nil
})
return
if err != nil {
msg := "Error reading the queue header"
return nil, a.errWrap(op, err).of(ReadFail).report(msg)
}

return hdr, nil
}

// ParsePosition parses an on disk position, providing page id, page offset and
Expand Down Expand Up @@ -121,3 +147,42 @@ func (a *access) WritePosition(to *pos, pos position) {
to.offset.Set(uint64(off))
to.id.Set(pos.id)
}

func (a *access) readPageByID(pool *pagePool, id txfile.PageID) (*page, reason) {
const op = "pq/read-single-page"

tx, err := a.BeginRead()
if err != nil {
return nil, a.errWrap(op, err)
}

defer tx.Close()

var page *page
err = withPage(tx, id, func(buf []byte) {
page = pool.NewPageWith(id, buf)
})
if err != nil {
return nil, a.errWrapPage(op, id, err).of(ReadFail)
}

return page, nil
}

func (a *access) err(op string) *Error { return a.errPage(op, 0) }
func (a *access) errPage(op string, id txfile.PageID) *Error {
return &Error{op: op, ctx: a.errPageCtx(id)}
}

func (a *access) errWrap(op string, cause error) *Error { return a.errWrapPage(op, 0, cause) }
func (a *access) errWrapPage(op string, id txfile.PageID, cause error) *Error {
return a.errPage(op, id).causedBy(cause)
}

func (a *access) errCtx() errorCtx { return errorCtx{id: a.quID} }
func (a *access) errPageCtx(id txfile.PageID) errorCtx {
if id != 0 {
return errorCtx{id: a.quID, isPage: true, page: id}
}
return errorCtx{id: a.quID}
}
Loading

0 comments on commit 80a2e39

Please sign in to comment.