Skip to content

Commit

Permalink
merkledag: switch to new dag interface
Browse files Browse the repository at this point in the history
Also:

* Update the blockstore/blockservice methods to match.
* Construct a new temporary offline dag instead of having a
  GetOfflineLinkService method.

License: MIT
Signed-off-by: Steven Allen <steven@stebalien.com>
  • Loading branch information
Stebalien committed Jan 25, 2018
1 parent 5202f55 commit 26a8652
Show file tree
Hide file tree
Showing 68 changed files with 493 additions and 654 deletions.
72 changes: 39 additions & 33 deletions blockservice/blockservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"errors"
"fmt"
"io"

"github.com/ipfs/go-ipfs/blocks/blockstore"
exchange "github.com/ipfs/go-ipfs/exchange"
Expand All @@ -21,31 +22,42 @@ var log = logging.Logger("blockservice")

var ErrNotFound = errors.New("blockservice: key not found")

type BlockGetter interface {
// GetBlock gets the requested block.
GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block, error)

// GetBlocks does a batch request for the given cids, returning blocks as
// they are found, in no particular order.
//
// It may not be able to find all requested blocks (or the context may
// be canceled). In that case, it will close the channel early. It is up
// to the consumer to detect this situation and keep track which blocks
// it has received and which it hasn't.
GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan blocks.Block
}

// BlockService is a hybrid block datastore. It stores data in a local
// datastore and may retrieve data from a remote Exchange.
// It uses an internal `datastore.Datastore` instance to store values.
type BlockService interface {
io.Closer
BlockGetter

// Blockstore returns a reference to the underlying blockstore
Blockstore() blockstore.Blockstore

// Exchange returns a reference to the underlying exchange (usually bitswap)
Exchange() exchange.Interface

// AddBlock puts a given block to the underlying datastore
AddBlock(o blocks.Block) (*cid.Cid, error)
AddBlock(o blocks.Block) error

// AddBlocks adds a slice of blocks at the same time using batching
// capabilities of the underlying datastore whenever possible.
AddBlocks(bs []blocks.Block) ([]*cid.Cid, error)

GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block, error)
DeleteBlock(o blocks.Block) error
AddBlocks(bs []blocks.Block) error

// GetBlocks does a batch request for the given cids, returning blocks as
// they are found, in no particular order.
GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan blocks.Block

Close() error
// DeleteBlock deletes the given block from the blockservice.
DeleteBlock(o *cid.Cid) error
}

