diff --git a/core/builder.go b/core/builder.go index 4de4595cd02..4a04551c35b 100644 --- a/core/builder.go +++ b/core/builder.go @@ -14,6 +14,7 @@ import ( dag "github.com/ipfs/go-ipfs/merkledag" resolver "github.com/ipfs/go-ipfs/path/resolver" pin "github.com/ipfs/go-ipfs/pin" + providers "github.com/ipfs/go-ipfs/providers" repo "github.com/ipfs/go-ipfs/repo" cfg "github.com/ipfs/go-ipfs/repo/config" "github.com/ipfs/go-ipfs/thirdparty/verifbs" @@ -237,7 +238,9 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error { return err } } else { + n.SetupOfflineRouting() n.Exchange = offline.Exchange(n.Blockstore) + n.Providers = providers.NewProviders(n.ctx, n.Routing, nil) } n.Blocks = bserv.New(n.Blockstore, n.Exchange) diff --git a/core/commands/add.go b/core/commands/add.go index 1fee487934b..3d19d0ed84b 100644 --- a/core/commands/add.go +++ b/core/commands/add.go @@ -313,7 +313,7 @@ You can now check what blocks have been created by: } // copy intermediary nodes from editor to our actual dagservice - _, err := fileAdder.Finalize() + nd, err := fileAdder.Finalize() if err != nil { return err } @@ -322,6 +322,12 @@ You can now check what blocks have been created by: return nil } + if !local { + if err := n.Providers.ProvideRecursive(req.Context, nd, dserv); err != nil { + return err + } + } + return fileAdder.PinRoot() } diff --git a/core/commands/bitswap.go b/core/commands/bitswap.go index eecc0a8cd9a..0243cf0b870 100644 --- a/core/commands/bitswap.go +++ b/core/commands/bitswap.go @@ -173,7 +173,6 @@ var bitswapStatCmd = &cmds.Command{ } fmt.Fprintln(w, "bitswap status") - fmt.Fprintf(w, "\tprovides buffer: %d / %d\n", out.ProvideBufLen, bitswap.HasBlockBufferSize) fmt.Fprintf(w, "\tblocks received: %d\n", out.BlocksReceived) fmt.Fprintf(w, "\tblocks sent: %d\n", out.BlocksSent) fmt.Fprintf(w, "\tdata received: %d\n", out.DataReceived) diff --git a/core/commands/block.go b/core/commands/block.go index 4873cb20a53..c148cc594cb 100644 --- a/core/commands/block.go +++ b/core/commands/block.go @@ -222,6 +222,11 @@ than 'sha2-256' or format to anything other than 'v0' will result in CIDv1. return } + if err := n.Providers.Provide(b.Cid()); err != nil { + log.Error("BlockPut key: '%q'", err) + return + } + err = cmds.EmitOnce(res, &BlockStat{ Key: b.Cid().String(), Size: len(data), diff --git a/core/commands/dag/dag.go b/core/commands/dag/dag.go index be33fa7104d..514037854fd 100644 --- a/core/commands/dag/dag.go +++ b/core/commands/dag/dag.go @@ -154,7 +154,9 @@ into an object of the specified format. } } - return nil + return cids.ForEach(func(c *cid.Cid) error { + return n.Providers.Provide(c) + }) } go func() { diff --git a/core/commands/object/object.go b/core/commands/object/object.go index 5f9675645f9..953960bda7d 100644 --- a/core/commands/object/object.go +++ b/core/commands/object/object.go @@ -512,6 +512,12 @@ Available templates: res.SetError(err, cmdkit.ErrNormal) return } + + if err = n.Providers.Provide(node.Cid()); err != nil { + res.SetError(err, cmdkit.ErrNormal) + return + } + res.SetOutput(&Object{Hash: node.Cid().String()}) }, Marshalers: oldcmds.MarshalerMap{ @@ -610,6 +616,11 @@ func objectPut(ctx context.Context, n *core.IpfsNode, input io.Reader, encoding return nil, err } + err = n.Providers.Provide(dagnode.Cid()) + if err != nil { + return nil, err + } + return dagnode.Cid(), nil } diff --git a/core/commands/object/patch.go b/core/commands/object/patch.go index 33f25c98e42..b08690236f7 100644 --- a/core/commands/object/patch.go +++ b/core/commands/object/patch.go @@ -118,6 +118,12 @@ the limit will not be respected by the network. return } + err = nd.Providers.Provide(rtpb.Cid()) + if err != nil { + re.SetError(err, cmdkit.ErrNormal) + return + } + cmds.EmitOnce(re, &Object{Hash: rtpb.Cid().String()}) }, Type: Object{}, @@ -189,6 +195,12 @@ Example: return } + err = nd.Providers.Provide(rtpb.Cid()) + if err != nil { + res.SetError(err, cmdkit.ErrNormal) + return + } + res.SetOutput(&Object{Hash: rtpb.Cid().String()}) }, Type: Object{}, @@ -251,6 +263,12 @@ Removes a link by the given name from root. nc := nnode.Cid() + err = nd.Providers.Provide(nc) + if err != nil { + res.SetError(err, cmdkit.ErrNormal) + return + } + res.SetOutput(&Object{Hash: nc.String()}) }, Type: Object{}, @@ -348,6 +366,12 @@ to a file containing 'bar', and returns the hash of the new object. nc := nnode.Cid() + err = nd.Providers.Provide(nc) + if err != nil { + res.SetError(err, cmdkit.ErrNormal) + return + } + res.SetOutput(&Object{Hash: nc.String()}) }, Type: Object{}, diff --git a/core/commands/pubsub.go b/core/commands/pubsub.go index fbbc0b05731..5ea8f5964a2 100644 --- a/core/commands/pubsub.go +++ b/core/commands/pubsub.go @@ -107,6 +107,10 @@ This command outputs data in the following encodings: log.Error("pubsub discovery: ", err) return } + if err := n.Providers.Provide(blk.Cid()); err != nil { + log.Error("pubsub discovery: ", err) + return + } connectToPubSubPeers(req.Context, n, blk.Cid()) }() diff --git a/core/commands/tar.go b/core/commands/tar.go index 3813ff8c94c..5603fff69d7 100644 --- a/core/commands/tar.go +++ b/core/commands/tar.go @@ -59,6 +59,12 @@ represent it. c := node.Cid() + err = nd.Providers.Provide(c) + if err != nil { + res.SetError(err, cmdkit.ErrNormal) + return + } + fi.FileName() res.SetOutput(&coreunix.AddedObject{ Name: fi.FileName(), diff --git a/core/core.go b/core/core.go index c267aad5283..952d0c2ce6b 100644 --- a/core/core.go +++ b/core/core.go @@ -33,6 +33,7 @@ import ( p2p "github.com/ipfs/go-ipfs/p2p" "github.com/ipfs/go-ipfs/path/resolver" pin "github.com/ipfs/go-ipfs/pin" + provider "github.com/ipfs/go-ipfs/providers" repo "github.com/ipfs/go-ipfs/repo" config "github.com/ipfs/go-ipfs/repo/config" ft "github.com/ipfs/go-ipfs/unixfs" @@ -129,6 +130,7 @@ type IpfsNode struct { PeerHost p2phost.Host // the network host (server+client) Bootstrapper io.Closer // the periodic bootstrapper Routing routing.IpfsRouting // the routing system. recommend ipfs-dht + Providers provider.Interface // the content routing abstraction layer Exchange exchange.Interface // the block exchange + strategy (bitswap) Namesys namesys.NameSystem // the name system, resolves paths to hashes Ping *ping.PingService @@ -493,8 +495,11 @@ func (n *IpfsNode) startOnlineServicesWithHost(ctx context.Context, host p2phost // Wrap standard peer host with routing system to allow unknown peer lookups n.PeerHost = rhost.Wrap(host, n.Routing) + // Wrap content routing with a buffering layer + n.Providers = provider.NewProviders(ctx, n.Routing, n.PeerHost) + // setup exchange service - bitswapNetwork := bsnet.NewFromIpfsHost(n.PeerHost, n.Routing) + bitswapNetwork := bsnet.NewFromIpfsHost(n.PeerHost, n.Providers) n.Exchange = bitswap.New(ctx, bitswapNetwork, n.Blockstore) size, err := n.getCacheSize() diff --git a/core/corehttp/gateway_handler.go b/core/corehttp/gateway_handler.go index e2fd4803e9e..fd4bccaf77c 100644 --- a/core/corehttp/gateway_handler.go +++ b/core/corehttp/gateway_handler.go @@ -504,6 +504,11 @@ func (i *gatewayHandler) putHandler(w http.ResponseWriter, r *http.Request) { return } + if err := i.node.Providers.Provide(newcid); err != nil { + webError(w, "putHandler: Could not provide newnode: ", err, http.StatusInternalServerError) + return + } + i.addUserHeaders(w) // ok, _now_ write user's headers. w.Header().Set("IPFS-Hash", newcid.String()) http.Redirect(w, r, gopath.Join(ipfsPathPrefix, newcid.String(), newPath), http.StatusCreated) @@ -560,6 +565,11 @@ func (i *gatewayHandler) deleteHandler(w http.ResponseWriter, r *http.Request) { return } + if err := i.node.Providers.Provide(c); err != nil { + webError(w, "Could not provide node", err, http.StatusInternalServerError) + return + } + pathpb, ok := pathNodes[j].(*dag.ProtoNode) if !ok { webError(w, "Cannot read non protobuf nodes through gateway", dag.ErrNotProtobuf, http.StatusBadRequest) @@ -581,6 +591,11 @@ func (i *gatewayHandler) deleteHandler(w http.ResponseWriter, r *http.Request) { // Redirect to new path ncid := newnode.Cid() + if err := i.node.Providers.Provide(ncid); err != nil { + webError(w, "Could not provide node", err, http.StatusInternalServerError) + return + } + i.addUserHeaders(w) // ok, _now_ write user's headers. w.Header().Set("IPFS-Hash", ncid.String()) http.Redirect(w, r, gopath.Join(ipfsPathPrefix+ncid.String(), path.Join(components[:len(components)-1])), http.StatusCreated) diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index 480b65aed83..fe97f81f665 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -16,7 +16,6 @@ import ( notifications "github.com/ipfs/go-ipfs/exchange/bitswap/notifications" delay "gx/ipfs/QmRJVNatYJwTAHgdSM1Xef9QVQ1Ch3XHdmcrykjP5Y4soL/go-ipfs-delay" - flags "gx/ipfs/QmRMGdC6HKdLsPDABL9aXPDidrpmEHzJqFWSvshkbn9Hj8/go-ipfs-flags" metrics "gx/ipfs/QmRg1gKTHzc3CZXSKzem8aR4E3TubFhbgXwfVuWnSK5CC5/go-metrics-interface" process "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess" procctx "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess/context" @@ -31,35 +30,15 @@ import ( var log = logging.Logger("bitswap") const ( - // maxProvidersPerRequest specifies the maximum number of providers desired - // from the network. This value is specified because the network streams - // results. - // TODO: if a 'non-nice' strategy is implemented, consider increasing this value - maxProvidersPerRequest = 3 - providerRequestTimeout = time.Second * 10 - provideTimeout = time.Second * 15 - sizeBatchRequestChan = 32 // kMaxPriority is the max priority as defined by the bitswap protocol kMaxPriority = math.MaxInt32 ) var ( - HasBlockBufferSize = 256 - provideKeysBufferSize = 2048 - provideWorkerMax = 512 - // the 1<<18+15 is to observe old file chunks that are 1<<18 + 14 in size metricsBuckets = []float64{1 << 6, 1 << 10, 1 << 14, 1 << 18, 1<<18 + 15, 1 << 22} ) -func init() { - if flags.LowMemMode { - HasBlockBufferSize = 64 - provideKeysBufferSize = 512 - provideWorkerMax = 16 - } -} - var rebroadcastDelay = delay.Fixed(time.Minute) // New initializes a BitSwap instance that communicates over the provided @@ -94,10 +73,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, notifications: notif, engine: decision.NewEngine(ctx, bstore), // TODO close the engine with Close() method network: network, - findKeys: make(chan *blockRequest, sizeBatchRequestChan), process: px, - newBlocks: make(chan *cid.Cid, HasBlockBufferSize), - provideKeys: make(chan *cid.Cid, provideKeysBufferSize), wm: NewWantManager(ctx, network), counters: new(counters), @@ -141,15 +117,6 @@ type Bitswap struct { // appropriate user requests notifications notifications.PubSub - // findKeys sends keys to a worker to find and connect to providers for them - findKeys chan *blockRequest - // newBlocks is a channel for newly added blocks to be provided to the - // network. blocks pushed down this channel get buffered and fed to the - // provideKeys channel later on to avoid too much network activity - newBlocks chan *cid.Cid - // provideKeys directly feeds provide workers - provideKeys chan *cid.Cid - process process.Process // Counters for various statistics @@ -178,11 +145,6 @@ type counters struct { messagesRecvd uint64 } -type blockRequest struct { - Cid *cid.Cid - Ctx context.Context -} - // GetBlock attempts to retrieve a particular block from peers within the // deadline enforced by the context. func (bs *Bitswap) GetBlock(parent context.Context, k *cid.Cid) (blocks.Block, error) { @@ -230,14 +192,6 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []*cid.Cid) (<-chan block bs.wm.WantBlocks(ctx, keys, nil, mses) - // NB: Optimization. Assumes that providers of key[0] are likely to - // be able to provide for all keys. This currently holds true in most - // every situation. Later, this assumption may not hold as true. - req := &blockRequest{ - Cid: keys[0], - Ctx: ctx, - } - remaining := cid.NewSet() for _, k := range keys { remaining.Add(k) @@ -272,12 +226,13 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []*cid.Cid) (<-chan block } }() - select { - case bs.findKeys <- req: - return out, nil - case <-ctx.Done(): - return nil, ctx.Err() + // NB: Optimization. Assumes that providers of key[0] are likely to + // be able to provide for all keys. This currently holds true in most + // every situation. Later, this assumption may not hold as true. + if err := bs.network.FindProviders(ctx, keys[0]); err != nil { + return nil, err } + return out, nil } func (bs *Bitswap) getNextSessionID() uint64 { @@ -298,6 +253,7 @@ func (bs *Bitswap) CancelWants(cids []*cid.Cid, ses uint64) { // HasBlock announces the existence of a block to this bitswap service. The // service will potentially notify its peers. func (bs *Bitswap) HasBlock(blk blocks.Block) error { + //TODO: call provide here? return bs.receiveBlockFrom(blk, "") } @@ -333,13 +289,6 @@ func (bs *Bitswap) receiveBlockFrom(blk blocks.Block, from peer.ID) error { } bs.engine.AddBlock(blk) - - select { - case bs.newBlocks <- blk.Cid(): - // send block off to be reprovided - case <-bs.process.Closing(): - return bs.process.Close() - } return nil } @@ -385,6 +334,9 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg if err := bs.receiveBlockFrom(b, p); err != nil { log.Warningf("ReceiveMessage recvBlockFrom error: %s", err) } + if err := bs.network.Provide(ctx, b.Cid()); err != nil { + log.Warningf("ReceiveMessage Provide error: %s", err) + } log.Event(ctx, "Bitswap.GetBlockRequest.End", b.Cid()) }(block) } diff --git a/exchange/bitswap/network/interface.go b/exchange/bitswap/network/interface.go index 96eb6614225..642d14d7340 100644 --- a/exchange/bitswap/network/interface.go +++ b/exchange/bitswap/network/interface.go @@ -65,6 +65,9 @@ type Routing interface { // FindProvidersAsync returns a channel of providers for the given key FindProvidersAsync(context.Context, *cid.Cid, int) <-chan peer.ID + // FindProvidersfinds providers for the given key and connects to them + FindProviders(ctx context.Context, c *cid.Cid) error + // Provide provides the key to the network Provide(context.Context, *cid.Cid) error } diff --git a/exchange/bitswap/network/ipfs_impl.go b/exchange/bitswap/network/ipfs_impl.go index 9df94e6e61f..066e6ba987c 100644 --- a/exchange/bitswap/network/ipfs_impl.go +++ b/exchange/bitswap/network/ipfs_impl.go @@ -7,9 +7,9 @@ import ( "time" bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message" + providers "github.com/ipfs/go-ipfs/providers" host "gx/ipfs/QmQQGtcp6nVUrQjNsnU53YWV1q8fK1Kd9S7FEkYbRZzxry/go-libp2p-host" - routing "gx/ipfs/QmUV9hDAAyjeGbxbXkJ2sYqZ6dTd1DXJ2REhYEkRm178Tg/go-libp2p-routing" ma "gx/ipfs/QmUxSEGbv2nmYNnfXi7839wwQqTN3kwQeUxe8dTjZWZs7J/go-multiaddr" peer "gx/ipfs/QmVf8hTAsLLFtn4WPCRNdnaF2Eag2qTBS6uR8AiHPZARXy/go-libp2p-peer" inet "gx/ipfs/QmXdgNhVEgjLxjUoMs5ViQL7pboAt3Y7V7eGHRiE4qrmTE/go-libp2p-net" @@ -25,10 +25,10 @@ var log = logging.Logger("bitswap_network") var sendMessageTimeout = time.Minute * 10 // NewFromIpfsHost returns a BitSwapNetwork supported by underlying IPFS host -func NewFromIpfsHost(host host.Host, r routing.ContentRouting) BitSwapNetwork { +func NewFromIpfsHost(host host.Host, p providers.Interface) BitSwapNetwork { bitswapNetwork := impl{ - host: host, - routing: r, + host: host, + providers: p, } host.SetStreamHandler(ProtocolBitswap, bitswapNetwork.handleNewStream) host.SetStreamHandler(ProtocolBitswapOne, bitswapNetwork.handleNewStream) @@ -42,8 +42,8 @@ func NewFromIpfsHost(host host.Host, r routing.ContentRouting) BitSwapNetwork { // impl transforms the ipfs network interface, which sends and receives // NetMessage objects, into the bitswap network interface. type impl struct { - host host.Host - routing routing.ContentRouting + host host.Host + providers providers.Interface // inbound messages from the network are forwarded to the receiver receiver Receiver @@ -136,47 +136,6 @@ func (bsnet *impl) ConnectTo(ctx context.Context, p peer.ID) error { return bsnet.host.Connect(ctx, pstore.PeerInfo{ID: p}) } -// FindProvidersAsync returns a channel of providers for the given key -func (bsnet *impl) FindProvidersAsync(ctx context.Context, k *cid.Cid, max int) <-chan peer.ID { - - // Since routing queries are expensive, give bitswap the peers to which we - // have open connections. Note that this may cause issues if bitswap starts - // precisely tracking which peers provide certain keys. This optimization - // would be misleading. In the long run, this may not be the most - // appropriate place for this optimization, but it won't cause any harm in - // the short term. - connectedPeers := bsnet.host.Network().Peers() - out := make(chan peer.ID, len(connectedPeers)) // just enough buffer for these connectedPeers - for _, id := range connectedPeers { - if id == bsnet.host.ID() { - continue // ignore self as provider - } - out <- id - } - - go func() { - defer close(out) - providers := bsnet.routing.FindProvidersAsync(ctx, k, max) - for info := range providers { - if info.ID == bsnet.host.ID() { - continue // ignore self as provider - } - bsnet.host.Peerstore().AddAddrs(info.ID, info.Addrs, pstore.TempAddrTTL) - select { - case <-ctx.Done(): - return - case out <- info.ID: - } - } - }() - return out -} - -// Provide provides the key to the network -func (bsnet *impl) Provide(ctx context.Context, k *cid.Cid) error { - return bsnet.routing.Provide(ctx, k, true) -} - // handleNewStream receives a new stream from the network. func (bsnet *impl) handleNewStream(s inet.Stream) { defer s.Close() @@ -205,6 +164,19 @@ func (bsnet *impl) handleNewStream(s inet.Stream) { } } +// FindProvidersAsync returns a channel of providers for the given key +func (bsnet *impl) FindProvidersAsync(ctx context.Context, k *cid.Cid, max int) <-chan peer.ID { + return bsnet.providers.FindProvidersAsync(ctx, k, max) +} + +func (bsnet *impl) FindProviders(ctx context.Context, k *cid.Cid) error { + return bsnet.providers.FindProviders(ctx, k) +} + +func (bsnet *impl) Provide(ctx context.Context, k *cid.Cid) error { + return bsnet.providers.Provide(k) +} + func (bsnet *impl) ConnectionManager() ifconnmgr.ConnManager { return bsnet.host.ConnManager() } diff --git a/exchange/bitswap/stat.go b/exchange/bitswap/stat.go index 85390475de8..8fc7d6cb65b 100644 --- a/exchange/bitswap/stat.go +++ b/exchange/bitswap/stat.go @@ -7,7 +7,6 @@ import ( ) type Stat struct { - ProvideBufLen int Wantlist []*cid.Cid Peers []string BlocksReceived uint64 @@ -20,7 +19,6 @@ type Stat struct { func (bs *Bitswap) Stat() (*Stat, error) { st := new(Stat) - st.ProvideBufLen = len(bs.newBlocks) st.Wantlist = bs.GetWantlist() bs.counterLk.Lock() c := bs.counters diff --git a/exchange/bitswap/testnet/peernet.go b/exchange/bitswap/testnet/peernet.go index 9b51a0de411..d5032c8ed50 100644 --- a/exchange/bitswap/testnet/peernet.go +++ b/exchange/bitswap/testnet/peernet.go @@ -4,6 +4,7 @@ import ( "context" bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network" + pr "github.com/ipfs/go-ipfs/providers" testutil "gx/ipfs/QmPdxCaVp4jZ9RbxqZADvKH6kiCR5jHvdR5f2ycjAY6T2a/go-testutil" mockpeernet "gx/ipfs/QmUEAR2pS7fP1GPseS3i8MWFyENs7oDp4CZrgn8FCjbsBu/go-libp2p/p2p/net/mock" @@ -27,7 +28,9 @@ func (pn *peernet) Adapter(p testutil.Identity) bsnet.BitSwapNetwork { panic(err.Error()) } routing := pn.routingserver.ClientWithDatastore(context.TODO(), p, ds.NewMapDatastore()) - return bsnet.NewFromIpfsHost(client, routing) + providers := pr.NewProviders(context.TODO(), routing, client) + + return bsnet.NewFromIpfsHost(client, providers) } func (pn *peernet) HasPeer(p peer.ID) bool { diff --git a/exchange/bitswap/testnet/virtual.go b/exchange/bitswap/testnet/virtual.go index bec775847a3..08a638853a2 100644 --- a/exchange/bitswap/testnet/virtual.go +++ b/exchange/bitswap/testnet/virtual.go @@ -8,6 +8,7 @@ import ( bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message" bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network" + providers "github.com/ipfs/go-ipfs/providers" testutil "gx/ipfs/QmPdxCaVp4jZ9RbxqZADvKH6kiCR5jHvdR5f2ycjAY6T2a/go-testutil" delay "gx/ipfs/QmRJVNatYJwTAHgdSM1Xef9QVQ1Ch3XHdmcrykjP5Y4soL/go-ipfs-delay" @@ -152,6 +153,11 @@ func (nc *networkClient) FindProvidersAsync(ctx context.Context, k *cid.Cid, max return out } +func (nc *networkClient) FindProviders(ctx context.Context, k *cid.Cid) error { + nc.FindProvidersAsync(ctx, k, providers.MaxProvidersPerRequest) + return nil +} + func (nc *networkClient) ConnectionManager() ifconnmgr.ConnManager { return &ifconnmgr.NullConnMgr{} } diff --git a/exchange/bitswap/testutils.go b/exchange/bitswap/testutils.go index ce141ab6db0..991850730ed 100644 --- a/exchange/bitswap/testutils.go +++ b/exchange/bitswap/testutils.go @@ -17,8 +17,7 @@ import ( ) // WARNING: this uses RandTestBogusIdentity DO NOT USE for NON TESTS! -func NewTestSessionGenerator( - net tn.Network) SessionGenerator { +func NewTestSessionGenerator(net tn.Network) SessionGenerator { ctx, cancel := context.WithCancel(context.Background()) return SessionGenerator{ net: net, diff --git a/exchange/bitswap/workers.go b/exchange/bitswap/workers.go index f96fc3ba33b..09bbac1c5a2 100644 --- a/exchange/bitswap/workers.go +++ b/exchange/bitswap/workers.go @@ -3,26 +3,17 @@ package bitswap import ( "context" "math/rand" - "sync" "time" bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message" process "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess" - procctx "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess/context" - peer "gx/ipfs/QmVf8hTAsLLFtn4WPCRNdnaF2Eag2qTBS6uR8AiHPZARXy/go-libp2p-peer" - cid "gx/ipfs/QmapdYm1b22Frv3k17fqrBYTFRxwiaVJkB299Mfn33edeB/go-cid" logging "gx/ipfs/Qmbi1CTJsbnBZjCEgc2otwu8cUFPsGpzWXG7edVCLZ7Gvk/go-log" ) var TaskWorkerCount = 8 func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) { - // Start up a worker to handle block requests this node is making - px.Go(func(px process.Process) { - bs.providerQueryManager(ctx) - }) - // Start up workers to handle requests from other nodes for the data on this node for i := 0; i < TaskWorkerCount; i++ { i := i @@ -35,16 +26,6 @@ func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) { px.Go(func(px process.Process) { bs.rebroadcastWorker(ctx) }) - - // Start up a worker to manage sending out provides messages - px.Go(func(px process.Process) { - bs.provideCollector(ctx) - }) - - // Spawn up multiple workers to handle incoming blocks - // consider increasing number if providing blocks bottlenecks - // file transfers - px.Go(bs.provideWorker) } func (bs *Bitswap) taskWorker(ctx context.Context, id int) { @@ -87,85 +68,6 @@ func (bs *Bitswap) taskWorker(ctx context.Context, id int) { } } -func (bs *Bitswap) provideWorker(px process.Process) { - - limit := make(chan struct{}, provideWorkerMax) - - limitedGoProvide := func(k *cid.Cid, wid int) { - defer func() { - // replace token when done - <-limit - }() - ev := logging.LoggableMap{"ID": wid} - - ctx := procctx.OnClosingContext(px) // derive ctx from px - defer log.EventBegin(ctx, "Bitswap.ProvideWorker.Work", ev, k).Done() - - ctx, cancel := context.WithTimeout(ctx, provideTimeout) // timeout ctx - defer cancel() - - if err := bs.network.Provide(ctx, k); err != nil { - log.Warning(err) - } - } - - // worker spawner, reads from bs.provideKeys until it closes, spawning a - // _ratelimited_ number of workers to handle each key. - for wid := 2; ; wid++ { - ev := logging.LoggableMap{"ID": 1} - log.Event(procctx.OnClosingContext(px), "Bitswap.ProvideWorker.Loop", ev) - - select { - case <-px.Closing(): - return - case k, ok := <-bs.provideKeys: - if !ok { - log.Debug("provideKeys channel closed") - return - } - select { - case <-px.Closing(): - return - case limit <- struct{}{}: - go limitedGoProvide(k, wid) - } - } - } -} - -func (bs *Bitswap) provideCollector(ctx context.Context) { - defer close(bs.provideKeys) - var toProvide []*cid.Cid - var nextKey *cid.Cid - var keysOut chan *cid.Cid - - for { - select { - case blkey, ok := <-bs.newBlocks: - if !ok { - log.Debug("newBlocks channel closed") - return - } - - if keysOut == nil { - nextKey = blkey - keysOut = bs.provideKeys - } else { - toProvide = append(toProvide, blkey) - } - case keysOut <- nextKey: - if len(toProvide) > 0 { - nextKey = toProvide[0] - toProvide = toProvide[1:] - } else { - keysOut = nil - } - case <-ctx.Done(): - return - } - } -} - func (bs *Bitswap) rebroadcastWorker(parent context.Context) { ctx, cancel := context.WithCancel(parent) defer cancel() @@ -194,60 +96,9 @@ func (bs *Bitswap) rebroadcastWorker(parent context.Context) { // TODO: come up with a better strategy for determining when to search // for new providers for blocks. i := rand.Intn(len(entries)) - bs.findKeys <- &blockRequest{ - Cid: entries[i].Cid, - Ctx: ctx, - } + bs.network.FindProviders(ctx, entries[i].Cid) case <-parent.Done(): return } } } - -func (bs *Bitswap) providerQueryManager(ctx context.Context) { - var activeLk sync.Mutex - kset := cid.NewSet() - - for { - select { - case e := <-bs.findKeys: - select { // make sure its not already cancelled - case <-e.Ctx.Done(): - continue - default: - } - - activeLk.Lock() - if kset.Has(e.Cid) { - activeLk.Unlock() - continue - } - kset.Add(e.Cid) - activeLk.Unlock() - - go func(e *blockRequest) { - child, cancel := context.WithTimeout(e.Ctx, providerRequestTimeout) - defer cancel() - providers := bs.network.FindProvidersAsync(child, e.Cid, maxProvidersPerRequest) - wg := &sync.WaitGroup{} - for p := range providers { - wg.Add(1) - go func(p peer.ID) { - defer wg.Done() - err := bs.network.ConnectTo(child, p) - if err != nil { - log.Debug("failed to connect to provider %s: %s", p, err) - } - }(p) - } - wg.Wait() - activeLk.Lock() - kset.Remove(e.Cid) - activeLk.Unlock() - }(e) - - case <-ctx.Done(): - return - } - } -} diff --git a/providers/providers.go b/providers/providers.go new file mode 100644 index 00000000000..5fdd91c7fcf --- /dev/null +++ b/providers/providers.go @@ -0,0 +1,204 @@ +package providers + +import ( + "context" + "time" + + host "gx/ipfs/QmQQGtcp6nVUrQjNsnU53YWV1q8fK1Kd9S7FEkYbRZzxry/go-libp2p-host" + flags "gx/ipfs/QmRMGdC6HKdLsPDABL9aXPDidrpmEHzJqFWSvshkbn9Hj8/go-ipfs-flags" + process "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess" + procctx "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess/context" + routing "gx/ipfs/QmUV9hDAAyjeGbxbXkJ2sYqZ6dTd1DXJ2REhYEkRm178Tg/go-libp2p-routing" + peer "gx/ipfs/QmVf8hTAsLLFtn4WPCRNdnaF2Eag2qTBS6uR8AiHPZARXy/go-libp2p-peer" + ipld "gx/ipfs/QmWi2BYBL5gJ3CiAiQchg6rn1A8iBsrWy51EYxvHVjFvLb/go-ipld-format" + pstore "gx/ipfs/QmZhsmorLpD9kmQ4ynbAu4vbKv2goMUnXazwGA4gnWHDjB/go-libp2p-peerstore" + cid "gx/ipfs/QmapdYm1b22Frv3k17fqrBYTFRxwiaVJkB299Mfn33edeB/go-cid" + logging "gx/ipfs/Qmbi1CTJsbnBZjCEgc2otwu8cUFPsGpzWXG7edVCLZ7Gvk/go-log" +) + +const ( + provideTimeout = time.Second * 15 + + // MaxProvidersPerRequest specifies the maximum number of providers desired + // from the network. This value is specified because the network streams + // results. + // TODO: if a 'non-nice' strategy is implemented, consider increasing this value + MaxProvidersPerRequest = 3 + providerRequestTimeout = time.Second * 10 + + sizeBatchRequestChan = 32 +) + +var ( + provideKeysBufferSize = 2048 + + // HasBlockBufferSize is the maximum numbers of CIDs that will get buffered + // for providing + HasBlockBufferSize = 256 + + provideWorkerMax = 512 +) + +var log = logging.Logger("providers") + +type blockRequest struct { + Cid *cid.Cid + Ctx context.Context +} + +// Interface is an abstraction on top of the libp2p content routing which +// optimizes common content routing tasks +type Interface interface { + // Provide a block to the network. Calls to this method are usually + // non-blocking with back-pressure which might happen under load + Provide(k *cid.Cid) error + + // ProvideRecursive provides graph to the network. Calls to this method are + // usually non-blocking with back-pressure which might happen under load + // + // Note: only call this method with offline NodeGetter. + ProvideRecursive(ctx context.Context, n ipld.Node, serv ipld.NodeGetter) error + + FindProviders(ctx context.Context, k *cid.Cid) error + FindProvidersAsync(ctx context.Context, k *cid.Cid, max int) <-chan peer.ID + + Stat() (*Stat, error) +} + +type providers struct { + routing routing.ContentRouting + process process.Process + host host.Host + + // newBlocks is a channel for newly added blocks to be provided to the + // network. blocks pushed down this channel get buffered and fed to the + // provideKeys channel later on to avoid too much network activity + newBlocks chan *cid.Cid + + // provideKeys directly feeds provide workers + provideKeys chan *cid.Cid + + // findKeys sends keys to a worker to find and connect to providers for them + findKeys chan *blockRequest +} + +func init() { + if flags.LowMemMode { + HasBlockBufferSize = 64 + provideKeysBufferSize = 512 + provideWorkerMax = 16 + } +} + +// NewProviders returns providers interface implementation based on +// libp2p routing +func NewProviders(parent context.Context, routing routing.ContentRouting, host host.Host) Interface { + ctx, cancelFunc := context.WithCancel(parent) + + px := process.WithTeardown(func() error { + return nil + }) + + p := &providers{ + routing: routing, + process: px, + host: host, + + newBlocks: make(chan *cid.Cid, HasBlockBufferSize), + provideKeys: make(chan *cid.Cid, provideKeysBufferSize), + + findKeys: make(chan *blockRequest, sizeBatchRequestChan), + } + + p.startWorkers(ctx, px) + // bind the context and process. + // do it over here to avoid closing before all setup is done. + go func() { + <-px.Closing() // process closes first + cancelFunc() + }() + procctx.CloseAfterContext(px, ctx) // parent cancelled first + + return p +} + +func (p *providers) Provide(b *cid.Cid) error { + select { + case p.newBlocks <- b: + // send block off to be provided to the network + case <-p.process.Closing(): + return p.process.Close() + } + return nil +} + +func (p *providers) provideRecursive(ctx context.Context, n ipld.Node, serv ipld.NodeGetter, done *cid.Set) error { + p.Provide(n.Cid()) + + for _, l := range n.Links() { + if !done.Visit(l.Cid) { + continue + } + + sub, err := l.GetNode(ctx, serv) + if err != nil { + return err + } + if err := p.provideRecursive(ctx, sub, serv, done); err != nil { + return err + } + } + return nil +} + +func (p *providers) ProvideRecursive(ctx context.Context, n ipld.Node, serv ipld.NodeGetter) error { + return p.provideRecursive(ctx, n, serv, cid.NewSet()) +} + +func (p *providers) FindProviders(ctx context.Context, c *cid.Cid) error { + select { + case <-ctx.Done(): + return ctx.Err() + case p.findKeys <- &blockRequest{Ctx: ctx, Cid: c}: + return nil + } +} + +// FindProvidersAsync returns a channel of providers for the given key +func (p *providers) FindProvidersAsync(ctx context.Context, k *cid.Cid, max int) <-chan peer.ID { + if p.host == nil { + return nil + } + + // Since routing queries are expensive, give bitswap the peers to which we + // have open connections. Note that this may cause issues if bitswap starts + // precisely tracking which peers provide certain keys. This optimization + // would be misleading. In the long run, this may not be the most + // appropriate place for this optimization, but it won't cause any harm in + // the short term. + connectedPeers := p.host.Network().Peers() + out := make(chan peer.ID, len(connectedPeers)) // just enough buffer for these connectedPeers + for _, id := range connectedPeers { + if id == p.host.ID() { + continue // ignore self as provider + } + out <- id + } + + go func() { + defer close(out) + providers := p.routing.FindProvidersAsync(ctx, k, max) + for info := range providers { + if info.ID == p.host.ID() { + continue // ignore self as provider + } + p.host.Peerstore().AddAddrs(info.ID, info.Addrs, pstore.TempAddrTTL) + select { + case <-ctx.Done(): + return + case out <- info.ID: + } + } + }() + return out +} diff --git a/providers/stat.go b/providers/stat.go new file mode 100644 index 00000000000..5dad05b2318 --- /dev/null +++ b/providers/stat.go @@ -0,0 +1,13 @@ +package providers + +// Stat contains statistics about providers subsystem +type Stat struct { + ProvideBufLen int +} + +// Stat returns statistics about providers subsystem +func (p *providers) Stat() (*Stat, error) { + return &Stat{ + ProvideBufLen: len(p.newBlocks), + }, nil +} diff --git a/providers/workers.go b/providers/workers.go new file mode 100644 index 00000000000..a421bf5ebcd --- /dev/null +++ b/providers/workers.go @@ -0,0 +1,159 @@ +package providers + +import ( + "context" + "sync" + + process "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess" + procctx "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess/context" + peer "gx/ipfs/QmVf8hTAsLLFtn4WPCRNdnaF2Eag2qTBS6uR8AiHPZARXy/go-libp2p-peer" + pstore "gx/ipfs/QmZhsmorLpD9kmQ4ynbAu4vbKv2goMUnXazwGA4gnWHDjB/go-libp2p-peerstore" + cid "gx/ipfs/QmapdYm1b22Frv3k17fqrBYTFRxwiaVJkB299Mfn33edeB/go-cid" + logging "gx/ipfs/Qmbi1CTJsbnBZjCEgc2otwu8cUFPsGpzWXG7edVCLZ7Gvk/go-log" +) + +func (p *providers) startWorkers(ctx context.Context, px process.Process) { + if p.host != nil { + // Start up a worker to handle block requests this node is making + px.Go(func(px process.Process) { + p.providerQueryManager(ctx) + }) + } + + // Start up a worker to manage sending out provides messages + px.Go(func(px process.Process) { + p.provideCollector(ctx) + }) + + // Spawn up multiple workers to handle incoming blocks + // consider increasing number if providing blocks bottlenecks + // file transfers + px.Go(p.provideWorker) +} + +func (p *providers) provideWorker(px process.Process) { + + limit := make(chan struct{}, provideWorkerMax) + + limitedGoProvide := func(k *cid.Cid, wid int) { + defer func() { + // replace token when done + <-limit + }() + ev := logging.LoggableMap{"ID": wid} + + ctx := procctx.OnClosingContext(px) // derive ctx from px + defer log.EventBegin(ctx, "Bitswap.ProvideWorker.Work", ev, k).Done() + + ctx, cancel := context.WithTimeout(ctx, provideTimeout) // timeout ctx + defer cancel() + + if err := p.routing.Provide(ctx, k, true); err != nil { + log.Warning(err) + } + } + + // worker spawner, reads from bs.provideKeys until it closes, spawning a + // _ratelimited_ number of workers to handle each key. + for wid := 2; ; wid++ { + ev := logging.LoggableMap{"ID": 1} + log.Event(procctx.OnClosingContext(px), "Bitswap.ProvideWorker.Loop", ev) + + select { + case <-px.Closing(): + return + case k, ok := <-p.provideKeys: + if !ok { + log.Debug("provideKeys channel closed") + return + } + select { + case <-px.Closing(): + return + case limit <- struct{}{}: + go limitedGoProvide(k, wid) + } + } + } +} + +func (p *providers) provideCollector(ctx context.Context) { + defer close(p.provideKeys) + var toProvide []*cid.Cid + var nextKey *cid.Cid + var keysOut chan *cid.Cid + + for { + select { + case blkey, ok := <-p.newBlocks: + if !ok { + log.Debug("newBlocks channel closed") + return + } + + if keysOut == nil { + nextKey = blkey + keysOut = p.provideKeys + } else { + toProvide = append(toProvide, blkey) + } + case keysOut <- nextKey: + if len(toProvide) > 0 { + nextKey = toProvide[0] + toProvide = toProvide[1:] + } else { + keysOut = nil + } + case <-ctx.Done(): + return + } + } +} + +func (p *providers) providerQueryManager(ctx context.Context) { + var activeLk sync.Mutex + kset := cid.NewSet() + + for { + select { + case e := <-p.findKeys: + select { // make sure its not already cancelled + case <-e.Ctx.Done(): + continue + default: + } + + activeLk.Lock() + if kset.Has(e.Cid) { + activeLk.Unlock() + continue + } + kset.Add(e.Cid) + activeLk.Unlock() + + go func(e *blockRequest) { + child, cancel := context.WithTimeout(e.Ctx, providerRequestTimeout) + defer cancel() + providers := p.FindProvidersAsync(child, e.Cid, MaxProvidersPerRequest) + wg := &sync.WaitGroup{} + for pr := range providers { + wg.Add(1) + go func(pi peer.ID) { + defer wg.Done() + err := p.host.Connect(ctx, pstore.PeerInfo{ID: pi}) + if err != nil { + log.Debug("failed to connect to provider %s: %s", p, err) + } + }(pr) + } + wg.Wait() + activeLk.Lock() + kset.Remove(e.Cid) + activeLk.Unlock() + }(e) + + case <-ctx.Done(): + return + } + } +} diff --git a/test/sharness/t0175-provider.sh b/test/sharness/t0175-provider.sh new file mode 100755 index 00000000000..aec4021d7b2 --- /dev/null +++ b/test/sharness/t0175-provider.sh @@ -0,0 +1,83 @@ +#!/bin/sh + +test_description="Test provider" + +. lib/test-lib.sh + +NUM_NODES=6 + +test_expect_success 'init iptb' ' + iptb init -f -n $NUM_NODES --bootstrap=none --port=0 +' + +test_expect_success 'peer ids' ' + PEERID_0=$(iptb get id 0) && + PEERID_1=$(iptb get id 1) +' + +startup_cluster ${NUM_NODES} + +findprovs_empty() { + test_expect_success 'findprovs '$1' succeeds' ' + ipfsi 1 dht findprovs -n 1 '$1' > findprovsOut + ' + + test_expect_success "findprovs $1 output is empty" ' + test_must_be_empty findprovsOut + ' +} + +findprovs_expect() { + test_expect_success 'findprovs '$1' succeeds' ' + ipfsi 1 dht findprovs -n 1 '$1' > findprovsOut && + echo '$2' > expected + ' + + test_expect_success "findprovs $1 output looks good" ' + test_cmp findprovsOut expected + ' +} + +test_expect_success 'prepare files for ipfs add' ' + random-files -depth=2 -dirs=2 -files=4 -seed=1 d1 > /dev/null && + random-files -depth=2 -dirs=2 -files=4 -seed=2 d2 > /dev/null +' + +test_expect_success 'ipfs add files' ' + HASH_F1=$(echo 1 | ipfsi 0 add -q --local) + HASH_F2=$(echo 2 | ipfsi 0 add -q) +' + +findprovs_empty '$HASH_F1' +findprovs_expect '$HASH_F2' '$PEERID_0' + +test_expect_success 'ipfs add directories' ' + HASH_D1=$(ipfsi 0 add -qr --local d1) + HASH_D2=$(ipfsi 0 add -qr d2) +' + +findprovs_empty '$HASH_D1' +findprovs_expect '$HASH_D2' '$PEERID_0' + +test_expect_success 'ipfs block put' ' + HASH_B1=$(echo 1 | ipfsi 0 block put) +' + +findprovs_expect '$HASH_B1' '$PEERID_0' + +test_expect_success 'ipfs dag put' ' + HASH_C1=$(echo 1 | ipfsi 0 dag put) +' + +findprovs_expect '$HASH_C1' '$PEERID_0' +findprovs_empty 'QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n' + +test_expect_success 'ipfs object' ' + HASH_O1=$(ipfsi 0 object new) && + HASH_O2=$(echo "{\"data\":\"foo\"}" | ipfsi 0 object put -q) +' + +findprovs_expect '$HASH_O1' '$PEERID_0' +findprovs_expect '$HASH_O2' '$PEERID_0' + +test_done diff --git a/test/sharness/t0220-bitswap.sh b/test/sharness/t0220-bitswap.sh index fb1e3dbc72d..c43ed7586be 100755 --- a/test/sharness/t0220-bitswap.sh +++ b/test/sharness/t0220-bitswap.sh @@ -18,7 +18,6 @@ test_expect_success "'ipfs bitswap stat' succeeds" ' test_expect_success "'ipfs bitswap stat' output looks good" ' cat <expected && bitswap status - provides buffer: 0 / 256 blocks received: 0 blocks sent: 0 data received: 0 @@ -56,7 +55,6 @@ test_expect_success "'ipfs bitswap stat' succeeds" ' test_expect_success "'ipfs bitswap stat' output looks good" ' cat <expected && bitswap status - provides buffer: 0 / 256 blocks received: 0 blocks sent: 0 data received: 0 diff --git a/unixfs/mod/dagmodifier.go b/unixfs/mod/dagmodifier.go index c6648b9abb4..45e09473e2c 100644 --- a/unixfs/mod/dagmodifier.go +++ b/unixfs/mod/dagmodifier.go @@ -487,7 +487,7 @@ func (dm *DagModifier) Truncate(size int64) error { return dm.expandSparse(int64(size) - realSize) } - nnode, err := dagTruncate(dm.ctx, dm.curNode, uint64(size), dm.dagserv) + nnode, err := dm.dagTruncate(dm.ctx, dm.curNode, uint64(size)) if err != nil { return err } @@ -502,7 +502,7 @@ func (dm *DagModifier) Truncate(size int64) error { } // dagTruncate truncates the given node to 'size' and returns the modified Node -func dagTruncate(ctx context.Context, n ipld.Node, size uint64, ds ipld.DAGService) (ipld.Node, error) { +func (dm *DagModifier) dagTruncate(ctx context.Context, n ipld.Node, size uint64) (ipld.Node, error) { if len(n.Links()) == 0 { switch nd := n.(type) { case *mdag.ProtoNode: @@ -528,7 +528,7 @@ func dagTruncate(ctx context.Context, n ipld.Node, size uint64, ds ipld.DAGServi var modified ipld.Node ndata := ft.NewFSNode(ft.TRaw) for i, lnk := range nd.Links() { - child, err := lnk.GetNode(ctx, ds) + child, err := lnk.GetNode(ctx, dm.dagserv) if err != nil { return nil, err } @@ -540,7 +540,7 @@ func dagTruncate(ctx context.Context, n ipld.Node, size uint64, ds ipld.DAGServi // found the child we want to cut if size < cur+childsize { - nchild, err := dagTruncate(ctx, child, size-cur, ds) + nchild, err := dm.dagTruncate(ctx, child, size-cur) if err != nil { return nil, err } @@ -555,7 +555,7 @@ func dagTruncate(ctx context.Context, n ipld.Node, size uint64, ds ipld.DAGServi ndata.AddBlockSize(childsize) } - err := ds.Add(ctx, modified) + err := dm.dagserv.Add(ctx, modified) if err != nil { return nil, err }