Skip to content

Commit

Permalink
Implement xrate (extended rate), xincrease and xdelta.
Browse files Browse the repository at this point in the history
  • Loading branch information
alin-at-dfinity committed Jun 28, 2020
1 parent c448ada commit 9c3a6bf
Show file tree
Hide file tree
Showing 4 changed files with 408 additions and 38 deletions.
111 changes: 83 additions & 28 deletions promql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -627,23 +627,30 @@ func (ng *Engine) cumulativeSubqueryOffset(path []parser.Node) time.Duration {
func (ng *Engine) findMinTime(s *parser.EvalStmt) time.Time {
var maxOffset time.Duration
parser.Inspect(s.Expr, func(node parser.Node, path []parser.Node) error {
var nodeOffset time.Duration
subqOffset := ng.cumulativeSubqueryOffset(path)
switch n := node.(type) {
case *parser.VectorSelector:
if maxOffset < ng.lookbackDelta+subqOffset {
maxOffset = ng.lookbackDelta + subqOffset
}
if n.Offset+ng.lookbackDelta+subqOffset > maxOffset {
maxOffset = n.Offset + ng.lookbackDelta + subqOffset
nodeOffset += ng.lookbackDelta + subqOffset
if n.Offset > 0 {
nodeOffset += n.Offset
}
case *parser.MatrixSelector:
if maxOffset < n.Range+subqOffset {
maxOffset = n.Range + subqOffset
nodeOffset += n.Range + subqOffset
if m := n.VectorSelector.(*parser.VectorSelector).Offset; m > 0 {
nodeOffset += m
}
if m := n.VectorSelector.(*parser.VectorSelector).Offset + n.Range + subqOffset; m > maxOffset {
maxOffset = m
// Include an extra lookbackDelta iff this is the argument to an
// extended range function. Extended ranges include one extra
// point, this is how far back we need to look for it.
f, ok := parser.Functions[extractFuncFromPath(path)]
if ok && f.ExtRange {
nodeOffset += ng.lookbackDelta
}
}
if maxOffset < nodeOffset {
maxOffset = nodeOffset
}
return nil
})
return s.Start.Add(-maxOffset)
Expand Down Expand Up @@ -678,18 +685,26 @@ func (ng *Engine) populateSeries(ctx context.Context, querier storage.Querier, s

switch n := node.(type) {
case *parser.VectorSelector:
hints.Func = extractFuncFromPath(path)
hints.By, hints.Grouping = extractGroupsFromPath(path)

if evalRange == 0 {
hints.Start = hints.Start - durationMilliseconds(ng.lookbackDelta)
} else {
hints.Range = durationMilliseconds(evalRange)
// For all matrix queries we want to ensure that we have (end-start) + range selected
// this way we have `range` data before the start time
hints.Start = hints.Start - durationMilliseconds(evalRange)
// Include an extra lookbackDelta iff this is the argument to an
// extended range function. Extended ranges include one extra
// point, this is how far back we need to look for it.
f, ok := parser.Functions[hints.Func]
if ok && f.ExtRange {
hints.Start = hints.Start - durationMilliseconds(ng.lookbackDelta)
}
evalRange = 0
}

hints.Func = extractFuncFromPath(path)
hints.By, hints.Grouping = extractGroupsFromPath(path)
if n.Offset > 0 {
offsetMilliseconds := durationMilliseconds(n.Offset)
hints.Start = hints.Start - offsetMilliseconds
Expand Down Expand Up @@ -1108,7 +1123,15 @@ func (ev *evaluator) eval(expr parser.Expr) parser.Value {
mat := make(Matrix, 0, len(selVS.Series)) // Output matrix.
offset := durationMilliseconds(selVS.Offset)
selRange := durationMilliseconds(sel.Range)
bufferRange := selRange
stepRange := selRange
// Include an extra lookbackDelta iff this is an extended
// range function. Extended ranges include one extra point,
// this is how far back we need to look for it.
if e.Func.ExtRange {
bufferRange += durationMilliseconds(ev.lookbackDelta)
stepRange += durationMilliseconds(ev.lookbackDelta)
}
if stepRange > ev.interval {
stepRange = ev.interval
}
Expand All @@ -1118,7 +1141,7 @@ func (ev *evaluator) eval(expr parser.Expr) parser.Value {
inArgs[matrixArgIndex] = inMatrix
enh := &EvalNodeHelper{out: make(Vector, 0, 1)}
// Process all the calls for one time series at a time.
it := storage.NewBuffer(selRange)
it := storage.NewBuffer(bufferRange)
for i, s := range selVS.Series {
ev.currentSamples -= len(points)
points = points[:0]
Expand All @@ -1144,7 +1167,7 @@ func (ev *evaluator) eval(expr parser.Expr) parser.Value {
maxt := ts - offset
mint := maxt - selRange
// Evaluate the matrix selector for this series for this step.
points = ev.matrixIterSlice(it, mint, maxt, points)
points = ev.matrixIterSlice(it, mint, maxt, e.Func.ExtRange, points)
if len(points) == 0 {
continue
}
Expand Down Expand Up @@ -1453,7 +1476,7 @@ func (ev *evaluator) matrixSelector(node *parser.MatrixSelector) Matrix {
Metric: series[i].Labels(),
}

ss.Points = ev.matrixIterSlice(it, mint, maxt, getPointSlice(16))
ss.Points = ev.matrixIterSlice(it, mint, maxt, false, getPointSlice(16))

if len(ss.Points) > 0 {
matrix = append(matrix, ss)
Expand All @@ -1469,24 +1492,39 @@ func (ev *evaluator) matrixSelector(node *parser.MatrixSelector) Matrix {
//
// As an optimization, the matrix vector may already contain points of the same
// time series from the evaluation of an earlier step (with lower mint and maxt
// values). Any such points falling before mint are discarded; points that fall
// into the [mint, maxt] range are retained; only points with later timestamps
// are populated from the iterator.
func (ev *evaluator) matrixIterSlice(it *storage.BufferedSeriesIterator, mint, maxt int64, out []Point) []Point {
if len(out) > 0 && out[len(out)-1].T >= mint {
// values). Any such points falling before mint (except the last, when extRange
// is true) are discarded; points that fall into the [mint, maxt] range are
// retained; only points with later timestamps are populated from the iterator.
func (ev *evaluator) matrixIterSlice(it *storage.BufferedSeriesIterator, mint, maxt int64, extRange bool, out []Point) []Point {
extMint := mint - durationMilliseconds(ev.lookbackDelta)
if len(out) > 0 && (out[len(out)-1].T >= mint || (extRange && out[len(out)-1].T >= extMint)) {
// There is an overlap between previous and current ranges, retain common
// points. In most such cases:
// (a) the overlap is significantly larger than the eval step; and/or
// (b) the number of samples is relatively small.
// so a linear search will be as fast as a binary search.
var drop int
for drop = 0; out[drop].T < mint; drop++ {
if !extRange {
for drop = 0; out[drop].T < mint; drop++ {
}
// Only append points with timestamps after the last timestamp we have.
mint = out[len(out)-1].T + 1
} else {
// This is an argument to an extended range function, first go past mint.
for drop = 0; drop < len(out) && out[drop].T <= mint; drop++ {
}
// Then, go back one sample if within lookbackDelta of mint.
if drop > 0 && out[drop-1].T >= extMint {
drop--
}
if out[len(out)-1].T >= mint {
// Only append points with timestamps after the last timestamp we have.
mint = out[len(out)-1].T + 1
}
}
ev.currentSamples -= drop
copy(out, out[drop:])
out = out[:len(out)-drop]
// Only append points with timestamps after the last timestamp we have.
mint = out[len(out)-1].T + 1
} else {
ev.currentSamples -= len(out)
out = out[:0]
Expand All @@ -1500,18 +1538,35 @@ func (ev *evaluator) matrixIterSlice(it *storage.BufferedSeriesIterator, mint, m
}

buf := it.Buffer()
appendedPointBeforeMint := len(out) > 0
for buf.Next() {
t, v := buf.At()
if value.IsStaleNaN(v) {
continue
}
// Values in the buffer are guaranteed to be smaller than maxt.
if t >= mint {
if ev.currentSamples >= ev.maxSamples {
ev.error(ErrTooManySamples(env))
if !extRange {
// Values in the buffer are guaranteed to be smaller than maxt.
if t >= mint {
if ev.currentSamples >= ev.maxSamples {
ev.error(ErrTooManySamples(env))
}
out = append(out, Point{T: t, V: v})
ev.currentSamples++
}
} else {
// This is the argument to an extended range function: if any point
// exists at or before range start, add it and then keep replacing
// it with later points while not yet (strictly) inside the range.
if t > mint || !appendedPointBeforeMint {
if ev.currentSamples >= ev.maxSamples {
ev.error(ErrTooManySamples(env))
}
out = append(out, Point{T: t, V: v})
ev.currentSamples++
appendedPointBeforeMint = true
} else {
out[len(out)-1] = Point{T: t, V: v}
}
out = append(out, Point{T: t, V: v})
ev.currentSamples++
}
}
// The seeked sample might also be in the range.
Expand Down
112 changes: 112 additions & 0 deletions promql/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
package promql

import (
"fmt"
"math"
"os"
"regexp"
"sort"
"strconv"
Expand Down Expand Up @@ -131,6 +133,73 @@ func extrapolatedRate(vals []parser.Value, args parser.Expressions, enh *EvalNod
})
}

// extendedRate is a utility function for xrate/xincrease/xdelta.
// It calculates the rate (allowing for counter resets if isCounter is true),
// taking into account the last sample before the range start, and returns
// the result as either per-second (if isRate is true) or overall.
func extendedRate(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper, isCounter bool, isRate bool) Vector {
ms := args[0].(*parser.MatrixSelector)
vs := ms.VectorSelector.(*parser.VectorSelector)

var (
samples = vals[0].(Matrix)[0]
rangeStart = enh.ts - durationMilliseconds(ms.Range+vs.Offset)
rangeEnd = enh.ts - durationMilliseconds(vs.Offset)
)

points := samples.Points
if len(points) < 2 {
return enh.out
}
sampledRange := float64(points[len(points)-1].T - points[0].T)
averageInterval := sampledRange / float64(len(points)-1)

firstPoint := 0
// If the point before the range is too far from rangeStart, drop it.
if float64(rangeStart-points[0].T) > averageInterval {
if len(points) < 3 {
return enh.out
}
firstPoint = 1
sampledRange = float64(points[len(points)-1].T - points[1].T)
averageInterval = sampledRange / float64(len(points)-2)
}

var (
counterCorrection float64
lastValue float64
)
if isCounter {
for i := firstPoint; i < len(points); i++ {
sample := points[i]
if sample.V < lastValue {
counterCorrection += lastValue
}
lastValue = sample.V
}
}
resultValue := points[len(points)-1].V - points[firstPoint].V + counterCorrection

// Duration between last sample and boundary of range.
durationToEnd := float64(rangeEnd - points[len(points)-1].T)

// If the points cover the whole range (i.e. they start just before the
// range start and end just before the range end) adjust the value from
// the sampled range to the requested range.
if points[firstPoint].T <= rangeStart && durationToEnd < averageInterval {
adjustToRange := float64(durationMilliseconds(ms.Range))
resultValue = resultValue * (adjustToRange / sampledRange)
}

if isRate {
resultValue = resultValue / ms.Range.Seconds()
}

return append(enh.out, Sample{
Point: Point{V: resultValue},
})
}

// === delta(Matrix parser.ValueTypeMatrix) Vector ===
func funcDelta(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) Vector {
return extrapolatedRate(vals, args, enh, false, false)
Expand All @@ -146,6 +215,21 @@ func funcIncrease(vals []parser.Value, args parser.Expressions, enh *EvalNodeHel
return extrapolatedRate(vals, args, enh, true, false)
}

// === xdelta(Matrix parser.ValueTypeMatrix) Vector ===
func funcXdelta(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) Vector {
return extendedRate(vals, args, enh, false, false)
}

// === xrate(node parser.ValueTypeMatrix) Vector ===
func funcXrate(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) Vector {
return extendedRate(vals, args, enh, true, true)
}

// === xincrease(node parser.ValueTypeMatrix) Vector ===
func funcXincrease(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) Vector {
return extendedRate(vals, args, enh, true, false)
}

// === irate(node parser.ValueTypeMatrix) Vector ===
func funcIrate(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) Vector {
return instantValue(vals, enh.out, true)
Expand Down Expand Up @@ -918,9 +1002,37 @@ var FunctionCalls = map[string]FunctionCall{
"time": funcTime,
"timestamp": funcTimestamp,
"vector": funcVector,
"xdelta": funcXdelta,
"xincrease": funcXincrease,
"xrate": funcXrate,
"year": funcYear,
}

func init() {
// REPLACE_RATE_FUNCS replaces the default rate extrapolation functions
// with xrate functions. This allows for a drop-in replacement and Grafana
// auto-completion, Prometheus tooling, Thanos, etc. should still work as expected.
if os.Getenv("REPLACE_RATE_FUNCS") == "1" {
FunctionCalls["delta"] = FunctionCalls["xdelta"]
FunctionCalls["increase"] = FunctionCalls["xincrease"]
FunctionCalls["rate"] = FunctionCalls["xrate"]
delete(FunctionCalls, "xdelta")
delete(FunctionCalls, "xincrease")
delete(FunctionCalls, "xrate")

parser.Functions["delta"] = parser.Functions["xdelta"]
parser.Functions["increase"] = parser.Functions["xincrease"]
parser.Functions["rate"] = parser.Functions["xrate"]
parser.Functions["delta"].Name = "delta"
parser.Functions["increase"].Name = "increase"
parser.Functions["rate"].Name = "rate"
delete(parser.Functions, "xdelta")
delete(parser.Functions, "xincrease")
delete(parser.Functions, "xrate")
fmt.Println("Successfully replaced rate & friends with xrate & friends (and removed xrate & friends function keys).")
}
}

type vectorByValueHeap Vector

func (s vectorByValueHeap) Len() int {
Expand Down
19 changes: 19 additions & 0 deletions promql/parser/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type Function struct {
ArgTypes []ValueType
Variadic int
ReturnType ValueType
ExtRange bool
}

// Functions is a list of all functions supported by PromQL, including their types.
Expand Down Expand Up @@ -262,6 +263,24 @@ var Functions = map[string]*Function{
ArgTypes: []ValueType{ValueTypeScalar},
ReturnType: ValueTypeVector,
},
"xdelta": {
Name: "xdelta",
ArgTypes: []ValueType{ValueTypeMatrix},
ReturnType: ValueTypeVector,
ExtRange: true,
},
"xincrease": {
Name: "xincrease",
ArgTypes: []ValueType{ValueTypeMatrix},
ReturnType: ValueTypeVector,
ExtRange: true,
},
"xrate": {
Name: "xrate",
ArgTypes: []ValueType{ValueTypeMatrix},
ReturnType: ValueTypeVector,
ExtRange: true,
},
"year": {
Name: "year",
ArgTypes: []ValueType{ValueTypeVector},
Expand Down
Loading

0 comments on commit 9c3a6bf

Please sign in to comment.