Skip to content

Commit

Permalink
Merge pull request #124 from ipfs/fix/sorted-limited-offset-mount-que…
Browse files Browse the repository at this point in the history
…ries

Fix – sorted limited offset mount queries
  • Loading branch information
Stebalien authored Mar 23, 2019
2 parents 8e1699b + 18ef644 commit bce485c
Show file tree
Hide file tree
Showing 5 changed files with 550 additions and 94 deletions.
212 changes: 149 additions & 63 deletions mount/mount.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package mount

import (
"container/heap"
"errors"
"fmt"
"sort"
Expand Down Expand Up @@ -49,6 +50,105 @@ func (d *Datastore) lookup(key ds.Key) (ds.Datastore, ds.Key, ds.Key) {
return nil, ds.NewKey("/"), key
}

type queryResults struct {
mount ds.Key
results query.Results
next query.Result
}

func (qr *queryResults) advance() bool {
if qr.results == nil {
return false
}

qr.next = query.Result{}
r, more := qr.results.NextSync()
if !more {
err := qr.results.Close()
qr.results = nil
if err != nil {
// One more result, the error.
qr.next = query.Result{Error: err}
return true
}
return false
}

r.Key = qr.mount.Child(ds.RawKey(r.Key)).String()
qr.next = r
return true
}

type querySet struct {
query query.Query
heads []*queryResults
}

func (h *querySet) Len() int {
return len(h.heads)
}

func (h *querySet) Less(i, j int) bool {
return query.Less(h.query.Orders, h.heads[i].next.Entry, h.heads[j].next.Entry)
}

func (h *querySet) Swap(i, j int) {
h.heads[i], h.heads[j] = h.heads[j], h.heads[i]
}

func (h *querySet) Push(x interface{}) {
h.heads = append(h.heads, x.(*queryResults))
}

func (h *querySet) Pop() interface{} {
i := len(h.heads) - 1
last := h.heads[i]
h.heads[i] = nil
h.heads = h.heads[:i]
return last
}

func (h *querySet) close() error {
var errs []error
for _, qr := range h.heads {
err := qr.results.Close()
if err != nil {
errs = append(errs, err)
}
}
h.heads = nil
if len(errs) > 0 {
return errs[0]
}
return nil
}

func (h *querySet) addResults(mount ds.Key, results query.Results) {
r := &queryResults{
results: results,
mount: mount,
}
if r.advance() {
heap.Push(h, r)
}
}

func (h *querySet) next() (query.Result, bool) {
if len(h.heads) == 0 {
return query.Result{}, false
}
head := h.heads[0]
next := head.next

if head.advance() {
heap.Fix(h, 0)
} else {
heap.Remove(h, 0)
}

return next, true
}

// lookupAll returns all mounts that might contain keys that are descendant of <key>
//
// Matching: /ao/e
Expand Down Expand Up @@ -121,73 +221,59 @@ func (d *Datastore) Delete(key ds.Key) error {
return cds.Delete(k)
}

func (d *Datastore) Query(q query.Query) (query.Results, error) {
if len(q.Filters) > 0 ||
len(q.Orders) > 0 ||
q.Limit > 0 ||
q.Offset > 0 {
// TODO this is still overly simplistic, but the only callers are
// `ipfs refs local` and ipfs-ds-convert.
return nil, errors.New("mount only supports listing all prefixed keys in random order")
func (d *Datastore) Query(master query.Query) (query.Results, error) {
childQuery := query.Query{
Prefix: master.Prefix,
Limit: master.Limit,
Orders: master.Orders,
KeysOnly: master.KeysOnly,
ReturnExpirations: master.ReturnExpirations,
}
prefix := ds.NewKey(q.Prefix)

prefix := ds.NewKey(childQuery.Prefix)
dses, mounts, rests := d.lookupAll(prefix)

// current itorator state
var res query.Results
var mount ds.Key
i := 0

return query.ResultsFromIterator(q, query.Iterator{
Next: func() (query.Result, bool) {
var r query.Result
var more bool

for try := true; try; try = len(dses) > i {
if res == nil {
if len(dses) <= i {
//This should not happen normally
return query.Result{}, false
}

dst := dses[i]
mount = mounts[i]
rest := rests[i]

q2 := q
q2.Prefix = rest.String()
r, err := dst.Query(q2)
if err != nil {
return query.Result{Error: err}, false
}
res = r
}

r, more = res.NextSync()
if !more {
err := res.Close()
if err != nil {
return query.Result{Error: err}, false
}
res = nil

i++
more = len(dses) > i
} else {
break
}
}
queries := &querySet{
query: childQuery,
heads: make([]*queryResults, 0, len(dses)),
}

r.Key = mount.Child(ds.RawKey(r.Key)).String()
return r, more
},
Close: func() error {
if len(mounts) > i && res != nil {
return res.Close()
}
return nil
},
}), nil
for i := range dses {
mount := mounts[i]
dstore := dses[i]
rest := rests[i]

qi := childQuery
qi.Prefix = rest.String()
results, err := dstore.Query(qi)

if err != nil {
_ = queries.close()
return nil, err
}
queries.addResults(mount, results)
}

qr := query.ResultsFromIterator(childQuery, query.Iterator{
Next: queries.next,
Close: queries.close,
})

if len(master.Filters) > 0 {
for _, f := range master.Filters {
qr = query.NaiveFilter(qr, f)
}
}

if master.Offset > 0 {
qr = query.NaiveOffset(qr, master.Offset)
}

if childQuery.Limit > 0 {
qr = query.NaiveLimit(qr, childQuery.Limit)
}

return qr, nil
}

func (d *Datastore) Close() error {
Expand Down
Loading

0 comments on commit bce485c

Please sign in to comment.