diff --git a/arc_cache.go b/arc_cache.go index 32efdb9..8c22748 100644 --- a/arc_cache.go +++ b/arc_cache.go @@ -20,7 +20,7 @@ type arccache struct { arc *lru.TwoQueueCache arcLks map[cid.Cid]*sync.Mutex - arcLksMu sync.Mutex + arcLksMu sync.RWMutex blockstore Blockstore @@ -74,12 +74,12 @@ func (b *arccache) hasCachedSync(k cid.Cid) (has bool, size int, ok bool, releas } } - // lock the map of cid->locks. - b.arcLksMu.Lock() - defer b.arcLksMu.Unlock() - - // check if we have a lock for this content. + // read lock the map of cid->locks. + // This ensures other CID's can be locked when more than one lock/waiting is held on the same CID. + b.arcLksMu.RLock() lk, hasLk := b.arcLks[k] + b.arcLksMu.RUnlock() + // check if a lock exists for content `k`. if has && hasLk { // cache and lock hit. lk.Lock() @@ -89,13 +89,18 @@ func (b *arccache) hasCachedSync(k cid.Cid) (has bool, size int, ok bool, releas } else if has && !hasLk { // cache hit and lock miss, create the lock, lock it, and add it to the lockMap lk = new(sync.Mutex) + + b.arcLksMu.Lock() b.arcLks[k] = lk + b.arcLksMu.Unlock() lk.Lock() release = func() { lk.Unlock() } } else if !has && hasLk { // cache miss and lock hit, remove lock from map + b.arcLksMu.Lock() delete(b.arcLks, k) + b.arcLksMu.Unlock() } // else cache miss and lock miss, noop return @@ -211,17 +216,15 @@ func (b *arccache) Put(bl blocks.Block) error { } func (b *arccache) PutMany(bs []blocks.Block) error { - // all put, get, and delete operations block on this lock, take it here to avoid lock for each key in `bs`. - b.arcLksMu.Lock() - defer b.arcLksMu.Unlock() - var good []blocks.Block for _, block := range bs { // call put on block if result is inconclusive or we are sure that // the block isn't in storage - if has, _, ok := b.hasCached(block.Cid()); !ok || (ok && !has) { + has, _, ok, release := b.hasCachedSync(block.Cid()) + if !ok || (ok && !has) { good = append(good, block) } + defer release() } err := b.blockstore.PutMany(good) if err != nil {