Skip to content

Commit

Permalink
Merge pull request #6715 from MichaelMure/pinner-with-context
Browse files Browse the repository at this point in the history
pin: add context and error return to most of the Pinner functions
  • Loading branch information
Stebalien authored Nov 18, 2019
2 parents 8c35862 + c529410 commit fbc284a
Show file tree
Hide file tree
Showing 14 changed files with 112 additions and 75 deletions.
9 changes: 5 additions & 4 deletions blocks/blockstoreutil/remove.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
package blockstoreutil

import (
"context"
"fmt"
"io"

Expand Down Expand Up @@ -33,7 +34,7 @@ type RmBlocksOpts struct {
// It returns a channel where objects of type RemovedBlock are placed, when
// not using the Quiet option. Block removal is asynchronous and will
// skip any pinned blocks.
func RmBlocks(blocks bs.GCBlockstore, pins pin.Pinner, cids []cid.Cid, opts RmBlocksOpts) (<-chan interface{}, error) {
func RmBlocks(ctx context.Context, blocks bs.GCBlockstore, pins pin.Pinner, cids []cid.Cid, opts RmBlocksOpts) (<-chan interface{}, error) {
// make the channel large enough to hold any result to avoid
// blocking while holding the GCLock
out := make(chan interface{}, len(cids))
Expand All @@ -43,7 +44,7 @@ func RmBlocks(blocks bs.GCBlockstore, pins pin.Pinner, cids []cid.Cid, opts RmBl
unlocker := blocks.GCLock()
defer unlocker.Unlock()

stillOkay := FilterPinned(pins, out, cids)
stillOkay := FilterPinned(ctx, pins, out, cids)

for _, c := range stillOkay {
// Kept for backwards compatibility. We may want to
Expand Down Expand Up @@ -74,9 +75,9 @@ func RmBlocks(blocks bs.GCBlockstore, pins pin.Pinner, cids []cid.Cid, opts RmBl
// out channel, with an error which indicates that the Cid is pinned.
// This function is used in RmBlocks to filter out any blocks which are not
// to be removed (because they are pinned).
func FilterPinned(pins pin.Pinner, out chan<- interface{}, cids []cid.Cid) []cid.Cid {
func FilterPinned(ctx context.Context, pins pin.Pinner, out chan<- interface{}, cids []cid.Cid) []cid.Cid {
stillOkay := make([]cid.Cid, 0, len(cids))
res, err := pins.CheckIfPinned(cids...)
res, err := pins.CheckIfPinned(ctx, cids...)
if err != nil {
out <- &RemovedBlock{Error: fmt.Sprintf("pin check failed: %s", err)}
return nil
Expand Down
35 changes: 26 additions & 9 deletions core/commands/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ func pinLsKeys(req *cmds.Request, typeStr string, n *core.IpfsNode, api coreifac
return err
}

pinType, pinned, err := n.Pinning.IsPinnedWithType(c.Cid(), mode)
pinType, pinned, err := n.Pinning.IsPinnedWithType(req.Context, c.Cid(), mode)
if err != nil {
return err
}
Expand Down Expand Up @@ -501,19 +501,31 @@ func pinLsAll(req *cmds.Request, typeStr string, n *core.IpfsNode, emit func(val
}

if typeStr == "direct" || typeStr == "all" {
err := AddToResultKeys(n.Pinning.DirectKeys(), "direct")
dkeys, err := n.Pinning.DirectKeys(req.Context)
if err != nil {
return err
}
err = AddToResultKeys(dkeys, "direct")
if err != nil {
return err
}
}
if typeStr == "recursive" || typeStr == "all" {
err := AddToResultKeys(n.Pinning.RecursiveKeys(), "recursive")
rkeys, err := n.Pinning.RecursiveKeys(req.Context)
if err != nil {
return err
}
err = AddToResultKeys(rkeys, "recursive")
if err != nil {
return err
}
}
if typeStr == "indirect" || typeStr == "all" {
for _, k := range n.Pinning.RecursiveKeys() {
rkeys, err := n.Pinning.RecursiveKeys(req.Context)
if err != nil {
return err
}
for _, k := range rkeys {
var visitErr error
err := dag.Walk(req.Context, dag.GetLinksWithDAG(n.DAG), k, func(c cid.Cid) bool {
r := keys.Visit(c)
Expand Down Expand Up @@ -642,8 +654,10 @@ var verifyPinCmd = &cmds.Command{
explain: !quiet,
includeOk: verbose,
}
out := pinVerify(req.Context, n, opts, enc)

out, err := pinVerify(req.Context, n, opts, enc)
if err != nil {
return err
}
return res.Emit(out)
},
Type: PinVerifyRes{},
Expand Down Expand Up @@ -685,13 +699,16 @@ type pinVerifyOpts struct {
includeOk bool
}

func pinVerify(ctx context.Context, n *core.IpfsNode, opts pinVerifyOpts, enc cidenc.Encoder) <-chan interface{} {
func pinVerify(ctx context.Context, n *core.IpfsNode, opts pinVerifyOpts, enc cidenc.Encoder) (<-chan interface{}, error) {
visited := make(map[cid.Cid]PinStatus)

bs := n.Blocks.Blockstore()
DAG := dag.NewDAGService(bserv.New(bs, offline.Exchange(bs)))
getLinks := dag.GetLinksWithDAG(DAG)
recPins := n.Pinning.RecursiveKeys()
recPins, err := n.Pinning.RecursiveKeys(ctx)
if err != nil {
return nil, err
}

var checkPin func(root cid.Cid) PinStatus
checkPin = func(root cid.Cid) PinStatus {
Expand Down Expand Up @@ -747,7 +764,7 @@ func pinVerify(ctx context.Context, n *core.IpfsNode, opts pinVerifyOpts, enc ci
}
}()

return out
return out, nil
}

// Format formats PinVerifyRes
Expand Down
4 changes: 2 additions & 2 deletions core/coreapi/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (api *BlockAPI) Put(ctx context.Context, src io.Reader, opts ...caopts.Bloc

if settings.Pin {
api.pinning.PinWithMode(b.Cid(), pin.Recursive)
if err := api.pinning.Flush(); err != nil {
if err := api.pinning.Flush(ctx); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -91,7 +91,7 @@ func (api *BlockAPI) Rm(ctx context.Context, p path.Path, opts ...caopts.BlockRm
cids := []cid.Cid{rp.Cid()}
o := util.RmBlocksOpts{Force: settings.Force}

out, err := util.RmBlocks(api.blockstore, api.pinning, cids, o)
out, err := util.RmBlocks(ctx, api.blockstore, api.pinning, cids, o)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions core/coreapi/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func (adder *pinningAdder) Add(ctx context.Context, nd ipld.Node) error {

adder.pinning.PinWithMode(nd.Cid(), pin.Recursive)

return adder.pinning.Flush()
return adder.pinning.Flush(ctx)
}

func (adder *pinningAdder) AddMany(ctx context.Context, nds []ipld.Node) error {
Expand All @@ -45,7 +45,7 @@ func (adder *pinningAdder) AddMany(ctx context.Context, nds []ipld.Node) error {
}
}

return adder.pinning.Flush()
return adder.pinning.Flush(ctx)
}

func (api *dagAPI) Pinning() ipld.NodeAdder {
Expand Down
2 changes: 1 addition & 1 deletion core/coreapi/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (api *ObjectAPI) Put(ctx context.Context, src io.Reader, opts ...caopts.Obj

if options.Pin {
api.pinning.PinWithMode(dagnode.Cid(), pin.Recursive)
err = api.pinning.Flush()
err = api.pinning.Flush(ctx)
if err != nil {
return nil, err
}
Expand Down
29 changes: 22 additions & 7 deletions core/coreapi/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (api *PinAPI) Add(ctx context.Context, p path.Path, opts ...caopts.PinAddOp
return err
}

return api.pinning.Flush()
return api.pinning.Flush(ctx)
}

func (api *PinAPI) Ls(ctx context.Context, opts ...caopts.PinLsOption) ([]coreiface.Pin, error) {
Expand Down Expand Up @@ -75,7 +75,7 @@ func (api *PinAPI) Rm(ctx context.Context, p path.Path, opts ...caopts.PinRmOpti
return err
}

return api.pinning.Flush()
return api.pinning.Flush(ctx)
}

func (api *PinAPI) Update(ctx context.Context, from path.Path, to path.Path, opts ...caopts.PinUpdateOption) error {
Expand All @@ -101,7 +101,7 @@ func (api *PinAPI) Update(ctx context.Context, from path.Path, to path.Path, opt
return err
}

return api.pinning.Flush()
return api.pinning.Flush(ctx)
}

type pinStatus struct {
Expand Down Expand Up @@ -137,7 +137,10 @@ func (api *PinAPI) Verify(ctx context.Context) (<-chan coreiface.PinStatus, erro
bs := api.blockstore
DAG := merkledag.NewDAGService(bserv.New(bs, offline.Exchange(bs)))
getLinks := merkledag.GetLinksWithDAG(DAG)
recPins := api.pinning.RecursiveKeys()
recPins, err := api.pinning.RecursiveKeys(ctx)
if err != nil {
return nil, err
}

var checkPin func(root cid.Cid) *pinStatus
checkPin = func(root cid.Cid) *pinStatus {
Expand Down Expand Up @@ -204,11 +207,19 @@ func (api *PinAPI) pinLsAll(typeStr string, ctx context.Context) ([]coreiface.Pi
}

if typeStr == "direct" || typeStr == "all" {
AddToResultKeys(api.pinning.DirectKeys(), "direct")
dkeys, err := api.pinning.DirectKeys(ctx)
if err != nil {
return nil, err
}
AddToResultKeys(dkeys, "direct")
}
if typeStr == "indirect" || typeStr == "all" {
set := cid.NewSet()
for _, k := range api.pinning.RecursiveKeys() {
rkeys, err := api.pinning.RecursiveKeys(ctx)
if err != nil {
return nil, err
}
for _, k := range rkeys {
err := merkledag.Walk(
ctx, merkledag.GetLinksWithDAG(api.dag), k,
set.Visit,
Expand All @@ -221,7 +232,11 @@ func (api *PinAPI) pinLsAll(typeStr string, ctx context.Context) ([]coreiface.Pi
AddToResultKeys(set.Keys(), "indirect")
}
if typeStr == "recursive" || typeStr == "all" {
AddToResultKeys(api.pinning.RecursiveKeys(), "recursive")
rkeys, err := api.pinning.RecursiveKeys(ctx)
if err != nil {
return nil, err
}
AddToResultKeys(rkeys, "recursive")
}

out := make([]coreiface.Pin, 0, len(keys))
Expand Down
2 changes: 1 addition & 1 deletion core/coreunix/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (adder *Adder) PinRoot(root ipld.Node) error {
}

adder.pinning.PinWithMode(rnk, pin.Recursive)
return adder.pinning.Flush()
return adder.pinning.Flush(adder.ctx)
}

func (adder *Adder) outputDirs(path string, fsn mfs.FSNode) error {
Expand Down
2 changes: 1 addition & 1 deletion fuse/ipns/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func InitializeKeyspace(n *core.IpfsNode, key ci.PrivKey) error {
return err
}

err = n.Pinning.Flush()
err = n.Pinning.Flush(ctx)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ require (
github.com/ipfs/go-ipfs-exchange-offline v0.0.1
github.com/ipfs/go-ipfs-files v0.0.4
github.com/ipfs/go-ipfs-posinfo v0.0.1
github.com/ipfs/go-ipfs-provider v0.2.2
github.com/ipfs/go-ipfs-provider v0.3.0
github.com/ipfs/go-ipfs-routing v0.1.0
github.com/ipfs/go-ipfs-util v0.0.1
github.com/ipfs/go-ipld-cbor v0.0.3
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,8 @@ github.com/ipfs/go-ipfs-posinfo v0.0.1 h1:Esoxj+1JgSjX0+ylc0hUmJCOv6V2vFoZiETLR6
github.com/ipfs/go-ipfs-posinfo v0.0.1/go.mod h1:SwyeVP+jCwiDu0C313l/8jg6ZxM0qqtlt2a0vILTc1A=
github.com/ipfs/go-ipfs-pq v0.0.1 h1:zgUotX8dcAB/w/HidJh1zzc1yFq6Vm8J7T2F4itj/RU=
github.com/ipfs/go-ipfs-pq v0.0.1/go.mod h1:LWIqQpqfRG3fNc5XsnIhz/wQ2XXGyugQwls7BgUmUfY=
github.com/ipfs/go-ipfs-provider v0.2.2 h1:/YcpqQtg27JhgOig9jbGxhDXmSKOZ0pvqrCW1+f8Q+U=
github.com/ipfs/go-ipfs-provider v0.2.2/go.mod h1:rcQBVqfblDQRk5LaCtf2uxuKxMJxvKmF5pLS0pO4au4=
github.com/ipfs/go-ipfs-provider v0.3.0 h1:W3AO8YQVPK/9NFu1HRJxuMZftw6K+rWv1j3y5K5e06A=
github.com/ipfs/go-ipfs-provider v0.3.0/go.mod h1:rcQBVqfblDQRk5LaCtf2uxuKxMJxvKmF5pLS0pO4au4=
github.com/ipfs/go-ipfs-routing v0.0.1/go.mod h1:k76lf20iKFxQTjcJokbPM9iBXVXVZhcOwc360N4nuKs=
github.com/ipfs/go-ipfs-routing v0.1.0 h1:gAJTT1cEeeLj6/DlLX6t+NxD9fQe2ymTO6qWRDI/HQQ=
github.com/ipfs/go-ipfs-routing v0.1.0/go.mod h1:hYoUkJLyAUKhF58tysKpids8RNDPO42BVMgK5dNsoqY=
Expand Down
2 changes: 1 addition & 1 deletion namesys/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ func InitializeKeyspace(ctx context.Context, pub Publisher, pins pin.Pinner, key
return err
}

err = pins.Flush()
err = pins.Flush(ctx)
if err != nil {
return err
}
Expand Down
18 changes: 15 additions & 3 deletions pin/gc/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,11 @@ func ColoredSet(ctx context.Context, pn pin.Pinner, ng ipld.NodeGetter, bestEffo
}
return links, nil
}
err := Descendants(ctx, getLinks, gcs, pn.RecursiveKeys())
rkeys, err := pn.RecursiveKeys(ctx)
if err != nil {
return nil, err
}
err = Descendants(ctx, getLinks, gcs, rkeys)
if err != nil {
errors = true
select {
Expand Down Expand Up @@ -233,11 +237,19 @@ func ColoredSet(ctx context.Context, pn pin.Pinner, ng ipld.NodeGetter, bestEffo
}
}

for _, k := range pn.DirectKeys() {
dkeys, err := pn.DirectKeys(ctx)
if err != nil {
return nil, err
}
for _, k := range dkeys {
gcs.Add(k)
}

err = Descendants(ctx, getLinks, gcs, pn.InternalPins())
ikeys, err := pn.InternalPins(ctx)
if err != nil {
return nil, err
}
err = Descendants(ctx, getLinks, gcs, ikeys)
if err != nil {
errors = true
select {
Expand Down
Loading

0 comments on commit fbc284a

Please sign in to comment.