Skip to content

Commit

Permalink
Merge pull request #125 from ipfs/fix/cleanup-query
Browse files Browse the repository at this point in the history
cleanup and optimize naive query filters
  • Loading branch information
Stebalien authored Apr 9, 2019
2 parents bce485c + b086f25 commit 5525660
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 104 deletions.
2 changes: 0 additions & 2 deletions batch.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package datastore

type verb int

type op struct {
delete bool
value []byte
Expand Down
11 changes: 0 additions & 11 deletions key_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package datastore_test

import (
"bytes"
"math/rand"
"path"
"strings"
"testing"
Expand All @@ -14,16 +13,6 @@ import (
// Hook up gocheck into the "go test" runner.
func Test(t *testing.T) { TestingT(t) }

func randomString() string {
chars := "abcdefghijklmnopqrstuvwxyz1234567890"
var buf bytes.Buffer
l := rand.Intn(50)
for j := 0; j < l; j++ {
buf.WriteByte(chars[rand.Intn(len(chars))])
}
return buf.String()
}

type KeySuite struct{}

var _ = Suite(&KeySuite{})
Expand Down
6 changes: 0 additions & 6 deletions query/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,6 @@ import (
"testing"
)

type filterTestCase struct {
filter Filter
keys []string
expect []string
}

func testKeyFilter(t *testing.T, f Filter, keys []string, expect []string) {
e := make([]Entry, len(keys))
for i, k := range keys {
Expand Down
6 changes: 0 additions & 6 deletions query/order_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,6 @@ import (
"testing"
)

type orderTestCase struct {
order Order
keys []string
expect []string
}

func testKeyOrder(t *testing.T, f Order, keys []string, expect []string) {
e := make([]Entry, len(keys))
for i, k := range keys {
Expand Down
26 changes: 19 additions & 7 deletions query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ type Entry struct {
}

// Result is a special entry that includes an error, so that the client
// may be warned about internal errors.
// may be warned about internal errors. If Error is non-nil, Entry must be
// empty.
type Result struct {
Entry

Expand Down Expand Up @@ -203,12 +204,12 @@ func NewResultBuilder(q Query) *ResultBuilder {
}

// ResultsWithChan returns a Results object from a channel
// of Result entries. Respects its own Close()
// of Result entries.
//
// DEPRECATED: This iterator is impossible to cancel correctly. Canceling it
// will leave anything trying to write to the result channel hanging.
func ResultsWithChan(q Query, res <-chan Result) Results {
b := NewResultBuilder(q)

// go consume all the entries and add them to the results.
b.Process.Go(func(worker goprocess.Process) {
return ResultsWithProcess(q, func(worker goprocess.Process, out chan<- Result) {
for {
select {
case <-worker.Closing(): // client told us to close early
Expand All @@ -219,13 +220,24 @@ func ResultsWithChan(q Query, res <-chan Result) Results {
}

select {
case b.Output <- e:
case out <- e:
case <-worker.Closing(): // client told us to close early
return
}
}
}
})
}

// ResultsWithProcess returns a Results object with the results generated by the
// passed subprocess.
func ResultsWithProcess(q Query, proc func(goprocess.Process, chan<- Result)) Results {
b := NewResultBuilder(q)

// go consume all the entries and add them to the results.
b.Process.Go(func(worker goprocess.Process) {
proc(worker, b.Output)
})

go b.Process.CloseAfterChildren()
return b.Results()
Expand Down
153 changes: 81 additions & 72 deletions query/query_impl.go
Original file line number Diff line number Diff line change
@@ -1,78 +1,79 @@
package query

import "sort"
import (
"sort"

func DerivedResults(qr Results, ch <-chan Result) Results {
return &results{
query: qr.Query(),
proc: qr.Process(),
res: ch,
}
}
goprocess "github.com/jbenet/goprocess"
)

// NaiveFilter applies a filter to the results.
func NaiveFilter(qr Results, filter Filter) Results {
ch := make(chan Result)
go func() {
defer close(ch)
defer qr.Close()

for e := range qr.Next() {
if e.Error != nil || filter.Filter(e.Entry) {
ch <- e
return ResultsFromIterator(qr.Query(), Iterator{
Next: func() (Result, bool) {
for {
e, ok := qr.NextSync()
if !ok {
return Result{}, false
}
if e.Error != nil || filter.Filter(e.Entry) {
return e, true
}
}
}
}()

return ResultsWithChan(qr.Query(), ch)
},
Close: func() error {
return qr.Close()
},
})
}

// NaiveLimit truncates the results to a given int limit
func NaiveLimit(qr Results, limit int) Results {
ch := make(chan Result)
go func() {
defer close(ch)
defer qr.Close()

l := 0
for e := range qr.Next() {
if e.Error != nil {
ch <- e
continue
if limit == 0 {
// 0 means no limit
return qr
}
closed := false
return ResultsFromIterator(qr.Query(), Iterator{
Next: func() (Result, bool) {
if limit == 0 {
if !closed {
closed = true
err := qr.Close()
if err != nil {
return Result{Error: err}, true
}
}
return Result{}, false
}
ch <- e
l++
if limit > 0 && l >= limit {
break
limit--
return qr.NextSync()
},
Close: func() error {
if closed {
return nil
}
}
}()

return ResultsWithChan(qr.Query(), ch)
closed = true
return qr.Close()
},
})
}

// NaiveOffset skips a given number of results
func NaiveOffset(qr Results, offset int) Results {
ch := make(chan Result)
go func() {
defer close(ch)
defer qr.Close()

sent := 0
for e := range qr.Next() {
if e.Error != nil {
ch <- e
}

if sent < offset {
sent++
continue
return ResultsFromIterator(qr.Query(), Iterator{
Next: func() (Result, bool) {
for ; offset > 0; offset-- {
res, ok := qr.NextSync()
if !ok || res.Error != nil {
return res, ok
}
}
ch <- e
}
}()

return ResultsWithChan(qr.Query(), ch)
return qr.NextSync()
},
Close: func() error {
return qr.Close()
},
})
}

// NaiveOrder reorders results according to given orders.
Expand All @@ -83,29 +84,37 @@ func NaiveOrder(qr Results, orders ...Order) Results {
return qr
}

ch := make(chan Result)
var entries []Entry
go func() {
defer close(ch)
return ResultsWithProcess(qr.Query(), func(worker goprocess.Process, out chan<- Result) {
defer qr.Close()

for e := range qr.Next() {
if e.Error != nil {
ch <- e
var entries []Entry
collect:
for {
select {
case <-worker.Closing():
return
case e, ok := <-qr.Next():
if !ok {
break collect
}
if e.Error != nil {
out <- e
continue
}
entries = append(entries, e.Entry)
}

entries = append(entries, e.Entry)
}

sort.Slice(entries, func(i int, j int) bool {
return Less(orders, entries[i], entries[j])
})

for _, e := range entries {
ch <- Result{Entry: e}
select {
case <-worker.Closing():
return
case out <- Result{Entry: e}:
}
}
}()

return DerivedResults(qr, ch)
})
}

func NaiveQueryApply(q Query, qr Results) Results {
Expand Down

0 comments on commit 5525660

Please sign in to comment.