diff --git a/core/core.go b/core/core.go index 24535208dbf..f87fd1da779 100644 --- a/core/core.go +++ b/core/core.go @@ -23,12 +23,12 @@ import ( ipnsrp "github.com/ipfs/go-ipfs/namesys/republisher" "github.com/ipfs/go-ipfs/p2p" "github.com/ipfs/go-ipfs/pin" - "github.com/ipfs/go-ipfs/provider" "github.com/ipfs/go-ipfs/repo" bserv "github.com/ipfs/go-blockservice" bstore "github.com/ipfs/go-ipfs-blockstore" exchange "github.com/ipfs/go-ipfs-exchange-interface" + "github.com/ipfs/go-ipfs-provider" ipld "github.com/ipfs/go-ipld-format" logging "github.com/ipfs/go-log" mfs "github.com/ipfs/go-mfs" diff --git a/core/coreapi/coreapi.go b/core/coreapi/coreapi.go index 85a2b24f602..5f2201df624 100644 --- a/core/coreapi/coreapi.go +++ b/core/coreapi/coreapi.go @@ -22,13 +22,13 @@ import ( "github.com/ipfs/go-ipfs/core/node" "github.com/ipfs/go-ipfs/namesys" "github.com/ipfs/go-ipfs/pin" - "github.com/ipfs/go-ipfs/provider" "github.com/ipfs/go-ipfs/repo" bserv "github.com/ipfs/go-blockservice" "github.com/ipfs/go-ipfs-blockstore" "github.com/ipfs/go-ipfs-exchange-interface" offlinexch "github.com/ipfs/go-ipfs-exchange-offline" + "github.com/ipfs/go-ipfs-provider" offlineroute "github.com/ipfs/go-ipfs-routing/offline" ipld "github.com/ipfs/go-ipld-format" logging "github.com/ipfs/go-log" diff --git a/core/node/provider.go b/core/node/provider.go index c87e2ac58d5..5390c3a9b70 100644 --- a/core/node/provider.go +++ b/core/node/provider.go @@ -6,10 +6,13 @@ import ( "time" "github.com/ipfs/go-ipfs/core/node/helpers" - "github.com/ipfs/go-ipfs/provider" - q "github.com/ipfs/go-ipfs/provider/queue" - "github.com/ipfs/go-ipfs/provider/simple" + "github.com/ipfs/go-ipfs/pin" "github.com/ipfs/go-ipfs/repo" + + "github.com/ipfs/go-ipfs-provider" + q "github.com/ipfs/go-ipfs-provider/queue" + "github.com/ipfs/go-ipfs-provider/simple" + ipld "github.com/ipfs/go-ipld-format" "github.com/libp2p/go-libp2p-core/routing" "go.uber.org/fx" ) @@ -101,9 +104,9 @@ func SimpleProviders(reprovideStrategy string, reprovideInterval string) fx.Opti case "": keyProvider = fx.Provide(simple.NewBlockstoreProvider) case "roots": - keyProvider = fx.Provide(simple.NewPinnedProvider(true)) + keyProvider = fx.Provide(pinnedProviderStrategy(true)) case "pinned": - keyProvider = fx.Provide(simple.NewPinnedProvider(false)) + keyProvider = fx.Provide(pinnedProviderStrategy(false)) default: return fx.Error(fmt.Errorf("unknown reprovider strategy '%s'", reprovideStrategy)) } @@ -115,3 +118,9 @@ func SimpleProviders(reprovideStrategy string, reprovideInterval string) fx.Opti fx.Provide(SimpleReprovider(reproviderInterval)), ) } + +func pinnedProviderStrategy(onlyRoots bool) interface{} { + return func(pinner pin.Pinner, dag ipld.DAGService) simple.KeyChanFunc { + return simple.NewPinnedProvider(onlyRoots, pinner, dag) + } +} diff --git a/go.mod b/go.mod index 845548a46e4..b904bbaf4b1 100644 --- a/go.mod +++ b/go.mod @@ -39,6 +39,7 @@ require ( github.com/ipfs/go-ipfs-exchange-offline v0.0.1 github.com/ipfs/go-ipfs-files v0.0.3 github.com/ipfs/go-ipfs-posinfo v0.0.1 + github.com/ipfs/go-ipfs-provider v0.1.1 github.com/ipfs/go-ipfs-routing v0.1.0 github.com/ipfs/go-ipfs-util v0.0.1 github.com/ipfs/go-ipld-cbor v0.0.2 @@ -112,6 +113,7 @@ require ( go4.org v0.0.0-20190313082347-94abd6928b1d // indirect golang.org/x/sync v0.0.0-20190423024810-112230192c58 // indirect golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb + google.golang.org/appengine v1.4.0 // indirect gopkg.in/cheggaaa/pb.v1 v1.0.28 gotest.tools/gotestsum v0.3.4 ) diff --git a/go.sum b/go.sum index 337302c8ad7..8b3b986bd22 100644 --- a/go.sum +++ b/go.sum @@ -270,6 +270,10 @@ github.com/ipfs/go-ipfs-posinfo v0.0.1 h1:Esoxj+1JgSjX0+ylc0hUmJCOv6V2vFoZiETLR6 github.com/ipfs/go-ipfs-posinfo v0.0.1/go.mod h1:SwyeVP+jCwiDu0C313l/8jg6ZxM0qqtlt2a0vILTc1A= github.com/ipfs/go-ipfs-pq v0.0.1 h1:zgUotX8dcAB/w/HidJh1zzc1yFq6Vm8J7T2F4itj/RU= github.com/ipfs/go-ipfs-pq v0.0.1/go.mod h1:LWIqQpqfRG3fNc5XsnIhz/wQ2XXGyugQwls7BgUmUfY= +github.com/ipfs/go-ipfs-provider v0.1.0 h1:lYSVVxWpL0KJw1PLj3+DAn0zuVfc+z93wzUXS09ZjZk= +github.com/ipfs/go-ipfs-provider v0.1.0/go.mod h1:gzVZZXC4zhr2r+MkNR21/+FS54oc7VfTKtDT2mdDxD8= +github.com/ipfs/go-ipfs-provider v0.1.1 h1:nsC6oWr6bDJ4H7pZfZJqAk6oXaHsrqnwhpQIqTdSDic= +github.com/ipfs/go-ipfs-provider v0.1.1/go.mod h1:gzVZZXC4zhr2r+MkNR21/+FS54oc7VfTKtDT2mdDxD8= github.com/ipfs/go-ipfs-routing v0.0.1/go.mod h1:k76lf20iKFxQTjcJokbPM9iBXVXVZhcOwc360N4nuKs= github.com/ipfs/go-ipfs-routing v0.1.0 h1:gAJTT1cEeeLj6/DlLX6t+NxD9fQe2ymTO6qWRDI/HQQ= github.com/ipfs/go-ipfs-routing v0.1.0/go.mod h1:hYoUkJLyAUKhF58tysKpids8RNDPO42BVMgK5dNsoqY= diff --git a/provider/offline.go b/provider/offline.go deleted file mode 100644 index 5511364ed74..00000000000 --- a/provider/offline.go +++ /dev/null @@ -1,28 +0,0 @@ -package provider - -import ( - "context" - "github.com/ipfs/go-cid" -) - -type offlineProvider struct{} - -// NewOfflineProvider creates a ProviderSystem that does nothing -func NewOfflineProvider() System { - return &offlineProvider{} -} - -func (op *offlineProvider) Run() { -} - -func (op *offlineProvider) Close() error { - return nil -} - -func (op *offlineProvider) Provide(cid.Cid) error { - return nil -} - -func (op *offlineProvider) Reprovide(context.Context) error { - return nil -} diff --git a/provider/provider.go b/provider/provider.go deleted file mode 100644 index 7dec4c172e6..00000000000 --- a/provider/provider.go +++ /dev/null @@ -1,26 +0,0 @@ -package provider - -import ( - "context" - "github.com/ipfs/go-cid" -) - -// Provider announces blocks to the network -type Provider interface { - // Run is used to begin processing the provider work - Run() - // Provide takes a cid and makes an attempt to announce it to the network - Provide(cid.Cid) error - // Close stops the provider - Close() error -} - -// Reprovider reannounces blocks to the network -type Reprovider interface { - // Run is used to begin processing the reprovider work and waiting for reprovide triggers - Run() - // Trigger a reprovide - Trigger(context.Context) error - // Close stops the reprovider - Close() error -} diff --git a/provider/queue/queue.go b/provider/queue/queue.go deleted file mode 100644 index ddaa97582e8..00000000000 --- a/provider/queue/queue.go +++ /dev/null @@ -1,148 +0,0 @@ -package queue - -import ( - "context" - "fmt" - "time" - - cid "github.com/ipfs/go-cid" - datastore "github.com/ipfs/go-datastore" - namespace "github.com/ipfs/go-datastore/namespace" - query "github.com/ipfs/go-datastore/query" - logging "github.com/ipfs/go-log" -) - -var log = logging.Logger("provider.queue") - -// Queue provides a durable, FIFO interface to the datastore for storing cids -// -// Durability just means that cids in the process of being provided when a -// crash or shutdown occurs will still be in the queue when the node is -// brought back online. -type Queue struct { - // used to differentiate queues in datastore - // e.g. provider vs reprovider - name string - ctx context.Context - ds datastore.Datastore // Must be threadsafe - dequeue chan cid.Cid - enqueue chan cid.Cid - close context.CancelFunc - closed chan struct{} -} - -// NewQueue creates a queue for cids -func NewQueue(ctx context.Context, name string, ds datastore.Datastore) (*Queue, error) { - namespaced := namespace.Wrap(ds, datastore.NewKey("/"+name+"/queue/")) - cancelCtx, cancel := context.WithCancel(ctx) - q := &Queue{ - name: name, - ctx: cancelCtx, - ds: namespaced, - dequeue: make(chan cid.Cid), - enqueue: make(chan cid.Cid), - close: cancel, - closed: make(chan struct{}, 1), - } - q.work() - return q, nil -} - -// Close stops the queue -func (q *Queue) Close() error { - q.close() - <-q.closed - return nil -} - -// Enqueue puts a cid in the queue -func (q *Queue) Enqueue(cid cid.Cid) { - select { - case q.enqueue <- cid: - case <-q.ctx.Done(): - } -} - -// Dequeue returns a channel that if listened to will remove entries from the queue -func (q *Queue) Dequeue() <-chan cid.Cid { - return q.dequeue -} - -// Run dequeues and enqueues when available. -func (q *Queue) work() { - go func() { - var k datastore.Key = datastore.Key{} - var c cid.Cid = cid.Undef - - defer func() { - close(q.closed) - }() - - for { - if c == cid.Undef { - head, e := q.getQueueHead() - - if e != nil { - log.Errorf("error querying for head of queue: %s, stopping provider", e) - return - } else if head != nil { - k = datastore.NewKey(head.Key) - c, e = cid.Parse(head.Value) - if e != nil { - log.Warningf("error parsing queue entry cid with key (%s), removing it from queue: %s", head.Key, e) - err := q.ds.Delete(k) - if err != nil { - log.Errorf("error deleting queue entry with key (%s), due to error (%s), stopping provider", head.Key, err) - return - } - continue - } - } else { - c = cid.Undef - } - } - - // If c != cid.Undef set dequeue and attempt write, otherwise wait for enqueue - var dequeue chan cid.Cid - if c != cid.Undef { - dequeue = q.dequeue - } - - select { - case toQueue := <-q.enqueue: - keyPath := fmt.Sprintf("%d/%s", time.Now().UnixNano(), c.String()) - nextKey := datastore.NewKey(keyPath) - - if err := q.ds.Put(nextKey, toQueue.Bytes()); err != nil { - log.Errorf("Failed to enqueue cid: %s", err) - continue - } - case dequeue <- c: - err := q.ds.Delete(k) - - if err != nil { - log.Errorf("Failed to delete queued cid %s with key %s: %s", c, k, err) - continue - } - c = cid.Undef - case <-q.ctx.Done(): - return - } - } - }() -} - -func (q *Queue) getQueueHead() (*query.Result, error) { - qry := query.Query{Orders: []query.Order{query.OrderByKey{}}, Limit: 1} - results, err := q.ds.Query(qry) - if err != nil { - return nil, err - } - defer results.Close() - r, ok := results.NextSync() - if !ok { - return nil, nil - } - - return &r, nil -} diff --git a/provider/queue/queue_test.go b/provider/queue/queue_test.go deleted file mode 100644 index 819fa90f978..00000000000 --- a/provider/queue/queue_test.go +++ /dev/null @@ -1,133 +0,0 @@ -package queue - -import ( - "context" - "testing" - "time" - - "github.com/ipfs/go-cid" - "github.com/ipfs/go-datastore" - "github.com/ipfs/go-datastore/sync" - "github.com/ipfs/go-ipfs-blocksutil" -) - -var blockGenerator = blocksutil.NewBlockGenerator() - -func makeCids(n int) []cid.Cid { - cids := make([]cid.Cid, 0, n) - for i := 0; i < n; i++ { - c := blockGenerator.Next().Cid() - cids = append(cids, c) - } - return cids -} - -func assertOrdered(cids []cid.Cid, q *Queue, t *testing.T) { - for _, c := range cids { - select { - case dequeued := <-q.dequeue: - if c != dequeued { - t.Fatalf("Error in ordering of CIDs retrieved from queue. Expected: %s, got: %s", c, dequeued) - } - - case <-time.After(time.Second * 1): - t.Fatal("Timeout waiting for cids to be provided.") - } - } -} - -func TestBasicOperation(t *testing.T) { - ctx := context.Background() - defer ctx.Done() - - ds := sync.MutexWrap(datastore.NewMapDatastore()) - queue, err := NewQueue(ctx, "test", ds) - if err != nil { - t.Fatal(err) - } - - cids := makeCids(10) - - for _, c := range cids { - queue.Enqueue(c) - } - - assertOrdered(cids, queue, t) -} - -func TestMangledData(t *testing.T) { - ctx := context.Background() - defer ctx.Done() - - ds := sync.MutexWrap(datastore.NewMapDatastore()) - queue, err := NewQueue(ctx, "test", ds) - if err != nil { - t.Fatal(err) - } - - cids := makeCids(10) - for _, c := range cids { - queue.Enqueue(c) - } - - // put bad data in the queue - queueKey := datastore.NewKey("/test/0") - err = queue.ds.Put(queueKey, []byte("borked")) - if err != nil { - t.Fatal(err) - } - - // expect to only see the valid cids we entered - expected := cids - assertOrdered(expected, queue, t) -} - -func TestInitialization(t *testing.T) { - ctx := context.Background() - defer ctx.Done() - - ds := sync.MutexWrap(datastore.NewMapDatastore()) - queue, err := NewQueue(ctx, "test", ds) - if err != nil { - t.Fatal(err) - } - - cids := makeCids(10) - for _, c := range cids { - queue.Enqueue(c) - } - - assertOrdered(cids[:5], queue, t) - - // make a new queue, same data - queue, err = NewQueue(ctx, "test", ds) - if err != nil { - t.Fatal(err) - } - - assertOrdered(cids[5:], queue, t) -} - -func TestInitializationWithManyCids(t *testing.T) { - ctx := context.Background() - defer ctx.Done() - - ds := sync.MutexWrap(datastore.NewMapDatastore()) - queue, err := NewQueue(ctx, "test", ds) - if err != nil { - t.Fatal(err) - } - - cids := makeCids(25) - for _, c := range cids { - queue.Enqueue(c) - } - - // make a new queue, same data - queue, err = NewQueue(ctx, "test", ds) - if err != nil { - t.Fatal(err) - } - - assertOrdered(cids, queue, t) -} diff --git a/provider/simple/provider.go b/provider/simple/provider.go deleted file mode 100644 index abe13ce5986..00000000000 --- a/provider/simple/provider.go +++ /dev/null @@ -1,72 +0,0 @@ -// Package simple implements structures and methods to provide blocks, -// keep track of which blocks are provided, and to allow those blocks to -// be reprovided. -package simple - -import ( - "context" - - cid "github.com/ipfs/go-cid" - q "github.com/ipfs/go-ipfs/provider/queue" - logging "github.com/ipfs/go-log" - routing "github.com/libp2p/go-libp2p-core/routing" -) - -var logP = logging.Logger("provider.simple") - -const provideOutgoingWorkerLimit = 8 - -// Provider announces blocks to the network -type Provider struct { - ctx context.Context - // the CIDs for which provide announcements should be made - queue *q.Queue - // used to announce providing to the network - contentRouting routing.ContentRouting -} - -// NewProvider creates a provider that announces blocks to the network using a content router -func NewProvider(ctx context.Context, queue *q.Queue, contentRouting routing.ContentRouting) *Provider { - return &Provider{ - ctx: ctx, - queue: queue, - contentRouting: contentRouting, - } -} - -// Close stops the provider -func (p *Provider) Close() error { - p.queue.Close() - return nil -} - -// Run workers to handle provide requests. -func (p *Provider) Run() { - p.handleAnnouncements() -} - -// Provide the given cid using specified strategy. -func (p *Provider) Provide(root cid.Cid) error { - p.queue.Enqueue(root) - return nil -} - -// Handle all outgoing cids by providing (announcing) them -func (p *Provider) handleAnnouncements() { - for workers := 0; workers < provideOutgoingWorkerLimit; workers++ { - go func() { - for p.ctx.Err() == nil { - select { - case <-p.ctx.Done(): - return - case c := <-p.queue.Dequeue(): - logP.Info("announce - start - ", c) - if err := p.contentRouting.Provide(p.ctx, c, true); err != nil { - logP.Warningf("Unable to provide entry: %s, %s", c, err) - } - logP.Info("announce - end - ", c) - } - } - }() - } -} diff --git a/provider/simple/provider_test.go b/provider/simple/provider_test.go deleted file mode 100644 index 4922958c8c9..00000000000 --- a/provider/simple/provider_test.go +++ /dev/null @@ -1,83 +0,0 @@ -package simple_test - -import ( - "context" - "math/rand" - "testing" - "time" - - cid "github.com/ipfs/go-cid" - datastore "github.com/ipfs/go-datastore" - sync "github.com/ipfs/go-datastore/sync" - blocksutil "github.com/ipfs/go-ipfs-blocksutil" - peer "github.com/libp2p/go-libp2p-core/peer" - - q "github.com/ipfs/go-ipfs/provider/queue" - - . "github.com/ipfs/go-ipfs/provider/simple" -) - -var blockGenerator = blocksutil.NewBlockGenerator() - -type mockRouting struct { - provided chan cid.Cid -} - -func (r *mockRouting) Provide(ctx context.Context, cid cid.Cid, recursive bool) error { - r.provided <- cid - return nil -} - -func (r *mockRouting) FindProvidersAsync(ctx context.Context, cid cid.Cid, timeout int) <-chan peer.AddrInfo { - return nil -} - -func mockContentRouting() *mockRouting { - r := mockRouting{} - r.provided = make(chan cid.Cid) - return &r -} - -func TestAnnouncement(t *testing.T) { - ctx := context.Background() - defer ctx.Done() - - ds := sync.MutexWrap(datastore.NewMapDatastore()) - queue, err := q.NewQueue(ctx, "test", ds) - if err != nil { - t.Fatal(err) - } - - r := mockContentRouting() - - prov := NewProvider(ctx, queue, r) - prov.Run() - - cids := cid.NewSet() - - for i := 0; i < 100; i++ { - c := blockGenerator.Next().Cid() - cids.Add(c) - } - - go func() { - for _, c := range cids.Keys() { - err = prov.Provide(c) - // A little goroutine stirring to exercise some different states - r := rand.Intn(10) - time.Sleep(time.Microsecond * time.Duration(r)) - } - }() - - for cids.Len() > 0 { - select { - case cp := <-r.provided: - if !cids.Has(cp) { - t.Fatal("Wrong CID provided") - } - cids.Remove(cp) - case <-time.After(time.Second * 5): - t.Fatal("Timeout waiting for cids to be provided.") - } - } -} diff --git a/provider/simple/reprovide.go b/provider/simple/reprovide.go deleted file mode 100644 index ce5c71812fd..00000000000 --- a/provider/simple/reprovide.go +++ /dev/null @@ -1,225 +0,0 @@ -package simple - -import ( - "context" - "fmt" - "time" - - backoff "github.com/cenkalti/backoff" - cid "github.com/ipfs/go-cid" - cidutil "github.com/ipfs/go-cidutil" - blocks "github.com/ipfs/go-ipfs-blockstore" - pin "github.com/ipfs/go-ipfs/pin" - ipld "github.com/ipfs/go-ipld-format" - logging "github.com/ipfs/go-log" - merkledag "github.com/ipfs/go-merkledag" - verifcid "github.com/ipfs/go-verifcid" - routing "github.com/libp2p/go-libp2p-core/routing" -) - -var logR = logging.Logger("reprovider.simple") - -//KeyChanFunc is function streaming CIDs to pass to content routing -type KeyChanFunc func(context.Context) (<-chan cid.Cid, error) -type doneFunc func(error) - -// Reprovider reannounces blocks to the network -type Reprovider struct { - ctx context.Context - trigger chan doneFunc - - // The routing system to provide values through - rsys routing.ContentRouting - - keyProvider KeyChanFunc - - tick time.Duration -} - -// NewReprovider creates new Reprovider instance. -func NewReprovider(ctx context.Context, reprovideIniterval time.Duration, rsys routing.ContentRouting, keyProvider KeyChanFunc) *Reprovider { - return &Reprovider{ - ctx: ctx, - trigger: make(chan doneFunc), - - rsys: rsys, - keyProvider: keyProvider, - tick: reprovideIniterval, - } -} - -// Close the reprovider -func (rp *Reprovider) Close() error { - return nil -} - -// Run re-provides keys with 'tick' interval or when triggered -func (rp *Reprovider) Run() { - // dont reprovide immediately. - // may have just started the daemon and shutting it down immediately. - // probability( up another minute | uptime ) increases with uptime. - after := time.After(time.Minute) - var done doneFunc - for { - if rp.tick == 0 { - after = make(chan time.Time) - } - - select { - case <-rp.ctx.Done(): - return - case done = <-rp.trigger: - case <-after: - } - - //'mute' the trigger channel so when `ipfs bitswap reprovide` is called - //a 'reprovider is already running' error is returned - unmute := rp.muteTrigger() - - err := rp.Reprovide() - if err != nil { - logR.Debug(err) - } - - if done != nil { - done(err) - } - - unmute() - - after = time.After(rp.tick) - } -} - -// Reprovide registers all keys given by rp.keyProvider to libp2p content routing -func (rp *Reprovider) Reprovide() error { - keychan, err := rp.keyProvider(rp.ctx) - if err != nil { - return fmt.Errorf("failed to get key chan: %s", err) - } - for c := range keychan { - // hash security - if err := verifcid.ValidateCid(c); err != nil { - logR.Errorf("insecure hash in reprovider, %s (%s)", c, err) - continue - } - op := func() error { - err := rp.rsys.Provide(rp.ctx, c, true) - if err != nil { - logR.Debugf("Failed to provide key: %s", err) - } - return err - } - - // TODO: this backoff library does not respect our context, we should - // eventually work contexts into it. low priority. - err := backoff.Retry(op, backoff.NewExponentialBackOff()) - if err != nil { - logR.Debugf("Providing failed after number of retries: %s", err) - return err - } - } - return nil -} - -// Trigger starts reprovision process in rp.Run and waits for it -func (rp *Reprovider) Trigger(ctx context.Context) error { - progressCtx, done := context.WithCancel(ctx) - - var err error - df := func(e error) { - err = e - done() - } - - select { - case <-rp.ctx.Done(): - return context.Canceled - case <-ctx.Done(): - return context.Canceled - case rp.trigger <- df: - <-progressCtx.Done() - return err - } -} - -func (rp *Reprovider) muteTrigger() context.CancelFunc { - ctx, cf := context.WithCancel(rp.ctx) - go func() { - defer cf() - for { - select { - case <-ctx.Done(): - return - case done := <-rp.trigger: - done(fmt.Errorf("reprovider is already running")) - } - } - }() - - return cf -} - -// Strategies - -// NewBlockstoreProvider returns key provider using bstore.AllKeysChan -func NewBlockstoreProvider(bstore blocks.Blockstore) KeyChanFunc { - return func(ctx context.Context) (<-chan cid.Cid, error) { - return bstore.AllKeysChan(ctx) - } -} - -// NewPinnedProvider returns provider supplying pinned keys -func NewPinnedProvider(onlyRoots bool) func(pin.Pinner, ipld.DAGService) KeyChanFunc { - return func(pinning pin.Pinner, dag ipld.DAGService) KeyChanFunc { - return func(ctx context.Context) (<-chan cid.Cid, error) { - set, err := pinSet(ctx, pinning, dag, onlyRoots) - if err != nil { - return nil, err - } - - outCh := make(chan cid.Cid) - go func() { - defer close(outCh) - for c := range set.New { - select { - case <-ctx.Done(): - return - case outCh <- c: - } - } - - }() - - return outCh, nil - } - } -} - -func pinSet(ctx context.Context, pinning pin.Pinner, dag ipld.DAGService, onlyRoots bool) (*cidutil.StreamingSet, error) { - set := cidutil.NewStreamingSet() - - go func() { - ctx, cancel := context.WithCancel(ctx) - defer cancel() - defer close(set.New) - - for _, key := range pinning.DirectKeys() { - set.Visitor(ctx)(key) - } - - for _, key := range pinning.RecursiveKeys() { - set.Visitor(ctx)(key) - - if !onlyRoots { - err := merkledag.EnumerateChildren(ctx, merkledag.GetLinksWithDAG(dag), key, set.Visitor(ctx)) - if err != nil { - logR.Errorf("reprovide indirect pins: %s", err) - return - } - } - } - }() - - return set, nil -} diff --git a/provider/simple/reprovide_test.go b/provider/simple/reprovide_test.go deleted file mode 100644 index e9925e55ec8..00000000000 --- a/provider/simple/reprovide_test.go +++ /dev/null @@ -1,61 +0,0 @@ -package simple_test - -import ( - "context" - "testing" - "time" - - blocks "github.com/ipfs/go-block-format" - ds "github.com/ipfs/go-datastore" - dssync "github.com/ipfs/go-datastore/sync" - "github.com/ipfs/go-ipfs-blockstore" - mock "github.com/ipfs/go-ipfs-routing/mock" - peer "github.com/libp2p/go-libp2p-core/peer" - testutil "github.com/libp2p/go-libp2p-testing/net" - - . "github.com/ipfs/go-ipfs/provider/simple" -) - -func TestReprovide(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - mrserv := mock.NewServer() - - idA := testutil.RandIdentityOrFatal(t) - idB := testutil.RandIdentityOrFatal(t) - - clA := mrserv.Client(idA) - clB := mrserv.Client(idB) - - bstore := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) - - blk := blocks.NewBlock([]byte("this is a test")) - err := bstore.Put(blk) - if err != nil { - t.Fatal(err) - } - - keyProvider := NewBlockstoreProvider(bstore) - reprov := NewReprovider(ctx, time.Hour, clA, keyProvider) - err = reprov.Reprovide() - if err != nil { - t.Fatal(err) - } - - var providers []peer.AddrInfo - maxProvs := 100 - - provChan := clB.FindProvidersAsync(ctx, blk.Cid(), maxProvs) - for p := range provChan { - providers = append(providers, p) - } - - if len(providers) == 0 { - t.Fatal("Should have gotten a provider") - } - - if providers[0].ID != idA.ID() { - t.Fatal("Somehow got the wrong peer back as a provider.") - } -} diff --git a/provider/system.go b/provider/system.go deleted file mode 100644 index b3e17ee40c5..00000000000 --- a/provider/system.go +++ /dev/null @@ -1,59 +0,0 @@ -package provider - -import ( - "context" - "github.com/ipfs/go-cid" -) - -// System defines the interface for interacting with the value -// provider system -type System interface { - Run() - Close() error - Provide(cid.Cid) error - Reprovide(context.Context) error -} - -type system struct { - provider Provider - reprovider Reprovider -} - -// NewSystem constructs a new provider system from a provider and reprovider -func NewSystem(provider Provider, reprovider Reprovider) System { - return &system{provider, reprovider} -} - -// Run the provider system by running the provider and reprovider -func (s *system) Run() { - go s.provider.Run() - go s.reprovider.Run() -} - -// Close the provider and reprovider -func (s *system) Close() error { - var errs []error - - if err := s.provider.Close(); err != nil { - errs = append(errs, err) - } - - if err := s.reprovider.Close(); err != nil { - errs = append(errs, err) - } - - if len(errs) > 0 { - return errs[0] - } - return nil -} - -// Provide a value -func (s *system) Provide(cid cid.Cid) error { - return s.provider.Provide(cid) -} - -// Reprovide all the previously provided values -func (s *system) Reprovide(ctx context.Context) error { - return s.reprovider.Trigger(ctx) -}