Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

datastore: blockstore should retry when it encounters temp errors #3091

Merged
merged 1 commit into from
Aug 22, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 23 additions & 3 deletions core/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import (
"crypto/rand"
"encoding/base64"
"errors"
"os"
"syscall"
"time"

bstore "github.com/ipfs/go-ipfs/blocks/blockstore"
key "github.com/ipfs/go-ipfs/blocks/key"
Expand All @@ -14,12 +17,13 @@ import (
pin "github.com/ipfs/go-ipfs/pin"
repo "github.com/ipfs/go-ipfs/repo"
cfg "github.com/ipfs/go-ipfs/repo/config"
ds "gx/ipfs/QmTxLSvdhwg68WJimdS6icLPhZi28aTp6b7uihC2Yb47Xk/go-datastore"
dsync "gx/ipfs/QmTxLSvdhwg68WJimdS6icLPhZi28aTp6b7uihC2Yb47Xk/go-datastore/sync"

pstore "gx/ipfs/QmQdnfvZQuhdT93LNc5bos52wAmdr3G2p6G8teLJMEN32P/go-libp2p-peerstore"
goprocessctx "gx/ipfs/QmQopLATEYMNg7dVqZRNDfeE2S1yKy8zrRh5xnYiuqeZBn/goprocess/context"
ds "gx/ipfs/QmTxLSvdhwg68WJimdS6icLPhZi28aTp6b7uihC2Yb47Xk/go-datastore"
dsync "gx/ipfs/QmTxLSvdhwg68WJimdS6icLPhZi28aTp6b7uihC2Yb47Xk/go-datastore/sync"
ci "gx/ipfs/QmUWER4r4qMvaCnX5zREcfyiWN7cXN9g3a7fkRqNz8qWPP/go-libp2p-crypto"
retry "gx/ipfs/QmY6UVhgS2ZxhbM5qU23Fnz3daJwfyAuNErd3StmVofnAU/retry-datastore"
context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
)

Expand Down Expand Up @@ -127,14 +131,30 @@ func NewNode(ctx context.Context, cfg *BuildCfg) (*IpfsNode, error) {
return n, nil
}

func isTooManyFDError(err error) bool {
perr, ok := err.(*os.PathError)
if ok && perr.Err == syscall.EMFILE {
return true
}

return false
}

func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error {
// setup local peer ID (private key is loaded in online setup)
if err := n.loadID(); err != nil {
return err
}

rds := &retry.Datastore{
Batching: n.Repo.Datastore(),
Delay: time.Millisecond * 200,
Retries: 6,
TempErrFunc: isTooManyFDError,
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

take that, disk! 💿


var err error
bs := bstore.NewBlockstore(n.Repo.Datastore())
bs := bstore.NewBlockstore(rds)
opts := bstore.DefaultCacheOpts()
conf, err := n.Repo.Config()
if err != nil {
Expand Down
14 changes: 1 addition & 13 deletions exchange/bitswap/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ func (bs *Bitswap) HasBlock(blk blocks.Block) error {
default:
}

err := bs.tryPutBlock(blk, 4) // attempt to store block up to four times
err := bs.blockstore.Put(blk)
if err != nil {
log.Errorf("Error writing block to datastore: %s", err)
return err
Expand All @@ -284,18 +284,6 @@ func (bs *Bitswap) HasBlock(blk blocks.Block) error {
return nil
}

func (bs *Bitswap) tryPutBlock(blk blocks.Block, attempts int) error {
var err error
for i := 0; i < attempts; i++ {
if err = bs.blockstore.Put(blk); err == nil {
break
}

time.Sleep(time.Millisecond * time.Duration(400*(i+1)))
}
return err
}

func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) {
// This call records changes to wantlists, blocks received,
// and number of bytes transfered.
Expand Down
6 changes: 5 additions & 1 deletion exchange/bitswap/bitswap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,12 @@ import (
// well under varying conditions
const kNetworkDelay = 0 * time.Millisecond

func getVirtualNetwork() tn.Network {
return tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
}

func TestClose(t *testing.T) {
vnet := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
vnet := getVirtualNetwork()
sesgen := NewTestSessionGenerator(vnet)
defer sesgen.Close()
bgen := blocksutil.NewBlockGenerator()
Expand Down
12 changes: 12 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,18 @@
"hash": "QmaeHSCBd9XjXxmgHEiKkHtLcMCb2eZsPLKT7bHgBfBkqw",
"name": "go-is-domain",
"version": "1.0.0"
},
{
"author": "whyrusleeping",
"hash": "QmY6UVhgS2ZxhbM5qU23Fnz3daJwfyAuNErd3StmVofnAU",
"name": "retry-datastore",
"version": "1.1.0"
},
{
"author": "whyrusleeping",
"hash": "QmdjfJJFxgqqR9skVZDmgiGrbKomSqxpaw12rjLNim5NYR",
"name": "failstore",
"version": "1.0.0"
}
],
"gxVersion": "0.4.0",
Expand Down