Skip to content

Commit

Permalink
convert sync.Mutex to sync.RWMutex in deduper to allow concurrent che…
Browse files Browse the repository at this point in the history
…cks for isSeen
  • Loading branch information
sandeepsukhani committed Feb 15, 2021
1 parent 426b150 commit ca8cbef
Showing 1 changed file with 18 additions and 5 deletions.
23 changes: 18 additions & 5 deletions pkg/storage/stores/shipper/util/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func DoParallelQueries(ctx context.Context, tableQuerier TableQuerier, queries [
type IndexDeduper struct {
callback chunk_util.Callback
seenRangeValues map[string]map[string]struct{}
mtx sync.Mutex
mtx sync.RWMutex
}

func NewIndexDeduper(callback chunk_util.Callback) *IndexDeduper {
Expand All @@ -77,19 +77,32 @@ func (i *IndexDeduper) Callback(query chunk.IndexQuery, batch chunk.ReadBatch) b
}

func (i *IndexDeduper) isSeen(hashValue string, rangeValue []byte) bool {
i.mtx.Lock()
defer i.mtx.Unlock()
i.mtx.RLock()

// index entries are never modified during query processing so it should be safe to reference a byte slice as a string.
rangeValueStr := yoloString(rangeValue)
if _, ok := i.seenRangeValues[hashValue]; !ok {
i.seenRangeValues[hashValue] = map[string]struct{}{}

if _, ok := i.seenRangeValues[hashValue][rangeValueStr]; ok {
i.mtx.RUnlock()
return true
}

i.mtx.RUnlock()

i.mtx.Lock()
defer i.mtx.Unlock()

// re-check if another concurrent call added the values already, if so do not add it again and return true
if _, ok := i.seenRangeValues[hashValue][rangeValueStr]; ok {
return true
}

// add the hashValue first if missing
if _, ok := i.seenRangeValues[hashValue]; !ok {
i.seenRangeValues[hashValue] = map[string]struct{}{}
}

// add the rangeValue
i.seenRangeValues[hashValue][rangeValueStr] = struct{}{}
return false
}
Expand Down

0 comments on commit ca8cbef

Please sign in to comment.