Skip to content

Commit

Permalink
Merge pull request grafana#1699 from gouthamve/caching-fix-1
Browse files Browse the repository at this point in the history
Simplify long-term caching
  • Loading branch information
bboreham authored Oct 17, 2019
2 parents a37dac2 + 57998df commit c11709c
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 62 deletions.
93 changes: 32 additions & 61 deletions schema_caching.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,81 +14,52 @@ type schemaCaching struct {
}

func (s *schemaCaching) GetReadQueriesForMetric(from, through model.Time, userID string, metricName string) ([]IndexQuery, error) {
return s.splitTimesByCacheability(from, through, func(from, through model.Time) ([]IndexQuery, error) {
return s.Schema.GetReadQueriesForMetric(from, through, userID, metricName)
})
queries, err := s.Schema.GetReadQueriesForMetric(from, through, userID, metricName)
if err != nil {
return nil, err
}
return s.setImmutability(from, through, queries), nil
}

func (s *schemaCaching) GetReadQueriesForMetricLabel(from, through model.Time, userID string, metricName string, labelName string) ([]IndexQuery, error) {
return s.splitTimesByCacheability(from, through, func(from, through model.Time) ([]IndexQuery, error) {
return s.Schema.GetReadQueriesForMetricLabel(from, through, userID, metricName, labelName)
})
queries, err := s.Schema.GetReadQueriesForMetricLabel(from, through, userID, metricName, labelName)
if err != nil {
return nil, err
}
return s.setImmutability(from, through, queries), nil
}

func (s *schemaCaching) GetReadQueriesForMetricLabelValue(from, through model.Time, userID string, metricName string, labelName string, labelValue string) ([]IndexQuery, error) {
return s.splitTimesByCacheability(from, through, func(from, through model.Time) ([]IndexQuery, error) {
return s.Schema.GetReadQueriesForMetricLabelValue(from, through, userID, metricName, labelName, labelValue)
})
queries, err := s.Schema.GetReadQueriesForMetricLabelValue(from, through, userID, metricName, labelName, labelValue)
if err != nil {
return nil, err
}
return s.setImmutability(from, through, queries), nil
}

// If the query resulted in series IDs, use this method to find chunks.
func (s *schemaCaching) GetChunksForSeries(from, through model.Time, userID string, seriesID []byte) ([]IndexQuery, error) {
return s.splitTimesByCacheability(from, through, func(from, through model.Time) ([]IndexQuery, error) {
return s.Schema.GetChunksForSeries(from, through, userID, seriesID)
})
}

func (s *schemaCaching) splitTimesByCacheability(from, through model.Time, f func(from, through model.Time) ([]IndexQuery, error)) ([]IndexQuery, error) {
var (
cacheableQueries []IndexQuery
activeQueries []IndexQuery
err error
cacheBefore = model.TimeFromUnix(mtime.Now().Add(-s.cacheOlderThan).Unix())
)

if from.After(cacheBefore) {
activeQueries, err = f(from, through)
if err != nil {
return nil, err
}
} else if through.Before(cacheBefore) {
cacheableQueries, err = f(from, through)
if err != nil {
return nil, err
}
} else {
cacheableQueries, err = f(from, cacheBefore)
if err != nil {
return nil, err
}

activeQueries, err = f(cacheBefore, through)
if err != nil {
return nil, err
}
queries, err := s.Schema.GetChunksForSeries(from, through, userID, seriesID)
if err != nil {
return nil, err
}

return mergeCacheableAndActiveQueries(cacheableQueries, activeQueries), nil
return s.setImmutability(from, through, queries), nil
}

func mergeCacheableAndActiveQueries(cacheableQueries []IndexQuery, activeQueries []IndexQuery) []IndexQuery {
finalQueries := make([]IndexQuery, 0, len(cacheableQueries)+len(activeQueries))

Outer:
for _, cq := range cacheableQueries {
for _, aq := range activeQueries {
// When deduping, the bucket values only influence TableName and HashValue
// and just checking those is enough.
if cq.TableName == aq.TableName && cq.HashValue == aq.HashValue {
continue Outer
}
func (s *schemaCaching) setImmutability(from, through model.Time, queries []IndexQuery) []IndexQuery {
cacheBefore := model.TimeFromUnix(mtime.Now().Add(-s.cacheOlderThan).Unix())

// If the entire query is cacheable then cache it.
// While not super effective stand-alone, when combined with query-frontend and splitting,
// old queries will mostly be all behind boundary.
// To cleanly split cacheable and non-cacheable ranges, we'd need bucket start and end times
// which we don't know.
// See: https://github.com/cortexproject/cortex/issues/1698
if through.Before(cacheBefore) {
for i := range queries {
queries[i].Immutable = true
}

cq.Immutable = true
finalQueries = append(finalQueries, cq)
}

finalQueries = append(finalQueries, activeQueries...)

return finalQueries
return queries
}
2 changes: 1 addition & 1 deletion schema_caching_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestCachingSchema(t *testing.T) {
// Mix of both.
baseTime.Add(-50 * time.Hour),
baseTime.Add(-2 * time.Hour),
0,
-1,
},
} {
t.Run(strconv.Itoa(i), func(t *testing.T) {
Expand Down

0 comments on commit c11709c

Please sign in to comment.