Skip to content

Commit

Permalink
switch event exporter to bulk api (#47633)
Browse files Browse the repository at this point in the history
  • Loading branch information
fspmarshall authored Oct 16, 2024
1 parent 2908d2a commit 72f26aa
Show file tree
Hide file tree
Showing 11 changed files with 716 additions and 384 deletions.
64 changes: 15 additions & 49 deletions integrations/event-handler/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@ import (
type App struct {
// Fluentd represents the instance of Fluentd client
Fluentd *FluentdClient
// EventWatcher represents the instance of TeleportEventWatcher
EventWatcher *TeleportEventsWatcher
// State represents the instance of the persistent state
State *State
// cmd is start command CLI config
Config *StartCmdConfig
// client is the teleport api client
client TeleportSearchEventsClient
// eventsJob represents main audit log event consumer job
eventsJob *EventsJob
// sessionEventsJob represents session events consumer job
Expand Down Expand Up @@ -82,9 +82,6 @@ func (a *App) Run(ctx context.Context) error {
a.SpawnCritical(a.sessionEventsJob.processMissingRecordings)
<-a.Process.Done()

lastWindow := a.EventWatcher.getWindowStartTime()
a.State.SetLastWindowTime(&lastWindow)

return a.Err()
}

Expand Down Expand Up @@ -122,7 +119,7 @@ func (a *App) SendEvent(ctx context.Context, url string, e *TeleportEvent) error
break
}

log.Error("Error sending event to fluentd: ", err)
log.Debug("Error sending event to fluentd: ", err)

bErr := backoff.Do(ctx)
if bErr != nil {
Expand All @@ -131,10 +128,14 @@ func (a *App) SendEvent(ctx context.Context, url string, e *TeleportEvent) error

backoffCount--
if backoffCount < 0 {
if !lib.IsCanceled(err) {
return trace.Wrap(err)
if lib.IsCanceled(err) {
return nil
}
return nil
log.WithFields(logrus.Fields{
"error": err.Error(), // omitting the stack trace (too verbose)
"attempts": sendBackoffNumTries,
}).Error("failed to send event to fluentd")
return trace.Wrap(err)
}
}
}
Expand All @@ -151,64 +152,29 @@ func (a *App) SendEvent(ctx context.Context, url string, e *TeleportEvent) error

// init initializes application state
func (a *App) init(ctx context.Context) error {
log := logger.Get(ctx)

a.Config.Dump(ctx)

s, err := NewState(a.Config)
if err != nil {
return trace.Wrap(err)
}

err = a.setStartTime(ctx, s)
if err != nil {
return trace.Wrap(err)
}

f, err := NewFluentdClient(&a.Config.FluentdConfig)
var err error
a.client, err = newClient(ctx, a.Config)
if err != nil {
return trace.Wrap(err)
}

latestCursor, err := s.GetCursor()
a.State, err = NewState(a.Config)
if err != nil {
return trace.Wrap(err)
}

latestID, err := s.GetID()
err = a.setStartTime(ctx, a.State)
if err != nil {
return trace.Wrap(err)
}

startTime, err := s.GetStartTime()
a.Fluentd, err = NewFluentdClient(&a.Config.FluentdConfig)
if err != nil {
return trace.Wrap(err)
}

lastWindowTime, err := s.GetLastWindowTime()
if err != nil {
return trace.Wrap(err)
}
// if lastWindowTime is nil, set it to startTime
// lastWindowTime is used to track the last window of events ingested
// and is updated on exit
if lastWindowTime == nil {
lastWindowTime = startTime
}

t, err := NewTeleportEventsWatcher(ctx, a.Config, *lastWindowTime, latestCursor, latestID)
if err != nil {
return trace.Wrap(err)
}

a.State = s
a.Fluentd = f
a.EventWatcher = t

log.WithField("cursor", latestCursor).Info("Using initial cursor value")
log.WithField("id", latestID).Info("Using initial ID value")
log.WithField("value", startTime).Info("Using start time from state")

return nil
}

Expand Down
Loading

0 comments on commit 72f26aa

Please sign in to comment.