Skip to content

Commit

Permalink
Merge pull request #4641 from ipfs/feat/cat-sessions
Browse files Browse the repository at this point in the history
Use a bitswap session for 'Cat'
  • Loading branch information
whyrusleeping authored Feb 3, 2018
2 parents 3f2c774 + a703e2d commit 79072dc
Show file tree
Hide file tree
Showing 12 changed files with 220 additions and 14 deletions.
18 changes: 14 additions & 4 deletions core/coreapi/coreapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ import (

core "github.com/ipfs/go-ipfs/core"
coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface"
namesys "github.com/ipfs/go-ipfs/namesys"
ipfspath "github.com/ipfs/go-ipfs/path"
uio "github.com/ipfs/go-ipfs/unixfs/io"

cid "gx/ipfs/QmcZfnkapfECQGcLZaf9B79NRg7cRa9EnZh4LSbkCzwNvY/go-cid"
ipld "gx/ipfs/Qme5bWv7wtjUNGsK2BNGVUFPKiuxWrsqrtvYwCLRw8YFES/go-ipld-format"
)

type CoreAPI struct {
Expand Down Expand Up @@ -49,12 +51,16 @@ func (api *CoreAPI) Object() coreiface.ObjectAPI {
// ResolveNode resolves the path `p` using Unixfx resolver, gets and returns the
// resolved Node.
func (api *CoreAPI) ResolveNode(ctx context.Context, p coreiface.Path) (coreiface.Node, error) {
p, err := api.ResolvePath(ctx, p)
return resolveNode(ctx, api.node.DAG, api.node.Namesys, p)
}

func resolveNode(ctx context.Context, ng ipld.NodeGetter, nsys namesys.NameSystem, p coreiface.Path) (coreiface.Node, error) {
p, err := resolvePath(ctx, ng, nsys, p)
if err != nil {
return nil, err
}

node, err := api.node.DAG.Get(ctx, p.Cid())
node, err := ng.Get(ctx, p.Cid())
if err != nil {
return nil, err
}
Expand All @@ -65,17 +71,21 @@ func (api *CoreAPI) ResolveNode(ctx context.Context, p coreiface.Path) (coreifac
// resolved path.
// TODO: store all of ipfspath.Resolver.ResolvePathComponents() in Path
func (api *CoreAPI) ResolvePath(ctx context.Context, p coreiface.Path) (coreiface.Path, error) {
return resolvePath(ctx, api.node.DAG, api.node.Namesys, p)
}

func resolvePath(ctx context.Context, ng ipld.NodeGetter, nsys namesys.NameSystem, p coreiface.Path) (coreiface.Path, error) {
if p.Resolved() {
return p, nil
}

r := &ipfspath.Resolver{
DAG: api.node.DAG,
DAG: ng,
ResolveOnce: uio.ResolveUnixfsOnce,
}

p2 := ipfspath.FromString(p.String())
node, err := core.Resolve(ctx, api.node.Namesys, r, p2)
node, err := core.Resolve(ctx, nsys, r, p2)
if err == core.ErrNoNamesys {
return nil, coreiface.ErrOffline
} else if err != nil {
Expand Down
7 changes: 5 additions & 2 deletions core/coreapi/unixfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface"
coreunix "github.com/ipfs/go-ipfs/core/coreunix"
dag "github.com/ipfs/go-ipfs/merkledag"
uio "github.com/ipfs/go-ipfs/unixfs/io"

cid "gx/ipfs/QmcZfnkapfECQGcLZaf9B79NRg7cRa9EnZh4LSbkCzwNvY/go-cid"
Expand All @@ -30,12 +31,14 @@ func (api *UnixfsAPI) Add(ctx context.Context, r io.Reader) (coreiface.Path, err

// Cat returns the data contained by an IPFS or IPNS object(s) at path `p`.
func (api *UnixfsAPI) Cat(ctx context.Context, p coreiface.Path) (coreiface.Reader, error) {
dagnode, err := api.core().ResolveNode(ctx, p)
ses := dag.NewSession(ctx, api.node.DAG)

dagnode, err := resolveNode(ctx, ses, api.node.Namesys, p)
if err != nil {
return nil, err
}

r, err := uio.NewDagReader(ctx, dagnode, api.node.DAG)
r, err := uio.NewDagReader(ctx, dagnode, ses)
if err == uio.ErrIsDir {
return nil, coreiface.ErrIsDir
} else if err != nil {
Expand Down
41 changes: 41 additions & 0 deletions merkledag/errservice.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package merkledag

import (
"context"

cid "gx/ipfs/QmcZfnkapfECQGcLZaf9B79NRg7cRa9EnZh4LSbkCzwNvY/go-cid"
ipld "gx/ipfs/Qme5bWv7wtjUNGsK2BNGVUFPKiuxWrsqrtvYwCLRw8YFES/go-ipld-format"
)

// ErrorService implements ipld.DAGService, returning 'Err' for every call.
type ErrorService struct {
Err error
}

var _ ipld.DAGService = (*ErrorService)(nil)

func (cs *ErrorService) Add(ctx context.Context, nd ipld.Node) error {
return cs.Err
}

func (cs *ErrorService) AddMany(ctx context.Context, nds []ipld.Node) error {
return cs.Err
}

func (cs *ErrorService) Get(ctx context.Context, c *cid.Cid) (ipld.Node, error) {
return nil, cs.Err
}

func (cs *ErrorService) GetMany(ctx context.Context, cids []*cid.Cid) <-chan *ipld.NodeOption {
ch := make(chan *ipld.NodeOption)
close(ch)
return ch
}

func (cs *ErrorService) Remove(ctx context.Context, c *cid.Cid) error {
return cs.Err
}

func (cs *ErrorService) RemoveMany(ctx context.Context, cids []*cid.Cid) error {
return cs.Err
}
5 changes: 5 additions & 0 deletions merkledag/merkledag.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,11 @@ func (sg *sesGetter) GetMany(ctx context.Context, keys []*cid.Cid) <-chan *ipld.
return getNodesFromBG(ctx, sg.bs, keys)
}

// Session returns a NodeGetter using a new session for block fetches.
func (ds *dagService) Session(ctx context.Context) ipld.NodeGetter {
return &sesGetter{bserv.NewSession(ctx, ds.Blocks)}
}

// FetchGraph fetches all nodes that are children of the given node
func FetchGraph(ctx context.Context, root *cid.Cid, serv ipld.DAGService) error {
var ng ipld.NodeGetter = serv
Expand Down
20 changes: 20 additions & 0 deletions merkledag/readonly.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package merkledag

import (
"fmt"

ipld "gx/ipfs/Qme5bWv7wtjUNGsK2BNGVUFPKiuxWrsqrtvYwCLRw8YFES/go-ipld-format"
)

// ErrReadOnly is used when a read-only datastructure is written to.
var ErrReadOnly = fmt.Errorf("cannot write to readonly DAGService")

// NewReadOnlyDagService takes a NodeGetter, and returns a full DAGService
// implementation that returns ErrReadOnly when its 'write' methods are
// invoked.
func NewReadOnlyDagService(ng ipld.NodeGetter) ipld.DAGService {
return &ComboService{
Read: ng,
Write: &ErrorService{ErrReadOnly},
}
}
64 changes: 64 additions & 0 deletions merkledag/readonly_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package merkledag_test

import (
"context"
"testing"

. "github.com/ipfs/go-ipfs/merkledag"
dstest "github.com/ipfs/go-ipfs/merkledag/test"

cid "gx/ipfs/QmcZfnkapfECQGcLZaf9B79NRg7cRa9EnZh4LSbkCzwNvY/go-cid"
ipld "gx/ipfs/Qme5bWv7wtjUNGsK2BNGVUFPKiuxWrsqrtvYwCLRw8YFES/go-ipld-format"
)

func TestReadonlyProperties(t *testing.T) {
ds := dstest.Mock()
ro := NewReadOnlyDagService(ds)

ctx := context.Background()
nds := []ipld.Node{
NewRawNode([]byte("foo1")),
NewRawNode([]byte("foo2")),
NewRawNode([]byte("foo3")),
NewRawNode([]byte("foo4")),
}
cids := []*cid.Cid{
nds[0].Cid(),
nds[1].Cid(),
nds[2].Cid(),
nds[3].Cid(),
}

// add to the actual underlying datastore
if err := ds.Add(ctx, nds[2]); err != nil {
t.Fatal(err)
}
if err := ds.Add(ctx, nds[3]); err != nil {
t.Fatal(err)
}

if err := ro.Add(ctx, nds[0]); err != ErrReadOnly {
t.Fatal("expected ErrReadOnly")
}
if err := ro.Add(ctx, nds[2]); err != ErrReadOnly {
t.Fatal("expected ErrReadOnly")
}

if err := ro.AddMany(ctx, nds[0:1]); err != ErrReadOnly {
t.Fatal("expected ErrReadOnly")
}

if err := ro.Remove(ctx, cids[3]); err != ErrReadOnly {
t.Fatal("expected ErrReadOnly")
}
if err := ro.RemoveMany(ctx, cids[1:2]); err != ErrReadOnly {
t.Fatal("expected ErrReadOnly")
}

if _, err := ro.Get(ctx, cids[0]); err != ipld.ErrNotFound {
t.Fatal("expected ErrNotFound")
}
if _, err := ro.Get(ctx, cids[3]); err != nil {
t.Fatal(err)
}
}
41 changes: 41 additions & 0 deletions merkledag/rwservice.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package merkledag

import (
"context"

cid "gx/ipfs/QmcZfnkapfECQGcLZaf9B79NRg7cRa9EnZh4LSbkCzwNvY/go-cid"
ipld "gx/ipfs/Qme5bWv7wtjUNGsK2BNGVUFPKiuxWrsqrtvYwCLRw8YFES/go-ipld-format"
)

// ComboService implements ipld.DAGService, using 'Read' for all fetch methods,
// and 'Write' for all methods that add new objects.
type ComboService struct {
Read ipld.NodeGetter
Write ipld.DAGService
}

var _ ipld.DAGService = (*ComboService)(nil)

func (cs *ComboService) Add(ctx context.Context, nd ipld.Node) error {
return cs.Write.Add(ctx, nd)
}

func (cs *ComboService) AddMany(ctx context.Context, nds []ipld.Node) error {
return cs.Write.AddMany(ctx, nds)
}

func (cs *ComboService) Get(ctx context.Context, c *cid.Cid) (ipld.Node, error) {
return cs.Read.Get(ctx, c)
}

func (cs *ComboService) GetMany(ctx context.Context, cids []*cid.Cid) <-chan *ipld.NodeOption {
return cs.Read.GetMany(ctx, cids)
}

func (cs *ComboService) Remove(ctx context.Context, c *cid.Cid) error {
return cs.Write.Remove(ctx, c)
}

func (cs *ComboService) RemoveMany(ctx context.Context, cids []*cid.Cid) error {
return cs.Write.RemoveMany(ctx, cids)
}
21 changes: 21 additions & 0 deletions merkledag/session.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package merkledag

import (
"context"

ipld "gx/ipfs/Qme5bWv7wtjUNGsK2BNGVUFPKiuxWrsqrtvYwCLRw8YFES/go-ipld-format"
)

// SessionMaker is an object that can generate a new fetching session.
type SessionMaker interface {
Session(context.Context) ipld.NodeGetter
}

// NewSession returns a session backed NodeGetter if the given NodeGetter
// implements SessionMaker.
func NewSession(ctx context.Context, g ipld.NodeGetter) ipld.NodeGetter {
if sm, ok := g.(SessionMaker); ok {
return sm.Session(ctx)
}
return g
}
6 changes: 3 additions & 3 deletions path/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ func (e ErrNoLink) Error() string {
// TODO: now that this is more modular, try to unify this code with the
// the resolvers in namesys
type Resolver struct {
DAG ipld.DAGService
DAG ipld.NodeGetter

ResolveOnce func(ctx context.Context, ds ipld.DAGService, nd ipld.Node, names []string) (*ipld.Link, []string, error)
ResolveOnce func(ctx context.Context, ds ipld.NodeGetter, nd ipld.Node, names []string) (*ipld.Link, []string, error)
}

// NewBasicResolver constructs a new basic resolver.
Expand Down Expand Up @@ -124,7 +124,7 @@ func (s *Resolver) ResolvePath(ctx context.Context, fpath Path) (ipld.Node, erro

// ResolveSingle simply resolves one hop of a path through a graph with no
// extra context (does not opaquely resolve through sharded nodes)
func ResolveSingle(ctx context.Context, ds ipld.DAGService, nd ipld.Node, names []string) (*ipld.Link, []string, error) {
func ResolveSingle(ctx context.Context, ds ipld.NodeGetter, nd ipld.Node, names []string) (*ipld.Link, []string, error) {
return nd.ResolveLink(names)
}

Expand Down
2 changes: 1 addition & 1 deletion unixfs/io/dagreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type ReadSeekCloser interface {

// NewDagReader creates a new reader object that reads the data represented by
// the given node, using the passed in DAGService for data retreival
func NewDagReader(ctx context.Context, n ipld.Node, serv ipld.DAGService) (DagReader, error) {
func NewDagReader(ctx context.Context, n ipld.Node, serv ipld.NodeGetter) (DagReader, error) {
switch n := n.(type) {
case *mdag.RawNode:
return NewBufDagReader(n.RawData()), nil
Expand Down
4 changes: 2 additions & 2 deletions unixfs/io/pbdagreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (

// DagReader provides a way to easily read the data contained in a dag.
type pbDagReader struct {
serv ipld.DAGService
serv ipld.NodeGetter

// the node being read
node *mdag.ProtoNode
Expand Down Expand Up @@ -51,7 +51,7 @@ type pbDagReader struct {
var _ DagReader = (*pbDagReader)(nil)

// NewPBFileReader constructs a new PBFileReader.
func NewPBFileReader(ctx context.Context, n *mdag.ProtoNode, pb *ftpb.Data, serv ipld.DAGService) *pbDagReader {
func NewPBFileReader(ctx context.Context, n *mdag.ProtoNode, pb *ftpb.Data, serv ipld.NodeGetter) *pbDagReader {
fctx, cancel := context.WithCancel(ctx)
curLinks := getLinkCids(n)
return &pbDagReader{
Expand Down
5 changes: 3 additions & 2 deletions unixfs/io/resolve.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

// ResolveUnixfsOnce resolves a single hop of a path through a graph in a
// unixfs context. This includes handling traversing sharded directories.
func ResolveUnixfsOnce(ctx context.Context, ds ipld.DAGService, nd ipld.Node, names []string) (*ipld.Link, []string, error) {
func ResolveUnixfsOnce(ctx context.Context, ds ipld.NodeGetter, nd ipld.Node, names []string) (*ipld.Link, []string, error) {
switch nd := nd.(type) {
case *dag.ProtoNode:
upb, err := ft.FromBytes(nd.Data())
Expand All @@ -28,7 +28,8 @@ func ResolveUnixfsOnce(ctx context.Context, ds ipld.DAGService, nd ipld.Node, na

switch upb.GetType() {
case ft.THAMTShard:
s, err := hamt.NewHamtFromDag(ds, nd)
rods := dag.NewReadOnlyDagService(ds)
s, err := hamt.NewHamtFromDag(rods, nd)
if err != nil {
return nil, nil, err
}
Expand Down

0 comments on commit 79072dc

Please sign in to comment.