diff --git a/pkg/pipeline/extract/conntrack/conntrack_test.go b/pkg/pipeline/extract/conntrack/conntrack_test.go index d54e2abe1..ad2d0c5e5 100644 --- a/pkg/pipeline/extract/conntrack/conntrack_test.go +++ b/pkg/pipeline/extract/conntrack/conntrack_test.go @@ -27,6 +27,7 @@ import ( "github.com/netobserv/flowlogs-pipeline/pkg/api" "github.com/netobserv/flowlogs-pipeline/pkg/config" "github.com/netobserv/flowlogs-pipeline/pkg/operational" + "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/extract" "github.com/netobserv/flowlogs-pipeline/pkg/test" "github.com/stretchr/testify/require" ) @@ -168,6 +169,7 @@ func TestTrack(t *testing.T) { require.NoError(t, err) actual := ct.Extract(testt.inputFlowLogs) require.Equal(t, testt.expected, actual) + assertStoreConsistency(t, ct) }) } } @@ -253,6 +255,7 @@ func TestEndConn_Bidirectional(t *testing.T) { clk.Set(tt.time) actual := ct.Extract(tt.inputFlowLogs) require.Equal(t, tt.expected, actual) + assertStoreConsistency(t, ct) }) } } @@ -354,6 +357,7 @@ func TestEndConn_Unidirectional(t *testing.T) { clk.Set(tt.time) actual := ct.Extract(tt.inputFlowLogs) require.Equal(t, tt.expected, actual) + assertStoreConsistency(t, ct) }) } } @@ -504,6 +508,7 @@ func TestUpdateConn_Unidirectional(t *testing.T) { clk.Set(tt.time) actual := ct.Extract(tt.inputFlowLogs) require.Equal(t, tt.expected, actual) + assertStoreConsistency(t, ct) }) } } @@ -591,6 +596,7 @@ func TestIsFirst_LongConnection(t *testing.T) { clk.Set(tt.time) actual := ct.Extract(tt.inputFlowLogs) require.Equal(t, tt.expected, actual) + assertStoreConsistency(t, ct) }) } } @@ -651,6 +657,7 @@ func TestIsFirst_ShortConnection(t *testing.T) { clk.Set(tt.time) actual := ct.Extract(tt.inputFlowLogs) require.Equal(t, tt.expected, actual) + assertStoreConsistency(t, ct) }) } } @@ -718,7 +725,6 @@ func TestPrepareUpdateConnectionRecords(t *testing.T) { // 30s: O updateConn ICMP // 35s: O endConn ICMP func TestScheduling(t *testing.T) { - test.ResetPromRegistry() clk := clock.NewMock() defaultUpdateConnectionInterval := 20 * time.Second @@ -842,10 +848,23 @@ func TestScheduling(t *testing.T) { clk.Set(tt.time) actual := ct.Extract(tt.inputFlowLogs) require.Equal(t, tt.expected, actual) + assertStoreConsistency(t, ct) }) } } +func assertStoreConsistency(t *testing.T, extractor extract.Extractor) { + store := extractor.(*conntrackImpl).connStore + hashLen := len(store.hashId2groupIdx) + groupsLenSlice := make([]int, 0) + sumGroupsLen := 0 + for _, g := range store.groups { + sumGroupsLen += g.mom.Len() + groupsLenSlice = append(groupsLenSlice, g.mom.Len()) + } + require.Equal(t, hashLen, sumGroupsLen, "hashLen(=%v) != sum(%v)", hashLen, groupsLenSlice) +} + func assertHashOrder(t *testing.T, expected []uint64, actualRecords []config.GenericMap) { t.Helper() var actual []uint64 diff --git a/pkg/pipeline/extract/conntrack/store.go b/pkg/pipeline/extract/conntrack/store.go index 8162d39fe..1ec6b9329 100644 --- a/pkg/pipeline/extract/conntrack/store.go +++ b/pkg/pipeline/extract/conntrack/store.go @@ -139,6 +139,7 @@ func (cs *connectionStore) popEndConnections() []connection { // The connection has expired. We want to pop it. poppedConnections = append(poppedConnections, conn) shouldDelete, shouldStop = true, false + delete(cs.hashId2groupIdx, conn.getHash().hashTotal) } else { // No more expired connections shouldDelete, shouldStop = false, true