Skip to content

Commit

Permalink
Merge pull request #3294 from ipfs/kevina/dup-prov-cache-fix
Browse files Browse the repository at this point in the history
Create a write-through block service
  • Loading branch information
whyrusleeping authored Oct 12, 2016
2 parents edf2b52 + affb06d commit 711cd8f
Show file tree
Hide file tree
Showing 14 changed files with 149 additions and 65 deletions.
6 changes: 3 additions & 3 deletions blocks/blocksutil/block_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ type BlockGenerator struct {
seq int
}

func (bg *BlockGenerator) Next() blocks.Block {
func (bg *BlockGenerator) Next() *blocks.BasicBlock {
bg.seq++
return blocks.NewBlock([]byte(string(bg.seq)))
}

func (bg *BlockGenerator) Blocks(n int) []blocks.Block {
blocks := make([]blocks.Block, 0)
func (bg *BlockGenerator) Blocks(n int) []*blocks.BasicBlock {
blocks := make([]*blocks.BasicBlock, 0)
for i := 0; i < n; i++ {
b := bg.Next()
blocks = append(blocks, b)
Expand Down
129 changes: 82 additions & 47 deletions blockservice/blockservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@
package blockservice

import (
"context"
"errors"
"fmt"

blocks "github.com/ipfs/go-ipfs/blocks"
"github.com/ipfs/go-ipfs/blocks/blockstore"
exchange "github.com/ipfs/go-ipfs/exchange"

context "context"
logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid"
)
Expand All @@ -23,77 +23,112 @@ var ErrNotFound = errors.New("blockservice: key not found")
// BlockService is a hybrid block datastore. It stores data in a local
// datastore and may retrieve data from a remote Exchange.
// It uses an internal `datastore.Datastore` instance to store values.
type BlockService struct {
// TODO don't expose underlying impl details
Blockstore blockstore.Blockstore
Exchange exchange.Interface
type BlockService interface {
Blockstore() blockstore.Blockstore
Exchange() exchange.Interface
AddBlock(o blocks.Block) (*cid.Cid, error)
AddBlocks(bs []blocks.Block) ([]*cid.Cid, error)
GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block, error)
GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan blocks.Block
DeleteBlock(o blocks.Block) error
Close() error
}

type blockService struct {
blockstore blockstore.Blockstore
exchange exchange.Interface
// If checkFirst is true then first check that a block doesn't
// already exist to avoid republishing the block on the exchange.
checkFirst bool
}

// NewBlockService creates a BlockService with given datastore instance.
func New(bs blockstore.Blockstore, rem exchange.Interface) *BlockService {
func New(bs blockstore.Blockstore, rem exchange.Interface) BlockService {
if rem == nil {
log.Warning("blockservice running in local (offline) mode.")
}

return &blockService{
blockstore: bs,
exchange: rem,
checkFirst: true,
}
}

// NewWriteThrough ceates a BlockService that guarantees writes will go
// through to the blockstore and are not skipped by cache checks.
func NewWriteThrough(bs blockstore.Blockstore, rem exchange.Interface) BlockService {
if rem == nil {
log.Warning("blockservice running in local (offline) mode.")
}

return &BlockService{
Blockstore: bs,
Exchange: rem,
return &blockService{
blockstore: bs,
exchange: rem,
checkFirst: false,
}
}

func (bs *blockService) Blockstore() blockstore.Blockstore {
return bs.blockstore
}

func (bs *blockService) Exchange() exchange.Interface {
return bs.exchange
}

// AddBlock adds a particular block to the service, Putting it into the datastore.
// TODO pass a context into this if the remote.HasBlock is going to remain here.
func (s *BlockService) AddBlock(o blocks.Block) (*cid.Cid, error) {
// TODO: while this is a great optimization, we should think about the
// possibility of streaming writes directly to disk. If we can pass this object
// all the way down to the datastore without having to 'buffer' its data,
// we could implement a `WriteTo` method on it that could do a streaming write
// of the content, saving us (probably) considerable memory.
func (s *blockService) AddBlock(o blocks.Block) (*cid.Cid, error) {
c := o.Cid()
has, err := s.Blockstore.Has(c)
if err != nil {
return nil, err
}
if s.checkFirst {
has, err := s.blockstore.Has(c)
if err != nil {
return nil, err
}

if has {
return c, nil
if has {
return c, nil
}
}

err = s.Blockstore.Put(o)
err := s.blockstore.Put(o)
if err != nil {
return nil, err
}

if err := s.Exchange.HasBlock(o); err != nil {
if err := s.exchange.HasBlock(o); err != nil {
return nil, errors.New("blockservice is closed")
}

return c, nil
}

func (s *BlockService) AddBlocks(bs []blocks.Block) ([]*cid.Cid, error) {
func (s *blockService) AddBlocks(bs []blocks.Block) ([]*cid.Cid, error) {
var toput []blocks.Block
for _, b := range bs {
has, err := s.Blockstore.Has(b.Cid())
if err != nil {
return nil, err
}

if has {
continue
if s.checkFirst {
for _, b := range bs {
has, err := s.blockstore.Has(b.Cid())
if err != nil {
return nil, err
}
if has {
continue
}
toput = append(toput, b)
}

toput = append(toput, b)
} else {
toput = bs
}

err := s.Blockstore.PutMany(toput)
err := s.blockstore.PutMany(toput)
if err != nil {
return nil, err
}

var ks []*cid.Cid
for _, o := range toput {
if err := s.Exchange.HasBlock(o); err != nil {
if err := s.exchange.HasBlock(o); err != nil {
return nil, fmt.Errorf("blockservice is closed (%s)", err)
}

Expand All @@ -104,19 +139,19 @@ func (s *BlockService) AddBlocks(bs []blocks.Block) ([]*cid.Cid, error) {

// GetBlock retrieves a particular block from the service,
// Getting it from the datastore using the key (hash).
func (s *BlockService) GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block, error) {
func (s *blockService) GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block, error) {
log.Debugf("BlockService GetBlock: '%s'", c)

block, err := s.Blockstore.Get(c)
block, err := s.blockstore.Get(c)
if err == nil {
return block, nil
}

if err == blockstore.ErrNotFound && s.Exchange != nil {
if err == blockstore.ErrNotFound && s.exchange != nil {
// TODO be careful checking ErrNotFound. If the underlying
// implementation changes, this will break.
log.Debug("Blockservice: Searching bitswap")
blk, err := s.Exchange.GetBlock(ctx, c)
blk, err := s.exchange.GetBlock(ctx, c)
if err != nil {
if err == blockstore.ErrNotFound {
return nil, ErrNotFound
Expand All @@ -137,13 +172,13 @@ func (s *BlockService) GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block,
// GetBlocks gets a list of blocks asynchronously and returns through
// the returned channel.
// NB: No guarantees are made about order.
func (s *BlockService) GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan blocks.Block {
func (s *blockService) GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan blocks.Block {
out := make(chan blocks.Block, 0)
go func() {
defer close(out)
var misses []*cid.Cid
for _, c := range ks {
hit, err := s.Blockstore.Get(c)
hit, err := s.blockstore.Get(c)
if err != nil {
misses = append(misses, c)
continue
Expand All @@ -160,7 +195,7 @@ func (s *BlockService) GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan bloc
return
}

rblocks, err := s.Exchange.GetBlocks(ctx, misses)
rblocks, err := s.exchange.GetBlocks(ctx, misses)
if err != nil {
log.Debugf("Error with GetBlocks: %s", err)
return
Expand All @@ -178,11 +213,11 @@ func (s *BlockService) GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan bloc
}

// DeleteBlock deletes a block in the blockservice from the datastore
func (s *BlockService) DeleteBlock(o blocks.Block) error {
return s.Blockstore.DeleteBlock(o.Cid())
func (s *blockService) DeleteBlock(o blocks.Block) error {
return s.blockstore.DeleteBlock(o.Cid())
}

func (s *BlockService) Close() error {
func (s *blockService) Close() error {
log.Debug("blockservice is shutting down...")
return s.Exchange.Close()
return s.exchange.Close()
}
49 changes: 49 additions & 0 deletions blockservice/blockservice_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package blockservice

import (
"testing"

"github.com/ipfs/go-ipfs/blocks"
"github.com/ipfs/go-ipfs/blocks/blockstore"
butil "github.com/ipfs/go-ipfs/blocks/blocksutil"
offline "github.com/ipfs/go-ipfs/exchange/offline"

ds "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore"
dssync "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore/sync"
)

func TestWriteThroughWorks(t *testing.T) {
bstore := &PutCountingBlockstore{
blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())),
0,
}
bstore2 := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))
exch := offline.Exchange(bstore2)
bserv := NewWriteThrough(bstore, exch)
bgen := butil.NewBlockGenerator()

block := bgen.Next()

t.Logf("PutCounter: %d", bstore.PutCounter)
bserv.AddBlock(block)
if bstore.PutCounter != 1 {
t.Fatalf("expected just one Put call, have: %d", bstore.PutCounter)
}

bserv.AddBlock(block)
if bstore.PutCounter != 2 {
t.Fatal("Put should have called again, should be 2 is: %d", bstore.PutCounter)
}
}

var _ blockstore.GCBlockstore = (*PutCountingBlockstore)(nil)

type PutCountingBlockstore struct {
blockstore.GCBlockstore
PutCounter int
}

func (bs *PutCountingBlockstore) Put(block blocks.Block) error {
bs.PutCounter++
return bs.GCBlockstore.Put(block)
}
4 changes: 2 additions & 2 deletions blockservice/test/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ import (
)

// Mocks returns |n| connected mock Blockservices
func Mocks(n int) []*BlockService {
func Mocks(n int) []BlockService {
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0))
sg := bitswap.NewTestSessionGenerator(net)

instances := sg.Instances(n)

var servs []*BlockService
var servs []BlockService
for _, i := range instances {
servs = append(servs, New(i.Blockstore(), i.Exchange))
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/ipfs/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ import (
"os"
"path"

context "context"
assets "github.com/ipfs/go-ipfs/assets"
cmds "github.com/ipfs/go-ipfs/commands"
core "github.com/ipfs/go-ipfs/core"
namesys "github.com/ipfs/go-ipfs/namesys"
config "github.com/ipfs/go-ipfs/repo/config"
fsrepo "github.com/ipfs/go-ipfs/repo/fsrepo"
context "context"
)

const (
Expand Down
2 changes: 1 addition & 1 deletion core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ type IpfsNode struct {
// Services
Peerstore pstore.Peerstore // storage for other Peer instances
Blockstore bstore.GCBlockstore // the block store (lower level)
Blocks *bserv.BlockService // the block service, get/add blocks.
Blocks bserv.BlockService // the block service, get/add blocks.
DAG merkledag.DAGService // the merkle dag service, get/add objects.
Resolver *path.Resolver // the path resolution system
Reporter metrics.Reporter
Expand Down
2 changes: 1 addition & 1 deletion core/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ package core
import (
"testing"

context "context"
"github.com/ipfs/go-ipfs/repo"
config "github.com/ipfs/go-ipfs/repo/config"
"github.com/ipfs/go-ipfs/thirdparty/testutil"
context "context"
)

func TestInitialization(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion core/corerepo/stat.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ package corerepo
import (
"fmt"

context "context"
"github.com/ipfs/go-ipfs/core"
fsrepo "github.com/ipfs/go-ipfs/repo/fsrepo"
context "context"
)

type Stat struct {
Expand Down
2 changes: 1 addition & 1 deletion core/coreunix/cat.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package coreunix

import (
context "context"
core "github.com/ipfs/go-ipfs/core"
path "github.com/ipfs/go-ipfs/path"
uio "github.com/ipfs/go-ipfs/unixfs/io"
context "context"
)

func Cat(ctx context.Context, n *core.IpfsNode, pstr string) (*uio.DagReader, error) {
Expand Down
2 changes: 1 addition & 1 deletion fuse/ipns/link_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ package ipns
import (
"os"

"context"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/bazil.org/fuse"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/bazil.org/fuse/fs"
"context"
)

type Link struct {
Expand Down
2 changes: 1 addition & 1 deletion fuse/node/mount_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ import (
"testing"
"time"

context "context"
core "github.com/ipfs/go-ipfs/core"
ipns "github.com/ipfs/go-ipfs/fuse/ipns"
mount "github.com/ipfs/go-ipfs/fuse/mount"
namesys "github.com/ipfs/go-ipfs/namesys"
offroute "github.com/ipfs/go-ipfs/routing/offline"
ci "github.com/ipfs/go-ipfs/thirdparty/testutil/ci"
context "context"
)

func maybeSkipFuseTests(t *testing.T) {
Expand Down
Loading

0 comments on commit 711cd8f

Please sign in to comment.