diff --git a/arc_cache.go b/arc_cache.go index 1e497ab..61d019b 100644 --- a/arc_cache.go +++ b/arc_cache.go @@ -3,6 +3,7 @@ package blockstore import ( "context" + "github.com/gammazero/kmutex" lru "github.com/hashicorp/golang-lru" blocks "github.com/ipfs/go-block-format" cid "github.com/ipfs/go-cid" @@ -17,7 +18,9 @@ type cacheSize int // size. This provides block access-time improvements, allowing // to short-cut many searches without querying the underlying datastore. type arccache struct { - cache *lru.TwoQueueCache + cache *lru.TwoQueueCache + arcKMutex *kmutex.Kmutex + blockstore Blockstore viewer Viewer @@ -33,7 +36,7 @@ func newARCCachedBS(ctx context.Context, bs Blockstore, lruSize int) (*arccache, if err != nil { return nil, err } - c := &arccache{cache: cache, blockstore: bs} + c := &arccache{cache: cache, arcKMutex: kmutex.New(), blockstore: bs} c.hits = metrics.NewCtx(ctx, "arc.hits_total", "Number of ARC cache hits").Counter() c.total = metrics.NewCtx(ctx, "arc_total", "Total number of ARC cache requests").Counter() if v, ok := bs.(Viewer); ok { @@ -47,6 +50,9 @@ func (b *arccache) DeleteBlock(k cid.Cid) error { return nil } + b.arcKMutex.Lock(k) + defer b.arcKMutex.Unlock(k) + b.cache.Remove(k) // Invalidate cache before deleting. err := b.blockstore.DeleteBlock(k) if err == nil { @@ -79,6 +85,10 @@ func (b *arccache) GetSize(k cid.Cid) (int, error) { } // we have it but don't know the size, ask the datastore. } + + b.arcKMutex.Lock(k) + defer b.arcKMutex.Unlock(k) + blockSize, err := b.blockstore.GetSize(k) if err == ErrNotFound { b.cacheHave(k, false) @@ -123,6 +133,9 @@ func (b *arccache) Get(k cid.Cid) (blocks.Block, error) { return nil, ErrNotFound } + b.arcKMutex.Lock(k) + defer b.arcKMutex.Unlock(k) + bl, err := b.blockstore.Get(k) if bl == nil && err == ErrNotFound { b.cacheHave(k, false) @@ -137,6 +150,9 @@ func (b *arccache) Put(bl blocks.Block) error { return nil } + b.arcKMutex.Lock(bl.Cid()) + defer b.arcKMutex.Unlock(bl.Cid()) + err := b.blockstore.Put(bl) if err == nil { b.cacheSize(bl.Cid(), len(bl.RawData())) @@ -151,8 +167,16 @@ func (b *arccache) PutMany(bs []blocks.Block) error { // the block isn't in storage if has, _, ok := b.queryCache(block.Cid()); !ok || (ok && !has) { good = append(good, block) + b.arcKMutex.Lock(block.Cid()) } } + + defer func() { + for _, block := range good { + b.arcKMutex.Unlock(block.Cid()) + } + }() + err := b.blockstore.PutMany(good) if err != nil { return err diff --git a/go.mod b/go.mod index 64675b4..5f02329 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,7 @@ module github.com/ipfs/go-ipfs-blockstore require ( + github.com/gammazero/kmutex v1.0.1 github.com/hashicorp/golang-lru v0.5.4 github.com/ipfs/bbloom v0.0.4 github.com/ipfs/go-block-format v0.0.3 diff --git a/go.sum b/go.sum index cc30010..6731f38 100644 --- a/go.sum +++ b/go.sum @@ -2,6 +2,8 @@ github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/gammazero/kmutex v1.0.1 h1:69XUr2F8TRTnGrg5absRHwfspEJcW7ETro6inQJ21Yo= +github.com/gammazero/kmutex v1.0.1/go.mod h1:Qxgwjh7K2RZFEcVRyjXBOUuHgR1hLskRnns+nzftpVQ= github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE= github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=