Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Extract provider logic from BitSwap #4333

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions core/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
dag "github.com/ipfs/go-ipfs/merkledag"
resolver "github.com/ipfs/go-ipfs/path/resolver"
pin "github.com/ipfs/go-ipfs/pin"
providers "github.com/ipfs/go-ipfs/providers"
repo "github.com/ipfs/go-ipfs/repo"
cfg "github.com/ipfs/go-ipfs/repo/config"
"github.com/ipfs/go-ipfs/thirdparty/verifbs"
Expand Down Expand Up @@ -237,7 +238,9 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error {
return err
}
} else {
n.SetupOfflineRouting()
n.Exchange = offline.Exchange(n.Blockstore)
n.Providers = providers.NewProviders(n.ctx, n.Routing, nil)
}

n.Blocks = bserv.New(n.Blockstore, n.Exchange)
Expand Down
8 changes: 7 additions & 1 deletion core/commands/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ You can now check what blocks have been created by:
}

// copy intermediary nodes from editor to our actual dagservice
_, err := fileAdder.Finalize()
nd, err := fileAdder.Finalize()
if err != nil {
return err
}
Expand All @@ -322,6 +322,12 @@ You can now check what blocks have been created by:
return nil
}

if !local {
if err := n.Providers.ProvideRecursive(req.Context, nd, dserv); err != nil {
return err
}
}

return fileAdder.PinRoot()
}

Expand Down
1 change: 0 additions & 1 deletion core/commands/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,6 @@ var bitswapStatCmd = &cmds.Command{
}

fmt.Fprintln(w, "bitswap status")
fmt.Fprintf(w, "\tprovides buffer: %d / %d\n", out.ProvideBufLen, bitswap.HasBlockBufferSize)
fmt.Fprintf(w, "\tblocks received: %d\n", out.BlocksReceived)
fmt.Fprintf(w, "\tblocks sent: %d\n", out.BlocksSent)
fmt.Fprintf(w, "\tdata received: %d\n", out.DataReceived)
Expand Down
5 changes: 5 additions & 0 deletions core/commands/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,11 @@ than 'sha2-256' or format to anything other than 'v0' will result in CIDv1.
return
}

if err := n.Providers.Provide(b.Cid()); err != nil {
log.Error("BlockPut key: '%q'", err)
return
}

err = cmds.EmitOnce(res, &BlockStat{
Key: b.Cid().String(),
Size: len(data),
Expand Down
4 changes: 3 additions & 1 deletion core/commands/dag/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,9 @@ into an object of the specified format.
}
}

return nil
return cids.ForEach(func(c *cid.Cid) error {
return n.Providers.Provide(c)
})
}

go func() {
Expand Down
11 changes: 11 additions & 0 deletions core/commands/object/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,12 @@ Available templates:
res.SetError(err, cmdkit.ErrNormal)
return
}

if err = n.Providers.Provide(node.Cid()); err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
}

res.SetOutput(&Object{Hash: node.Cid().String()})
},
Marshalers: oldcmds.MarshalerMap{
Expand Down Expand Up @@ -610,6 +616,11 @@ func objectPut(ctx context.Context, n *core.IpfsNode, input io.Reader, encoding
return nil, err
}

err = n.Providers.Provide(dagnode.Cid())
if err != nil {
return nil, err
}

return dagnode.Cid(), nil
}

Expand Down
24 changes: 24 additions & 0 deletions core/commands/object/patch.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,12 @@ the limit will not be respected by the network.
return
}

err = nd.Providers.Provide(rtpb.Cid())
if err != nil {
re.SetError(err, cmdkit.ErrNormal)
return
}

cmds.EmitOnce(re, &Object{Hash: rtpb.Cid().String()})
},
Type: Object{},
Expand Down Expand Up @@ -189,6 +195,12 @@ Example:
return
}

err = nd.Providers.Provide(rtpb.Cid())
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
}

res.SetOutput(&Object{Hash: rtpb.Cid().String()})
},
Type: Object{},
Expand Down Expand Up @@ -251,6 +263,12 @@ Removes a link by the given name from root.

nc := nnode.Cid()

err = nd.Providers.Provide(nc)
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
}

res.SetOutput(&Object{Hash: nc.String()})
},
Type: Object{},
Expand Down Expand Up @@ -348,6 +366,12 @@ to a file containing 'bar', and returns the hash of the new object.

nc := nnode.Cid()

err = nd.Providers.Provide(nc)
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
}

res.SetOutput(&Object{Hash: nc.String()})
},
Type: Object{},
Expand Down
4 changes: 4 additions & 0 deletions core/commands/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ This command outputs data in the following encodings:
log.Error("pubsub discovery: ", err)
return
}
if err := n.Providers.Provide(blk.Cid()); err != nil {
log.Error("pubsub discovery: ", err)
return
}

connectToPubSubPeers(req.Context, n, blk.Cid())
}()
Expand Down
6 changes: 6 additions & 0 deletions core/commands/tar.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@ represent it.

c := node.Cid()

err = nd.Providers.Provide(c)
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
}

