Skip to content

Commit

Permalink
skip duplicates (#421)
Browse files Browse the repository at this point in the history
  • Loading branch information
jpinsonneau committed Apr 11, 2023
1 parent 41fe8ff commit cfadce6
Showing 1 changed file with 36 additions and 32 deletions.
68 changes: 36 additions & 32 deletions pkg/pipeline/extract/conntrack/conntrack.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,36 +67,42 @@ func (ct *conntrackImpl) Extract(flowLogs []config.GenericMap) []config.GenericM
ct.metrics.inputRecords.WithLabelValues("rejected").Inc()
continue
}
conn, exists := ct.connStore.getConnection(computedHash.hashTotal)
if !exists {
if (ct.config.MaxConnectionsTracked > 0) && (ct.connStore.len() >= ct.config.MaxConnectionsTracked) {
log.Warningf("too many connections; skipping flow log %v: ", fl)
ct.metrics.inputRecords.WithLabelValues("discarded").Inc()

if fl.IsDuplicate() {
log.Debugf("skipping duplicated flow log %v", fl)
ct.metrics.inputRecords.WithLabelValues("duplicate").Inc()
} else {
conn, exists := ct.connStore.getConnection(computedHash.hashTotal)
if !exists {
if (ct.config.MaxConnectionsTracked > 0) && (ct.connStore.len() >= ct.config.MaxConnectionsTracked) {
log.Warningf("too many connections; skipping flow log %v: ", fl)
ct.metrics.inputRecords.WithLabelValues("discarded").Inc()
} else {
builder := NewConnBuilder(ct.metrics)
conn = builder.
ShouldSwapAB(ct.config.TCPFlags.SwapAB && ct.shouldSwapAB(fl)).
KeysFrom(fl, ct.config.KeyDefinition, ct.endpointAFields, ct.endpointBFields).
Aggregators(ct.aggregators).
Hash(computedHash).
Build()
ct.connStore.addConnection(computedHash.hashTotal, conn)
ct.connStore.updateNextHeartbeatTime(computedHash.hashTotal)
ct.updateConnection(conn, fl, computedHash)
ct.metrics.inputRecords.WithLabelValues("newConnection").Inc()
if ct.shouldOutputNewConnection {
record := conn.toGenericMap()
addHashField(record, computedHash.hashTotal)
addTypeField(record, api.ConnTrackOutputRecordTypeName("NewConnection"))
isFirst := conn.markReported()
addIsFirstField(record, isFirst)
outputRecords = append(outputRecords, record)
ct.metrics.outputRecords.WithLabelValues("newConnection").Inc()
}
}
} else {
builder := NewConnBuilder(ct.metrics)
conn = builder.
ShouldSwapAB(ct.config.TCPFlags.SwapAB && ct.shouldSwapAB(fl)).
KeysFrom(fl, ct.config.KeyDefinition, ct.endpointAFields, ct.endpointBFields).
Aggregators(ct.aggregators).
Hash(computedHash).
Build()
ct.connStore.addConnection(computedHash.hashTotal, conn)
ct.connStore.updateNextHeartbeatTime(computedHash.hashTotal)
ct.updateConnection(conn, fl, computedHash)
ct.metrics.inputRecords.WithLabelValues("newConnection").Inc()
if ct.shouldOutputNewConnection {
record := conn.toGenericMap()
addHashField(record, computedHash.hashTotal)
addTypeField(record, api.ConnTrackOutputRecordTypeName("NewConnection"))
isFirst := conn.markReported()
addIsFirstField(record, isFirst)
outputRecords = append(outputRecords, record)
ct.metrics.outputRecords.WithLabelValues("newConnection").Inc()
}
ct.metrics.inputRecords.WithLabelValues("update").Inc()
}
} else {
ct.updateConnection(conn, fl, computedHash)
ct.metrics.inputRecords.WithLabelValues("update").Inc()
}

if ct.shouldOutputFlowLogs {
Expand Down Expand Up @@ -162,11 +168,9 @@ func (ct *conntrackImpl) prepareHeartbeatRecords() []config.GenericMap {
}

func (ct *conntrackImpl) updateConnection(conn connection, flowLog config.GenericMap, flowLogHash totalHashType) {
if !flowLog.IsDuplicate() {
d := ct.getFlowLogDirection(conn, flowLogHash)
for _, agg := range ct.aggregators {
agg.update(conn, flowLog, d)
}
d := ct.getFlowLogDirection(conn, flowLogHash)
for _, agg := range ct.aggregators {
agg.update(conn, flowLog, d)
}

if ct.config.TCPFlags.DetectEndConnection && ct.isLastFlowLogOfConnection(flowLog) {
Expand Down

0 comments on commit cfadce6

Please sign in to comment.