Skip to content

Commit

Permalink
Remove entries from hashId2groupIdx map on connection removal
Browse files Browse the repository at this point in the history
  • Loading branch information
ronensc committed Jan 16, 2023
1 parent 219e71f commit a462f5d
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 1 deletion.
21 changes: 20 additions & 1 deletion pkg/pipeline/extract/conntrack/conntrack_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/operational"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/extract"
"github.com/netobserv/flowlogs-pipeline/pkg/test"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -168,6 +169,7 @@ func TestTrack(t *testing.T) {
require.NoError(t, err)
actual := ct.Extract(testt.inputFlowLogs)
require.Equal(t, testt.expected, actual)
assertStoreConsistency(t, ct)
})
}
}
Expand Down Expand Up @@ -253,6 +255,7 @@ func TestEndConn_Bidirectional(t *testing.T) {
clk.Set(tt.time)
actual := ct.Extract(tt.inputFlowLogs)
require.Equal(t, tt.expected, actual)
assertStoreConsistency(t, ct)
})
}
}
Expand Down Expand Up @@ -354,6 +357,7 @@ func TestEndConn_Unidirectional(t *testing.T) {
clk.Set(tt.time)
actual := ct.Extract(tt.inputFlowLogs)
require.Equal(t, tt.expected, actual)
assertStoreConsistency(t, ct)
})
}
}
Expand Down Expand Up @@ -504,6 +508,7 @@ func TestUpdateConn_Unidirectional(t *testing.T) {
clk.Set(tt.time)
actual := ct.Extract(tt.inputFlowLogs)
require.Equal(t, tt.expected, actual)
assertStoreConsistency(t, ct)
})
}
}
Expand Down Expand Up @@ -591,6 +596,7 @@ func TestIsFirst_LongConnection(t *testing.T) {
clk.Set(tt.time)
actual := ct.Extract(tt.inputFlowLogs)
require.Equal(t, tt.expected, actual)
assertStoreConsistency(t, ct)
})
}
}
Expand Down Expand Up @@ -651,6 +657,7 @@ func TestIsFirst_ShortConnection(t *testing.T) {
clk.Set(tt.time)
actual := ct.Extract(tt.inputFlowLogs)
require.Equal(t, tt.expected, actual)
assertStoreConsistency(t, ct)
})
}
}
Expand Down Expand Up @@ -718,7 +725,6 @@ func TestPrepareUpdateConnectionRecords(t *testing.T) {
// 30s: O updateConn ICMP
// 35s: O endConn ICMP
func TestScheduling(t *testing.T) {

test.ResetPromRegistry()
clk := clock.NewMock()
defaultUpdateConnectionInterval := 20 * time.Second
Expand Down Expand Up @@ -842,10 +848,23 @@ func TestScheduling(t *testing.T) {
clk.Set(tt.time)
actual := ct.Extract(tt.inputFlowLogs)
require.Equal(t, tt.expected, actual)
assertStoreConsistency(t, ct)
})
}
}

func assertStoreConsistency(t *testing.T, extractor extract.Extractor) {
store := extractor.(*conntrackImpl).connStore
hashLen := len(store.hashId2groupIdx)
groupsLenSlice := make([]int, 0)
sumGroupsLen := 0
for _, g := range store.groups {
sumGroupsLen += g.mom.Len()
groupsLenSlice = append(groupsLenSlice, g.mom.Len())
}
require.Equal(t, hashLen, sumGroupsLen, "hashLen(=%v) != sum(%v)", hashLen, groupsLenSlice)
}

func assertHashOrder(t *testing.T, expected []uint64, actualRecords []config.GenericMap) {
t.Helper()
var actual []uint64
Expand Down
1 change: 1 addition & 0 deletions pkg/pipeline/extract/conntrack/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ func (cs *connectionStore) popEndConnections() []connection {
// The connection has expired. We want to pop it.
poppedConnections = append(poppedConnections, conn)
shouldDelete, shouldStop = true, false
delete(cs.hashId2groupIdx, conn.getHash().hashTotal)
} else {
// No more expired connections
shouldDelete, shouldStop = false, true
Expand Down

0 comments on commit a462f5d

Please sign in to comment.