diff --git a/lib/backend/buffer.go b/lib/backend/buffer.go index 9509a4b03813..43724b289152 100644 --- a/lib/backend/buffer.go +++ b/lib/backend/buffer.go @@ -22,6 +22,7 @@ import ( "bytes" "context" "fmt" + "log/slog" "sort" "sync" "time" @@ -29,10 +30,10 @@ import ( radix "github.com/armon/go-radix" "github.com/gravitational/trace" "github.com/jonboulle/clockwork" - log "github.com/sirupsen/logrus" "github.com/gravitational/teleport" "github.com/gravitational/teleport/api/types" + logutils "github.com/gravitational/teleport/lib/utils/log" ) type bufferConfig struct { @@ -85,7 +86,7 @@ func BufferClock(c clockwork.Clock) BufferOption { // of predefined size, that is capable of fan-out of the backend events. type CircularBuffer struct { sync.Mutex - *log.Entry + logger *slog.Logger cfg bufferConfig init, closed bool watchers *watcherTree @@ -103,9 +104,7 @@ func NewCircularBuffer(opts ...BufferOption) *CircularBuffer { opt(&cfg) } return &CircularBuffer{ - Entry: log.WithFields(log.Fields{ - teleport.ComponentKey: teleport.ComponentBuffer, - }), + logger: slog.With(teleport.ComponentKey, teleport.ComponentBuffer), cfg: cfg, watchers: newWatcherTree(), } @@ -157,7 +156,7 @@ func (c *CircularBuffer) SetInit() { }) for _, watcher := range watchersToDelete { - c.Warningf("Closing %v, failed to send init event.", watcher) + c.logger.WarnContext(context.Background(), "Closing watcher, failed to send init event.", "watcher", logutils.StringerAttr(watcher)) watcher.closeWatcher() c.watchers.rm(watcher) } @@ -213,7 +212,7 @@ func (c *CircularBuffer) fanOutEvent(r Event) { }) for _, watcher := range watchersToDelete { - c.Warningf("Closing %v, buffer overflow at %v (backlog=%v).", watcher, len(watcher.eventsC), watcher.backlogLen()) + c.logger.WarnContext(context.Background(), "Closing watcher, buffer overflow", "watcher", logutils.StringerAttr(watcher), "events", len(watcher.eventsC), "backlog", watcher.backlogLen()) watcher.closeWatcher() c.watchers.rm(watcher) } @@ -275,10 +274,10 @@ func (c *CircularBuffer) NewWatcher(ctx context.Context, watch Watch) (Watcher, cancel: cancel, capacity: watch.QueueSize, } - c.Debugf("Add %v.", w) + c.logger.DebugContext(ctx, "Adding watcher", "watcher", logutils.StringerAttr(w)) if c.init { if ok := w.init(); !ok { - c.Warningf("Closing %v, failed to send init event.", w) + c.logger.WarnContext(ctx, "Closing watcher, failed to send init event.", "watcher", logutils.StringerAttr(w)) return nil, trace.BadParameter("failed to send init event") } } @@ -287,16 +286,17 @@ func (c *CircularBuffer) NewWatcher(ctx context.Context, watch Watch) (Watcher, } func (c *CircularBuffer) removeWatcherWithLock(watcher *BufferWatcher) { + ctx := context.Background() c.Lock() defer c.Unlock() if watcher == nil { - c.Warningf("Internal logic error: %v.", trace.DebugReport(trace.BadParameter("empty watcher"))) + c.logger.WarnContext(ctx, "Internal logic error, empty watcher") return } - c.Debugf("Removing watcher %v (%p) via external close.", watcher.Name, watcher) + c.logger.DebugContext(ctx, "Removing watcher via external close.", "watcher", logutils.StringerAttr(watcher)) found := c.watchers.rm(watcher) if !found { - c.Debugf("Could not find watcher %v.", watcher.Name) + c.logger.DebugContext(ctx, "Could not find watcher", "watcher", watcher.Name) } } diff --git a/lib/backend/dynamo/atomicwrite.go b/lib/backend/dynamo/atomicwrite.go index 5fda59441c2b..9c61fb824886 100644 --- a/lib/backend/dynamo/atomicwrite.go +++ b/lib/backend/dynamo/atomicwrite.go @@ -201,7 +201,7 @@ TxnLoop: var txnErr *types.TransactionCanceledException if !errors.As(err, &txnErr) { if s := err.Error(); strings.Contains(s, "AccessDenied") && strings.Contains(s, "dynamodb:ConditionCheckItem") { - b.Warnf("AtomicWrite failed with error that may indicate dynamodb is missing the required dynamodb:ConditionCheckItem permission (this permission is now required for teleport v16 and later). Consider updating your IAM policy to include this permission. Original error: %v", err) + b.logger.WarnContext(ctx, "AtomicWrite failed with error that may indicate dynamodb is missing the required dynamodb:ConditionCheckItem permission (this permission is now required for teleport v16 and later). Consider updating your IAM policy to include this permission.", "error", err) return "", trace.Errorf("teleport is missing required AWS permission dynamodb:ConditionCheckItem, please contact your administrator to update permissions") } return "", trace.Errorf("unexpected error during atomic write: %v", err) @@ -258,7 +258,7 @@ TxnLoop: if n := i + 1; n > 2 { // if we retried more than once, txn experienced non-trivial conflict and we should warn about it. Infrequent warnings of this kind // are nothing to be concerned about, but high volumes may indicate that an automatic process is creating excessive conflicts. - b.Warnf("AtomicWrite retried %d times due to dynamodb transaction conflicts. Some conflict is expected, but persistent conflict warnings may indicate an unhealthy state.", n) + b.logger.WarnContext(ctx, "AtomicWrite retried due to dynamodb transaction conflicts. Some conflict is expected, but persistent conflict warnings may indicate an unhealthy state.", "retry_attempts", n) } if !includesPut { @@ -274,7 +274,7 @@ TxnLoop: keys = append(keys, ca.Key) } - b.Errorf("AtomicWrite failed, dynamodb transaction experienced too many conflicts. keys=%s", bytes.Join(keys, []byte(","))) + b.logger.ErrorContext(ctx, "AtomicWrite failed, dynamodb transaction experienced too many conflicts", "keys", bytes.Join(keys, []byte(","))) return "", trace.Errorf("dynamodb transaction experienced too many conflicts") } diff --git a/lib/backend/dynamo/dynamodbbk.go b/lib/backend/dynamo/dynamodbbk.go index 8a21cce32c25..6fd79759cfee 100644 --- a/lib/backend/dynamo/dynamodbbk.go +++ b/lib/backend/dynamo/dynamodbbk.go @@ -21,6 +21,7 @@ package dynamo import ( "context" "errors" + "log/slog" "net/http" "sort" "strconv" @@ -40,7 +41,6 @@ import ( streamtypes "github.com/aws/aws-sdk-go-v2/service/dynamodbstreams/types" "github.com/gravitational/trace" "github.com/jonboulle/clockwork" - log "github.com/sirupsen/logrus" "go.opentelemetry.io/contrib/instrumentation/github.com/aws/aws-sdk-go-v2/otelaws" "github.com/gravitational/teleport" @@ -164,9 +164,9 @@ type dynamoClient interface { // Backend is a DynamoDB-backed key value backend implementation. type Backend struct { - svc dynamoClient - clock clockwork.Clock - *log.Entry + svc dynamoClient + clock clockwork.Clock + logger *slog.Logger streams *dynamodbstreams.Client buf *backend.CircularBuffer Config @@ -232,20 +232,20 @@ var _ backend.Backend = &Backend{} // New returns new instance of DynamoDB backend. // It's an implementation of backend API's NewFunc func New(ctx context.Context, params backend.Params) (*Backend, error) { - l := log.WithFields(log.Fields{teleport.ComponentKey: BackendName}) + l := slog.With(teleport.ComponentKey, BackendName) var cfg *Config if err := utils.ObjectToStruct(params, &cfg); err != nil { return nil, trace.BadParameter("DynamoDB configuration is invalid: %v", err) } - defer l.Debug("AWS session is created.") + defer l.DebugContext(ctx, "AWS session is created.") if err := cfg.CheckAndSetDefaults(); err != nil { return nil, trace.Wrap(err) } - l.Infof("Initializing backend. Table: %q, poll streams every %v.", cfg.TableName, cfg.PollStreamPeriod) + l.InfoContext(ctx, "Initializing backend", "table", cfg.TableName, "poll_stream_period", cfg.PollStreamPeriod) opts := []func(*config.LoadOptions) error{ config.WithRegion(cfg.Region), @@ -283,7 +283,7 @@ func New(ctx context.Context, params backend.Params) (*Backend, error) { otelaws.AppendMiddlewares(&awsConfig.APIOptions, otelaws.WithAttributeSetter(otelaws.DynamoDBAttributeSetter)) b := &Backend{ - Entry: l, + logger: l, Config: *cfg, clock: clockwork.NewRealClock(), buf: backend.NewCircularBuffer(backend.BufferCapacity(cfg.BufferSize)), @@ -297,7 +297,7 @@ func New(ctx context.Context, params backend.Params) (*Backend, error) { go func() { if err := b.asyncPollStreams(ctx); err != nil { - b.Errorf("Stream polling loop exited: %v", err) + b.logger.ErrorContext(ctx, "Stream polling loop exited", "error", err) } }() @@ -316,12 +316,12 @@ func (b *Backend) configureTable(ctx context.Context, svc *applicationautoscalin case tableStatusOK: if tableBillingMode == types.BillingModePayPerRequest { b.Config.EnableAutoScaling = false - b.Logger.Info("Ignoring auto_scaling setting as table is in on-demand mode.") + b.logger.InfoContext(ctx, "Ignoring auto_scaling setting as table is in on-demand mode.") } case tableStatusMissing: if b.Config.BillingMode == billingModePayPerRequest { b.Config.EnableAutoScaling = false - b.Logger.Info("Ignoring auto_scaling setting as table is being created in on-demand mode.") + b.logger.InfoContext(ctx, "Ignoring auto_scaling setting as table is being created in on-demand mode.") } err = b.createTable(ctx, tableName, fullPathKey) case tableStatusNeedsMigration: @@ -544,7 +544,7 @@ func (b *Backend) getAllRecords(ctx context.Context, startKey, endKey backend.Ke // otherwise updated lastEvaluatedKey and proceed with obtaining new records. if (limit != 0 && len(result.records) >= limit) || len(re.lastEvaluatedKey) == 0 { if len(result.records) == backend.DefaultRangeLimit { - b.Warnf("Range query hit backend limit. (this is a bug!) startKey=%q,limit=%d", startKey, backend.DefaultRangeLimit) + b.logger.WarnContext(ctx, "Range query hit backend limit. (this is a bug!)", "start_key", startKey, "limit", backend.DefaultRangeLimit) } result.lastEvaluatedKey = nil return &result, nil @@ -894,7 +894,7 @@ func (b *Backend) createTable(ctx context.Context, tableName *string, rangeKey s if err != nil { return trace.Wrap(err) } - b.Infof("Waiting until table %q is created.", aws.ToString(tableName)) + b.logger.InfoContext(ctx, "Waiting until table is created.", "table", aws.ToString(tableName)) waiter := dynamodb.NewTableExistsWaiter(b.svc) err = waiter.Wait(ctx, @@ -902,7 +902,7 @@ func (b *Backend) createTable(ctx context.Context, tableName *string, rangeKey s 10*time.Minute, ) if err == nil { - b.Infof("Table %q has been created.", aws.ToString(tableName)) + b.logger.InfoContext(ctx, "Table has been created.", "table", aws.ToString(tableName)) } return trace.Wrap(err) @@ -1123,7 +1123,7 @@ func (b *Backend) getKey(ctx context.Context, key backend.Key) (*record, error) // Check if key expired, if expired delete it if r.isExpired(b.clock.Now()) { if err := b.deleteKeyIfExpired(ctx, key); err != nil { - b.Warnf("Failed deleting expired key %q: %v", key, err) + b.logger.WarnContext(ctx, "Failed deleting expired key", "key", key, "error", err) } return nil, trace.NotFound("%q is not found", key) } diff --git a/lib/backend/dynamo/dynamodbbk_test.go b/lib/backend/dynamo/dynamodbbk_test.go index 213d2c957fb1..cd1f633e2cf9 100644 --- a/lib/backend/dynamo/dynamodbbk_test.go +++ b/lib/backend/dynamo/dynamodbbk_test.go @@ -21,6 +21,7 @@ package dynamo import ( "context" "fmt" + "log/slog" "os" "testing" "time" @@ -35,10 +36,10 @@ import ( "github.com/google/uuid" "github.com/gravitational/trace" "github.com/jonboulle/clockwork" - log "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/gravitational/teleport" "github.com/gravitational/teleport/lib/backend" "github.com/gravitational/teleport/lib/backend/test" "github.com/gravitational/teleport/lib/utils" @@ -218,7 +219,7 @@ func TestCreateTable(t *testing.T) { expectedProvisionedthroughput: tc.expectedProvisionedThroughput, } b := &Backend{ - Entry: log.NewEntry(log.New()), + logger: slog.With(teleport.ComponentKey, BackendName), Config: Config{ BillingMode: tc.billingMode, ReadCapacityUnits: int64(tc.readCapacityUnits), diff --git a/lib/backend/dynamo/shards.go b/lib/backend/dynamo/shards.go index 8ee52ba2f1d9..741501101971 100644 --- a/lib/backend/dynamo/shards.go +++ b/lib/backend/dynamo/shards.go @@ -34,6 +34,7 @@ import ( "github.com/gravitational/teleport/api/types" "github.com/gravitational/teleport/api/utils/retryutils" "github.com/gravitational/teleport/lib/backend" + logutils "github.com/gravitational/teleport/lib/utils/log" ) type shardEvent struct { @@ -48,7 +49,7 @@ func (b *Backend) asyncPollStreams(ctx context.Context) error { Max: b.RetryPeriod, }) if err != nil { - b.Errorf("Bad retry parameters: %v", err) + b.logger.ErrorContext(ctx, "Bad retry parameters", "error", err) return trace.Wrap(err) } @@ -61,14 +62,14 @@ func (b *Backend) asyncPollStreams(ctx context.Context) error { if b.isClosed() { return trace.Wrap(err) } - b.Errorf("Poll streams returned with error: %v.", err) + b.logger.ErrorContext(ctx, "Poll streams returned with error", "error", err) } - b.Debugf("Reloading %v.", retry) + b.logger.DebugContext(ctx, "Reloading", "retry_duration", retry.Duration()) select { case <-retry.After(): retry.Inc() case <-ctx.Done(): - b.Debugf("Closed, returning from asyncPollStreams loop.") + b.logger.DebugContext(ctx, "Closed, returning from asyncPollStreams loop.") return nil } } @@ -82,7 +83,7 @@ func (b *Backend) pollStreams(externalCtx context.Context) error { if err != nil { return trace.Wrap(err) } - b.Debugf("Found latest event stream %v.", aws.ToString(streamArn)) + b.logger.DebugContext(ctx, "Found latest event stream", "stream_arn", aws.ToString(streamArn)) set := make(map[string]struct{}) eventsC := make(chan shardEvent) @@ -94,7 +95,7 @@ func (b *Backend) pollStreams(externalCtx context.Context) error { return false } if _, ok := set[aws.ToString(shard.ParentShardId)]; ok { - b.Tracef("Skipping child shard: %s, still polling parent %s", sid, aws.ToString(shard.ParentShardId)) + b.logger.Log(ctx, logutils.TraceLevel, "Skipping child shard, still polling parent", "child_shard_id", sid, "parent_shard_id", aws.ToString(shard.ParentShardId)) // still processing parent return false } @@ -120,7 +121,7 @@ func (b *Backend) pollStreams(externalCtx context.Context) error { continue } shardID := aws.ToString(shards[i].ShardId) - b.Tracef("Adding active shard %v.", shardID) + b.logger.Log(ctx, logutils.TraceLevel, "Adding active shard", "shard_id", shardID) set[shardID] = struct{}{} go b.asyncPollShard(ctx, streamArn, shards[i], eventsC, initC) started++ @@ -161,15 +162,15 @@ func (b *Backend) pollStreams(externalCtx context.Context) error { if event.shardID == "" { // empty shard IDs in err-variant events are programming bugs and will lead to // invalid state. - b.WithError(err).Warnf("Forcing watch system reset due to empty shard ID on error (this is a bug)") + b.logger.WarnContext(ctx, "Forcing watch system reset due to empty shard ID on error (this is a bug)", "error", err) return trace.BadParameter("empty shard ID") } delete(set, event.shardID) if !errors.Is(event.err, io.EOF) { - b.Debugf("Shard ID %v closed with error: %v, resetting buffers.", event.shardID, event.err) + b.logger.DebugContext(ctx, "Shard closed with error, resetting buffers.", "shard_id", event.shardID, "error", event.err) return trace.Wrap(event.err) } - b.Tracef("Shard ID %v exited gracefully.", event.shardID) + b.logger.Log(ctx, logutils.TraceLevel, "Shard exited gracefully.", "shard_id", event.shardID) } else { b.buf.Emit(event.events...) } @@ -178,7 +179,7 @@ func (b *Backend) pollStreams(externalCtx context.Context) error { return trace.Wrap(err) } case <-ctx.Done(): - b.Tracef("Context is closing, returning.") + b.logger.Log(ctx, logutils.TraceLevel, "Context is closing, returning.") return nil } } @@ -231,18 +232,18 @@ func (b *Backend) pollShard(ctx context.Context, streamArn *string, shard stream return convertError(err) } if len(out.Records) > 0 { - b.Tracef("Got %v new stream shard records.", len(out.Records)) + b.logger.Log(ctx, logutils.TraceLevel, "Got new stream shard records.", "num_records", len(out.Records)) } if len(out.Records) == 0 { if out.NextShardIterator == nil { - b.Tracef("Shard is closed: %v.", aws.ToString(shard.ShardId)) + b.logger.Log(ctx, logutils.TraceLevel, "Shard is closed", "shard_id", aws.ToString(shard.ShardId)) return io.EOF } iterator = out.NextShardIterator continue } if out.NextShardIterator == nil { - b.Tracef("Shard is closed: %v.", aws.ToString(shard.ShardId)) + b.logger.Log(ctx, logutils.TraceLevel, "Shard is closed", "shard_id", aws.ToString(shard.ShardId)) return io.EOF } events := make([]backend.Event, 0, len(out.Records)) @@ -354,7 +355,7 @@ func (b *Backend) asyncPollShard(ctx context.Context, streamArn *string, shard s select { case eventsC <- shardEvent{err: err, shardID: shardID}: case <-ctx.Done(): - b.Debugf("Context is closing, returning") + b.logger.DebugContext(ctx, "Context is closing, returning") return } }() diff --git a/lib/backend/etcdbk/etcd.go b/lib/backend/etcdbk/etcd.go index 0da60f497b6f..7e71aa1c68a6 100644 --- a/lib/backend/etcdbk/etcd.go +++ b/lib/backend/etcdbk/etcd.go @@ -25,6 +25,7 @@ import ( "crypto/x509" "encoding/base64" "errors" + "log/slog" "os" "sort" "strconv" @@ -36,7 +37,6 @@ import ( "github.com/gravitational/trace" "github.com/jonboulle/clockwork" "github.com/prometheus/client_golang/prometheus" - log "github.com/sirupsen/logrus" "go.etcd.io/etcd/api/v3/mvccpb" "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" clientv3 "go.etcd.io/etcd/client/v3" @@ -149,8 +149,8 @@ var ( ) type EtcdBackend struct { - nodes []string - *log.Entry + nodes []string + logger *slog.Logger cfg *Config clients *utils.RoundRobin[*clientv3.Client] cancelC chan bool @@ -278,7 +278,7 @@ func New(ctx context.Context, params backend.Params, opts ...Option) (*EtcdBacke } b := &EtcdBackend{ - Entry: log.WithFields(log.Fields{teleport.ComponentKey: GetName()}), + logger: slog.With(teleport.ComponentKey, GetName()), cfg: cfg, nodes: cfg.Nodes, cancelC: make(chan bool, 1), @@ -433,7 +433,7 @@ func (b *EtcdBackend) reconnect(ctx context.Context) error { if b.clients != nil { b.clients.ForEach(func(clt *clientv3.Client) { if err := clt.Close(); err != nil { - b.Entry.WithError(err).Warning("Failed closing existing etcd client on reconnect.") + b.logger.WarnContext(ctx, "Failed closing existing etcd client on reconnect.", "error", err) } }) @@ -509,7 +509,7 @@ WatchEvents: for b.ctx.Err() == nil { err = b.watchEvents(b.ctx) - b.Debugf("Watch exited: %v", err) + b.logger.DebugContext(b.ctx, "Watch exited", "error", err) // pause briefly to prevent excessive watcher creation attempts select { @@ -518,7 +518,7 @@ WatchEvents: break WatchEvents } } - b.Debugf("Watch stopped: %v.", trace.NewAggregate(err, b.ctx.Err())) + b.logger.DebugContext(b.ctx, "Watch stopped", "error", trace.NewAggregate(err, b.ctx.Err())) } // eventResult is used to ferry the result of event processing @@ -592,7 +592,7 @@ func (b *EtcdBackend) watchEvents(ctx context.Context) error { select { case r := <-q.Pop(): if r.err != nil { - b.WithError(r.err).Errorf("Failed to unmarshal event: %v.", r.original) + b.logger.ErrorContext(ctx, "Failed to unmarshal event", "event", r.original, "error", r.err) continue EmitEvents } b.buf.Emit(r.event) @@ -627,7 +627,7 @@ func (b *EtcdBackend) watchEvents(ctx context.Context) error { // limit backlog warnings to once per minute to prevent log spam. if now := time.Now(); now.After(lastBacklogWarning.Add(time.Minute)) { - b.Warnf("Etcd event processing backlog; may result in excess memory usage and stale cluster state.") + b.logger.WarnContext(ctx, "Etcd event processing backlog; may result in excess memory usage and stale cluster state.") lastBacklogWarning = now } diff --git a/lib/backend/firestore/atomicwrite.go b/lib/backend/firestore/atomicwrite.go index 6d9634445433..2b854531620b 100644 --- a/lib/backend/firestore/atomicwrite.go +++ b/lib/backend/firestore/atomicwrite.go @@ -24,7 +24,6 @@ import ( "cloud.google.com/go/firestore" "github.com/gravitational/trace" - log "github.com/sirupsen/logrus" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -136,7 +135,7 @@ func (b *Backend) AtomicWrite(ctx context.Context, condacts []backend.Conditiona for _, ca := range condacts { keys = append(keys, ca.Key) } - log.Errorf("AtomicWrite failed, firestore experienced too many txn rollbacks. keys=%s", bytes.Join(keys, []byte(","))) + b.logger.ErrorContext(ctx, "AtomicWrite failed, firestore experienced too many txn rollbacks.", "keys", bytes.Join(keys, []byte(","))) // RunTransaction does not officially document what error is returned if MaxAttempts is exceeded, // but as currently implemented it should simply bubble up the Aborted error from the most recent // failed commit attempt. @@ -153,7 +152,7 @@ func (b *Backend) AtomicWrite(ctx context.Context, condacts []backend.Conditiona if n > 2 { // if we retried more than once, txn experienced non-trivial contention and we should warn about it. Infrequent warnings of this kind // are nothing to be concerned about, but high volumes may indicate than an automatic process is creating excessive conflicts. - log.Warnf("AtomicWrite retried %d times due to firestore txn rollbacks. Some rollbacks are expected, but persistent rollback warnings may indicate an unhealthy state.", n) + b.logger.WarnContext(ctx, "AtomicWrite retried due to firestore txn rollbacks. Some rollbacks are expected, but persistent rollback warnings may indicate an unhealthy state.", "retry_attempts", n) } // atomic writes don't have a meaningful concept of revision outside of put operations diff --git a/lib/backend/firestore/firestorebk.go b/lib/backend/firestore/firestorebk.go index 50138ae9fda8..3f406b863aa7 100644 --- a/lib/backend/firestore/firestorebk.go +++ b/lib/backend/firestore/firestorebk.go @@ -23,6 +23,7 @@ import ( "context" "encoding/base64" "errors" + "log/slog" "strconv" "strings" "time" @@ -33,7 +34,6 @@ import ( "github.com/gravitational/trace" "github.com/gravitational/trace/trail" "github.com/jonboulle/clockwork" - log "github.com/sirupsen/logrus" "google.golang.org/api/option" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -112,7 +112,7 @@ func (cfg *backendConfig) CheckAndSetDefaults() error { // Backend is a Firestore-backed key value backend implementation. type Backend struct { - *log.Entry + logger *slog.Logger backendConfig // svc is the primary Firestore client svc *firestore.Client @@ -356,13 +356,13 @@ func (opts *Options) checkAndSetDefaults() error { // New returns new instance of Firestore backend. // It's an implementation of backend API's NewFunc func New(ctx context.Context, params backend.Params, options Options) (*Backend, error) { - l := log.WithFields(log.Fields{teleport.ComponentKey: BackendName}) + l := slog.With(teleport.ComponentKey, BackendName) var cfg *backendConfig err := apiutils.ObjectToStruct(params, &cfg) if err != nil { return nil, trace.BadParameter("firestore: configuration is invalid: %v", err) } - l.Info("Initializing backend.") + l.InfoContext(ctx, "Initializing backend.") if err := cfg.CheckAndSetDefaults(); err != nil { return nil, trace.Wrap(err) @@ -388,7 +388,7 @@ func New(ctx context.Context, params backend.Params, options Options) (*Backend, b := &Backend{ svc: firestoreClient, - Entry: l, + logger: l, backendConfig: *cfg, clock: options.Clock, buf: buf, @@ -408,12 +408,12 @@ func New(ctx context.Context, params backend.Params, options Options) (*Backend, Step: b.RetryPeriod / 10, Max: b.RetryPeriod, } - go RetryingAsyncFunctionRunner(b.clientContext, linearConfig, b.Logger, b.watchCollection, "watchCollection") + go RetryingAsyncFunctionRunner(b.clientContext, linearConfig, b.logger.With("task_name", "watch_collection"), b.watchCollection) if !cfg.DisableExpiredDocumentPurge { - go RetryingAsyncFunctionRunner(b.clientContext, linearConfig, b.Logger, b.purgeExpiredDocuments, "purgeExpiredDocuments") + go RetryingAsyncFunctionRunner(b.clientContext, linearConfig, b.logger.With("task_name", "purged_expired_documents"), b.purgeExpiredDocuments) } - l.Info("Backend created.") + l.InfoContext(b.clientContext, "Backend created.") return b, nil } @@ -497,7 +497,7 @@ func (b *Backend) getRangeDocs(ctx context.Context, startKey, endKey backend.Key allDocs := append(append(docs, legacyDocs...), brokenDocs...) if len(allDocs) >= backend.DefaultRangeLimit { - b.Warnf("Range query hit backend limit. (this is a bug!) startKey=%q,limit=%d", startKey, backend.DefaultRangeLimit) + b.logger.WarnContext(ctx, "Range query hit backend limit. (this is a bug!)", "start_key", startKey, "limit", backend.DefaultRangeLimit) } return allDocs, nil } @@ -859,7 +859,7 @@ func (b *Backend) Close() error { b.clientCancel() err := b.buf.Close() if err != nil { - b.Logger.Error("error closing buffer, continuing with closure of other resources...", err) + b.logger.ErrorContext(b.clientContext, "error closing buffer, continuing with closure of other resources...", "error", err) } return b.svc.Close() } @@ -885,14 +885,14 @@ func (b *Backend) keyToDocumentID(key backend.Key) string { } // RetryingAsyncFunctionRunner wraps a task target in retry logic -func RetryingAsyncFunctionRunner(ctx context.Context, retryConfig retryutils.LinearConfig, logger *log.Logger, task func() error, taskName string) { +func RetryingAsyncFunctionRunner(ctx context.Context, retryConfig retryutils.LinearConfig, logger *slog.Logger, task func() error) { retry, err := retryutils.NewLinear(retryConfig) if err != nil { - logger.WithError(err).Error("Bad retry parameters, returning and not running.") + logger.ErrorContext(ctx, "Bad retry parameters, returning and not running.", "error", err) return } - defer logger.Debugf("Returning from %v loop.", taskName) + defer logger.DebugContext(ctx, "Returning from task loop.") for { err := task() @@ -900,10 +900,10 @@ func RetryingAsyncFunctionRunner(ctx context.Context, retryConfig retryutils.Lin if isCanceled(err) { return } else if err != nil { - logger.WithError(err).Errorf("Task %v has returned with error.", taskName) + logger.ErrorContext(ctx, "Task %v has returned with error", "error", err) } - logger.Debugf("Reloading %v for %s.", retry, taskName) + logger.DebugContext(ctx, "Reloading task", "retry", retry.Duration()) select { case <-retry.After(): retry.Inc() @@ -1011,7 +1011,7 @@ func (b *Backend) purgeExpiredDocuments() error { Documents(b.clientContext). GetAll() if err != nil { - b.Logger.WithError(trail.FromGRPC(err)).Warn("Failed to get expired documents") + b.logger.WarnContext(b.clientContext, "Failed to get expired documents", "error", trail.FromGRPC(err)) continue } @@ -1090,7 +1090,7 @@ func (b *Backend) getIndexParent() string { func (b *Backend) ensureIndexes(adminSvc *apiv1.FirestoreAdminClient) error { tuples := IndexList{} tuples.Index(Field(keyDocProperty, adminpb.Index_IndexField_ASCENDING), Field(expiresDocProperty, adminpb.Index_IndexField_ASCENDING)) - return EnsureIndexes(b.clientContext, adminSvc, tuples, b.getIndexParent()) + return EnsureIndexes(b.clientContext, adminSvc, b.logger, tuples, b.getIndexParent()) } type IndexList [][]*adminpb.Index_IndexField @@ -1117,8 +1117,7 @@ type indexTask struct { // EnsureIndexes is a function used by Firestore events and backend to generate indexes and will block until // indexes are reported as created -func EnsureIndexes(ctx context.Context, adminSvc *apiv1.FirestoreAdminClient, tuples IndexList, indexParent string) error { - l := log.WithFields(log.Fields{teleport.ComponentKey: BackendName}) +func EnsureIndexes(ctx context.Context, adminSvc *apiv1.FirestoreAdminClient, logger *slog.Logger, tuples IndexList, indexParent string) error { var tasks []indexTask // create the indexes @@ -1139,9 +1138,9 @@ func EnsureIndexes(ctx context.Context, adminSvc *apiv1.FirestoreAdminClient, tu } } - stop := periodIndexUpdate(l) + stop := periodIndexUpdate(logger) for _, task := range tasks { - err := waitOnIndexCreation(ctx, l, task) + err := waitOnIndexCreation(ctx, logger, task) if err != nil { return trace.Wrap(err) } @@ -1151,7 +1150,7 @@ func EnsureIndexes(ctx context.Context, adminSvc *apiv1.FirestoreAdminClient, tu return nil } -func periodIndexUpdate(l *log.Entry) chan struct{} { +func periodIndexUpdate(l *slog.Logger) chan struct{} { ticker := time.NewTicker(timeInBetweenIndexCreationStatusChecks) quit := make(chan struct{}) start := time.Now() @@ -1160,9 +1159,9 @@ func periodIndexUpdate(l *log.Entry) chan struct{} { select { case <-ticker.C: elapsed := time.Since(start) - l.Infof("Still creating indexes, %v elapsed", elapsed) + l.InfoContext(context.Background(), "Still creating indexes", "time_elapsed", elapsed) case <-quit: - l.Info("Finished creating indexes") + l.InfoContext(context.Background(), "Finished creating indexes") ticker.Stop() return } @@ -1171,12 +1170,12 @@ func periodIndexUpdate(l *log.Entry) chan struct{} { return quit } -func waitOnIndexCreation(ctx context.Context, l *log.Entry, task indexTask) error { +func waitOnIndexCreation(ctx context.Context, l *slog.Logger, task indexTask) error { meta, err := task.operation.Metadata() if err != nil { return trace.Wrap(err) } - l.Infof("Creating index for tuple %v with name %s.", task.tuple, meta.Index) + l.InfoContext(ctx, "Creating index for tuple.", "tuple", task.tuple, "name", meta.Index) _, err = task.operation.Wait(ctx) if err != nil { diff --git a/lib/backend/firestore/firestorebk_test.go b/lib/backend/firestore/firestorebk_test.go index 9fdbac4c64f3..1fa013661053 100644 --- a/lib/backend/firestore/firestorebk_test.go +++ b/lib/backend/firestore/firestorebk_test.go @@ -22,6 +22,7 @@ import ( "context" "errors" "fmt" + "log/slog" "net" "os" "reflect" @@ -36,7 +37,6 @@ import ( "cloud.google.com/go/firestore/apiv1/firestorepb" "github.com/gravitational/trace" "github.com/jonboulle/clockwork" - "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "google.golang.org/api/option" @@ -421,7 +421,7 @@ func TestDeleteDocuments(t *testing.T) { b := &Backend{ svc: client, - Entry: utils.NewLoggerForTests().WithFields(logrus.Fields{teleport.ComponentKey: BackendName}), + logger: slog.With(teleport.ComponentKey, BackendName), clock: clockwork.NewFakeClock(), clientContext: ctx, clientCancel: cancel, diff --git a/lib/backend/helpers.go b/lib/backend/helpers.go index 862850ef703d..4f35d85a4440 100644 --- a/lib/backend/helpers.go +++ b/lib/backend/helpers.go @@ -21,11 +21,13 @@ package backend import ( "bytes" "context" + "log/slog" "time" "github.com/google/uuid" "github.com/gravitational/trace" - log "github.com/sirupsen/logrus" + + logutils "github.com/gravitational/teleport/lib/utils/log" ) const ( @@ -209,7 +211,7 @@ func RunWhileLocked(ctx context.Context, cfg RunWhileLockedConfig, fn func(conte case <-cfg.Backend.Clock().After(refreshAfter): if err := lock.resetTTL(ctx, cfg.Backend); err != nil { cancelFunction() - log.Errorf("%v", err) + slog.ErrorContext(ctx, "failed to reset lock ttl", "error", err, "lock", logutils.StringerAttr(lock.key)) return } case <-stopRefresh: diff --git a/lib/backend/kubernetes/kubernetes.go b/lib/backend/kubernetes/kubernetes.go index 2af379af58e0..4d05d5bf6329 100644 --- a/lib/backend/kubernetes/kubernetes.go +++ b/lib/backend/kubernetes/kubernetes.go @@ -21,12 +21,12 @@ package kubernetes import ( "context" "fmt" + "log/slog" "os" "strings" "sync" "github.com/gravitational/trace" - log "github.com/sirupsen/logrus" corev1 "k8s.io/api/core/v1" kubeerrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -172,7 +172,7 @@ func NewSharedWithClient(restClient kubernetes.Interface) (*Backend, error) { ident := os.Getenv(ReleaseNameEnv) if ident == "" { ident = "teleport" - log.Warnf("Var %q is not set, falling back to default identifier %q for shared store.", ReleaseNameEnv, ident) + slog.WarnContext(context.Background(), "Var RELEASE_NAME is not set, falling back to default identifier teleport for shared store.") } return NewWithConfig( diff --git a/lib/backend/lite/lite.go b/lib/backend/lite/lite.go index b2ff52eb25e2..2ceb9ec53141 100644 --- a/lib/backend/lite/lite.go +++ b/lib/backend/lite/lite.go @@ -25,6 +25,7 @@ import ( "errors" "fmt" "io/fs" + "log/slog" "net/url" "os" "path/filepath" @@ -36,7 +37,6 @@ import ( "github.com/gravitational/trace" "github.com/jonboulle/clockwork" "github.com/mattn/go-sqlite3" - log "github.com/sirupsen/logrus" "github.com/gravitational/teleport" "github.com/gravitational/teleport/api/types" @@ -250,18 +250,18 @@ func NewWithConfig(ctx context.Context, cfg Config) (*Backend, error) { l := &Backend{ Config: cfg, db: db, - Entry: log.WithFields(log.Fields{teleport.ComponentKey: BackendName}), + logger: slog.With(teleport.ComponentKey, BackendName), clock: cfg.Clock, buf: buf, ctx: closeCtx, cancel: cancel, } - l.Debugf("Connected to: %v, poll stream period: %v", connectionURI, cfg.PollStreamPeriod) + l.logger.DebugContext(ctx, "Connected to database", "database", connectionURI, "poll_stream_period", cfg.PollStreamPeriod) if err := l.createSchema(); err != nil { return nil, trace.Wrap(err, "error creating schema: %v", connectionURI) } if err := l.showPragmas(); err != nil { - l.Warningf("Failed to show pragma settings: %v.", err) + l.logger.WarnContext(ctx, "Failed to show pragma settings", "error", err) } go l.runPeriodicOperations() return l, nil @@ -270,8 +270,8 @@ func NewWithConfig(ctx context.Context, cfg Config) (*Backend, error) { // Backend uses SQLite to implement storage interfaces type Backend struct { Config - *log.Entry - db *sql.DB + logger *slog.Logger + db *sql.DB // clock is used to generate time, // could be swapped in tests for fixed time clock clockwork.Clock @@ -307,7 +307,7 @@ func (l *Backend) showPragmas() error { if err := row.Scan(&busyTimeout); err != nil { return trace.Wrap(err) } - l.Debugf("journal_mode=%v, synchronous=%v, busy_timeout=%v", journalMode, synchronous, busyTimeout) + l.logger.DebugContext(l.ctx, "retrieved pragma values", "journal_mode", journalMode, "synchronous", synchronous, "busy_timeout", busyTimeout) return nil }) } @@ -340,14 +340,14 @@ func (l *Backend) createSchema() error { for _, schema := range schemas { if _, err := l.db.ExecContext(l.ctx, schema); err != nil { - l.Errorf("Failing schema step: %v, %v.", schema, err) + l.logger.ErrorContext(l.ctx, "Failed schema step", "step", schema, "error", err) return trace.Wrap(err) } } for table, column := range map[string]string{"kv": "revision", "events": "kv_revision"} { if err := l.migrateRevision(table, column); err != nil { - l.Errorf("Failing schema step: %s.%s, %v.", table, column, err) + l.logger.WarnContext(l.ctx, "Failed schema migration", "table", table, "column", column, "error", err) return trace.Wrap(err) } } @@ -674,7 +674,7 @@ func (l *Backend) GetRange(ctx context.Context, startKey, endKey backend.Key, li return nil, trace.Wrap(err) } if len(result.Items) == backend.DefaultRangeLimit { - l.Warnf("Range query hit backend limit. (this is a bug!) startKey=%q,limit=%d", startKey, backend.DefaultRangeLimit) + l.logger.WarnContext(ctx, "Range query hit backend limit. (this is a bug!)", "start_key", startKey, "limit", backend.DefaultRangeLimit) } return &result, nil } @@ -943,7 +943,7 @@ func (l *Backend) inTransaction(ctx context.Context, f func(tx *sql.Tx) error) ( defer func() { diff := time.Since(start) if diff > slowTransactionThreshold { - l.Warningf("SLOW TRANSACTION: %v, %v.", diff, string(debug.Stack())) + l.logger.WarnContext(ctx, "SLOW TRANSACTION", "duration", diff, "stack", string(debug.Stack())) } }() tx, err := l.db.BeginTx(ctx, nil) @@ -958,10 +958,10 @@ func (l *Backend) inTransaction(ctx context.Context, f func(tx *sql.Tx) error) ( } defer func() { if r := recover(); r != nil { - l.Errorf("Unexpected panic in inTransaction: %v, trying to rollback.", r) + l.logger.ErrorContext(ctx, "Unexpected panic in inTransaction, trying to rollback.", "error", r) err = trace.BadParameter("panic: %v", r) if e2 := rollback(); e2 != nil { - l.Errorf("Failed to rollback: %v.", e2) + l.logger.ErrorContext(ctx, "Failed to rollback", "error", e2) } return } @@ -981,10 +981,10 @@ func (l *Backend) inTransaction(ctx context.Context, f func(tx *sql.Tx) error) ( } if !l.isClosed() { if !trace.IsCompareFailed(err) && !trace.IsAlreadyExists(err) && !trace.IsConnectionProblem(err) { - l.Warningf("Unexpected error in inTransaction: %v, rolling back.", trace.DebugReport(err)) + l.logger.WarnContext(ctx, "Unexpected error in inTransaction, rolling back.", "error", err) } if e2 := rollback(); e2 != nil { - l.Errorf("Failed to rollback too: %v.", e2) + l.logger.ErrorContext(ctx, "Failed to rollback too", "error", e2) } } return diff --git a/lib/backend/lite/periodic.go b/lib/backend/lite/periodic.go index 4f62100f1263..c31093042127 100644 --- a/lib/backend/lite/periodic.go +++ b/lib/backend/lite/periodic.go @@ -39,7 +39,7 @@ func (l *Backend) runPeriodicOperations() { select { case <-l.ctx.Done(): if err := l.closeDatabase(); err != nil { - l.Warningf("Error closing database: %v", err) + l.logger.WarnContext(l.ctx, "Error closing database", "error", err) } return case <-t.C: @@ -49,19 +49,19 @@ func (l *Backend) runPeriodicOperations() { // or is closing, downgrade the log to debug // to avoid polluting logs in production if trace.IsConnectionProblem(err) { - l.Debugf("Failed to run remove expired keys: %v", err) + l.logger.DebugContext(l.ctx, "Failed to run remove expired keys", "error", err) } else { - l.Warningf("Failed to run remove expired keys: %v", err) + l.logger.DebugContext(l.ctx, "Failed to run remove expired keys", "error", err) } } if !l.EventsOff { err = l.removeOldEvents() if err != nil { - l.Warningf("Failed to run remove old events: %v", err) + l.logger.WarnContext(l.ctx, "Failed to run remove old events", "error", err) } rowid, err = l.pollEvents(rowid) if err != nil { - l.Warningf("Failed to run poll events: %v", err) + l.logger.WarnContext(l.ctx, "Failed to run poll events", "error", err) } } } @@ -149,7 +149,7 @@ func (l *Backend) pollEvents(rowid int64) (int64, error) { if err != nil { return rowid, trace.Wrap(err) } - l.Debugf("Initialized event ID iterator to %v", rowid) + l.logger.DebugContext(l.ctx, "Initialized event ID iterator", "event_id", rowid) l.buf.SetInit() } diff --git a/lib/backend/memory/memory.go b/lib/backend/memory/memory.go index 89c2ae33bb00..08b69872e699 100644 --- a/lib/backend/memory/memory.go +++ b/lib/backend/memory/memory.go @@ -21,13 +21,13 @@ package memory import ( "bytes" "context" + "log/slog" "sync" "time" "github.com/google/btree" "github.com/gravitational/trace" "github.com/jonboulle/clockwork" - log "github.com/sirupsen/logrus" "github.com/gravitational/teleport" "github.com/gravitational/teleport/api/types" @@ -98,10 +98,8 @@ func New(cfg Config) (*Memory, error) { ) buf.SetInit() m := &Memory{ - Mutex: &sync.Mutex{}, - Entry: log.WithFields(log.Fields{ - teleport.ComponentKey: teleport.ComponentMemory, - }), + Mutex: &sync.Mutex{}, + logger: slog.With(teleport.ComponentKey, teleport.ComponentMemory), Config: cfg, tree: btree.NewG(cfg.BTreeDegree, func(a, b *btreeItem) bool { return a.Less(b) @@ -117,7 +115,7 @@ func New(cfg Config) (*Memory, error) { // Memory is a memory B-Tree based backend type Memory struct { *sync.Mutex - *log.Entry + logger *slog.Logger Config // tree is a BTree with items tree *btree.BTreeG[*btreeItem] @@ -308,7 +306,7 @@ func (m *Memory) GetRange(ctx context.Context, startKey, endKey backend.Key, lim m.removeExpired() re := m.getRange(ctx, startKey, endKey, limit) if len(re.Items) == backend.DefaultRangeLimit { - m.Warnf("Range query hit backend limit. (this is a bug!) startKey=%q,limit=%d", startKey, backend.DefaultRangeLimit) + m.logger.WarnContext(ctx, "Range query hit backend limit. (this is a bug!)", "start_key", startKey, "limit", backend.DefaultRangeLimit) } return &re, nil } @@ -474,7 +472,7 @@ func (m *Memory) removeExpired() int { } m.heap.PopEl() m.tree.Delete(item) - m.Debugf("Removed expired %v %v item.", string(item.Key), item.Expires) + m.logger.DebugContext(m.ctx, "Removed expired item.", "key", item.Key.String(), "epiry", item.Expires) removed++ event := backend.Event{ @@ -488,7 +486,7 @@ func (m *Memory) removeExpired() int { } } if removed > 0 { - m.Debugf("Removed %v expired items.", removed) + m.logger.DebugContext(m.ctx, "Removed expired items.", "num_expired", removed) } return removed } diff --git a/lib/backend/report.go b/lib/backend/report.go index 32539ea1d02c..38006e2546de 100644 --- a/lib/backend/report.go +++ b/lib/backend/report.go @@ -31,7 +31,6 @@ import ( lru "github.com/hashicorp/golang-lru/v2" "github.com/jonboulle/clockwork" "github.com/prometheus/client_golang/prometheus" - log "github.com/sirupsen/logrus" "go.opentelemetry.io/otel/attribute" oteltrace "go.opentelemetry.io/otel/trace" "golang.org/x/time/rate" @@ -147,7 +146,7 @@ func (s *Reporter) GetRange(ctx context.Context, startKey, endKey Key, limit int } else { reads.WithLabelValues(s.Component).Add(float64(len(res.Items))) } - s.trackRequest(types.OpGet, startKey, endKey) + s.trackRequest(ctx, types.OpGet, startKey, endKey) end := s.Clock().Now() if d := end.Sub(start); d > time.Second*3 { if s.slowRangeLogLimiter.AllowN(end, 1) { @@ -181,7 +180,7 @@ func (s *Reporter) Create(ctx context.Context, i Item) (*Lease, error) { } else { writes.WithLabelValues(s.Component).Inc() } - s.trackRequest(types.OpPut, i.Key, nil) + s.trackRequest(ctx, types.OpPut, i.Key, nil) return lease, err } @@ -207,7 +206,7 @@ func (s *Reporter) Put(ctx context.Context, i Item) (*Lease, error) { } else { writes.WithLabelValues(s.Component).Inc() } - s.trackRequest(types.OpPut, i.Key, nil) + s.trackRequest(ctx, types.OpPut, i.Key, nil) return lease, err } @@ -235,7 +234,7 @@ func (s *Reporter) Update(ctx context.Context, i Item) (*Lease, error) { } else { writes.WithLabelValues(s.Component).Inc() } - s.trackRequest(types.OpPut, i.Key, nil) + s.trackRequest(ctx, types.OpPut, i.Key, nil) return lease, err } @@ -263,7 +262,7 @@ func (s *Reporter) ConditionalUpdate(ctx context.Context, i Item) (*Lease, error } else { writes.WithLabelValues(s.Component).Inc() } - s.trackRequest(types.OpPut, i.Key, nil) + s.trackRequest(ctx, types.OpPut, i.Key, nil) return lease, err } @@ -286,7 +285,7 @@ func (s *Reporter) Get(ctx context.Context, key Key) (*Item, error) { if err != nil && !trace.IsNotFound(err) { readRequestsFailed.WithLabelValues(s.Component).Inc() } - s.trackRequest(types.OpGet, key, nil) + s.trackRequest(ctx, types.OpGet, key, nil) return item, err } @@ -314,7 +313,7 @@ func (s *Reporter) CompareAndSwap(ctx context.Context, expected Item, replaceWit } else { writes.WithLabelValues(s.Component).Inc() } - s.trackRequest(types.OpPut, expected.Key, nil) + s.trackRequest(ctx, types.OpPut, expected.Key, nil) return lease, err } @@ -341,7 +340,7 @@ func (s *Reporter) Delete(ctx context.Context, key Key) error { } else { writes.WithLabelValues(s.Component).Inc() } - s.trackRequest(types.OpDelete, key, nil) + s.trackRequest(ctx, types.OpDelete, key, nil) return err } @@ -369,7 +368,7 @@ func (s *Reporter) ConditionalDelete(ctx context.Context, key Key, revision stri } else { writes.WithLabelValues(s.Component).Inc() } - s.trackRequest(types.OpDelete, key, nil) + s.trackRequest(ctx, types.OpDelete, key, nil) return err } @@ -411,10 +410,10 @@ func (s *Reporter) AtomicWrite(ctx context.Context, condacts []ConditionalAction switch ca.Action.Kind { case KindPut: writeTotal++ - s.trackRequest(types.OpPut, ca.Key, nil) + s.trackRequest(ctx, types.OpPut, ca.Key, nil) case KindDelete: writeTotal++ - s.trackRequest(types.OpDelete, ca.Key, nil) + s.trackRequest(ctx, types.OpDelete, ca.Key, nil) default: // ignore other variants } @@ -445,7 +444,7 @@ func (s *Reporter) DeleteRange(ctx context.Context, startKey, endKey Key) error if err != nil && !trace.IsNotFound(err) { batchWriteRequestsFailed.WithLabelValues(s.Component).Inc() } - s.trackRequest(types.OpDelete, startKey, endKey) + s.trackRequest(ctx, types.OpDelete, startKey, endKey) return err } @@ -476,7 +475,7 @@ func (s *Reporter) KeepAlive(ctx context.Context, lease Lease, expires time.Time } else { writes.WithLabelValues(s.Component).Inc() } - s.trackRequest(types.OpPut, lease.Key, nil) + s.trackRequest(ctx, types.OpPut, lease.Key, nil) return err } @@ -521,7 +520,7 @@ type topRequestsCacheKey struct { } // trackRequests tracks top requests, endKey is supplied for ranges -func (s *Reporter) trackRequest(opType types.OpType, key Key, endKey Key) { +func (s *Reporter) trackRequest(ctx context.Context, opType types.OpType, key Key, endKey Key) { if len(key) == 0 { return } @@ -551,7 +550,7 @@ func (s *Reporter) trackRequest(opType types.OpType, key Key, endKey Key) { counter, err := requests.GetMetricWithLabelValues(s.Component, keyLabel, rangeSuffix) if err != nil { - log.Warningf("Failed to get counter: %v", err) + slog.WarnContext(ctx, "Failed to get prometheus counter", "error", err) return } counter.Inc() diff --git a/lib/backend/report_test.go b/lib/backend/report_test.go index 360b0acfaa31..c10c4d2c0ae8 100644 --- a/lib/backend/report_test.go +++ b/lib/backend/report_test.go @@ -19,6 +19,7 @@ package backend import ( + "context" "strconv" "sync/atomic" "testing" @@ -60,7 +61,7 @@ func TestReporterTopRequestsLimit(t *testing.T) { // Run through 1000 unique keys. for i := 0; i < 1000; i++ { - r.trackRequest(types.OpGet, []byte(strconv.Itoa(i)), nil) + r.trackRequest(context.Background(), types.OpGet, []byte(strconv.Itoa(i)), nil) } // Now the metric should have only 10 of the keys above. diff --git a/lib/events/firestoreevents/firestoreevents.go b/lib/events/firestoreevents/firestoreevents.go index 8c1d4d4f9785..3c480cb6fa6b 100644 --- a/lib/events/firestoreevents/firestoreevents.go +++ b/lib/events/firestoreevents/firestoreevents.go @@ -22,6 +22,7 @@ import ( "context" "encoding/json" "errors" + "log/slog" "net/url" "sort" "strconv" @@ -35,7 +36,6 @@ import ( "github.com/gravitational/trace" "github.com/jonboulle/clockwork" "github.com/prometheus/client_golang/prometheus" - log "github.com/sirupsen/logrus" "google.golang.org/api/iterator" "github.com/gravitational/teleport" @@ -252,8 +252,8 @@ func (cfg *EventsConfig) SetFromURL(url *url.URL) error { // Log is a firestore-db backed storage of events type Log struct { - // Entry is a log entry - *log.Entry + // logger emits log messages + logger *slog.Logger // Config is a backend configuration EventsConfig // svc is the primary Firestore client @@ -281,11 +281,9 @@ func New(cfg EventsConfig) (*Log, error) { return nil, trace.Wrap(err) } - l := log.WithFields(log.Fields{ - teleport.ComponentKey: teleport.Component(teleport.ComponentFirestore), - }) - l.Info("Initializing event backend.") closeCtx, cancel := context.WithCancel(context.Background()) + l := slog.With(teleport.ComponentKey, teleport.ComponentFirestore) + l.InfoContext(closeCtx, "Initializing event backend.") firestoreAdminClient, firestoreClient, err := firestorebk.CreateFirestoreClients(closeCtx, cfg.ProjectID, cfg.EndPoint, cfg.CredentialsPath) if err != nil { cancel() @@ -295,7 +293,7 @@ func New(cfg EventsConfig) (*Log, error) { b := &Log{ svcContext: closeCtx, svcCancel: cancel, - Entry: l, + logger: l, EventsConfig: cfg, svc: firestoreClient, } @@ -310,7 +308,7 @@ func New(cfg EventsConfig) (*Log, error) { go firestorebk.RetryingAsyncFunctionRunner(b.svcContext, retryutils.LinearConfig{ Step: b.RetryPeriod / 10, Max: b.RetryPeriod, - }, b.Logger, b.purgeExpiredEvents, "purgeExpiredEvents") + }, b.logger.With("task_name", "purge_expired_events"), b.purgeExpiredEvents) } return b, nil } @@ -388,7 +386,14 @@ func (l *Log) searchEventsWithFilter(ctx context.Context, params searchEventsWit params.limit = batchReadLimit } - g := l.WithFields(log.Fields{"From": params.fromUTC, "To": params.toUTC, "Namespace": params.namespace, "Filter": params.filter, "Limit": params.limit, "StartKey": params.lastKey}) + g := l.logger.With( + "from", params.fromUTC, + "to", params.toUTC, + "namespace", params.namespace, + "filter", params.filter, + "limit", params.limit, + "start_key", params.lastKey, + ) var firestoreOrdering firestore.Direction switch params.order { @@ -451,7 +456,7 @@ func (l *Log) query( lastKey string, filter searchEventsFilter, limit int, - g *log.Entry, + g *slog.Logger, ) (values []events.EventFields, _ string, err error) { var ( checkpointTime int64 @@ -485,7 +490,7 @@ func (l *Log) query( return nil, "", firestorebk.ConvertGRPCError(err) } - g.WithFields(log.Fields{"duration": time.Since(start)}).Debugf("Query completed.") + g.DebugContext(ctx, "Query completed.", "duration", time.Since(start)) // Iterate over the documents in the query. // The iterator is limited to [limit] documents so in order to know if we @@ -604,7 +609,7 @@ func (l *Log) ensureIndexes(adminSvc *apiv1.FirestoreAdminClient) error { firestorebk.Field(createdAtDocProperty, adminpb.Index_IndexField_ASCENDING), firestorebk.Field(firestore.DocumentID, adminpb.Index_IndexField_ASCENDING), ) - err := firestorebk.EnsureIndexes(l.svcContext, adminSvc, tuples, l.getIndexParent()) + err := firestorebk.EnsureIndexes(l.svcContext, adminSvc, l.logger, tuples, l.getIndexParent()) return trace.Wrap(err) }