Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v15] switch event exporter to bulk api #47632

Merged
merged 1 commit into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 15 additions & 49 deletions integrations/event-handler/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@ import (
type App struct {
// Fluentd represents the instance of Fluentd client
Fluentd *FluentdClient
// EventWatcher represents the instance of TeleportEventWatcher
EventWatcher *TeleportEventsWatcher
// State represents the instance of the persistent state
State *State
// cmd is start command CLI config
Config *StartCmdConfig
// client is the teleport api client
client TeleportSearchEventsClient
// eventsJob represents main audit log event consumer job
eventsJob *EventsJob
// sessionEventsJob represents session events consumer job
Expand Down Expand Up @@ -82,9 +82,6 @@ func (a *App) Run(ctx context.Context) error {
a.SpawnCritical(a.sessionEventsJob.processMissingRecordings)
<-a.Process.Done()

lastWindow := a.EventWatcher.getWindowStartTime()
a.State.SetLastWindowTime(&lastWindow)

return a.Err()
}

Expand Down Expand Up @@ -122,7 +119,7 @@ func (a *App) SendEvent(ctx context.Context, url string, e *TeleportEvent) error
break
}

log.Error("Error sending event to fluentd: ", err)
log.Debug("Error sending event to fluentd: ", err)

bErr := backoff.Do(ctx)
if bErr != nil {
Expand All @@ -131,10 +128,14 @@ func (a *App) SendEvent(ctx context.Context, url string, e *TeleportEvent) error

backoffCount--
if backoffCount < 0 {
if !lib.IsCanceled(err) {
return trace.Wrap(err)
if lib.IsCanceled(err) {
return nil
}
return nil
log.WithFields(logrus.Fields{
"error": err.Error(), // omitting the stack trace (too verbose)
"attempts": sendBackoffNumTries,
}).Error("failed to send event to fluentd")
return trace.Wrap(err)
}
}
}
Expand All @@ -151,64 +152,29 @@ func (a *App) SendEvent(ctx context.Context, url string, e *TeleportEvent) error

// init initializes application state
func (a *App) init(ctx context.Context) error {
log := logger.Get(ctx)

a.Config.Dump(ctx)

s, err := NewState(a.Config)
if err != nil {
return trace.Wrap(err)
}

err = a.setStartTime(ctx, s)
if err != nil {
return trace.Wrap(err)
}

f, err := NewFluentdClient(&a.Config.FluentdConfig)
var err error
a.client, err = newClient(ctx, a.Config)
if err != nil {
return trace.Wrap(err)
}

latestCursor, err := s.GetCursor()
a.State, err = NewState(a.Config)
if err != nil {
return trace.Wrap(err)
}

latestID, err := s.GetID()
err = a.setStartTime(ctx, a.State)
if err != nil {
return trace.Wrap(err)
}

startTime, err := s.GetStartTime()
a.Fluentd, err = NewFluentdClient(&a.Config.FluentdConfig)
if err != nil {
return trace.Wrap(err)
}

lastWindowTime, err := s.GetLastWindowTime()
if err != nil {
return trace.Wrap(err)
}
// if lastWindowTime is nil, set it to startTime
// lastWindowTime is used to track the last window of events ingested
// and is updated on exit
if lastWindowTime == nil {
lastWindowTime = startTime
}

t, err := NewTeleportEventsWatcher(ctx, a.Config, *lastWindowTime, latestCursor, latestID)
if err != nil {
return trace.Wrap(err)
}

a.State = s
a.Fluentd = f
a.EventWatcher = t

log.WithField("cursor", latestCursor).Info("Using initial cursor value")
log.WithField("id", latestID).Info("Using initial ID value")
log.WithField("value", startTime).Info("Using start time from state")

return nil
}

Expand Down
Loading
Loading