diff --git a/core/builder.go b/core/builder.go index 2d6d8b664b83..13dce637d5f4 100644 --- a/core/builder.go +++ b/core/builder.go @@ -277,7 +277,7 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error { n.Resolver = resolver.NewBasicResolver(n.DAG) // Provider - queue, err := provider.NewQueue("provider-v1", ctx, n.Repo.Datastore()) + queue, err := provider.NewQueue(ctx, "provider-v1", n.Repo.Datastore()) if err != nil { return err } diff --git a/core/coreapi/provider.go b/core/coreapi/provider.go index b22a3811c742..8148c87892ef 100644 --- a/core/coreapi/provider.go +++ b/core/coreapi/provider.go @@ -4,8 +4,10 @@ import ( cid "github.com/ipfs/go-cid" ) +// ProviderAPI brings Provider behavior to CoreAPI type ProviderAPI CoreAPI -func (api *ProviderAPI) Provide(root cid.Cid) error { - return api.provider.Provide(root) +// Provide the given cid using the current provider +func (api *ProviderAPI) Provide(cid cid.Cid) error { + return api.provider.Provide(cid) } diff --git a/provider/offline.go b/provider/offline.go index f7b9603b92af..029ddfa9889b 100644 --- a/provider/offline.go +++ b/provider/offline.go @@ -2,8 +2,9 @@ package provider import "github.com/ipfs/go-cid" -type offlineProvider struct {} +type offlineProvider struct{} +// NewOfflineProvider creates a Provider that does nothing func NewOfflineProvider() Provider { return &offlineProvider{} } diff --git a/provider/provider.go b/provider/provider.go index e4ee6d9ffbb3..76004f51abf8 100644 --- a/provider/provider.go +++ b/provider/provider.go @@ -18,13 +18,12 @@ const ( provideOutgoingWorkerLimit = 8 ) +// Provider announces blocks to the network type Provider interface { Run() Provide(cid.Cid) error } -// Provider announces blocks to the network, tracks which blocks are -// being provided, and untracks blocks when they're no longer in the blockstore. type provider struct { ctx context.Context // the CIDs for which provide announcements should be made @@ -33,6 +32,7 @@ type provider struct { contentRouting routing.ContentRouting } +// NewProvider creates a provider that announces blocks to the network using a content router func NewProvider(ctx context.Context, queue *Queue, contentRouting routing.ContentRouting) Provider { return &provider{ ctx: ctx, diff --git a/provider/queue.go b/provider/queue.go index 65656450a03b..c33a4d45809a 100644 --- a/provider/queue.go +++ b/provider/queue.go @@ -17,11 +17,12 @@ import ( // not removed from the datastore until you call Complete() on the entry you // receive. type Entry struct { - cid cid.Cid - key ds.Key + cid cid.Cid + key ds.Key queue *Queue } +// Complete the entry by removing it from the queue func (e *Entry) Complete() error { return e.queue.remove(e.key) } @@ -41,36 +42,37 @@ type Queue struct { tail uint64 head uint64 - lock sync.Mutex + lock sync.Mutex datastore ds.Datastore - dequeue chan *Entry + dequeue chan *Entry notEmpty chan struct{} isRunning bool } -func NewQueue(name string, ctx context.Context, datastore ds.Datastore) (*Queue, error) { - namespaced := namespace.Wrap(datastore, ds.NewKey("/" + name + "/queue/")) - head, tail, err := getQueueHeadTail(name, ctx, namespaced) +// NewQueue creates a queue for cids +func NewQueue(ctx context.Context, name string, datastore ds.Datastore) (*Queue, error) { + namespaced := namespace.Wrap(datastore, ds.NewKey("/"+name+"/queue/")) + head, tail, err := getQueueHeadTail(ctx, name, namespaced) if err != nil { return nil, err } q := &Queue{ - name: name, - ctx: ctx, - head: head, - tail: tail, - lock: sync.Mutex{}, + name: name, + ctx: ctx, + head: head, + tail: tail, + lock: sync.Mutex{}, datastore: namespaced, - dequeue: make(chan *Entry), - notEmpty: make(chan struct{}), + dequeue: make(chan *Entry), + notEmpty: make(chan struct{}), isRunning: false, } return q, nil } -// Put a cid in the queue +// Enqueue puts a cid in the queue func (q *Queue) Enqueue(cid cid.Cid) error { q.lock.Lock() defer q.lock.Unlock() @@ -95,21 +97,18 @@ func (q *Queue) Enqueue(cid cid.Cid) error { return nil } -// Remove an entry from the queue. +// Dequeue returns a channel that if listened to will remove entries from the queue func (q *Queue) Dequeue() <-chan *Entry { return q.dequeue } +// IsEmpty returns whether r not the queue has any items func (q *Queue) IsEmpty() bool { return (q.tail - q.head) == 0 } -func (q *Queue) remove(key ds.Key) error { - return q.datastore.Delete(key) -} - -// dequeue items when the dequeue channel is available to -// be written to +// Run dequeues items when the dequeue channel is available to +// be written to. func (q *Queue) Run() { q.isRunning = true go func() { @@ -178,9 +177,9 @@ func (q *Queue) next() (*Entry, error) { return nil, err } - entry := &Entry { - cid: id, - key: nextKey, + entry := &Entry{ + cid: id, + key: nextKey, queue: q, } @@ -194,14 +193,14 @@ func (q *Queue) queueKey(id uint64) ds.Key { } // crawl over the queue entries to find the head and tail -func getQueueHeadTail(name string, ctx context.Context, datastore ds.Datastore) (uint64, uint64, error) { +func getQueueHeadTail(ctx context.Context, name string, datastore ds.Datastore) (uint64, uint64, error) { query := query.Query{} results, err := datastore.Query(query) if err != nil { return 0, 0, err } - var tail uint64 = 0 + var tail uint64 var head uint64 = math.MaxUint64 for entry := range results.Next() { select { @@ -219,8 +218,8 @@ func getQueueHeadTail(name string, ctx context.Context, datastore ds.Datastore) head = id } - if (id+1) > tail { - tail = (id+1) + if (id + 1) > tail { + tail = (id + 1) } } if err := results.Close(); err != nil { @@ -233,3 +232,6 @@ func getQueueHeadTail(name string, ctx context.Context, datastore ds.Datastore) return head, tail, nil } +func (q *Queue) remove(key ds.Key) error { + return q.datastore.Delete(key) +}