type blockService struct {
Expand Down Expand Up @@ -110,38 +122,34 @@ func NewSession(ctx context.Context, bs BlockService) *Session {

// AddBlock adds a particular block to the service, Putting it into the datastore.
// TODO pass a context into this if the remote.HasBlock is going to remain here.
func (s *blockService) AddBlock(o blocks.Block) (*cid.Cid, error) {
func (s *blockService) AddBlock(o blocks.Block) error {
c := o.Cid()
if s.checkFirst {
has, err := s.blockstore.Has(c)
if err != nil {
return nil, err
}

if has {
return c, nil
if has, err := s.blockstore.Has(c); has || err != nil {
return err
}
}

err := s.blockstore.Put(o)
if err != nil {
return nil, err
if err := s.blockstore.Put(o); err != nil {
return err
}

if err := s.exchange.HasBlock(o); err != nil {
return nil, errors.New("blockservice is closed")
// TODO(stebalien): really an error?
return errors.New("blockservice is closed")
}

return c, nil
return nil
}

func (s *blockService) AddBlocks(bs []blocks.Block) ([]*cid.Cid, error) {
func (s *blockService) AddBlocks(bs []blocks.Block) error {
var toput []blocks.Block
if s.checkFirst {
toput = make([]blocks.Block, 0, len(bs))
for _, b := range bs {
has, err := s.blockstore.Has(b.Cid())
if err != nil {
return nil, err
return err
}
if !has {
toput = append(toput, b)
Expand All @@ -153,18 +161,16 @@ func (s *blockService) AddBlocks(bs []blocks.Block) ([]*cid.Cid, error) {

err := s.blockstore.PutMany(toput)
if err != nil {
return nil, err
return err
}

var ks []*cid.Cid
for _, o := range toput {
if err := s.exchange.HasBlock(o); err != nil {
return nil, fmt.Errorf("blockservice is closed (%s)", err)
// TODO(stebalien): Should this really *return*?
return fmt.Errorf("blockservice is closed (%s)", err)
}

ks = append(ks, o.Cid())
}
return ks, nil
return nil
}

// GetBlock retrieves a particular block from the service,
Expand Down Expand Up @@ -256,8 +262,8 @@ func getBlocks(ctx context.Context, ks []*cid.Cid, bs blockstore.Blockstore, f e
}

// DeleteBlock deletes a block in the blockservice from the datastore
func (s *blockService) DeleteBlock(o blocks.Block) error {
return s.blockstore.DeleteBlock(o.Cid())
func (s *blockService) DeleteBlock(c *cid.Cid) error {
return s.blockstore.DeleteBlock(c)
}

func (s *blockService) Close() error {
Expand Down
6 changes: 1 addition & 5 deletions blockservice/test/blocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,12 @@ func TestBlocks(t *testing.T) {
t.Error("Block key and data multihash key not equal")
}

k, err := bs.AddBlock(o)
err := bs.AddBlock(o)
if err != nil {
t.Error("failed to add block to BlockService", err)
return
}

if !k.Equals(o.Cid()) {
t.Error("returned key is not equal to block key", err)
}

ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
b2, err := bs.GetBlock(ctx, o.Cid())
Expand Down
4 changes: 2 additions & 2 deletions core/commands/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,14 +199,14 @@ It reads from stdin, and <key> is a base58 encoded multihash.
return
}

k, err := n.Blocks.AddBlock(b)
err = n.Blocks.AddBlock(b)
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
}

err = cmds.EmitOnce(res, &BlockStat{
Key: k.String(),
Key: b.Cid().String(),
Size: len(data),
})
if err != nil {
Expand Down
5 changes: 3 additions & 2 deletions core/commands/dag/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
cid "gx/ipfs/QmcZfnkapfECQGcLZaf9B79NRg7cRa9EnZh4LSbkCzwNvY/go-cid"
cmdkit "gx/ipfs/QmceUdzxkimdYsgtX733uNgzf1DLHyBKN6ehGSp85ayppM/go-ipfs-cmdkit"
files "gx/ipfs/QmceUdzxkimdYsgtX733uNgzf1DLHyBKN6ehGSp85ayppM/go-ipfs-cmdkit/files"
node "gx/ipfs/Qme5bWv7wtjUNGsK2BNGVUFPKiuxWrsqrtvYwCLRw8YFES/go-ipld-format"
)

var log = logging.Logger("cmds/files")
Expand Down Expand Up @@ -102,7 +103,7 @@ into an object of the specified format.

addAllAndPin := func(f files.File) error {
cids := cid.NewSet()
b := n.DAG.Batch()
b := node.NewBatch(req.Context(), n.DAG)

for {
file, err := f.NextFile()
Expand All @@ -122,7 +123,7 @@ into an object of the specified format.
}

for _, nd := range nds {
_, err := b.Add(nd)
err := b.Add(nd)
if err != nil {
return err
}
Expand Down
3 changes: 2 additions & 1 deletion core/commands/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
peer "gx/ipfs/Qma7H6RW8wRrfZpNSXwxYGcd1E149s42FpWNpDNieSVrnU/go-libp2p-peer"
cid "gx/ipfs/QmcZfnkapfECQGcLZaf9B79NRg7cRa9EnZh4LSbkCzwNvY/go-cid"
"gx/ipfs/QmceUdzxkimdYsgtX733uNgzf1DLHyBKN6ehGSp85ayppM/go-ipfs-cmdkit"
node "gx/ipfs/Qme5bWv7wtjUNGsK2BNGVUFPKiuxWrsqrtvYwCLRw8YFES/go-ipld-format"
pstore "gx/ipfs/QmeZVQzUrXqaszo24DAoHfGzcmCptN9JyngLkGAiEfk2x7/go-libp2p-peerstore"
ipdht "gx/ipfs/QmfChjky1VNaHUQR9F2xqR1QEyX45pqU78nhsoq5GDYoKL/go-libp2p-kad-dht"
)
Expand Down Expand Up @@ -377,7 +378,7 @@ func provideKeys(ctx context.Context, r routing.IpfsRouting, cids []*cid.Cid) er
return nil
}

func provideKeysRec(ctx context.Context, r routing.IpfsRouting, dserv dag.DAGService, cids []*cid.Cid) error {
func provideKeysRec(ctx context.Context, r routing.IpfsRouting, dserv node.DAGService, cids []*cid.Cid) error {
provided := cid.NewSet()
for _, c := range cids {
kset := cid.NewSet()
Expand Down
2 changes: 1 addition & 1 deletion core/commands/files/files.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func statGetFormatOptions(req cmds.Request) (string, error) {
}
}

func statNode(ds dag.DAGService, fsn mfs.FSNode) (*Object, error) {
func statNode(ds node.DAGService, fsn mfs.FSNode) (*Object, error) {
nd, err := fsn.GetNode()
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion core/commands/ls.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ The JSON output contains type information.
t := unixfspb.Data_DataType(-1)

linkNode, err := link.GetNode(req.Context(), dserv)
if err == merkledag.ErrNotFound && !resolve {
if err == node.ErrNotFound && !resolve {
// not an error
linkNode = nil
} else if err != nil {
Expand Down
11 changes: 6 additions & 5 deletions core/commands/object/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package objectcmd

import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"encoding/xml"
Expand Down Expand Up @@ -422,7 +423,7 @@ And then run:
defer n.Blockstore.PinLock().Unlock()
}

objectCid, err := objectPut(n, input, inputenc, datafieldenc)
objectCid, err := objectPut(req.Context(), n, input, inputenc, datafieldenc)
if err != nil {
errType := cmdkit.ErrNormal
if err == ErrUnknownObjectEnc {
Expand Down Expand Up @@ -504,12 +505,12 @@ Available templates:
}
}

k, err := n.DAG.Add(node)
err = n.DAG.Add(req.Context(), node)
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
}
res.SetOutput(&Object{Hash: k.String()})
res.SetOutput(&Object{Hash: node.Cid().String()})
},
Marshalers: cmds.MarshalerMap{
cmds.Text: func(res cmds.Response) (io.Reader, error) {
Expand Down Expand Up @@ -542,7 +543,7 @@ func nodeFromTemplate(template string) (*dag.ProtoNode, error) {
var ErrEmptyNode = errors.New("no data or links in this node")

// objectPut takes a format option, serializes bytes from stdin and updates the dag with that data
func objectPut(n *core.IpfsNode, input io.Reader, encoding string, dataFieldEncoding string) (*cid.Cid, error) {
func objectPut(ctx context.Context, n *core.IpfsNode, input io.Reader, encoding string, dataFieldEncoding string) (*cid.Cid, error) {

data, err := ioutil.ReadAll(io.LimitReader(input, inputLimit+10))
if err != nil {
Expand Down Expand Up @@ -602,7 +603,7 @@ func objectPut(n *core.IpfsNode, input io.Reader, encoding string, dataFieldEnco
return nil, err
}

_, err = n.DAG.Add(dagnode)
err = n.DAG.Add(ctx, dagnode)
if err != nil {
return nil, err
}
Expand Down
12 changes: 6 additions & 6 deletions core/commands/object/patch.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,13 @@ the limit will not be respected by the network.

rtpb.SetData(append(rtpb.Data(), data...))

newkey, err := nd.DAG.Add(rtpb)
err = nd.DAG.Add(req.Context(), rtpb)
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
}

res.SetOutput(&Object{Hash: newkey.String()})
res.SetOutput(&Object{Hash: rtpb.Cid().String()})
},
Type: Object{},
Marshalers: cmds.MarshalerMap{
Expand Down Expand Up @@ -177,13 +177,13 @@ Example:

rtpb.SetData(data)

newkey, err := nd.DAG.Add(rtpb)
err = nd.DAG.Add(req.Context(), rtpb)
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
}

res.SetOutput(&Object{Hash: newkey.String()})
res.SetOutput(&Object{Hash: rtpb.Cid().String()})
},
Type: Object{},
Marshalers: cmds.MarshalerMap{
Expand Down Expand Up @@ -237,7 +237,7 @@ Removes a link by the given name from root.
return
}

nnode, err := e.Finalize(nd.DAG)
nnode, err := e.Finalize(req.Context(), nd.DAG)
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
Expand Down Expand Up @@ -334,7 +334,7 @@ to a file containing 'bar', and returns the hash of the new object.
return
}

nnode, err := e.Finalize(nd.DAG)
nnode, err := e.Finalize(req.Context(), nd.DAG)
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
Expand Down
9 changes: 7 additions & 2 deletions core/commands/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ import (
"io"
"time"

bserv "github.com/ipfs/go-ipfs/blockservice"
cmds "github.com/ipfs/go-ipfs/commands"
core "github.com/ipfs/go-ipfs/core"
e "github.com/ipfs/go-ipfs/core/commands/e"
corerepo "github.com/ipfs/go-ipfs/core/corerepo"
offline "github.com/ipfs/go-ipfs/exchange/offline"
dag "github.com/ipfs/go-ipfs/merkledag"
path "github.com/ipfs/go-ipfs/path"
pin "github.com/ipfs/go-ipfs/pin"
Expand Down Expand Up @@ -555,7 +557,7 @@ func pinLsAll(typeStr string, ctx context.Context, n *core.IpfsNode) (map[string
if typeStr == "indirect" || typeStr == "all" {
set := cid.NewSet()
for _, k := range n.Pinning.RecursiveKeys() {
err := dag.EnumerateChildren(n.Context(), n.DAG.GetLinks, k, set.Visit)
err := dag.EnumerateChildren(n.Context(), dag.GetLinksWithDAG(n.DAG), k, set.Visit)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -594,7 +596,10 @@ type pinVerifyOpts struct {

func pinVerify(ctx context.Context, n *core.IpfsNode, opts pinVerifyOpts) <-chan interface{} {
visited := make(map[string]PinStatus)
getLinks := n.DAG.GetOfflineLinkService().GetLinks

bs := n.Blocks.Blockstore()
DAG := dag.NewDAGService(bserv.New(bs, offline.Exchange(bs)))
getLinks := dag.GetLinksWithDAG(DAG)
recPins := n.Pinning.RecursiveKeys()

var checkPin func(root *cid.Cid) PinStatus
Expand Down
4 changes: 2 additions & 2 deletions core/commands/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,13 @@ This command outputs data in the following encodings:
if discover {
go func() {
blk := blocks.NewBlock([]byte("floodsub:" + topic))
cid, err := n.Blocks.AddBlock(blk)
err := n.Blocks.AddBlock(blk)
if err != nil {
log.Error("pubsub discovery: ", err)
return
}

connectToPubSubPeers(req.Context, n, cid)
connectToPubSubPeers(req.Context, n, blk.Cid())
}()
}

Expand Down
Loading

0 comments on commit 26a8652

Please sign in to comment.