Skip to content

Commit

Permalink
Make queue operation more clear
Browse files Browse the repository at this point in the history
License: MIT
Signed-off-by: Erik Ingenito <erik@carbonfive.com>
  • Loading branch information
Erik Ingenito committed Mar 16, 2019
1 parent 7fcafb6 commit 6c1eca9
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 29 deletions.
2 changes: 1 addition & 1 deletion provider/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestAnnouncement(t *testing.T) {

cids := cid.NewSet()

for i := 0; i < 1000; i++ {
for i := 0; i < 100; i++ {
c := blockGenerator.Next().Cid()
cids.Add(c)
}
Expand Down
72 changes: 44 additions & 28 deletions provider/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,37 +61,50 @@ func (q *Queue) Dequeue() <-chan cid.Cid {
return q.dequeue
}

type entry struct {
cid cid.Cid
key datastore.Key
}

// Look for next Cid in the queue and return it. Skip over gaps and mangled data
func (q *Queue) nextEntry() (datastore.Key, cid.Cid) {
for {
if q.head >= q.tail {
return datastore.Key{}, cid.Undef
}

key := q.queueKey(q.head)
value, err := q.ds.Get(key)

if err == datastore.ErrNotFound {
log.Warningf("Error missing entry in queue: %s", key)
q.head++ // move on
continue
} else if err != nil {
log.Warningf("Error fetching from queue: %s", err)
continue
}

c, err := cid.Parse(value)
if err != nil {
log.Warningf("Error marshalling Cid from queue: ", err)
q.head++
err = q.ds.Delete(key)
continue
}

return key, c
}
}

// Run dequeues and enqueues when available.
func (q *Queue) work() {
go func() {

for {
var c cid.Cid = cid.Undef
var key datastore.Key
k, c := q.nextEntry()
var dequeue chan cid.Cid

// If we're not empty dequeue a cid and ship it
if q.head < q.tail {
key = q.queueKey(q.head)
value, err := q.ds.Get(key)

if err == datastore.ErrNotFound {
log.Warningf("Missing entry in queue: %s", err)
q.head++
continue
} else if err != nil {
log.Warningf("Error fetching from queue: %s", err)
continue
}

c, err = cid.Parse(value)
if err != nil {
log.Warningf("Error marshalling Cid from queue: ", err)
q.head++
err = q.ds.Delete(key)
continue
}
}

if c != cid.Undef {
dequeue = q.dequeue
}
Expand All @@ -102,16 +115,19 @@ func (q *Queue) work() {

if err := q.ds.Put(nextKey, toQueue.Bytes()); err != nil {
log.Errorf("Failed to enqueue cid: %s", err)
continue
}

q.tail++
case dequeue <- c:
q.head++
err := q.ds.Delete(key)
err := q.ds.Delete(k)

if err != nil {
log.Errorf("Failed to delete queued cid: %s", err)
log.Errorf("Failed to delete queued cid %s with key %s: %s", c, k, err)
continue
}

q.head++
case <-q.ctx.Done():
return
}
Expand Down

0 comments on commit 6c1eca9

Please sign in to comment.