Skip to content

Commit

Permalink
add support for deleting expired aggregates
Browse files Browse the repository at this point in the history
  • Loading branch information
eranra committed Apr 10, 2022
1 parent 9e8e6f5 commit 5cc400c
Show file tree
Hide file tree
Showing 5 changed files with 158 additions and 18 deletions.
42 changes: 38 additions & 4 deletions pkg/pipeline/extract/aggregate/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
package aggregate

import (
"container/list"
"fmt"
"math"
"sort"
"strconv"
"strings"
"time"

"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
Expand All @@ -43,7 +45,8 @@ type NormalizedValues string

type Aggregate struct {
Definition api.AggregateDefinition
Groups map[NormalizedValues]*GroupState
GroupsMap map[NormalizedValues]*GroupState
GroupsList *list.List
}

type GroupState struct {
Expand All @@ -53,6 +56,8 @@ type GroupState struct {
recentCount int
totalValue float64
totalCount int
lastUpdatedTime int64
listElement *list.Element
}

func (aggregate Aggregate) LabelsFromEntry(entry config.GenericMap) (Labels, bool) {
Expand Down Expand Up @@ -116,7 +121,11 @@ func getInitValue(operation string) float64 {
}

func (aggregate Aggregate) UpdateByEntry(entry config.GenericMap, normalizedValues NormalizedValues) error {
groupState, ok := aggregate.Groups[normalizedValues]

mutex.Lock()
defer mutex.Unlock()

groupState, ok := aggregate.GroupsMap[normalizedValues]
if !ok {
groupState = &GroupState{normalizedValues: normalizedValues}
initVal := getInitValue(string(aggregate.Definition.Operation))
Expand All @@ -125,7 +134,10 @@ func (aggregate Aggregate) UpdateByEntry(entry config.GenericMap, normalizedValu
if aggregate.Definition.Operation == OperationRawValues {
groupState.recentRawValues = make([]float64, 0)
}
aggregate.Groups[normalizedValues] = groupState
aggregate.GroupsMap[normalizedValues] = groupState
groupState.listElement = aggregate.GroupsList.PushBack(groupState)
} else {
aggregate.GroupsList.MoveToBack(groupState.listElement)
}

// update value
Expand Down Expand Up @@ -164,6 +176,7 @@ func (aggregate Aggregate) UpdateByEntry(entry config.GenericMap, normalizedValu
// update count
groupState.totalCount += 1
groupState.recentCount += 1
groupState.lastUpdatedTime = time.Now().Unix()

return nil
}
Expand All @@ -188,8 +201,11 @@ func (aggregate Aggregate) Evaluate(entries []config.GenericMap) error {
}

func (aggregate Aggregate) GetMetrics() []config.GenericMap {
mutex.Lock()
defer mutex.Unlock()

var metrics []config.GenericMap
for _, group := range aggregate.Groups {
for _, group := range aggregate.GroupsMap {
metrics = append(metrics, config.GenericMap{
"name": aggregate.Definition.Name,
"operation": aggregate.Definition.Operation,
Expand All @@ -213,3 +229,21 @@ func (aggregate Aggregate) GetMetrics() []config.GenericMap {

return metrics
}

func (aggregate Aggregate) cleanupExpiredEntries() {
nowInSecs := time.Now().Unix()
expireTime := nowInSecs - expiryTime

for {
listEntry := aggregate.GroupsList.Front()
if listEntry == nil {
return
}
pCacheInfo := listEntry.Value.(*GroupState)
if pCacheInfo.lastUpdatedTime > expireTime {
return
}
delete(aggregate.GroupsMap, pCacheInfo.normalizedValues)
aggregate.GroupsList.Remove(listEntry)
}
}
8 changes: 5 additions & 3 deletions pkg/pipeline/extract/aggregate/aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package aggregate

import (
"container/list"
"fmt"
"strconv"
"testing"
Expand All @@ -36,7 +37,8 @@ func GetMockAggregate() Aggregate {
Operation: "avg",
RecordKey: "value",
},
Groups: map[NormalizedValues]*GroupState{},
GroupsMap: map[NormalizedValues]*GroupState{},
GroupsList: list.New(),
}
return aggregate
}
Expand Down Expand Up @@ -116,8 +118,8 @@ func Test_Evaluate(t *testing.T) {
err := aggregate.Evaluate(entries)

require.Equal(t, err, nil)
require.Equal(t, aggregate.Groups[normalizedValues].totalCount, 2)
require.Equal(t, aggregate.Groups[normalizedValues].totalValue, float64(7))
require.Equal(t, aggregate.GroupsMap[normalizedValues].totalCount, 2)
require.Equal(t, aggregate.GroupsMap[normalizedValues].totalValue, float64(7))
}

func Test_GetMetrics(t *testing.T) {
Expand Down
46 changes: 38 additions & 8 deletions pkg/pipeline/extract/aggregate/aggregates.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,26 @@
package aggregate

import (
"fmt"
"reflect"
"container/list"
"sync"
"time"

"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils"
log "github.com/sirupsen/logrus"
)

const defaultExpiryTime = 60 * 10 // 10 minutes

type Aggregates []Aggregate
type Definitions []api.AggregateDefinition

var (
mutex = &sync.Mutex{}
expiryTime = int64(defaultExpiryTime)
)

func (aggregates Aggregates) Evaluate(entries []config.GenericMap) error {
for _, aggregate := range aggregates {
err := aggregate.Evaluate(entries)
Expand All @@ -54,20 +63,39 @@ func (aggregates Aggregates) GetMetrics() []config.GenericMap {
func (aggregates Aggregates) AddAggregate(aggregateDefinition api.AggregateDefinition) Aggregates {
aggregate := Aggregate{
Definition: aggregateDefinition,
Groups: map[NormalizedValues]*GroupState{},
GroupsMap: map[NormalizedValues]*GroupState{},
GroupsList: list.New(),
}

appendedAggregates := append(aggregates, aggregate)
return appendedAggregates
}

func (aggregates Aggregates) RemoveAggregate(by api.AggregateBy) (Aggregates, error) {
for i, other := range aggregates {
if reflect.DeepEqual(other.Definition.By, by) {
return append(aggregates[:i], aggregates[i+1:]...), nil
func (aggregates Aggregates) cleanupExpiredEntriesLoop() {

ticker := time.NewTicker(time.Duration(expiryTime) * time.Second)
done := make(chan bool)
utils.RegisterExitChannel(done)
go func() {
for {
select {
case <-done:
return
case <-ticker.C:
aggregates.cleanupExpiredEntries()
}
}
}()
}

func (aggregates Aggregates) cleanupExpiredEntries() {
mutex.Lock()
defer mutex.Unlock()

for _, aggregate := range aggregates {
aggregate.cleanupExpiredEntries()
}
return aggregates, fmt.Errorf("can't find AggregateBy = %v", by)

}

func NewAggregatesFromConfig(definitions []api.AggregateDefinition) (Aggregates, error) {
Expand All @@ -77,5 +105,7 @@ func NewAggregatesFromConfig(definitions []api.AggregateDefinition) (Aggregates,
aggregates = aggregates.AddAggregate(aggregateDefinition)
}

aggregates.cleanupExpiredEntriesLoop()

return aggregates, nil
}
27 changes: 24 additions & 3 deletions pkg/pipeline/extract/aggregate/aggregates_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@ package aggregate

import (
"testing"
"time"

"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/test"
"github.com/stretchr/testify/require"
)

func Test_NewAggregatesFromConfig(t *testing.T) {
func initAggregates(t *testing.T) Aggregates {
var yamlConfig = `
log-level: debug
pipeline:
Expand All @@ -44,10 +45,30 @@ parameters:
`
v := test.InitConfig(t, yamlConfig)
require.NotNil(t, v)
aggregates, err := NewAggregatesFromConfig(config.Parameters[0].Extract.Aggregates)
require.NoError(t, err)

return aggregates
}

func Test_NewAggregatesFromConfig(t *testing.T) {

aggregates := initAggregates(t)
expectedAggregate := GetMockAggregate()
aggregates, err := NewAggregatesFromConfig(config.Parameters[0].Extract.Aggregates)

require.NoError(t, err)
require.Equal(t, aggregates[0].Definition, expectedAggregate.Definition)
}

func Test_CleanupExpiredEntriesLoop(t *testing.T) {

expiryTime = 1
aggregates := initAggregates(t)
expectedAggregate := GetMockAggregate()
require.Equal(t, aggregates[0].Definition, expectedAggregate.Definition)

entry := test.GetIngestMockEntry(false)
err := aggregates.Evaluate([]config.GenericMap{entry})
require.NoError(t, err)
time.Sleep(2 * time.Second)
require.Equal(t, []config.GenericMap(nil), aggregates[0].GetMetrics())
}
53 changes: 53 additions & 0 deletions tatus
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
diff --git a/pkg/pipeline/extract/aggregate/aggregates_test.go b/pkg/pipeline/extract/aggregate/aggregates_test.go
index 74ecd5b..ed7a7fd 100644
--- a/pkg/pipeline/extract/aggregate/aggregates_test.go
+++ b/pkg/pipeline/extract/aggregate/aggregates_test.go
@@ -19,13 +19,14 @@ package aggregate

import (
"testing"
+ "time"

"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/test"
"github.com/stretchr/testify/require"
)

-func Test_NewAggregatesFromConfig(t *testing.T) {
+func initAggregates(t *testing.T) Aggregates {
var yamlConfig = `
log-level: debug
pipeline:
@@ -44,10 +45,30 @@ parameters:
`
v := test.InitConfig(t, yamlConfig)
require.NotNil(t, v)
+ aggregates, err := NewAggregatesFromConfig(config.Parameters[0].Extract.Aggregates)
+ require.NoError(t, err)
+
+ return aggregates
+}
+
+func Test_NewAggregatesFromConfig(t *testing.T) {

+ aggregates := initAggregates(t)
expectedAggregate := GetMockAggregate()
- aggregates, err := NewAggregatesFromConfig(config.Parameters[0].Extract.Aggregates)

- require.NoError(t, err)
require.Equal(t, aggregates[0].Definition, expectedAggregate.Definition)
}
+
+func Test_CleanupExpiredEntriesLoop(t *testing.T) {
+
+ expiryTime = 1
+ aggregates := initAggregates(t)
+ expectedAggregate := GetMockAggregate()
+ require.Equal(t, aggregates[0].Definition, expectedAggregate.Definition)
+
+ entry := test.GetIngestMockEntry(false)
+ err := aggregates.Evaluate([]config.GenericMap{entry})
+ require.NoError(t, err)
+ time.Sleep(2 * time.Second)
+ require.Equal(t, []config.GenericMap(nil), aggregates[0].GetMetrics())
+}

0 comments on commit 5cc400c

Please sign in to comment.