Skip to content

Commit

Permalink
add support for deleting expired aggregates
Browse files Browse the repository at this point in the history
  • Loading branch information
eranra committed Apr 14, 2022
1 parent a1d72e8 commit 77ea806
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 28 deletions.
45 changes: 41 additions & 4 deletions pkg/pipeline/extract/aggregate/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@
package aggregate

import (
"container/list"
"fmt"
"math"
"sort"
"strconv"
"strings"
"sync"
"time"

"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
Expand All @@ -43,7 +46,10 @@ type NormalizedValues string

type Aggregate struct {
Definition api.AggregateDefinition
Groups map[NormalizedValues]*GroupState
GroupsMap map[NormalizedValues]*GroupState
GroupsList *list.List
mutex *sync.Mutex
expiryTime int64
}

type GroupState struct {
Expand All @@ -53,6 +59,8 @@ type GroupState struct {
recentCount int
totalValue float64
totalCount int
lastUpdatedTime int64
listElement *list.Element
}

func (aggregate Aggregate) LabelsFromEntry(entry config.GenericMap) (Labels, bool) {
Expand Down Expand Up @@ -116,7 +124,11 @@ func getInitValue(operation string) float64 {
}

func (aggregate Aggregate) UpdateByEntry(entry config.GenericMap, normalizedValues NormalizedValues) error {
groupState, ok := aggregate.Groups[normalizedValues]

aggregate.mutex.Lock()
defer aggregate.mutex.Unlock()

groupState, ok := aggregate.GroupsMap[normalizedValues]
if !ok {
groupState = &GroupState{normalizedValues: normalizedValues}
initVal := getInitValue(string(aggregate.Definition.Operation))
Expand All @@ -125,7 +137,10 @@ func (aggregate Aggregate) UpdateByEntry(entry config.GenericMap, normalizedValu
if aggregate.Definition.Operation == OperationRawValues {
groupState.recentRawValues = make([]float64, 0)
}
aggregate.Groups[normalizedValues] = groupState
aggregate.GroupsMap[normalizedValues] = groupState
groupState.listElement = aggregate.GroupsList.PushBack(groupState)
} else {
aggregate.GroupsList.MoveToBack(groupState.listElement)
}

// update value
Expand Down Expand Up @@ -164,6 +179,7 @@ func (aggregate Aggregate) UpdateByEntry(entry config.GenericMap, normalizedValu
// update count
groupState.totalCount += 1
groupState.recentCount += 1
groupState.lastUpdatedTime = time.Now().Unix()

return nil
}
Expand All @@ -188,8 +204,11 @@ func (aggregate Aggregate) Evaluate(entries []config.GenericMap) error {
}

func (aggregate Aggregate) GetMetrics() []config.GenericMap {
aggregate.mutex.Lock()
defer aggregate.mutex.Unlock()

var metrics []config.GenericMap
for _, group := range aggregate.Groups {
for _, group := range aggregate.GroupsMap {
metrics = append(metrics, config.GenericMap{
"name": aggregate.Definition.Name,
"operation": aggregate.Definition.Operation,
Expand All @@ -213,3 +232,21 @@ func (aggregate Aggregate) GetMetrics() []config.GenericMap {

return metrics
}

func (aggregate Aggregate) cleanupExpiredEntries() {
nowInSecs := time.Now().Unix()
expireTime := nowInSecs - aggregate.expiryTime

for {
listEntry := aggregate.GroupsList.Front()
if listEntry == nil {
return
}
pCacheInfo := listEntry.Value.(*GroupState)
if pCacheInfo.lastUpdatedTime > expireTime {
return
}
delete(aggregate.GroupsMap, pCacheInfo.normalizedValues)
aggregate.GroupsList.Remove(listEntry)
}
}
14 changes: 10 additions & 4 deletions pkg/pipeline/extract/aggregate/aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
package aggregate

import (
"container/list"
"fmt"
"strconv"
"sync"
"testing"

"github.com/netobserv/flowlogs-pipeline/pkg/api"
Expand All @@ -36,7 +38,10 @@ func GetMockAggregate() Aggregate {
Operation: "avg",
RecordKey: "value",
},
Groups: map[NormalizedValues]*GroupState{},
GroupsMap: map[NormalizedValues]*GroupState{},
GroupsList: list.New(),
mutex: &sync.Mutex{},
expiryTime: 30,
}
return aggregate
}
Expand Down Expand Up @@ -115,9 +120,10 @@ func Test_Evaluate(t *testing.T) {

err := aggregate.Evaluate(entries)

require.Equal(t, err, nil)
require.Equal(t, aggregate.Groups[normalizedValues].totalCount, 2)
require.Equal(t, aggregate.Groups[normalizedValues].totalValue, float64(7))
require.Equal(t, nil, err)
require.Equal(t, 1, aggregate.GroupsList.Len())
require.Equal(t, 2, aggregate.GroupsMap[normalizedValues].totalCount)
require.Equal(t, float64(7), aggregate.GroupsMap[normalizedValues].totalValue)
}

func Test_GetMetrics(t *testing.T) {
Expand Down
67 changes: 50 additions & 17 deletions pkg/pipeline/extract/aggregate/aggregates.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,27 @@
package aggregate

import (
"fmt"
"reflect"
"container/list"
"sync"
"time"

"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils"
log "github.com/sirupsen/logrus"
)

type Aggregates []Aggregate
var defaultExpiryTime = 60 * 10 // 10 minutes

type Aggregates struct {
Aggregates []Aggregate
expiryTime int64
}

type Definitions []api.AggregateDefinition

func (aggregates Aggregates) Evaluate(entries []config.GenericMap) error {
for _, aggregate := range aggregates {
func (aggregates *Aggregates) Evaluate(entries []config.GenericMap) error {
for _, aggregate := range aggregates.Aggregates {
err := aggregate.Evaluate(entries)
if err != nil {
log.Debugf("Evaluate error %v", err)
Expand All @@ -41,41 +49,66 @@ func (aggregates Aggregates) Evaluate(entries []config.GenericMap) error {
return nil
}

func (aggregates Aggregates) GetMetrics() []config.GenericMap {
func (aggregates *Aggregates) GetMetrics() []config.GenericMap {
var metrics []config.GenericMap
for _, aggregate := range aggregates {
for _, aggregate := range aggregates.Aggregates {
aggregateMetrics := aggregate.GetMetrics()
metrics = append(metrics, aggregateMetrics...)
}

return metrics
}

func (aggregates Aggregates) AddAggregate(aggregateDefinition api.AggregateDefinition) Aggregates {
func (aggregates *Aggregates) AddAggregate(aggregateDefinition api.AggregateDefinition) []Aggregate {
aggregate := Aggregate{
Definition: aggregateDefinition,
Groups: map[NormalizedValues]*GroupState{},
GroupsMap: map[NormalizedValues]*GroupState{},
GroupsList: list.New(),
mutex: &sync.Mutex{},
expiryTime: aggregates.expiryTime,
}

appendedAggregates := append(aggregates, aggregate)
appendedAggregates := append(aggregates.Aggregates, aggregate)
return appendedAggregates
}

func (aggregates Aggregates) RemoveAggregate(by api.AggregateBy) (Aggregates, error) {
for i, other := range aggregates {
if reflect.DeepEqual(other.Definition.By, by) {
return append(aggregates[:i], aggregates[i+1:]...), nil
func (aggregates *Aggregates) cleanupExpiredEntriesLoop() {

ticker := time.NewTicker(time.Duration(aggregates.expiryTime) * time.Second)
done := make(chan bool)
utils.RegisterExitChannel(done)
go func() {
for {
select {
case <-done:
return
case <-ticker.C:
aggregates.cleanupExpiredEntries()
}
}
}()
}

func (aggregates *Aggregates) cleanupExpiredEntries() {

for _, aggregate := range aggregates.Aggregates {
aggregate.mutex.Lock()
aggregate.cleanupExpiredEntries()
aggregate.mutex.Unlock()
}
return aggregates, fmt.Errorf("can't find AggregateBy = %v", by)

}

func NewAggregatesFromConfig(definitions []api.AggregateDefinition) (Aggregates, error) {
aggregates := Aggregates{}
aggregates := Aggregates{
expiryTime: int64(defaultExpiryTime),
}

for _, aggregateDefinition := range definitions {
aggregates = aggregates.AddAggregate(aggregateDefinition)
aggregates.Aggregates = aggregates.AddAggregate(aggregateDefinition)
}

aggregates.cleanupExpiredEntriesLoop()

return aggregates, nil
}
29 changes: 26 additions & 3 deletions pkg/pipeline/extract/aggregate/aggregates_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@ package aggregate

import (
"testing"
"time"

"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/test"
"github.com/stretchr/testify/require"
)

func Test_NewAggregatesFromConfig(t *testing.T) {
func initAggregates(t *testing.T) Aggregates {
var yamlConfig = `
log-level: debug
pipeline:
Expand All @@ -44,10 +45,32 @@ parameters:
`
v := test.InitConfig(t, yamlConfig)
require.NotNil(t, v)
aggregates, err := NewAggregatesFromConfig(config.Parameters[0].Extract.Aggregates)
require.NoError(t, err)

return aggregates
}

func Test_NewAggregatesFromConfig(t *testing.T) {

aggregates := initAggregates(t)
expectedAggregate := GetMockAggregate()
aggregates, err := NewAggregatesFromConfig(config.Parameters[0].Extract.Aggregates)

require.Equal(t, aggregates.Aggregates[0].Definition, expectedAggregate.Definition)
}

func Test_CleanupExpiredEntriesLoop(t *testing.T) {

defaultExpiryTime = 4 // expiration after 4 seconds
aggregates := initAggregates(t)
expectedAggregate := GetMockAggregate()
require.Equal(t, expectedAggregate.Definition, aggregates.Aggregates[0].Definition)

entry := test.GetIngestMockEntry(false)
err := aggregates.Evaluate([]config.GenericMap{entry})
require.NoError(t, err)
require.Equal(t, aggregates[0].Definition, expectedAggregate.Definition)
time.Sleep(2 * time.Second) // still exists after 2 seconds
require.Equal(t, 1, len(aggregates.Aggregates[0].GetMetrics()))
time.Sleep(3 * time.Second) // expires after 3 more seconds (5 seconds in total)
require.Equal(t, 0, len(aggregates.Aggregates[0].GetMetrics()))
}

0 comments on commit 77ea806

Please sign in to comment.