fi.FileName()
res.SetOutput(&coreunix.AddedObject{
Name: fi.FileName(),
Expand Down
7 changes: 6 additions & 1 deletion core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
p2p "github.com/ipfs/go-ipfs/p2p"
"github.com/ipfs/go-ipfs/path/resolver"
pin "github.com/ipfs/go-ipfs/pin"
provider "github.com/ipfs/go-ipfs/providers"
repo "github.com/ipfs/go-ipfs/repo"
config "github.com/ipfs/go-ipfs/repo/config"
ft "github.com/ipfs/go-ipfs/unixfs"
Expand Down Expand Up @@ -129,6 +130,7 @@ type IpfsNode struct {
PeerHost p2phost.Host // the network host (server+client)
Bootstrapper io.Closer // the periodic bootstrapper
Routing routing.IpfsRouting // the routing system. recommend ipfs-dht
Providers provider.Interface // the content routing abstraction layer
Exchange exchange.Interface // the block exchange + strategy (bitswap)
Namesys namesys.NameSystem // the name system, resolves paths to hashes
Ping *ping.PingService
Expand Down Expand Up @@ -493,8 +495,11 @@ func (n *IpfsNode) startOnlineServicesWithHost(ctx context.Context, host p2phost
// Wrap standard peer host with routing system to allow unknown peer lookups
n.PeerHost = rhost.Wrap(host, n.Routing)

// Wrap content routing with a buffering layer
n.Providers = provider.NewProviders(ctx, n.Routing, n.PeerHost)

// setup exchange service
bitswapNetwork := bsnet.NewFromIpfsHost(n.PeerHost, n.Routing)
bitswapNetwork := bsnet.NewFromIpfsHost(n.PeerHost, n.Providers)
n.Exchange = bitswap.New(ctx, bitswapNetwork, n.Blockstore)

size, err := n.getCacheSize()
Expand Down
15 changes: 15 additions & 0 deletions core/corehttp/gateway_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,11 @@ func (i *gatewayHandler) putHandler(w http.ResponseWriter, r *http.Request) {
return
}

if err := i.node.Providers.Provide(newcid); err != nil {
webError(w, "putHandler: Could not provide newnode: ", err, http.StatusInternalServerError)
return
}

i.addUserHeaders(w) // ok, _now_ write user's headers.
w.Header().Set("IPFS-Hash", newcid.String())
http.Redirect(w, r, gopath.Join(ipfsPathPrefix, newcid.String(), newPath), http.StatusCreated)
Expand Down Expand Up @@ -560,6 +565,11 @@ func (i *gatewayHandler) deleteHandler(w http.ResponseWriter, r *http.Request) {
return
}

if err := i.node.Providers.Provide(c); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@magik6k I have a question: why c here? It seems like this will try to provide the same Cid for each iteration of the loop. Is that intentional? If so, I'm curious why. I'm using this PR to inform a similar change I'm making and am caught up on this detail. Thanks.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this is wrong. This should be either newnode.Cid() or more likely not at all here (Provide below should handle this, not 100% sure)

webError(w, "Could not provide node", err, http.StatusInternalServerError)
return
}

pathpb, ok := pathNodes[j].(*dag.ProtoNode)
if !ok {
webError(w, "Cannot read non protobuf nodes through gateway", dag.ErrNotProtobuf, http.StatusBadRequest)
Expand All @@ -581,6 +591,11 @@ func (i *gatewayHandler) deleteHandler(w http.ResponseWriter, r *http.Request) {
// Redirect to new path
ncid := newnode.Cid()

if err := i.node.Providers.Provide(ncid); err != nil {
webError(w, "Could not provide node", err, http.StatusInternalServerError)
return
}

i.addUserHeaders(w) // ok, _now_ write user's headers.
w.Header().Set("IPFS-Hash", ncid.String())
http.Redirect(w, r, gopath.Join(ipfsPathPrefix+ncid.String(), path.Join(components[:len(components)-1])), http.StatusCreated)
Expand Down
68 changes: 10 additions & 58 deletions exchange/bitswap/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
notifications "github.com/ipfs/go-ipfs/exchange/bitswap/notifications"

delay "gx/ipfs/QmRJVNatYJwTAHgdSM1Xef9QVQ1Ch3XHdmcrykjP5Y4soL/go-ipfs-delay"
flags "gx/ipfs/QmRMGdC6HKdLsPDABL9aXPDidrpmEHzJqFWSvshkbn9Hj8/go-ipfs-flags"
metrics "gx/ipfs/QmRg1gKTHzc3CZXSKzem8aR4E3TubFhbgXwfVuWnSK5CC5/go-metrics-interface"
process "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess"
procctx "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess/context"
Expand All @@ -31,35 +30,15 @@ import (
var log = logging.Logger("bitswap")

const (
// maxProvidersPerRequest specifies the maximum number of providers desired
// from the network. This value is specified because the network streams
// results.
// TODO: if a 'non-nice' strategy is implemented, consider increasing this value
maxProvidersPerRequest = 3
providerRequestTimeout = time.Second * 10
provideTimeout = time.Second * 15
sizeBatchRequestChan = 32
// kMaxPriority is the max priority as defined by the bitswap protocol
kMaxPriority = math.MaxInt32
)

var (
HasBlockBufferSize = 256
provideKeysBufferSize = 2048
provideWorkerMax = 512

// the 1<<18+15 is to observe old file chunks that are 1<<18 + 14 in size
metricsBuckets = []float64{1 << 6, 1 << 10, 1 << 14, 1 << 18, 1<<18 + 15, 1 << 22}
)

func init() {
if flags.LowMemMode {
HasBlockBufferSize = 64
provideKeysBufferSize = 512
provideWorkerMax = 16
}
}

var rebroadcastDelay = delay.Fixed(time.Minute)

// New initializes a BitSwap instance that communicates over the provided
Expand Down Expand Up @@ -94,10 +73,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
notifications: notif,
engine: decision.NewEngine(ctx, bstore), // TODO close the engine with Close() method
network: network,
findKeys: make(chan *blockRequest, sizeBatchRequestChan),
process: px,
newBlocks: make(chan *cid.Cid, HasBlockBufferSize),
provideKeys: make(chan *cid.Cid, provideKeysBufferSize),
wm: NewWantManager(ctx, network),
counters: new(counters),

Expand Down Expand Up @@ -141,15 +117,6 @@ type Bitswap struct {
// appropriate user requests
notifications notifications.PubSub

// findKeys sends keys to a worker to find and connect to providers for them
findKeys chan *blockRequest
// newBlocks is a channel for newly added blocks to be provided to the
// network. blocks pushed down this channel get buffered and fed to the
// provideKeys channel later on to avoid too much network activity
newBlocks chan *cid.Cid
// provideKeys directly feeds provide workers
provideKeys chan *cid.Cid

process process.Process

// Counters for various statistics
Expand Down Expand Up @@ -178,11 +145,6 @@ type counters struct {
messagesRecvd uint64
}

type blockRequest struct {
Cid *cid.Cid
Ctx context.Context
}

// GetBlock attempts to retrieve a particular block from peers within the
// deadline enforced by the context.
func (bs *Bitswap) GetBlock(parent context.Context, k *cid.Cid) (blocks.Block, error) {
Expand Down Expand Up @@ -230,14 +192,6 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []*cid.Cid) (<-chan block

bs.wm.WantBlocks(ctx, keys, nil, mses)

// NB: Optimization. Assumes that providers of key[0] are likely to
// be able to provide for all keys. This currently holds true in most
// every situation. Later, this assumption may not hold as true.
req := &blockRequest{
Cid: keys[0],
Ctx: ctx,
}

remaining := cid.NewSet()
for _, k := range keys {
remaining.Add(k)
Expand Down Expand Up @@ -272,12 +226,13 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []*cid.Cid) (<-chan block
}
}()

select {
case bs.findKeys <- req:
return out, nil
case <-ctx.Done():
return nil, ctx.Err()
// NB: Optimization. Assumes that providers of key[0] are likely to
// be able to provide for all keys. This currently holds true in most
// every situation. Later, this assumption may not hold as true.
if err := bs.network.FindProviders(ctx, keys[0]); err != nil {
return nil, err
}
return out, nil
}

func (bs *Bitswap) getNextSessionID() uint64 {
Expand All @@ -298,6 +253,7 @@ func (bs *Bitswap) CancelWants(cids []*cid.Cid, ses uint64) {
// HasBlock announces the existence of a block to this bitswap service. The
// service will potentially notify its peers.
func (bs *Bitswap) HasBlock(blk blocks.Block) error {
//TODO: call provide here?
return bs.receiveBlockFrom(blk, "")
}

Expand Down Expand Up @@ -333,13 +289,6 @@ func (bs *Bitswap) receiveBlockFrom(blk blocks.Block, from peer.ID) error {
}

bs.engine.AddBlock(blk)

select {
case bs.newBlocks <- blk.Cid():
// send block off to be reprovided
case <-bs.process.Closing():
return bs.process.Close()
}
return nil
}

Expand Down Expand Up @@ -385,6 +334,9 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
if err := bs.receiveBlockFrom(b, p); err != nil {
log.Warningf("ReceiveMessage recvBlockFrom error: %s", err)
}
if err := bs.network.Provide(ctx, b.Cid()); err != nil {
log.Warningf("ReceiveMessage Provide error: %s", err)
}
log.Event(ctx, "Bitswap.GetBlockRequest.End", b.Cid())
}(block)
}
Expand Down
3 changes: 3 additions & 0 deletions exchange/bitswap/network/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ type Routing interface {
// FindProvidersAsync returns a channel of providers for the given key
FindProvidersAsync(context.Context, *cid.Cid, int) <-chan peer.ID

// FindProvidersfinds providers for the given key and connects to them
FindProviders(ctx context.Context, c *cid.Cid) error

// Provide provides the key to the network
Provide(context.Context, *cid.Cid) error
}
Loading