Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pin: add context and error return to most of the Pinner functions #6715

Merged
merged 3 commits into from
Nov 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions blocks/blockstoreutil/remove.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
package blockstoreutil

import (
"context"
"fmt"
"io"

Expand Down Expand Up @@ -33,7 +34,7 @@ type RmBlocksOpts struct {
// It returns a channel where objects of type RemovedBlock are placed, when
// not using the Quiet option. Block removal is asynchronous and will
// skip any pinned blocks.
func RmBlocks(blocks bs.GCBlockstore, pins pin.Pinner, cids []cid.Cid, opts RmBlocksOpts) (<-chan interface{}, error) {
func RmBlocks(ctx context.Context, blocks bs.GCBlockstore, pins pin.Pinner, cids []cid.Cid, opts RmBlocksOpts) (<-chan interface{}, error) {
// make the channel large enough to hold any result to avoid
// blocking while holding the GCLock
out := make(chan interface{}, len(cids))
Expand All @@ -43,7 +44,7 @@ func RmBlocks(blocks bs.GCBlockstore, pins pin.Pinner, cids []cid.Cid, opts RmBl
unlocker := blocks.GCLock()
defer unlocker.Unlock()

stillOkay := FilterPinned(pins, out, cids)
stillOkay := FilterPinned(ctx, pins, out, cids)

for _, c := range stillOkay {
// Kept for backwards compatibility. We may want to
Expand Down Expand Up @@ -74,9 +75,9 @@ func RmBlocks(blocks bs.GCBlockstore, pins pin.Pinner, cids []cid.Cid, opts RmBl
// out channel, with an error which indicates that the Cid is pinned.
// This function is used in RmBlocks to filter out any blocks which are not
// to be removed (because they are pinned).
func FilterPinned(pins pin.Pinner, out chan<- interface{}, cids []cid.Cid) []cid.Cid {
func FilterPinned(ctx context.Context, pins pin.Pinner, out chan<- interface{}, cids []cid.Cid) []cid.Cid {
stillOkay := make([]cid.Cid, 0, len(cids))
res, err := pins.CheckIfPinned(cids...)
res, err := pins.CheckIfPinned(ctx, cids...)
if err != nil {
out <- &RemovedBlock{Error: fmt.Sprintf("pin check failed: %s", err)}
return nil
Expand Down
35 changes: 26 additions & 9 deletions core/commands/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ func pinLsKeys(req *cmds.Request, typeStr string, n *core.IpfsNode, api coreifac
return err
}

pinType, pinned, err := n.Pinning.IsPinnedWithType(c.Cid(), mode)
pinType, pinned, err := n.Pinning.IsPinnedWithType(req.Context, c.Cid(), mode)
if err != nil {
return err
}
Expand Down Expand Up @@ -501,19 +501,31 @@ func pinLsAll(req *cmds.Request, typeStr string, n *core.IpfsNode, emit func(val
}

if typeStr == "direct" || typeStr == "all" {
err := AddToResultKeys(n.Pinning.DirectKeys(), "direct")
dkeys, err := n.Pinning.DirectKeys(req.Context)
if err != nil {
return err
}
err = AddToResultKeys(dkeys, "direct")
if err != nil {
return err
}
}
if typeStr == "recursive" || typeStr == "all" {
err := AddToResultKeys(n.Pinning.RecursiveKeys(), "recursive")
rkeys, err := n.Pinning.RecursiveKeys(req.Context)
if err != nil {
return err
}
err = AddToResultKeys(rkeys, "recursive")
if err != nil {
return err
}
}
if typeStr == "indirect" || typeStr == "all" {
for _, k := range n.Pinning.RecursiveKeys() {
rkeys, err := n.Pinning.RecursiveKeys(req.Context)
if err != nil {
return err
}
for _, k := range rkeys {
var visitErr error
err := dag.Walk(req.Context, dag.GetLinksWithDAG(n.DAG), k, func(c cid.Cid) bool {
r := keys.Visit(c)
Expand Down Expand Up @@ -642,8 +654,10 @@ var verifyPinCmd = &cmds.Command{
explain: !quiet,
includeOk: verbose,
}
out := pinVerify(req.Context, n, opts, enc)

out, err := pinVerify(req.Context, n, opts, enc)
if err != nil {
return err
}
return res.Emit(out)
},
Type: PinVerifyRes{},
Expand Down Expand Up @@ -685,13 +699,16 @@ type pinVerifyOpts struct {
includeOk bool
}

func pinVerify(ctx context.Context, n *core.IpfsNode, opts pinVerifyOpts, enc cidenc.Encoder) <-chan interface{} {
func pinVerify(ctx context.Context, n *core.IpfsNode, opts pinVerifyOpts, enc cidenc.Encoder) (<-chan interface{}, error) {
visited := make(map[cid.Cid]PinStatus)

bs := n.Blocks.Blockstore()
DAG := dag.NewDAGService(bserv.New(bs, offline.Exchange(bs)))
getLinks := dag.GetLinksWithDAG(DAG)
recPins := n.Pinning.RecursiveKeys()
recPins, err := n.Pinning.RecursiveKeys(ctx)
if err != nil {
return nil, err
}

var checkPin func(root cid.Cid) PinStatus
checkPin = func(root cid.Cid) PinStatus {
Expand Down Expand Up @@ -747,7 +764,7 @@ func pinVerify(ctx context.Context, n *core.IpfsNode, opts pinVerifyOpts, enc ci
}
}()

return out
return out, nil
}

// Format formats PinVerifyRes
Expand Down
4 changes: 2 additions & 2 deletions core/coreapi/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (api *BlockAPI) Put(ctx context.Context, src io.Reader, opts ...caopts.Bloc

if settings.Pin {
api.pinning.PinWithMode(b.Cid(), pin.Recursive)
if err := api.pinning.Flush(); err != nil {
if err := api.pinning.Flush(ctx); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -91,7 +91,7 @@ func (api *BlockAPI) Rm(ctx context.Context, p path.Path, opts ...caopts.BlockRm
cids := []cid.Cid{rp.Cid()}
o := util.RmBlocksOpts{Force: settings.Force}

out, err := util.RmBlocks(api.blockstore, api.pinning, cids, o)
out, err := util.RmBlocks(ctx, api.blockstore, api.pinning, cids, o)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions core/coreapi/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func (adder *pinningAdder) Add(ctx context.Context, nd ipld.Node) error {

adder.pinning.PinWithMode(nd.Cid(), pin.Recursive)

return adder.pinning.Flush()
return adder.pinning.Flush(ctx)
}

func (adder *pinningAdder) AddMany(ctx context.Context, nds []ipld.Node) error {
Expand All @@ -45,7 +45,7 @@ func (adder *pinningAdder) AddMany(ctx context.Context, nds []ipld.Node) error {
}
}

return adder.pinning.Flush()
return adder.pinning.Flush(ctx)
}

func (api *dagAPI) Pinning() ipld.NodeAdder {
Expand Down
2 changes: 1 addition & 1 deletion core/coreapi/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (api *ObjectAPI) Put(ctx context.Context, src io.Reader, opts ...caopts.Obj

if options.Pin {
api.pinning.PinWithMode(dagnode.Cid(), pin.Recursive)
err = api.pinning.Flush()
err = api.pinning.Flush(ctx)
if err != nil {
return nil, err
}
Expand Down
29 changes: 22 additions & 7 deletions core/coreapi/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (api *PinAPI) Add(ctx context.Context, p path.Path, opts ...caopts.PinAddOp
return err
}

return api.pinning.Flush()
return api.pinning.Flush(ctx)
}

func (api *PinAPI) Ls(ctx context.Context, opts ...caopts.PinLsOption) ([]coreiface.Pin, error) {
Expand Down Expand Up @@ -75,7 +75,7 @@ func (api *PinAPI) Rm(ctx context.Context, p path.Path, opts ...caopts.PinRmOpti
return err
}

return api.pinning.Flush()
return api.pinning.Flush(ctx)
}

func (api *PinAPI) Update(ctx context.Context, from path.Path, to path.Path, opts ...caopts.PinUpdateOption) error {
Expand All @@ -101,7 +101,7 @@ func (api *PinAPI) Update(ctx context.Context, from path.Path, to path.Path, opt
return err
}

return api.pinning.Flush()
return api.pinning.Flush(ctx)
}

type pinStatus struct {
Expand Down Expand Up @@ -137,7 +137,10 @@ func (api *PinAPI) Verify(ctx context.Context) (<-chan coreiface.PinStatus, erro
bs := api.blockstore
DAG := merkledag.NewDAGService(bserv.New(bs, offline.Exchange(bs)))
getLinks := merkledag.GetLinksWithDAG(DAG)
recPins := api.pinning.RecursiveKeys()
recPins, err := api.pinning.RecursiveKeys(ctx)
if err != nil {
return nil, err
}

var checkPin func(root cid.Cid) *pinStatus
checkPin = func(root cid.Cid) *pinStatus {
Expand Down Expand Up @@ -204,11 +207,19 @@ func (api *PinAPI) pinLsAll(typeStr string, ctx context.Context) ([]coreiface.Pi
}

if typeStr == "direct" || typeStr == "all" {
AddToResultKeys(api.pinning.DirectKeys(), "direct")
dkeys, err := api.pinning.DirectKeys(ctx)
if err != nil {
return nil, err
}
AddToResultKeys(dkeys, "direct")
}
if typeStr == "indirect" || typeStr == "all" {
set := cid.NewSet()
for _, k := range api.pinning.RecursiveKeys() {
rkeys, err := api.pinning.RecursiveKeys(ctx)
if err != nil {
return nil, err
}
for _, k := range rkeys {
err := merkledag.Walk(
ctx, merkledag.GetLinksWithDAG(api.dag), k,
set.Visit,
Expand All @@ -221,7 +232,11 @@ func (api *PinAPI) pinLsAll(typeStr string, ctx context.Context) ([]coreiface.Pi
AddToResultKeys(set.Keys(), "indirect")
}
if typeStr == "recursive" || typeStr == "all" {
AddToResultKeys(api.pinning.RecursiveKeys(), "recursive")
rkeys, err := api.pinning.RecursiveKeys(ctx)
if err != nil {
return nil, err
}
AddToResultKeys(rkeys, "recursive")
}

out := make([]coreiface.Pin, 0, len(keys))
Expand Down
2 changes: 1 addition & 1 deletion core/coreunix/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (adder *Adder) PinRoot(root ipld.Node) error {
}

adder.pinning.PinWithMode(rnk, pin.Recursive)
return adder.pinning.Flush()
return adder.pinning.Flush(adder.ctx)
}

func (adder *Adder) outputDirs(path string, fsn mfs.FSNode) error {
Expand Down
2 changes: 1 addition & 1 deletion fuse/ipns/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func InitializeKeyspace(n *core.IpfsNode, key ci.PrivKey) error {
return err
}

err = n.Pinning.Flush()
err = n.Pinning.Flush(ctx)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ require (
github.com/ipfs/go-ipfs-exchange-offline v0.0.1
github.com/ipfs/go-ipfs-files v0.0.4
github.com/ipfs/go-ipfs-posinfo v0.0.1
github.com/ipfs/go-ipfs-provider v0.2.2
github.com/ipfs/go-ipfs-provider v0.3.0
github.com/ipfs/go-ipfs-routing v0.1.0
github.com/ipfs/go-ipfs-util v0.0.1
github.com/ipfs/go-ipld-cbor v0.0.3
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,8 @@ github.com/ipfs/go-ipfs-posinfo v0.0.1 h1:Esoxj+1JgSjX0+ylc0hUmJCOv6V2vFoZiETLR6
github.com/ipfs/go-ipfs-posinfo v0.0.1/go.mod h1:SwyeVP+jCwiDu0C313l/8jg6ZxM0qqtlt2a0vILTc1A=
github.com/ipfs/go-ipfs-pq v0.0.1 h1:zgUotX8dcAB/w/HidJh1zzc1yFq6Vm8J7T2F4itj/RU=
github.com/ipfs/go-ipfs-pq v0.0.1/go.mod h1:LWIqQpqfRG3fNc5XsnIhz/wQ2XXGyugQwls7BgUmUfY=
github.com/ipfs/go-ipfs-provider v0.2.2 h1:/YcpqQtg27JhgOig9jbGxhDXmSKOZ0pvqrCW1+f8Q+U=
github.com/ipfs/go-ipfs-provider v0.2.2/go.mod h1:rcQBVqfblDQRk5LaCtf2uxuKxMJxvKmF5pLS0pO4au4=
github.com/ipfs/go-ipfs-provider v0.3.0 h1:W3AO8YQVPK/9NFu1HRJxuMZftw6K+rWv1j3y5K5e06A=
github.com/ipfs/go-ipfs-provider v0.3.0/go.mod h1:rcQBVqfblDQRk5LaCtf2uxuKxMJxvKmF5pLS0pO4au4=
github.com/ipfs/go-ipfs-routing v0.0.1/go.mod h1:k76lf20iKFxQTjcJokbPM9iBXVXVZhcOwc360N4nuKs=
github.com/ipfs/go-ipfs-routing v0.1.0 h1:gAJTT1cEeeLj6/DlLX6t+NxD9fQe2ymTO6qWRDI/HQQ=
github.com/ipfs/go-ipfs-routing v0.1.0/go.mod h1:hYoUkJLyAUKhF58tysKpids8RNDPO42BVMgK5dNsoqY=
Expand Down
2 changes: 1 addition & 1 deletion namesys/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ func InitializeKeyspace(ctx context.Context, pub Publisher, pins pin.Pinner, key
return err
}

err = pins.Flush()
err = pins.Flush(ctx)
if err != nil {
return err
}
Expand Down
18 changes: 15 additions & 3 deletions pin/gc/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,11 @@ func ColoredSet(ctx context.Context, pn pin.Pinner, ng ipld.NodeGetter, bestEffo
}
return links, nil
}
err := Descendants(ctx, getLinks, gcs, pn.RecursiveKeys())
rkeys, err := pn.RecursiveKeys(ctx)
if err != nil {
return nil, err
}
err = Descendants(ctx, getLinks, gcs, rkeys)
if err != nil {
errors = true
select {
Expand Down Expand Up @@ -233,11 +237,19 @@ func ColoredSet(ctx context.Context, pn pin.Pinner, ng ipld.NodeGetter, bestEffo
}
}

for _, k := range pn.DirectKeys() {
dkeys, err := pn.DirectKeys(ctx)
if err != nil {
return nil, err
}
for _, k := range dkeys {
gcs.Add(k)
}

err = Descendants(ctx, getLinks, gcs, pn.InternalPins())
ikeys, err := pn.InternalPins(ctx)
if err != nil {
return nil, err
}
err = Descendants(ctx, getLinks, gcs, ikeys)
if err != nil {
errors = true
select {
Expand Down
Loading