diff --git a/arc_cache.go b/arc_cache.go index 1e497ab..83f397b 100644 --- a/arc_cache.go +++ b/arc_cache.go @@ -2,6 +2,7 @@ package blockstore import ( "context" + "sync" lru "github.com/hashicorp/golang-lru" blocks "github.com/ipfs/go-block-format" @@ -17,7 +18,9 @@ type cacheSize int // size. This provides block access-time improvements, allowing // to short-cut many searches without querying the underlying datastore. type arccache struct { - cache *lru.TwoQueueCache + cache *lru.TwoQueueCache + lks [256]sync.Mutex + blockstore Blockstore viewer Viewer @@ -33,7 +36,7 @@ func newARCCachedBS(ctx context.Context, bs Blockstore, lruSize int) (*arccache, if err != nil { return nil, err } - c := &arccache{cache: cache, blockstore: bs} + c := &arccache{cache: cache, lks: [256]sync.Mutex{}, blockstore: bs} c.hits = metrics.NewCtx(ctx, "arc.hits_total", "Number of ARC cache hits").Counter() c.total = metrics.NewCtx(ctx, "arc_total", "Total number of ARC cache requests").Counter() if v, ok := bs.(Viewer); ok { @@ -47,6 +50,9 @@ func (b *arccache) DeleteBlock(k cid.Cid) error { return nil } + b.lks[k.Bytes()[len(k.Bytes())-1]].Lock() + defer b.lks[k.Bytes()[len(k.Bytes())-1]].Unlock() + b.cache.Remove(k) // Invalidate cache before deleting. err := b.blockstore.DeleteBlock(k) if err == nil { @@ -59,6 +65,7 @@ func (b *arccache) Has(k cid.Cid) (bool, error) { if has, _, ok := b.queryCache(k); ok { return has, nil } + has, err := b.blockstore.Has(k) if err != nil { return false, err @@ -79,6 +86,10 @@ func (b *arccache) GetSize(k cid.Cid) (int, error) { } // we have it but don't know the size, ask the datastore. } + + b.lks[k.Bytes()[len(k.Bytes())-1]].Lock() + defer b.lks[k.Bytes()[len(k.Bytes())-1]].Unlock() + blockSize, err := b.blockstore.GetSize(k) if err == ErrNotFound { b.cacheHave(k, false) @@ -123,6 +134,9 @@ func (b *arccache) Get(k cid.Cid) (blocks.Block, error) { return nil, ErrNotFound } + b.lks[k.Bytes()[len(k.Bytes())-1]].Lock() + defer b.lks[k.Bytes()[len(k.Bytes())-1]].Unlock() + bl, err := b.blockstore.Get(k) if bl == nil && err == ErrNotFound { b.cacheHave(k, false) @@ -137,6 +151,9 @@ func (b *arccache) Put(bl blocks.Block) error { return nil } + b.lks[bl.Cid().Bytes()[len(bl.Cid().Bytes())-1]].Lock() + defer b.lks[bl.Cid().Bytes()[len(bl.Cid().Bytes())-1]].Unlock() + err := b.blockstore.Put(bl) if err == nil { b.cacheSize(bl.Cid(), len(bl.RawData())) @@ -151,6 +168,8 @@ func (b *arccache) PutMany(bs []blocks.Block) error { // the block isn't in storage if has, _, ok := b.queryCache(block.Cid()); !ok || (ok && !has) { good = append(good, block) + b.lks[block.Cid().Bytes()[len(block.Cid().Bytes())-1]].Lock() + defer b.lks[block.Cid().Bytes()[len(block.Cid().Bytes())-1]].Unlock() } } err := b.blockstore.PutMany(good)