From b10e3891357956021516eec0b4463d4c351af595 Mon Sep 17 00:00:00 2001 From: Michael Avila Date: Tue, 2 Jul 2019 09:49:55 -0700 Subject: [PATCH 1/3] Change queue to be timestamp based --- provider/queue/queue.go | 122 +++++++++-------------------------- provider/queue/queue_test.go | 44 +++---------- 2 files changed, 37 insertions(+), 129 deletions(-) diff --git a/provider/queue/queue.go b/provider/queue/queue.go index 2afbc81ee9b..dceb67447a7 100644 --- a/provider/queue/queue.go +++ b/provider/queue/queue.go @@ -3,8 +3,7 @@ package queue import ( "context" "fmt" - "strconv" - "strings" + "time" cid "github.com/ipfs/go-cid" datastore "github.com/ipfs/go-datastore" @@ -25,8 +24,6 @@ type Queue struct { // e.g. provider vs reprovider name string ctx context.Context - tail uint64 - head uint64 ds datastore.Datastore // Must be threadsafe dequeue chan cid.Cid enqueue chan cid.Cid @@ -37,16 +34,10 @@ type Queue struct { // NewQueue creates a queue for cids func NewQueue(ctx context.Context, name string, ds datastore.Datastore) (*Queue, error) { namespaced := namespace.Wrap(ds, datastore.NewKey("/"+name+"/queue/")) - head, tail, err := getQueueHeadTail(ctx, namespaced) - if err != nil { - return nil, err - } cancelCtx, cancel := context.WithCancel(ctx) q := &Queue{ name: name, ctx: cancelCtx, - head: head, - tail: tail, ds: namespaced, dequeue: make(chan cid.Cid), enqueue: make(chan cid.Cid), @@ -77,41 +68,6 @@ func (q *Queue) Dequeue() <-chan cid.Cid { return q.dequeue } -// Look for next Cid in the queue and return it. Skip over gaps and mangled data -func (q *Queue) nextEntry() (datastore.Key, cid.Cid) { - for { - if q.head >= q.tail { - return datastore.Key{}, cid.Undef - } - - key := q.queueKey(q.head) - value, err := q.ds.Get(key) - - if err != nil { - if err == datastore.ErrNotFound { - log.Warningf("Error missing entry in queue: %s", key) - } else { - log.Errorf("Error fetching from queue: %s", err) - } - q.head++ // move on - continue - } - - c, err := cid.Parse(value) - if err != nil { - log.Warningf("Error marshalling Cid from queue: ", err) - q.head++ - err = q.ds.Delete(key) - if err != nil { - log.Warningf("Provider queue failed to delete: %s", key) - } - continue - } - - return key, c - } -} - // Run dequeues and enqueues when available. func (q *Queue) work() { go func() { @@ -124,7 +80,26 @@ func (q *Queue) work() { for { if c == cid.Undef { - k, c = q.nextEntry() + head, e := q.getQueueHead() + + if e != nil { + log.Errorf("error querying for head of queue: %s, stopping provider", e) + return + } else if head != nil { + k = datastore.NewKey(head.Key) + c, e = cid.Parse(head.Value) + if e != nil { + log.Warningf("error parsing queue entry cid with key (%s), removing it from queue: %s", head.Key, e) + err := q.ds.Delete(k) + if err != nil { + log.Errorf("error deleting queue entry with key (%s), due to error (%s), stopping provider", head.Key, err) + return + } + continue + } + } else { + c = cid.Undef + } } // If c != cid.Undef set dequeue and attempt write, otherwise wait for enqueue @@ -135,14 +110,12 @@ func (q *Queue) work() { select { case toQueue := <-q.enqueue: - nextKey := q.queueKey(q.tail) + nextKey := datastore.NewKey(fmt.Sprintf("%d", time.Now().UnixNano())) if err := q.ds.Put(nextKey, toQueue.Bytes()); err != nil { log.Errorf("Failed to enqueue cid: %s", err) continue } - - q.tail++ case dequeue <- c: err := q.ds.Delete(k) @@ -151,7 +124,6 @@ func (q *Queue) work() { continue } c = cid.Undef - q.head++ case <-q.ctx.Done(): return } @@ -159,53 +131,17 @@ func (q *Queue) work() { }() } -func (q *Queue) queueKey(id uint64) datastore.Key { - s := fmt.Sprintf("%016X", id) - return datastore.NewKey(s) -} - -func getQueueHeadTail(ctx context.Context, datastore datastore.Datastore) (uint64, uint64, error) { - head, err := getQueueHead(datastore) - if err != nil { - return 0, 0, err - } - tail, err := getQueueTail(datastore) - if err != nil { - return 0, 0, err - } - return head, tail, nil -} - -func getQueueHead(ds datastore.Datastore) (uint64, error) { - return getFirstIDByOrder(ds, query.OrderByKey{}) -} - -func getQueueTail(ds datastore.Datastore) (uint64, error) { - tail, err := getFirstIDByOrder(ds, query.OrderByKeyDescending{}) +func (q *Queue) getQueueHead() (*query.Result, error) { + qry := query.Query{Orders: []query.Order{query.OrderByKey{}}} + results, err := q.ds.Query(qry) if err != nil { - return 0, err - } - if tail > 0 { - tail++ - } - return tail, nil -} - -func getFirstIDByOrder(ds datastore.Datastore, order query.Order) (uint64, error) { - q := query.Query{Orders: []query.Order{order}} - results, err := ds.Query(q) - if err != nil { - return 0, err + return nil, err } defer results.Close() r, ok := results.NextSync() if !ok { - return 0, nil - } - trimmed := strings.TrimPrefix(r.Key, "/") - id, err := strconv.ParseUint(trimmed, 16, 64) - if err != nil { - return 0, err + return nil, nil } - return id, nil + + return &r, nil } diff --git a/provider/queue/queue_test.go b/provider/queue/queue_test.go index c8fb8682e31..819fa90f978 100644 --- a/provider/queue/queue_test.go +++ b/provider/queue/queue_test.go @@ -5,9 +5,9 @@ import ( "testing" "time" - cid "github.com/ipfs/go-cid" - datastore "github.com/ipfs/go-datastore" - sync "github.com/ipfs/go-datastore/sync" + "github.com/ipfs/go-cid" + "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/sync" "github.com/ipfs/go-ipfs-blocksutil" ) @@ -55,36 +55,6 @@ func TestBasicOperation(t *testing.T) { assertOrdered(cids, queue, t) } -func TestSparseDatastore(t *testing.T) { - ctx := context.Background() - defer ctx.Done() - - ds := sync.MutexWrap(datastore.NewMapDatastore()) - queue, err := NewQueue(ctx, "test", ds) - if err != nil { - t.Fatal(err) - } - - cids := makeCids(10) - for _, c := range cids { - queue.Enqueue(c) - } - - // remove entries in the middle - err = queue.ds.Delete(queue.queueKey(5)) - if err != nil { - t.Fatal(err) - } - - err = queue.ds.Delete(queue.queueKey(6)) - if err != nil { - t.Fatal(err) - } - - expected := append(cids[:5], cids[7:]...) - assertOrdered(expected, queue, t) -} - func TestMangledData(t *testing.T) { ctx := context.Background() defer ctx.Done() @@ -100,13 +70,15 @@ func TestMangledData(t *testing.T) { queue.Enqueue(c) } - // remove entries in the middle - err = queue.ds.Put(queue.queueKey(5), []byte("borked")) + // put bad data in the queue + queueKey := datastore.NewKey("/test/0") + err = queue.ds.Put(queueKey, []byte("borked")) if err != nil { t.Fatal(err) } - expected := append(cids[:5], cids[6:]...) + // expect to only see the valid cids we entered + expected := cids assertOrdered(expected, queue, t) } From 2ef67c97319f8669362305349d8c818542de26d3 Mon Sep 17 00:00:00 2001 From: Michael Avila Date: Tue, 2 Jul 2019 09:50:14 -0700 Subject: [PATCH 2/3] Avoid collisions by appending cid --- provider/queue/queue.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/provider/queue/queue.go b/provider/queue/queue.go index dceb67447a7..29355223371 100644 --- a/provider/queue/queue.go +++ b/provider/queue/queue.go @@ -110,7 +110,8 @@ func (q *Queue) work() { select { case toQueue := <-q.enqueue: - nextKey := datastore.NewKey(fmt.Sprintf("%d", time.Now().UnixNano())) + keyPath := fmt.Sprintf("%d/%s", time.Now().UnixNano(), c.String()) + nextKey := datastore.NewKey(keyPath) if err := q.ds.Put(nextKey, toQueue.Bytes()); err != nil { log.Errorf("Failed to enqueue cid: %s", err) From 143e41570bca067d92bec85cfa5e5fe283946f21 Mon Sep 17 00:00:00 2001 From: Michael Avila Date: Tue, 2 Jul 2019 13:15:14 -0700 Subject: [PATCH 3/3] Add limit for getQueueHead call --- provider/queue/queue.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/provider/queue/queue.go b/provider/queue/queue.go index 29355223371..ddaa97582e8 100644 --- a/provider/queue/queue.go +++ b/provider/queue/queue.go @@ -133,7 +133,7 @@ func (q *Queue) work() { } func (q *Queue) getQueueHead() (*query.Result, error) { - qry := query.Query{Orders: []query.Order{query.OrderByKey{}}} + qry := query.Query{Orders: []query.Order{query.OrderByKey{}}, Limit: 1} results, err := q.ds.Query(qry) if err != nil { return nil, err