Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add support for deleting expired aggregates #178

Merged
merged 1 commit into from
Apr 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 41 additions & 4 deletions pkg/pipeline/extract/aggregate/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@
package aggregate

import (
"container/list"
"fmt"
"math"
"sort"
"strconv"
"strings"
"sync"
"time"

"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
Expand All @@ -43,7 +46,10 @@ type NormalizedValues string

type Aggregate struct {
Definition api.AggregateDefinition
Groups map[NormalizedValues]*GroupState
GroupsMap map[NormalizedValues]*GroupState
GroupsList *list.List
eranra marked this conversation as resolved.
Show resolved Hide resolved
mutex *sync.Mutex
expiryTime int64
}

type GroupState struct {
Expand All @@ -53,6 +59,8 @@ type GroupState struct {
recentCount int
totalValue float64
totalCount int
lastUpdatedTime int64
listElement *list.Element
}

func (aggregate Aggregate) LabelsFromEntry(entry config.GenericMap) (Labels, bool) {
Expand Down Expand Up @@ -116,7 +124,11 @@ func getInitValue(operation string) float64 {
}

func (aggregate Aggregate) UpdateByEntry(entry config.GenericMap, normalizedValues NormalizedValues) error {
groupState, ok := aggregate.Groups[normalizedValues]

aggregate.mutex.Lock()
defer aggregate.mutex.Unlock()

groupState, ok := aggregate.GroupsMap[normalizedValues]
if !ok {
groupState = &GroupState{normalizedValues: normalizedValues}
initVal := getInitValue(string(aggregate.Definition.Operation))
Expand All @@ -125,7 +137,10 @@ func (aggregate Aggregate) UpdateByEntry(entry config.GenericMap, normalizedValu
if aggregate.Definition.Operation == OperationRawValues {
groupState.recentRawValues = make([]float64, 0)
}
aggregate.Groups[normalizedValues] = groupState
aggregate.GroupsMap[normalizedValues] = groupState
groupState.listElement = aggregate.GroupsList.PushBack(groupState)
} else {
aggregate.GroupsList.MoveToBack(groupState.listElement)
}

// update value
Expand Down Expand Up @@ -164,6 +179,7 @@ func (aggregate Aggregate) UpdateByEntry(entry config.GenericMap, normalizedValu
// update count
groupState.totalCount += 1
groupState.recentCount += 1
groupState.lastUpdatedTime = time.Now().Unix()

return nil
}
Expand All @@ -188,8 +204,11 @@ func (aggregate Aggregate) Evaluate(entries []config.GenericMap) error {
}

func (aggregate Aggregate) GetMetrics() []config.GenericMap {
aggregate.mutex.Lock()
defer aggregate.mutex.Unlock()

var metrics []config.GenericMap
for _, group := range aggregate.Groups {
for _, group := range aggregate.GroupsMap {
metrics = append(metrics, config.GenericMap{
"name": aggregate.Definition.Name,
"operation": aggregate.Definition.Operation,
Expand All @@ -213,3 +232,21 @@ func (aggregate Aggregate) GetMetrics() []config.GenericMap {

return metrics
}

func (aggregate Aggregate) cleanupExpiredEntries() {
nowInSecs := time.Now().Unix()
eranra marked this conversation as resolved.
Show resolved Hide resolved
expireTime := nowInSecs - aggregate.expiryTime

for {
listEntry := aggregate.GroupsList.Front()
if listEntry == nil {
return
}
pCacheInfo := listEntry.Value.(*GroupState)
if pCacheInfo.lastUpdatedTime > expireTime {
return
}
delete(aggregate.GroupsMap, pCacheInfo.normalizedValues)
aggregate.GroupsList.Remove(listEntry)
}
}
14 changes: 10 additions & 4 deletions pkg/pipeline/extract/aggregate/aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
package aggregate

import (
"container/list"
"fmt"
"strconv"
"sync"
"testing"

"github.com/netobserv/flowlogs-pipeline/pkg/api"
Expand All @@ -36,7 +38,10 @@ func GetMockAggregate() Aggregate {
Operation: "avg",
RecordKey: "value",
},
Groups: map[NormalizedValues]*GroupState{},
GroupsMap: map[NormalizedValues]*GroupState{},
GroupsList: list.New(),
mutex: &sync.Mutex{},
expiryTime: 30,
}
return aggregate
}
Expand Down Expand Up @@ -115,9 +120,10 @@ func Test_Evaluate(t *testing.T) {

err := aggregate.Evaluate(entries)

require.Equal(t, err, nil)
require.Equal(t, aggregate.Groups[normalizedValues].totalCount, 2)
require.Equal(t, aggregate.Groups[normalizedValues].totalValue, float64(7))
require.Equal(t, nil, err)
require.Equal(t, 1, aggregate.GroupsList.Len())
require.Equal(t, 2, aggregate.GroupsMap[normalizedValues].totalCount)
require.Equal(t, float64(7), aggregate.GroupsMap[normalizedValues].totalValue)
}

func Test_GetMetrics(t *testing.T) {
Expand Down
67 changes: 50 additions & 17 deletions pkg/pipeline/extract/aggregate/aggregates.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,27 @@
package aggregate

import (
"fmt"
"reflect"
"container/list"
"sync"
"time"

"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils"
log "github.com/sirupsen/logrus"
)

type Aggregates []Aggregate
var defaultExpiryTime = 60 * 10 // 10 minutes

type Aggregates struct {
Aggregates []Aggregate
expiryTime int64
}

type Definitions []api.AggregateDefinition

func (aggregates Aggregates) Evaluate(entries []config.GenericMap) error {
for _, aggregate := range aggregates {
func (aggregates *Aggregates) Evaluate(entries []config.GenericMap) error {
for _, aggregate := range aggregates.Aggregates {
err := aggregate.Evaluate(entries)
if err != nil {
log.Debugf("Evaluate error %v", err)
Expand All @@ -41,41 +49,66 @@ func (aggregates Aggregates) Evaluate(entries []config.GenericMap) error {
return nil
}

func (aggregates Aggregates) GetMetrics() []config.GenericMap {
func (aggregates *Aggregates) GetMetrics() []config.GenericMap {
var metrics []config.GenericMap
for _, aggregate := range aggregates {
for _, aggregate := range aggregates.Aggregates {
aggregateMetrics := aggregate.GetMetrics()
metrics = append(metrics, aggregateMetrics...)
}

return metrics
}

func (aggregates Aggregates) AddAggregate(aggregateDefinition api.AggregateDefinition) Aggregates {
func (aggregates *Aggregates) AddAggregate(aggregateDefinition api.AggregateDefinition) []Aggregate {
aggregate := Aggregate{
Definition: aggregateDefinition,
Groups: map[NormalizedValues]*GroupState{},
GroupsMap: map[NormalizedValues]*GroupState{},
GroupsList: list.New(),
mutex: &sync.Mutex{},
expiryTime: aggregates.expiryTime,
}

appendedAggregates := append(aggregates, aggregate)
appendedAggregates := append(aggregates.Aggregates, aggregate)
return appendedAggregates
}

func (aggregates Aggregates) RemoveAggregate(by api.AggregateBy) (Aggregates, error) {
eranra marked this conversation as resolved.
Show resolved Hide resolved
for i, other := range aggregates {
if reflect.DeepEqual(other.Definition.By, by) {
return append(aggregates[:i], aggregates[i+1:]...), nil
func (aggregates *Aggregates) cleanupExpiredEntriesLoop() {

ticker := time.NewTicker(time.Duration(aggregates.expiryTime) * time.Second)
done := make(chan bool)
utils.RegisterExitChannel(done)
go func() {
for {
select {
case <-done:
return
case <-ticker.C:
aggregates.cleanupExpiredEntries()
}
}
}()
}

func (aggregates *Aggregates) cleanupExpiredEntries() {

for _, aggregate := range aggregates.Aggregates {
aggregate.mutex.Lock()
aggregate.cleanupExpiredEntries()
eranra marked this conversation as resolved.
Show resolved Hide resolved
aggregate.mutex.Unlock()
Comment on lines +95 to +97
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better to move the Lock()/Unlock() inside aggregate.cleanupExpiredEntries() as done in GetMetrics() and UpdateByEntry()

}
return aggregates, fmt.Errorf("can't find AggregateBy = %v", by)

}

func NewAggregatesFromConfig(definitions []api.AggregateDefinition) (Aggregates, error) {
aggregates := Aggregates{}
aggregates := Aggregates{
expiryTime: int64(defaultExpiryTime),
}

for _, aggregateDefinition := range definitions {
aggregates = aggregates.AddAggregate(aggregateDefinition)
aggregates.Aggregates = aggregates.AddAggregate(aggregateDefinition)
}

aggregates.cleanupExpiredEntriesLoop()

return aggregates, nil
}
29 changes: 26 additions & 3 deletions pkg/pipeline/extract/aggregate/aggregates_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@ package aggregate

import (
"testing"
"time"

"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/test"
"github.com/stretchr/testify/require"
)

func Test_NewAggregatesFromConfig(t *testing.T) {
func initAggregates(t *testing.T) Aggregates {
var yamlConfig = `
log-level: debug
pipeline:
Expand All @@ -44,10 +45,32 @@ parameters:
`
v := test.InitConfig(t, yamlConfig)
require.NotNil(t, v)
aggregates, err := NewAggregatesFromConfig(config.Parameters[0].Extract.Aggregates)
require.NoError(t, err)

return aggregates
}

func Test_NewAggregatesFromConfig(t *testing.T) {

aggregates := initAggregates(t)
expectedAggregate := GetMockAggregate()
aggregates, err := NewAggregatesFromConfig(config.Parameters[0].Extract.Aggregates)

require.Equal(t, aggregates.Aggregates[0].Definition, expectedAggregate.Definition)
}

func Test_CleanupExpiredEntriesLoop(t *testing.T) {

defaultExpiryTime = 4 // expiration after 4 seconds
aggregates := initAggregates(t)
expectedAggregate := GetMockAggregate()
require.Equal(t, expectedAggregate.Definition, aggregates.Aggregates[0].Definition)

entry := test.GetIngestMockEntry(false)
err := aggregates.Evaluate([]config.GenericMap{entry})
require.NoError(t, err)
require.Equal(t, aggregates[0].Definition, expectedAggregate.Definition)
time.Sleep(2 * time.Second) // still exists after 2 seconds
require.Equal(t, 1, len(aggregates.Aggregates[0].GetMetrics()))
time.Sleep(3 * time.Second) // expires after 3 more seconds (5 seconds in total)
require.Equal(t, 0, len(aggregates.Aggregates[0].GetMetrics()))
Comment on lines +72 to +75
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good comments

}