diff --git a/integrations/event-handler/app.go b/integrations/event-handler/app.go index 6a44e76aa422..08fa5aabb881 100644 --- a/integrations/event-handler/app.go +++ b/integrations/event-handler/app.go @@ -35,12 +35,12 @@ import ( type App struct { // Fluentd represents the instance of Fluentd client Fluentd *FluentdClient - // EventWatcher represents the instance of TeleportEventWatcher - EventWatcher *TeleportEventsWatcher // State represents the instance of the persistent state State *State // cmd is start command CLI config Config *StartCmdConfig + // client is the teleport api client + client TeleportSearchEventsClient // eventsJob represents main audit log event consumer job eventsJob *EventsJob // sessionEventsJob represents session events consumer job @@ -82,9 +82,6 @@ func (a *App) Run(ctx context.Context) error { a.SpawnCritical(a.sessionEventsJob.processMissingRecordings) <-a.Process.Done() - lastWindow := a.EventWatcher.getWindowStartTime() - a.State.SetLastWindowTime(&lastWindow) - return a.Err() } @@ -122,7 +119,7 @@ func (a *App) SendEvent(ctx context.Context, url string, e *TeleportEvent) error break } - log.Error("Error sending event to fluentd: ", err) + log.Debug("Error sending event to fluentd: ", err) bErr := backoff.Do(ctx) if bErr != nil { @@ -131,10 +128,14 @@ func (a *App) SendEvent(ctx context.Context, url string, e *TeleportEvent) error backoffCount-- if backoffCount < 0 { - if !lib.IsCanceled(err) { - return trace.Wrap(err) + if lib.IsCanceled(err) { + return nil } - return nil + log.WithFields(logrus.Fields{ + "error": err.Error(), // omitting the stack trace (too verbose) + "attempts": sendBackoffNumTries, + }).Error("failed to send event to fluentd") + return trace.Wrap(err) } } } @@ -151,64 +152,29 @@ func (a *App) SendEvent(ctx context.Context, url string, e *TeleportEvent) error // init initializes application state func (a *App) init(ctx context.Context) error { - log := logger.Get(ctx) - a.Config.Dump(ctx) - s, err := NewState(a.Config) - if err != nil { - return trace.Wrap(err) - } - - err = a.setStartTime(ctx, s) - if err != nil { - return trace.Wrap(err) - } - - f, err := NewFluentdClient(&a.Config.FluentdConfig) + var err error + a.client, err = newClient(ctx, a.Config) if err != nil { return trace.Wrap(err) } - latestCursor, err := s.GetCursor() + a.State, err = NewState(a.Config) if err != nil { return trace.Wrap(err) } - latestID, err := s.GetID() + err = a.setStartTime(ctx, a.State) if err != nil { return trace.Wrap(err) } - startTime, err := s.GetStartTime() + a.Fluentd, err = NewFluentdClient(&a.Config.FluentdConfig) if err != nil { return trace.Wrap(err) } - lastWindowTime, err := s.GetLastWindowTime() - if err != nil { - return trace.Wrap(err) - } - // if lastWindowTime is nil, set it to startTime - // lastWindowTime is used to track the last window of events ingested - // and is updated on exit - if lastWindowTime == nil { - lastWindowTime = startTime - } - - t, err := NewTeleportEventsWatcher(ctx, a.Config, *lastWindowTime, latestCursor, latestID) - if err != nil { - return trace.Wrap(err) - } - - a.State = s - a.Fluentd = f - a.EventWatcher = t - - log.WithField("cursor", latestCursor).Info("Using initial cursor value") - log.WithField("id", latestID).Info("Using initial ID value") - log.WithField("value", startTime).Info("Using start time from state") - return nil } diff --git a/integrations/event-handler/events_job.go b/integrations/event-handler/events_job.go index 54229b1daf56..645866ec3475 100644 --- a/integrations/event-handler/events_job.go +++ b/integrations/event-handler/events_job.go @@ -16,21 +16,29 @@ package main import ( "context" + "sync/atomic" "time" "github.com/gravitational/trace" limiter "github.com/sethvargo/go-limiter" "github.com/sethvargo/go-limiter/memorystore" + log "github.com/sirupsen/logrus" + "google.golang.org/protobuf/types/known/timestamppb" + auditlogpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/auditlog/v1" + "github.com/gravitational/teleport/api/internalutils/stream" "github.com/gravitational/teleport/integrations/lib" "github.com/gravitational/teleport/integrations/lib/logger" + "github.com/gravitational/teleport/lib/events/export" ) // EventsJob incapsulates audit log event consumption logic type EventsJob struct { lib.ServiceJob - app *App - rl limiter.Store + app *App + rl limiter.Store + eventsProcessed atomic.Uint64 + targetDate atomic.Pointer[time.Time] } // NewEventsJob creates new EventsJob structure @@ -51,6 +59,25 @@ func (j *EventsJob) run(ctx context.Context) error { return nil }) + // set up background logging of event processing rate + go func() { + logTicker := time.NewTicker(time.Minute) + defer logTicker.Stop() + + for { + select { + case <-logTicker.C: + ll := log.WithField("events_per_minute", j.eventsProcessed.Swap(0)) + if td := j.targetDate.Load(); td != nil { + ll = ll.WithField("date", td.Format(time.DateOnly)) + } + ll.Info("event processing") + case <-ctx.Done(): + return + } + } + }() + store, err := memorystore.New(&memorystore.Config{ Tokens: uint64(j.app.Config.LockFailedAttemptsCount), Interval: j.app.Config.LockPeriod, @@ -62,65 +89,198 @@ func (j *EventsJob) run(ctx context.Context) error { j.rl = store j.SetReady(true) + defer j.app.Terminate() for { err := j.runPolling(ctx) + if err == nil || ctx.Err() != nil { + log.Debug("watch loop exiting") + return trace.Wrap(err) + } - switch { - case trace.IsConnectionProblem(err): - log.WithError(err).Error("Failed to connect to Teleport Auth server. Reconnecting...") - case trace.IsEOF(err): - log.WithError(err).Error("Watcher stream closed. Reconnecting...") - case lib.IsCanceled(err): - log.Debug("Watcher context is canceled") - j.app.Terminate() + log.WithError(err).Error("unexpected error in watch loop. reconnecting in 5s...") + + select { + case <-time.After(time.Second * 5): + case <-ctx.Done(): return nil - default: - j.app.Terminate() - if err == nil { - return nil - } - log.WithError(err).Error("Watcher event loop failed") - return trace.Wrap(err) } } } // runPolling runs actual event queue polling func (j *EventsJob) runPolling(ctx context.Context) error { - log := logger.Get(ctx) + cursorV2State := j.app.State.GetCursorV2State() + if cursorV2State.IsEmpty() { + // we haven't started using the newer bulk event export API yet. check if the upstream implements it by + // performing a fake request. if it does not, we'll need to fallback to using the legacy API. + chunks := j.app.client.GetEventExportChunks(ctx, &auditlogpb.GetEventExportChunksRequest{ + // target a date 2 days in the future to be confident that we're querying a valid but + // empty date range, even in the context of reasonable clock drift. + Date: timestamppb.New(time.Now().AddDate(0, 0, 2)), + }) + + if err := stream.Drain(chunks); err != nil { + if trace.IsNotImplemented(err) { + // fallback to legacy behavior + return j.runLegacyPolling(ctx) + } + return trace.Wrap(err) + } + + // the new API is implemented, check if there is preexisting legacy cursor state. + legacyStates, err := j.app.State.GetLegacyCursorValues() + if err != nil { + return trace.Wrap(err) + } - evtCh, errCh := j.app.EventWatcher.Events(ctx) + if !legacyStates.IsEmpty() { + // cursorV2 state isn't totally compatible, but we can skip ahead to the same target date as + // was being tracked by the legacy cursor. + cursorV2State.Dates[normalizeDate(legacyStates.WindowStartTime)] = export.DateExporterState{} + } + } - logTicker := time.NewTicker(time.Minute) - defer logTicker.Stop() + startTime, err := j.app.State.GetStartTime() + if err != nil { + return trace.Wrap(err) + } - var eventsProcessed int + idleCh := make(chan struct{}, 1) + + // a minimum concurrency of 3 for the date exporter is enforced because 3 is the largest number of chunks we'd + // expect to show up simultaneously due to normal ongoing activity. as concurrency is scaled up we apply a quarter + // of the value of the global concurrency limit to the exporter. this has been shown in manual scale testing to be + // a fairly optimal ratio relative to session processing concurrency (which uses the concurrency limit directly). + // using the global concurrency limit directly results in a lot of stalled chunks because each chunk will typically + // contain a large number of sessions that need to be processed. + concurrency := max(j.app.Config.Concurrency/4, 3) + + exporter, err := export.NewExporter(export.ExporterConfig{ + Client: j.app.client, + StartDate: *startTime, + Export: j.handleEventV2, + OnIdle: func(_ context.Context) { + select { + case idleCh <- struct{}{}: + default: + } + }, + PreviousState: cursorV2State, + Concurrency: concurrency, + BacklogSize: 2, + MaxBackoff: time.Second * 90, + PollInterval: time.Second * 16, + }) + if err != nil { + return trace.Wrap(err) + } + + // the exporter manages retries internally once successfully created, so from here on out + // we just want to periodically sync state to disk until the event exporter closes. + cursorTicker := time.NewTicker(time.Millisecond * 666) + defer cursorTicker.Stop() for { select { - case err := <-errCh: - log.WithField("err", err).Error("Error ingesting Audit Log") - return trace.Wrap(err) - - case evt := <-evtCh: - if evt == nil { + case <-cursorTicker.C: + date := exporter.GetCurrentDate() + j.targetDate.Store(&date) + if err := j.app.State.SetCursorV2State(exporter.GetState()); err != nil { + log.WithError(err).Error("Failed to save cursor_v2 values, will retry") + } + case <-ctx.Done(): + exporter.Close() + <-exporter.Done() + // optimistically attempt one last save + j.app.State.SetCursorV2State(exporter.GetState()) + return nil + case <-idleCh: + // the exporter is idle, which means it has caught up to the present. + if j.app.Config.ExitOnLastEvent { + exporter.Close() + <-exporter.Done() + // optimistically attempt one last save + j.app.State.SetCursorV2State(exporter.GetState()) return nil } + } + } +} - err := j.handleEvent(ctx, evt) - if err != nil { - return trace.Wrap(err) - } +// runLegacyPolling handles event processing via the old API. +func (j *EventsJob) runLegacyPolling(ctx context.Context) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() - eventsProcessed++ - case <-logTicker.C: - log.WithField("events_per_minute", eventsProcessed).Info("event processing rate") - eventsProcessed = 0 - case <-ctx.Done(): - return ctx.Err() + log := logger.Get(ctx) + + lc, err := j.app.State.GetLegacyCursorValues() + if err != nil { + return trace.Wrap(err) + } + + if lc.IsEmpty() { + st, err := j.app.State.GetStartTime() + if err != nil { + return trace.Wrap(err) } + lc.WindowStartTime = *st } + + eventWatcher := NewLegacyEventsWatcher(j.app.Config, j.app.client, *lc, j.handleEvent) + + // periodically sync cursor values to disk + go func() { + ticker := time.NewTicker(time.Millisecond * 666) + defer ticker.Stop() + + lastCursorValues := *lc + for { + select { + case <-ticker.C: + currentCursorValues := eventWatcher.GetCursorValues() + date := normalizeDate(currentCursorValues.WindowStartTime) + j.targetDate.Store(&date) + if currentCursorValues.Equals(lastCursorValues) { + continue + } + if err := j.app.State.SetLegacyCursorValues(currentCursorValues); err != nil { + log.WithError(err).Error("Failed to save cursor values, will retry") + continue + } + lastCursorValues = currentCursorValues + case <-ctx.Done(): + // optimistically attempt one last save. this is good practice, but note that + // we don't promise not to emit duplicate events post-restart, so this we don't + // bother checking for errors here. + j.app.State.SetLegacyCursorValues(eventWatcher.GetCursorValues()) + return + } + } + }() + + return eventWatcher.ExportEvents(ctx) +} + +// handleEventV2 processes an event from the newer export API. +func (j *EventsJob) handleEventV2(ctx context.Context, evt *auditlogpb.ExportEventUnstructured) error { + + // filter out unwanted event types (in the v1 event export logic this was an internal behavior + // of the event processing helper since it would perform conversion prior to storing the event + // in its internal buffer). + if _, ok := j.app.Config.SkipEventTypes[evt.Event.Type]; ok { + return nil + } + + // convert the event to teleport-event-exporter's internal representation + event, err := NewTeleportEvent(evt.Event) + if err != nil { + return trace.Wrap(err) + } + + // remaining handling logic is common to v1 and v2 event export + return j.handleEvent(ctx, event) } // handleEvent processes an event @@ -144,13 +304,7 @@ func (j *EventsJob) handleEvent(ctx context.Context, evt *TeleportEvent) error { } } - // Save last event id and cursor to disk - if err := j.app.State.SetID(evt.ID); err != nil { - return trace.Wrap(err) - } - if err := j.app.State.SetCursor(evt.Cursor); err != nil { - return trace.Wrap(err) - } + j.eventsProcessed.Add(1) return nil } @@ -176,7 +330,7 @@ func (j *EventsJob) TryLockUser(ctx context.Context, evt *TeleportEvent) error { return nil } - err = j.app.EventWatcher.UpsertLock(ctx, evt.FailedLoginData.User, evt.FailedLoginData.Login, j.app.Config.LockFor) + err = upsertLock(ctx, j.app.client, evt.FailedLoginData.User, evt.FailedLoginData.Login, j.app.Config.LockFor) if err != nil { return trace.Wrap(err) } diff --git a/integrations/event-handler/helpers.go b/integrations/event-handler/helpers.go new file mode 100644 index 000000000000..793b4c00262e --- /dev/null +++ b/integrations/event-handler/helpers.go @@ -0,0 +1,126 @@ +/* +Copyright 2024 Gravitational, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "context" + "fmt" + "time" + + "github.com/gravitational/trace" + log "github.com/sirupsen/logrus" + + "github.com/gravitational/teleport/api/client" + "github.com/gravitational/teleport/api/client/proto" + auditlogpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/auditlog/v1" + "github.com/gravitational/teleport/api/types" + "github.com/gravitational/teleport/api/types/events" + "github.com/gravitational/teleport/integrations/lib" + "github.com/gravitational/teleport/integrations/lib/credentials" + "github.com/gravitational/teleport/lib/events/export" +) + +// TeleportSearchEventsClient is an interface for client.Client, required for testing +type TeleportSearchEventsClient interface { + export.Client + // SearchEvents searches for events in the audit log and returns them using their protobuf representation. + SearchEvents(ctx context.Context, fromUTC, toUTC time.Time, namespace string, eventTypes []string, limit int, order types.EventOrder, startKey string) ([]events.AuditEvent, string, error) + // StreamSessionEvents returns session events stream for a given session ID using their protobuf representation. + StreamSessionEvents(ctx context.Context, sessionID string, startIndex int64) (chan events.AuditEvent, chan error) + // SearchUnstructuredEvents searches for events in the audit log and returns them using an unstructured representation (structpb.Struct). + SearchUnstructuredEvents(ctx context.Context, fromUTC, toUTC time.Time, namespace string, eventTypes []string, limit int, order types.EventOrder, startKey string) ([]*auditlogpb.EventUnstructured, string, error) + // StreamUnstructuredSessionEvents returns session events stream for a given session ID using an unstructured representation (structpb.Struct). + StreamUnstructuredSessionEvents(ctx context.Context, sessionID string, startIndex int64) (chan *auditlogpb.EventUnstructured, chan error) + UpsertLock(ctx context.Context, lock types.Lock) error + Ping(ctx context.Context) (proto.PingResponse, error) + Close() error +} + +// newClient performs teleport api client setup, including credentials loading, validation, and +// setup of credentials refresh if needed. +func newClient(ctx context.Context, c *StartCmdConfig) (*client.Client, error) { + var creds []client.Credentials + switch { + case c.TeleportIdentityFile != "" && !c.TeleportRefreshEnabled: + creds = []client.Credentials{client.LoadIdentityFile(c.TeleportIdentityFile)} + case c.TeleportIdentityFile != "" && c.TeleportRefreshEnabled: + cred, err := lib.NewIdentityFileWatcher(ctx, c.TeleportIdentityFile, c.TeleportRefreshInterval) + if err != nil { + return nil, trace.Wrap(err) + } + creds = []client.Credentials{cred} + case c.TeleportCert != "" && c.TeleportKey != "" && c.TeleportCA != "": + creds = []client.Credentials{client.LoadKeyPair(c.TeleportCert, c.TeleportKey, c.TeleportCA)} + default: + return nil, trace.BadParameter("no credentials configured") + } + + if validCred, err := credentials.CheckIfExpired(creds); err != nil { + log.Warn(err) + if !validCred { + return nil, trace.BadParameter( + "No valid credentials found, this likely means credentials are expired. In this case, please sign new credentials and increase their TTL if needed.", + ) + } + log.Info("At least one non-expired credential has been found, continuing startup") + } + + clientConfig := client.Config{ + Addrs: []string{c.TeleportAddr}, + Credentials: creds, + } + + teleportClient, err := client.New(ctx, clientConfig) + if err != nil { + return nil, trace.Wrap(err) + } + + return teleportClient, nil +} + +// upsertLock is a helper used to create or update the event handler's auto lock. +func upsertLock(ctx context.Context, clt TeleportSearchEventsClient, user string, login string, period time.Duration) error { + var expires *time.Time + + if period > 0 { + t := time.Now() + t = t.Add(period) + expires = &t + } + + lock := &types.LockV2{ + Metadata: types.Metadata{ + Name: fmt.Sprintf("event-handler-auto-lock-%v-%v", user, login), + }, + Spec: types.LockSpecV2{ + Target: types.LockTarget{ + Login: login, + User: user, + }, + Message: lockMessage, + Expires: expires, + }, + } + + return clt.UpsertLock(ctx, lock) +} + +// normalizeDate normalizes a timestamp to the beginning of the day in UTC. +func normalizeDate(t time.Time) time.Time { + t = t.UTC() + return time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, time.UTC) +} diff --git a/integrations/event-handler/teleport_events_watcher.go b/integrations/event-handler/legacy_events_watcher.go similarity index 57% rename from integrations/event-handler/teleport_events_watcher.go rename to integrations/event-handler/legacy_events_watcher.go index 0221b44aaefa..2279bd779d9b 100644 --- a/integrations/event-handler/teleport_events_watcher.go +++ b/integrations/event-handler/legacy_events_watcher.go @@ -18,21 +18,16 @@ package main import ( "context" - "fmt" "sync" + "sync/atomic" "time" "github.com/gravitational/trace" log "github.com/sirupsen/logrus" "golang.org/x/time/rate" - "github.com/gravitational/teleport/api/client" - "github.com/gravitational/teleport/api/client/proto" auditlogpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/auditlog/v1" "github.com/gravitational/teleport/api/types" - "github.com/gravitational/teleport/api/types/events" - "github.com/gravitational/teleport/integrations/lib" - "github.com/gravitational/teleport/integrations/lib/credentials" "github.com/gravitational/teleport/integrations/lib/logger" ) @@ -41,23 +36,25 @@ const ( lockMessage = "User is locked due to too many failed login attempts" ) -// TeleportSearchEventsClient is an interface for client.Client, required for testing -type TeleportSearchEventsClient interface { - // SearchEvents searches for events in the audit log and returns them using their protobuf representation. - SearchEvents(ctx context.Context, fromUTC, toUTC time.Time, namespace string, eventTypes []string, limit int, order types.EventOrder, startKey string) ([]events.AuditEvent, string, error) - // StreamSessionEvents returns session events stream for a given session ID using their protobuf representation. - StreamSessionEvents(ctx context.Context, sessionID string, startIndex int64) (chan events.AuditEvent, chan error) - // SearchUnstructuredEvents searches for events in the audit log and returns them using an unstructured representation (structpb.Struct). - SearchUnstructuredEvents(ctx context.Context, fromUTC, toUTC time.Time, namespace string, eventTypes []string, limit int, order types.EventOrder, startKey string) ([]*auditlogpb.EventUnstructured, string, error) - // StreamUnstructuredSessionEvents returns session events stream for a given session ID using an unstructured representation (structpb.Struct). - StreamUnstructuredSessionEvents(ctx context.Context, sessionID string, startIndex int64) (chan *auditlogpb.EventUnstructured, chan error) - UpsertLock(ctx context.Context, lock types.Lock) error - Ping(ctx context.Context) (proto.PingResponse, error) - Close() error +// LegacyCursorValue represents the cursor data used by the LegacyEventsWatcher to +// resume from where it left off. +type LegacyCursorValues struct { + Cursor string + ID string + WindowStartTime time.Time } -// TeleportEventsWatcher represents wrapper around Teleport client to work with events -type TeleportEventsWatcher struct { +// IsEmpty returns true if all cursor values are empty. +func (c *LegacyCursorValues) IsEmpty() bool { + return c.Cursor == "" && c.ID == "" && c.WindowStartTime.IsZero() +} + +func (c *LegacyCursorValues) Equals(other LegacyCursorValues) bool { + return c.Cursor == other.Cursor && c.ID == other.ID && c.WindowStartTime.Equal(other.WindowStartTime) +} + +// LegacyEventsWatcher represents wrapper around Teleport client to work with events +type LegacyEventsWatcher struct { // client is an instance of GRPC Teleport client client TeleportSearchEventsClient // cursor current page cursor value @@ -69,94 +66,74 @@ type TeleportEventsWatcher struct { // pos current virtual cursor position within a batch pos int // batch current events batch - batch []*TeleportEvent + batch []*LegacyTeleportEvent // config is teleport config config *StartCmdConfig + // export is the callback that is invoked for each event. it is retried indefinitely + // until it returns nil. + export func(context.Context, *TeleportEvent) error + + // exportedCursor is a pointer to the cursor values of the most recently + // exported event. the values mirror above fields, but those are only accessible + // to the main event processing goroutine. these values are meant to be read + // by concurrently. + exportedCursor atomic.Pointer[LegacyCursorValues] // windowStartTime is event time frame start windowStartTime time.Time windowStartTimeMu sync.Mutex } -// NewTeleportEventsWatcher builds Teleport client instance -func NewTeleportEventsWatcher( - ctx context.Context, +// NewLegacyEventsWatcher builds Teleport client instance +func NewLegacyEventsWatcher( c *StartCmdConfig, - windowStartTime time.Time, - cursor string, - id string, -) (*TeleportEventsWatcher, error) { - var creds []client.Credentials - switch { - case c.TeleportIdentityFile != "" && !c.TeleportRefreshEnabled: - creds = []client.Credentials{client.LoadIdentityFile(c.TeleportIdentityFile)} - case c.TeleportIdentityFile != "" && c.TeleportRefreshEnabled: - cred, err := lib.NewIdentityFileWatcher(ctx, c.TeleportIdentityFile, c.TeleportRefreshInterval) - if err != nil { - return nil, trace.Wrap(err) - } - creds = []client.Credentials{cred} - case c.TeleportCert != "" && c.TeleportKey != "" && c.TeleportCA != "": - creds = []client.Credentials{client.LoadKeyPair(c.TeleportCert, c.TeleportKey, c.TeleportCA)} - default: - return nil, trace.BadParameter("no credentials configured") - } - - if validCred, err := credentials.CheckIfExpired(creds); err != nil { - log.Warn(err) - if !validCred { - return nil, trace.BadParameter( - "No valid credentials found, this likely means credentials are expired. In this case, please sign new credentials and increase their TTL if needed.", - ) - } - log.Info("At least one non-expired credential has been found, continuing startup") - } - - config := client.Config{ - Addrs: []string{c.TeleportAddr}, - Credentials: creds, - } - - teleportClient, err := client.New(ctx, config) - if err != nil { - return nil, trace.Wrap(err) - } - - tc := TeleportEventsWatcher{ - client: teleportClient, + client TeleportSearchEventsClient, + cursorValues LegacyCursorValues, + export func(context.Context, *TeleportEvent) error, +) *LegacyEventsWatcher { + w := &LegacyEventsWatcher{ + client: client, pos: -1, - cursor: cursor, + cursor: cursorValues.Cursor, config: c, - id: id, - windowStartTime: windowStartTime, + export: export, + id: cursorValues.ID, + windowStartTime: cursorValues.WindowStartTime, } - return &tc, nil + w.exportedCursor.Store(&cursorValues) + + return w } // Close closes connection to Teleport -func (t *TeleportEventsWatcher) Close() { +func (t *LegacyEventsWatcher) Close() { t.client.Close() } +func (t *LegacyEventsWatcher) GetCursorValues() LegacyCursorValues { + // exported cursor values ptr is never nil + return *t.exportedCursor.Load() +} + // flipPage flips the current page -func (t *TeleportEventsWatcher) flipPage() bool { +func (t *LegacyEventsWatcher) flipPage() bool { if t.nextCursor == "" { return false } t.cursor = t.nextCursor t.pos = -1 - t.batch = make([]*TeleportEvent, 0) + t.batch = make([]*LegacyTeleportEvent, 0) return true } // fetch fetches the page and sets the position to the event after latest known -func (t *TeleportEventsWatcher) fetch(ctx context.Context) error { +func (t *LegacyEventsWatcher) fetch(ctx context.Context) error { log := logger.Get(ctx) // Zero batch - t.batch = make([]*TeleportEvent, 0, t.config.BatchSize) + t.batch = make([]*LegacyTeleportEvent, 0, t.config.BatchSize) nextCursor, err := t.getEvents(ctx) if err != nil { return trace.Wrap(err) @@ -183,7 +160,6 @@ func (t *TeleportEventsWatcher) fetch(ctx context.Context) error { for i, e := range t.batch { if e.ID == t.id { pos = i + 1 - t.id = e.ID } } } @@ -200,8 +176,9 @@ func (t *TeleportEventsWatcher) fetch(ctx context.Context) error { // It returns a slice of events, a cursor for the next page and an error. // If the cursor is out of the range, it advances the windowStartTime to the next day. // It only advances the windowStartTime if no events are found until the last complete day. -func (t *TeleportEventsWatcher) getEvents(ctx context.Context) (string, error) { - rangeSplitByDay := splitRangeByDay(t.getWindowStartTime(), time.Now().UTC(), t.config.WindowSize) +func (t *LegacyEventsWatcher) getEvents(ctx context.Context) (string, error) { + wst := t.getWindowStartTime() + rangeSplitByDay := splitRangeByDay(wst, time.Now().UTC(), t.config.WindowSize) for i := 1; i < len(rangeSplitByDay); i++ { startTime := rangeSplitByDay[i-1] endTime := rangeSplitByDay[i] @@ -217,7 +194,7 @@ func (t *TeleportEventsWatcher) getEvents(ctx context.Context) (string, error) { log.WithField("event", e).Debug("Skipping event") continue } - evt, err := NewTeleportEvent(e, t.cursor) + evt, err := NewLegacyTeleportEvent(e, t.cursor, wst) if err != nil { return "", trace.Wrap(err) } @@ -238,7 +215,7 @@ func (t *TeleportEventsWatcher) getEvents(ctx context.Context) (string, error) { return t.cursor, nil } -func (t *TeleportEventsWatcher) canSkipToNextWindow(i int, rangeSplitByDay []time.Time, cursor string) bool { +func (t *LegacyEventsWatcher) canSkipToNextWindow(i int, rangeSplitByDay []time.Time, cursor string) bool { if cursor != "" { return false @@ -267,7 +244,7 @@ func (t *TeleportEventsWatcher) canSkipToNextWindow(i int, rangeSplitByDay []tim // getEvents calls Teleport client and loads events from the audit log. // It returns a slice of events, a cursor for the next page and an error. -func (t *TeleportEventsWatcher) getEventsInWindow(ctx context.Context, from, to time.Time) ([]*auditlogpb.EventUnstructured, string, error) { +func (t *LegacyEventsWatcher) getEventsInWindow(ctx context.Context, from, to time.Time) ([]*auditlogpb.EventUnstructured, string, error) { evts, cursor, err := t.client.SearchUnstructuredEvents( ctx, from, @@ -291,7 +268,7 @@ func splitRangeByDay(from, to time.Time, windowSize time.Duration) []time.Time { } // pause sleeps for timeout seconds -func (t *TeleportEventsWatcher) pause(ctx context.Context) error { +func (t *LegacyEventsWatcher) pause(ctx context.Context) error { log := logger.Get(ctx) log.Debugf("No new events, pause for %v seconds", t.config.Timeout) @@ -303,13 +280,13 @@ func (t *TeleportEventsWatcher) pause(ctx context.Context) error { } } -// Next returns next event from a batch or requests next batch if it has been ended -func (t *TeleportEventsWatcher) Events(ctx context.Context) (chan *TeleportEvent, chan error) { - ch := make(chan *TeleportEvent, t.config.BatchSize) +// ExportEvents exports events from Teleport to the export function provided on initialization. The atomic +// cursor value is updated after each successful export call, and failed export calls are retried indefinitely. +func (t *LegacyEventsWatcher) ExportEvents(ctx context.Context) error { + ch := make(chan *LegacyTeleportEvent, t.config.BatchSize) e := make(chan error, 1) go func() { - defer close(ch) defer close(e) logLimiter := rate.NewLimiter(rate.Every(time.Minute), 6) @@ -396,48 +373,46 @@ func (t *TeleportEventsWatcher) Events(ctx context.Context) (chan *TeleportEvent } }() - return ch, e -} - -// StreamSessionEvents returns session event stream, that's the simple delegate to an API function -func (t *TeleportEventsWatcher) StreamUnstructuredSessionEvents(ctx context.Context, id string, index int64) (chan *auditlogpb.EventUnstructured, chan error) { - return t.client.StreamUnstructuredSessionEvents(ctx, id, index) -} - -// UpsertLock upserts user lock -func (t *TeleportEventsWatcher) UpsertLock(ctx context.Context, user string, login string, period time.Duration) error { - var expires *time.Time - - if period > 0 { - t := time.Now() - t = t.Add(period) - expires = &t - } + for { + select { + case evt := <-ch: + Export: + for { + // retry export of event indefinitely until event export either succeeds or + // exporter is closed. + err := t.export(ctx, evt.TeleportEvent) + if err == nil { + break Export + } - lock := &types.LockV2{ - Metadata: types.Metadata{ - Name: fmt.Sprintf("event-handler-auto-lock-%v-%v", user, login), - }, - Spec: types.LockSpecV2{ - Target: types.LockTarget{ - Login: login, - User: user, - }, - Message: lockMessage, - Expires: expires, - }, + log.WithError(err).Error("Failed to export event, retrying...") + select { + case <-ctx.Done(): + return trace.Wrap(ctx.Err()) + case <-time.After(5 * time.Second): // TODO(fspmarshall): add real backoff + } + } + // store updated cursor values after successful export + t.exportedCursor.Store(&LegacyCursorValues{ + Cursor: evt.Cursor, + ID: evt.ID, + WindowStartTime: evt.Window, + }) + case <-ctx.Done(): + return trace.Wrap(ctx.Err()) + case err := <-e: + return trace.Wrap(err) + } } - - return t.client.UpsertLock(ctx, lock) } -func (t *TeleportEventsWatcher) getWindowStartTime() time.Time { +func (t *LegacyEventsWatcher) getWindowStartTime() time.Time { t.windowStartTimeMu.Lock() defer t.windowStartTimeMu.Unlock() return t.windowStartTime } -func (t *TeleportEventsWatcher) setWindowStartTime(time time.Time) { +func (t *LegacyEventsWatcher) setWindowStartTime(time time.Time) { t.windowStartTimeMu.Lock() defer t.windowStartTimeMu.Unlock() t.windowStartTime = time diff --git a/integrations/event-handler/teleport_events_watcher_test.go b/integrations/event-handler/legacy_events_watcher_test.go similarity index 80% rename from integrations/event-handler/teleport_events_watcher_test.go rename to integrations/event-handler/legacy_events_watcher_test.go index 38b7e6e8f6a4..e106b0381523 100644 --- a/integrations/event-handler/teleport_events_watcher_test.go +++ b/integrations/event-handler/legacy_events_watcher_test.go @@ -31,10 +31,12 @@ import ( "github.com/gravitational/teleport/api/types" "github.com/gravitational/teleport/api/types/events" libevents "github.com/gravitational/teleport/lib/events" + "github.com/gravitational/teleport/lib/events/export" ) // mockTeleportEventWatcher is Teleport client mock type mockTeleportEventWatcher struct { + export.Client mu sync.Mutex // events is the mock list of events events []events.AuditEvent @@ -133,27 +135,25 @@ func (c *mockTeleportEventWatcher) Close() error { return nil } -func newTeleportEventWatcher(t *testing.T, eventsClient TeleportSearchEventsClient, startTime time.Time, skipEventTypesRaw []string) *TeleportEventsWatcher { +func newTeleportEventWatcher(t *testing.T, eventsClient TeleportSearchEventsClient, startTime time.Time, skipEventTypesRaw []string, exportFn func(context.Context, *TeleportEvent) error) *LegacyEventsWatcher { skipEventTypes := map[string]struct{}{} for _, eventType := range skipEventTypesRaw { skipEventTypes[eventType] = struct{}{} } - client := &TeleportEventsWatcher{ - client: eventsClient, - pos: -1, - config: &StartCmdConfig{ - IngestConfig: IngestConfig{ - BatchSize: 5, - ExitOnLastEvent: true, - SkipEventTypes: skipEventTypes, - SkipSessionTypesRaw: skipEventTypesRaw, - WindowSize: 24 * time.Hour, - }, - }, - windowStartTime: startTime, + + cursor := LegacyCursorValues{ + WindowStartTime: startTime, } - return client + return NewLegacyEventsWatcher(&StartCmdConfig{ + IngestConfig: IngestConfig{ + BatchSize: 5, + ExitOnLastEvent: true, + SkipEventTypes: skipEventTypes, + SkipSessionTypesRaw: skipEventTypesRaw, + WindowSize: 24 * time.Hour, + }, + }, eventsClient, cursor, exportFn) } func TestEvents(t *testing.T) { @@ -175,10 +175,19 @@ func TestEvents(t *testing.T) { // Add the 20 events to a mock event watcher. mockEventWatcher := &mockTeleportEventWatcher{events: testAuditEvents} - client := newTeleportEventWatcher(t, mockEventWatcher, time.Now().Add(-48*time.Hour), nil) - // Start the events goroutine - chEvt, chErr := client.Events(ctx) + chEvt, chErr := make(chan *TeleportEvent, 128), make(chan error, 1) + client := newTeleportEventWatcher(t, mockEventWatcher, time.Now().Add(-48*time.Hour), nil, func(ctx context.Context, evt *TeleportEvent) error { + select { + case chEvt <- evt: + return nil + case <-ctx.Done(): + return ctx.Err() + } + }) + go func() { + chErr <- client.ExportEvents(ctx) + }() // Collect all 20 events for i := 0; i < 20; i++ { @@ -197,34 +206,14 @@ func TestEvents(t *testing.T) { } } - // Both channels should be closed once the last event is reached. + // watcher should exit automatically select { - case _, ok := <-chEvt: - require.False(t, ok, "Events channel should be closed") + case evt := <-chEvt: + t.Fatalf("received unexpected event while waiting for watcher exit: %v", evt) + case err := <-chErr: + require.NoError(t, err) case <-time.After(2 * time.Second): - t.Fatalf("No events received within deadline") - } - - select { - case _, ok := <-chErr: - require.False(t, ok, "Error channel should be closed") - case <-time.After(2 * time.Second): - t.Fatalf("No events received within deadline") - } - - // Both channels should be closed - select { - case _, ok := <-chEvt: - require.False(t, ok, "Events channel should be closed") - case <-time.After(2 * time.Second): - t.Fatalf("No events received within deadline") - } - - select { - case _, ok := <-chErr: - require.False(t, ok, "Error channel should be closed") - case <-time.After(2 * time.Second): - t.Fatalf("No events received within deadline") + t.Fatalf("timeout waiting for watcher to exit") } } @@ -248,33 +237,28 @@ func TestEventsError(t *testing.T) { // Add the 20 events to a mock event watcher. mockErr := trace.Errorf("error") mockEventWatcher := &mockTeleportEventWatcher{events: testAuditEvents, mockSearchErr: mockErr} - client := newTeleportEventWatcher(t, mockEventWatcher, time.Now().Add(-48*time.Hour), nil) - // Start the events goroutine - chEvt, chErr := client.Events(ctx) + chEvt, chErr := make(chan *TeleportEvent, 128), make(chan error, 1) + client := newTeleportEventWatcher(t, mockEventWatcher, time.Now().Add(-48*time.Hour), nil, func(ctx context.Context, evt *TeleportEvent) error { + select { + case chEvt <- evt: + return nil + case <-ctx.Done(): + return ctx.Err() + } + }) + go func() { + chErr <- client.ExportEvents(ctx) + }() select { - case err, ok := <-chErr: - require.True(t, ok, "Channel unexpectedly close") + case evt := <-chEvt: + t.Fatalf("received unexpected event while waiting for watcher exit: %v", evt) + case err := <-chErr: require.ErrorIs(t, err, mockErr) case <-time.After(2 * time.Second): t.Fatalf("No events received within deadline") } - - // Both channels should be closed - select { - case _, ok := <-chEvt: - require.False(t, ok, "Events channel should be closed") - case <-time.After(2 * time.Second): - t.Fatalf("No events received within deadline") - } - - select { - case _, ok := <-chErr: - require.False(t, ok, "Error channel should be closed") - case <-time.After(2 * time.Second): - t.Fatalf("No events received within deadline") - } } func TestUpdatePage(t *testing.T) { @@ -295,11 +279,21 @@ func TestUpdatePage(t *testing.T) { defer cancel() mockEventWatcher := &mockTeleportEventWatcher{} - client := newTeleportEventWatcher(t, mockEventWatcher, time.Now().Add(-1*time.Hour), nil) + + chEvt, chErr := make(chan *TeleportEvent, 128), make(chan error, 1) + client := newTeleportEventWatcher(t, mockEventWatcher, time.Now().Add(-1*time.Hour), nil, func(ctx context.Context, evt *TeleportEvent) error { + select { + case chEvt <- evt: + return nil + case <-ctx.Done(): + return ctx.Err() + } + }) client.config.ExitOnLastEvent = false - // Start the events goroutine - chEvt, chErr := client.Events(ctx) + go func() { + chErr <- client.ExportEvents(ctx) + }() // Add an incomplete page of 3 events and collect them. mockEventWatcher.setEvents(testAuditEvents[:3]) @@ -379,27 +373,13 @@ func TestUpdatePage(t *testing.T) { mockEventWatcher.setSearchEventsError(mockErr) select { - case err, ok := <-chErr: - require.True(t, ok, "Channel unexpectedly close") + case evt := <-chEvt: + t.Fatalf("received unexpected event while waiting for watcher exit: %v", evt) + case err := <-chErr: require.ErrorIs(t, err, mockErr) case <-time.After(2 * time.Second): t.Fatalf("No events received within deadline") } - - // Both channels should be closed - select { - case _, ok := <-chEvt: - require.False(t, ok, "Events channel should be closed") - case <-time.After(2 * time.Second): - t.Fatalf("No events received within deadline") - } - - select { - case _, ok := <-chErr: - require.False(t, ok, "Error channel should be closed") - case <-time.After(2 * time.Second): - t.Fatalf("No events received within deadline") - } } func TestValidateConfig(t *testing.T) { @@ -576,10 +556,19 @@ func TestEventsWithWindowSkip(t *testing.T) { // Add the 20 events to a mock event watcher. mockEventWatcher := &mockTeleportEventWatcher{events: testAuditEvents} - client := newTeleportEventWatcher(t, mockEventWatcher, time.Now().Add(-48*time.Hour), []string{libevents.UserCreateEvent}) - // Start the events goroutine - chEvt, chErr := client.Events(ctx) + chEvt, chErr := make(chan *TeleportEvent, 128), make(chan error, 1) + client := newTeleportEventWatcher(t, mockEventWatcher, time.Now().Add(-48*time.Hour), []string{libevents.UserCreateEvent}, func(ctx context.Context, evt *TeleportEvent) error { + select { + case chEvt <- evt: + return nil + case <-ctx.Done(): + return ctx.Err() + } + }) + go func() { + chErr <- client.ExportEvents(ctx) + }() // Collect all 10 first events for i := 0; i < 10; i++ { @@ -614,33 +603,13 @@ func TestEventsWithWindowSkip(t *testing.T) { } } - // Both channels should be closed once the last event is reached. + // watcher should exit automatically select { - case _, ok := <-chEvt: - require.False(t, ok, "Events channel should be closed") + case evt := <-chEvt: + t.Fatalf("received unexpected event while waiting for watcher exit: %v", evt) + case err := <-chErr: + require.NoError(t, err) case <-time.After(2 * time.Second): - t.Fatalf("No events received within deadline") - } - - select { - case _, ok := <-chErr: - require.False(t, ok, "Error channel should be closed") - case <-time.After(2 * time.Second): - t.Fatalf("No events received within deadline") - } - - // Both channels should be closed - select { - case _, ok := <-chEvt: - require.False(t, ok, "Events channel should be closed") - case <-time.After(2 * time.Second): - t.Fatalf("No events received within deadline") - } - - select { - case _, ok := <-chErr: - require.False(t, ok, "Error channel should be closed") - case <-time.After(2 * time.Second): - t.Fatalf("No events received within deadline") + t.Fatalf("timeout waiting for watcher to exit") } } diff --git a/integrations/event-handler/session_events_job.go b/integrations/event-handler/session_events_job.go index 4013f419183e..8a009333bc3d 100644 --- a/integrations/event-handler/session_events_job.go +++ b/integrations/event-handler/session_events_job.go @@ -16,11 +16,13 @@ package main import ( "context" + "sync/atomic" "time" "github.com/gravitational/trace" "github.com/jonboulle/clockwork" "github.com/sirupsen/logrus" + "golang.org/x/time/rate" "github.com/gravitational/teleport/api/utils/retryutils" "github.com/gravitational/teleport/integrations/lib" @@ -28,6 +30,14 @@ import ( "github.com/gravitational/teleport/integrations/lib/logger" ) +const ( + // sessionBacklogMultiplier is used to calculate the allowed "backlog" of sessions waiting to be processed + // before session processing starts to block primary event ingestion. a multiplier of 16x the concurrency + // setting was selected based on real-world testing and seems to be a decent middle ground, preventing explosive + // growth of session cursors on disk without unduly blocking in the event of minor perf hickups. + sessionBacklogMultiplier = 16 +) + // session is the utility struct used for session ingestion type session struct { // ID current ID @@ -41,17 +51,22 @@ type session struct { // SessionEventsJob incapsulates session events consumption logic type SessionEventsJob struct { lib.ServiceJob - app *App - sessions chan session - semaphore chan struct{} + app *App + sessions chan session + semaphore chan struct{} + logLimiter *rate.Limiter + backpressureLogLimiter *rate.Limiter + sessionsProcessed atomic.Uint64 } // NewSessionEventsJob creates new EventsJob structure func NewSessionEventsJob(app *App) *SessionEventsJob { j := &SessionEventsJob{ - app: app, - semaphore: make(chan struct{}, app.Config.Concurrency), - sessions: make(chan session), + app: app, + semaphore: make(chan struct{}, app.Config.Concurrency), + sessions: make(chan session, app.Config.Concurrency*sessionBacklogMultiplier), + logLimiter: rate.NewLimiter(rate.Every(time.Second), 1), + backpressureLogLimiter: rate.NewLimiter(rate.Every(time.Minute), 1), } j.ServiceJob = lib.NewServiceJob(j.run) @@ -71,6 +86,21 @@ func (j *SessionEventsJob) run(ctx context.Context) error { return nil }) + // set up background logging of session processing rate + go func() { + logTicker := time.NewTicker(time.Minute) + defer logTicker.Stop() + + for { + select { + case <-logTicker.C: + log.WithField("sessions_per_minute", j.sessionsProcessed.Swap(0)).Info("session processing") + case <-ctx.Done(): + return + } + } + }() + if err := j.restartPausedSessions(); err != nil { log.WithError(err).Error("Restarting paused sessions") } @@ -82,7 +112,9 @@ func (j *SessionEventsJob) run(ctx context.Context) error { case s := <-j.sessions: logger := log.WithField("id", s.ID).WithField("index", s.Index) - logger.Info("Starting session ingest") + if j.logLimiter.Allow() { + logger.Debug("Starting session ingest") + } select { case j.semaphore <- struct{}{}: @@ -172,6 +204,8 @@ func (j *SessionEventsJob) processSession(ctx context.Context, s session, proces } return trace.Wrap(err) default: + // increment the number of sessions processed + j.sessionsProcessed.Add(1) // No errors, we've finished processing the session. return nil } @@ -241,10 +275,12 @@ func (j *SessionEventsJob) restartPausedSessions() error { return nil } + logrus.WithField("count", len(sessions)).Debug("Restarting paused sessions") + for id, idx := range sessions { func(id string, idx int64) { j.app.SpawnCritical(func(ctx context.Context) error { - logrus.WithField("id", id).WithField("index", idx).Info("Restarting session ingestion") + logrus.WithField("id", id).WithField("index", idx).Debug("Restarting session ingestion") s := session{ID: id, Index: idx} @@ -268,8 +304,10 @@ func (j *SessionEventsJob) restartPausedSessions() error { // consumeSession ingests session func (j *SessionEventsJob) consumeSession(ctx context.Context, s session) (bool, error) { url := j.app.Config.FluentdSessionURL + "." + s.ID + ".log" - chEvt, chErr := j.app.EventWatcher.StreamUnstructuredSessionEvents(ctx, s.ID, s.Index) + chEvt, chErr := j.app.client.StreamUnstructuredSessionEvents(ctx, s.ID, s.Index) + cursorSyncLimiter := rate.NewLimiter(rate.Every(time.Second), 1) + cursorSyncLimiter.Allow() // start the limiter off in a drained state Loop: for { select { @@ -278,11 +316,13 @@ Loop: case evt, ok := <-chEvt: if !ok { - logrus.WithField("id", s.ID).Info("Finished session events ingest") + if j.logLimiter.Allow() { + logrus.WithField("id", s.ID).Debug("Finished session events ingest") + } break Loop // Break the main loop } - e, err := NewTeleportEvent(evt, "") + e, err := NewTeleportEvent(evt) if err != nil { return false, trace.Wrap(err) } @@ -299,10 +339,12 @@ Loop: } } - // Set session index - err = j.app.State.SetSessionIndex(s.ID, e.Index) - if err != nil { - return true, trace.Wrap(err) + if cursorSyncLimiter.Allow() { + // Set session index + err = j.app.State.SetSessionIndex(s.ID, e.Index) + if err != nil { + return true, trace.Wrap(err) + } } case <-ctx.Done(): if lib.IsCanceled(ctx.Err()) { @@ -327,17 +369,26 @@ func (j *SessionEventsJob) RegisterSession(ctx context.Context, e *TeleportEvent s := session{ID: e.SessionID, Index: 0, UploadTime: e.Time} - go func() { - select { - case j.sessions <- s: - return - case <-ctx.Done(): - if !lib.IsCanceled(ctx.Err()) { - logrus.Error(ctx.Err()) - } - return - } - }() + select { + case j.sessions <- s: + return nil + default: + } - return nil + if j.backpressureLogLimiter.Allow() { + logrus.Warn("backpressure in session processing, consider increasing concurrency if this issue persists") + } + + select { + case j.sessions <- s: + return nil + case <-ctx.Done(): + if !lib.IsCanceled(ctx.Err()) { + logrus.Error(ctx.Err()) + } + // from the caller's perspective this isn't really an error since we did + // successfully sync session index to disk... session will be ingested + // on a subsequent run. + return nil + } } diff --git a/integrations/event-handler/session_events_job_test.go b/integrations/event-handler/session_events_job_test.go index 3f8c7ef1b364..b0ae8caca8e4 100644 --- a/integrations/event-handler/session_events_job_test.go +++ b/integrations/event-handler/session_events_job_test.go @@ -29,19 +29,15 @@ import ( // if no events are found. func TestConsumeSessionNoEventsFound(t *testing.T) { sessionID := "test" - j := &SessionEventsJob{ - app: &App{ - Config: &StartCmdConfig{}, - EventWatcher: &TeleportEventsWatcher{ - client: &mockClient{}, - }, - State: &State{ - dv: diskv.New(diskv.Options{ - BasePath: t.TempDir(), - }), - }, + j := NewSessionEventsJob(&App{ + Config: &StartCmdConfig{}, + State: &State{ + dv: diskv.New(diskv.Options{ + BasePath: t.TempDir(), + }), }, - } + client: &mockClient{}, + }) _, err := j.consumeSession(context.Background(), session{ID: sessionID}) require.NoError(t, err) } diff --git a/integrations/event-handler/state.go b/integrations/event-handler/state.go index acb98b95ff03..73e6bb1baa1d 100644 --- a/integrations/event-handler/state.go +++ b/integrations/event-handler/state.go @@ -24,6 +24,7 @@ import ( "net" "os" "path" + "path/filepath" "strings" "syscall" "time" @@ -34,6 +35,7 @@ import ( "github.com/gravitational/teleport/integrations/event-handler/lib" "github.com/gravitational/teleport/integrations/lib/logger" + "github.com/gravitational/teleport/lib/events/export" ) const ( @@ -49,6 +51,9 @@ const ( // cursorName is the cursor variable name cursorName = "cursor" + // cursorV2Dir is the cursor v2 directory + cursorV2Dir = "cursor_v2" + // idName is the id variable name idName = "id" @@ -66,6 +71,12 @@ const ( type State struct { // dv is a diskv instance dv *diskv.Diskv + + // cursorV2 is an export cursor. if the event handler was started before + // introduction of the v2 cursor or is talking to an auth that does not + // implement the newer bulk export apis, the v1 cursor stored in the above + // dv may be the source of truth still. + cursorV2 *export.Cursor } // NewCursor creates new cursor instance @@ -84,7 +95,17 @@ func NewState(c *StartCmdConfig) (*State, error) { CacheSizeMax: cacheSizeMaxBytes, }) - s := State{dv} + cursorV2, err := export.NewCursor(export.CursorConfig{ + Dir: filepath.Join(dir, cursorV2Dir), + }) + if err != nil { + return nil, trace.Wrap(err) + } + + s := State{ + dv: dv, + cursorV2: cursorV2, + } return &s, nil } @@ -129,6 +150,56 @@ func createStorageDir(c *StartCmdConfig) (string, error) { return dir, nil } +func (s *State) GetCursorV2State() export.ExporterState { + return s.cursorV2.GetState() +} + +func (s *State) SetCursorV2State(state export.ExporterState) error { + return s.cursorV2.Sync(state) +} + +func (s *State) GetLegacyCursorValues() (*LegacyCursorValues, error) { + latestCursor, err := s.GetCursor() + if err != nil { + return nil, trace.Wrap(err) + } + + latestID, err := s.GetID() + if err != nil { + return nil, trace.Wrap(err) + } + + lastWindowTime, err := s.GetLastWindowTime() + if err != nil { + return nil, trace.Wrap(err) + } + + var windowStartTime time.Time + if lastWindowTime != nil { + windowStartTime = *lastWindowTime + } + + lcv := &LegacyCursorValues{ + Cursor: latestCursor, + ID: latestID, + WindowStartTime: windowStartTime, + } + + return lcv, nil +} + +func (s *State) SetLegacyCursorValues(v LegacyCursorValues) error { + if err := s.SetCursor(v.Cursor); err != nil { + return trace.Wrap(err) + } + + if err := s.SetID(v.ID); err != nil { + return trace.Wrap(err) + } + + return s.SetLastWindowTime(&v.WindowStartTime) +} + // GetStartTime gets current start time func (s *State) GetStartTime() (*time.Time, error) { return s.getTimeKey(startTimeName) diff --git a/integrations/event-handler/teleport_event.go b/integrations/event-handler/teleport_event.go index 36833e16971b..bb094eb0fb71 100644 --- a/integrations/event-handler/teleport_event.go +++ b/integrations/event-handler/teleport_event.go @@ -37,10 +37,8 @@ const ( type TeleportEvent struct { // event is the event Event []byte - // cursor is the event ID (real/generated when empty) + // ID is the event ID (real/generated when empty) ID string - // cursor is the current cursor value - Cursor string // Type is an event type Type string // Time is an event timestamp @@ -64,19 +62,26 @@ type TeleportEvent struct { } } +// LegacyTeleportEvent extends TeleportEvent with cursor and window values (used by the +// legacy event watcher to manage its cursor values). +type LegacyTeleportEvent struct { + *TeleportEvent + Cursor string + Window time.Time +} + // NewTeleportEvent creates TeleportEvent using AuditEvent as a source -func NewTeleportEvent(e *auditlogpb.EventUnstructured, cursor string) (*TeleportEvent, error) { +func NewTeleportEvent(e *auditlogpb.EventUnstructured) (*TeleportEvent, error) { payload, err := e.Unstructured.MarshalJSON() if err != nil { return nil, trace.Wrap(err) } evt := &TeleportEvent{ - Cursor: cursor, - Type: e.GetType(), - Time: e.Time.AsTime(), - Index: e.GetIndex(), - ID: e.Id, - Event: payload, + Type: e.GetType(), + Time: e.Time.AsTime(), + Index: e.GetIndex(), + ID: e.Id, + Event: payload, } switch e.GetType() { @@ -92,6 +97,19 @@ func NewTeleportEvent(e *auditlogpb.EventUnstructured, cursor string) (*Teleport return evt, nil } +func NewLegacyTeleportEvent(e *auditlogpb.EventUnstructured, cursor string, window time.Time) (*LegacyTeleportEvent, error) { + evt, err := NewTeleportEvent(e) + if err != nil { + return nil, trace.Wrap(err) + } + + return &LegacyTeleportEvent{ + TeleportEvent: evt, + Cursor: cursor, + Window: window, + }, nil +} + // setSessionID sets session id for session end event func (e *TeleportEvent) setSessionID(evt *auditlogpb.EventUnstructured) error { sessionUploadEvt := &events.SessionUpload{} diff --git a/integrations/event-handler/teleport_event_test.go b/integrations/event-handler/teleport_event_test.go index 0523ff686307..121985696741 100644 --- a/integrations/event-handler/teleport_event_test.go +++ b/integrations/event-handler/teleport_event_test.go @@ -43,11 +43,10 @@ func TestNew(t *testing.T) { protoEvent, err := eventToProto(events.AuditEvent(e)) require.NoError(t, err) - event, err := NewTeleportEvent(protoEvent, "cursor") + event, err := NewTeleportEvent(protoEvent) require.NoError(t, err) assert.Equal(t, "test", event.ID) assert.Equal(t, "mock", event.Type) - assert.Equal(t, "cursor", event.Cursor) } func TestGenID(t *testing.T) { @@ -56,7 +55,7 @@ func TestGenID(t *testing.T) { protoEvent, err := eventToProto(events.AuditEvent(e)) require.NoError(t, err) - event, err := NewTeleportEvent(protoEvent, "cursor") + event, err := NewTeleportEvent(protoEvent) require.NoError(t, err) assert.NotEmpty(t, event.ID) } @@ -74,7 +73,7 @@ func TestSessionEnd(t *testing.T) { protoEvent, err := eventToProto(events.AuditEvent(e)) require.NoError(t, err) - event, err := NewTeleportEvent(protoEvent, "cursor") + event, err := NewTeleportEvent(protoEvent) require.NoError(t, err) assert.NotEmpty(t, event.ID) assert.NotEmpty(t, event.SessionID) @@ -94,7 +93,7 @@ func TestFailedLogin(t *testing.T) { protoEvent, err := eventToProto(events.AuditEvent(e)) require.NoError(t, err) - event, err := NewTeleportEvent(protoEvent, "cursor") + event, err := NewTeleportEvent(protoEvent) require.NoError(t, err) assert.NotEmpty(t, event.ID) assert.True(t, event.IsFailedLogin) @@ -113,7 +112,7 @@ func TestSuccessLogin(t *testing.T) { protoEvent, err := eventToProto(events.AuditEvent(e)) require.NoError(t, err) - event, err := NewTeleportEvent(protoEvent, "cursor") + event, err := NewTeleportEvent(protoEvent) require.NoError(t, err) assert.NotEmpty(t, event.ID) assert.False(t, event.IsFailedLogin) diff --git a/lib/events/export/exporter.go b/lib/events/export/exporter.go index 9506a80b81ad..779e0231edce 100644 --- a/lib/events/export/exporter.go +++ b/lib/events/export/exporter.go @@ -195,6 +195,13 @@ func (e *Exporter) Done() <-chan struct{} { return e.done } +// GetCurrentDate returns the current target date. Note that earlier dates may also be being processed concurrently. +func (e *Exporter) GetCurrentDate() time.Time { + e.mu.Lock() + defer e.mu.Unlock() + return e.currentDate +} + // GetState loads the current state of the exporter. Note that there may be concurrent export operations // in progress, meaning that by the time state is observed it may already be outdated. func (e *Exporter) GetState() ExporterState {