Skip to content

Commit

Permalink
Migrate lib/backend to slog (#46865)
Browse files Browse the repository at this point in the history
* Convert etcd backend to use slog

* Convert firestore backend to use slog

* Convert dynamo backend to use slog

* Convert memory backend to use slog

* Convert lite backend to use slog

* Convert kubernetes backend to use slog

* Convert backend helpers to use slog

* Convert firestore events to use slog
  • Loading branch information
rosstimothy authored Sep 25, 2024
1 parent f7886b0 commit eddadae
Show file tree
Hide file tree
Showing 17 changed files with 156 additions and 151 deletions.
24 changes: 12 additions & 12 deletions lib/backend/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,18 @@ import (
"bytes"
"context"
"fmt"
"log/slog"
"sort"
"sync"
"time"

radix "github.com/armon/go-radix"
"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"
log "github.com/sirupsen/logrus"

"github.com/gravitational/teleport"
"github.com/gravitational/teleport/api/types"
logutils "github.com/gravitational/teleport/lib/utils/log"
)

type bufferConfig struct {
Expand Down Expand Up @@ -85,7 +86,7 @@ func BufferClock(c clockwork.Clock) BufferOption {
// of predefined size, that is capable of fan-out of the backend events.
type CircularBuffer struct {
sync.Mutex
*log.Entry
logger *slog.Logger
cfg bufferConfig
init, closed bool
watchers *watcherTree
Expand All @@ -103,9 +104,7 @@ func NewCircularBuffer(opts ...BufferOption) *CircularBuffer {
opt(&cfg)
}
return &CircularBuffer{
Entry: log.WithFields(log.Fields{
teleport.ComponentKey: teleport.ComponentBuffer,
}),
logger: slog.With(teleport.ComponentKey, teleport.ComponentBuffer),
cfg: cfg,
watchers: newWatcherTree(),
}
Expand Down Expand Up @@ -157,7 +156,7 @@ func (c *CircularBuffer) SetInit() {
})

for _, watcher := range watchersToDelete {
c.Warningf("Closing %v, failed to send init event.", watcher)
c.logger.WarnContext(context.Background(), "Closing watcher, failed to send init event.", "watcher", logutils.StringerAttr(watcher))
watcher.closeWatcher()
c.watchers.rm(watcher)
}
Expand Down Expand Up @@ -213,7 +212,7 @@ func (c *CircularBuffer) fanOutEvent(r Event) {
})

for _, watcher := range watchersToDelete {
c.Warningf("Closing %v, buffer overflow at %v (backlog=%v).", watcher, len(watcher.eventsC), watcher.backlogLen())
c.logger.WarnContext(context.Background(), "Closing watcher, buffer overflow", "watcher", logutils.StringerAttr(watcher), "events", len(watcher.eventsC), "backlog", watcher.backlogLen())
watcher.closeWatcher()
c.watchers.rm(watcher)
}
Expand Down Expand Up @@ -275,10 +274,10 @@ func (c *CircularBuffer) NewWatcher(ctx context.Context, watch Watch) (Watcher,
cancel: cancel,
capacity: watch.QueueSize,
}
c.Debugf("Add %v.", w)
c.logger.DebugContext(ctx, "Adding watcher", "watcher", logutils.StringerAttr(w))
if c.init {
if ok := w.init(); !ok {
c.Warningf("Closing %v, failed to send init event.", w)
c.logger.WarnContext(ctx, "Closing watcher, failed to send init event.", "watcher", logutils.StringerAttr(w))
return nil, trace.BadParameter("failed to send init event")
}
}
Expand All @@ -287,16 +286,17 @@ func (c *CircularBuffer) NewWatcher(ctx context.Context, watch Watch) (Watcher,
}

func (c *CircularBuffer) removeWatcherWithLock(watcher *BufferWatcher) {
ctx := context.Background()
c.Lock()
defer c.Unlock()
if watcher == nil {
c.Warningf("Internal logic error: %v.", trace.DebugReport(trace.BadParameter("empty watcher")))
c.logger.WarnContext(ctx, "Internal logic error, empty watcher")
return
}
c.Debugf("Removing watcher %v (%p) via external close.", watcher.Name, watcher)
c.logger.DebugContext(ctx, "Removing watcher via external close.", "watcher", logutils.StringerAttr(watcher))
found := c.watchers.rm(watcher)
if !found {
c.Debugf("Could not find watcher %v.", watcher.Name)
c.logger.DebugContext(ctx, "Could not find watcher", "watcher", watcher.Name)
}
}

Expand Down
6 changes: 3 additions & 3 deletions lib/backend/dynamo/atomicwrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ TxnLoop:
var txnErr *types.TransactionCanceledException
if !errors.As(err, &txnErr) {
if s := err.Error(); strings.Contains(s, "AccessDenied") && strings.Contains(s, "dynamodb:ConditionCheckItem") {
b.Warnf("AtomicWrite failed with error that may indicate dynamodb is missing the required dynamodb:ConditionCheckItem permission (this permission is now required for teleport v16 and later). Consider updating your IAM policy to include this permission. Original error: %v", err)
b.logger.WarnContext(ctx, "AtomicWrite failed with error that may indicate dynamodb is missing the required dynamodb:ConditionCheckItem permission (this permission is now required for teleport v16 and later). Consider updating your IAM policy to include this permission.", "error", err)
return "", trace.Errorf("teleport is missing required AWS permission dynamodb:ConditionCheckItem, please contact your administrator to update permissions")
}
return "", trace.Errorf("unexpected error during atomic write: %v", err)
Expand Down Expand Up @@ -258,7 +258,7 @@ TxnLoop:
if n := i + 1; n > 2 {
// if we retried more than once, txn experienced non-trivial conflict and we should warn about it. Infrequent warnings of this kind
// are nothing to be concerned about, but high volumes may indicate that an automatic process is creating excessive conflicts.
b.Warnf("AtomicWrite retried %d times due to dynamodb transaction conflicts. Some conflict is expected, but persistent conflict warnings may indicate an unhealthy state.", n)
b.logger.WarnContext(ctx, "AtomicWrite retried due to dynamodb transaction conflicts. Some conflict is expected, but persistent conflict warnings may indicate an unhealthy state.", "retry_attempts", n)
}

if !includesPut {
Expand All @@ -274,7 +274,7 @@ TxnLoop:
keys = append(keys, ca.Key)
}

b.Errorf("AtomicWrite failed, dynamodb transaction experienced too many conflicts. keys=%s", bytes.Join(keys, []byte(",")))
b.logger.ErrorContext(ctx, "AtomicWrite failed, dynamodb transaction experienced too many conflicts", "keys", bytes.Join(keys, []byte(",")))

return "", trace.Errorf("dynamodb transaction experienced too many conflicts")
}
30 changes: 15 additions & 15 deletions lib/backend/dynamo/dynamodbbk.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package dynamo
import (
"context"
"errors"
"log/slog"
"net/http"
"sort"
"strconv"
Expand All @@ -40,7 +41,6 @@ import (
streamtypes "github.com/aws/aws-sdk-go-v2/service/dynamodbstreams/types"
"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"
log "github.com/sirupsen/logrus"
"go.opentelemetry.io/contrib/instrumentation/github.com/aws/aws-sdk-go-v2/otelaws"

"github.com/gravitational/teleport"
Expand Down Expand Up @@ -164,9 +164,9 @@ type dynamoClient interface {

// Backend is a DynamoDB-backed key value backend implementation.
type Backend struct {
svc dynamoClient
clock clockwork.Clock
*log.Entry
svc dynamoClient
clock clockwork.Clock
logger *slog.Logger
streams *dynamodbstreams.Client
buf *backend.CircularBuffer
Config
Expand Down Expand Up @@ -232,20 +232,20 @@ var _ backend.Backend = &Backend{}
// New returns new instance of DynamoDB backend.
// It's an implementation of backend API's NewFunc
func New(ctx context.Context, params backend.Params) (*Backend, error) {
l := log.WithFields(log.Fields{teleport.ComponentKey: BackendName})
l := slog.With(teleport.ComponentKey, BackendName)

var cfg *Config
if err := utils.ObjectToStruct(params, &cfg); err != nil {
return nil, trace.BadParameter("DynamoDB configuration is invalid: %v", err)
}

defer l.Debug("AWS session is created.")
defer l.DebugContext(ctx, "AWS session is created.")

if err := cfg.CheckAndSetDefaults(); err != nil {
return nil, trace.Wrap(err)
}

l.Infof("Initializing backend. Table: %q, poll streams every %v.", cfg.TableName, cfg.PollStreamPeriod)
l.InfoContext(ctx, "Initializing backend", "table", cfg.TableName, "poll_stream_period", cfg.PollStreamPeriod)

opts := []func(*config.LoadOptions) error{
config.WithRegion(cfg.Region),
Expand Down Expand Up @@ -283,7 +283,7 @@ func New(ctx context.Context, params backend.Params) (*Backend, error) {
otelaws.AppendMiddlewares(&awsConfig.APIOptions, otelaws.WithAttributeSetter(otelaws.DynamoDBAttributeSetter))

b := &Backend{
Entry: l,
logger: l,
Config: *cfg,
clock: clockwork.NewRealClock(),
buf: backend.NewCircularBuffer(backend.BufferCapacity(cfg.BufferSize)),
Expand All @@ -297,7 +297,7 @@ func New(ctx context.Context, params backend.Params) (*Backend, error) {

go func() {
if err := b.asyncPollStreams(ctx); err != nil {
b.Errorf("Stream polling loop exited: %v", err)
b.logger.ErrorContext(ctx, "Stream polling loop exited", "error", err)
}
}()

Expand All @@ -316,12 +316,12 @@ func (b *Backend) configureTable(ctx context.Context, svc *applicationautoscalin
case tableStatusOK:
if tableBillingMode == types.BillingModePayPerRequest {
b.Config.EnableAutoScaling = false
b.Logger.Info("Ignoring auto_scaling setting as table is in on-demand mode.")
b.logger.InfoContext(ctx, "Ignoring auto_scaling setting as table is in on-demand mode.")
}
case tableStatusMissing:
if b.Config.BillingMode == billingModePayPerRequest {
b.Config.EnableAutoScaling = false
b.Logger.Info("Ignoring auto_scaling setting as table is being created in on-demand mode.")
b.logger.InfoContext(ctx, "Ignoring auto_scaling setting as table is being created in on-demand mode.")
}
err = b.createTable(ctx, tableName, fullPathKey)
case tableStatusNeedsMigration:
Expand Down Expand Up @@ -544,7 +544,7 @@ func (b *Backend) getAllRecords(ctx context.Context, startKey, endKey backend.Ke
// otherwise updated lastEvaluatedKey and proceed with obtaining new records.
if (limit != 0 && len(result.records) >= limit) || len(re.lastEvaluatedKey) == 0 {
if len(result.records) == backend.DefaultRangeLimit {
b.Warnf("Range query hit backend limit. (this is a bug!) startKey=%q,limit=%d", startKey, backend.DefaultRangeLimit)
b.logger.WarnContext(ctx, "Range query hit backend limit. (this is a bug!)", "start_key", startKey, "limit", backend.DefaultRangeLimit)
}
result.lastEvaluatedKey = nil
return &result, nil
Expand Down Expand Up @@ -894,15 +894,15 @@ func (b *Backend) createTable(ctx context.Context, tableName *string, rangeKey s
if err != nil {
return trace.Wrap(err)
}
b.Infof("Waiting until table %q is created.", aws.ToString(tableName))
b.logger.InfoContext(ctx, "Waiting until table is created.", "table", aws.ToString(tableName))
waiter := dynamodb.NewTableExistsWaiter(b.svc)

err = waiter.Wait(ctx,
&dynamodb.DescribeTableInput{TableName: tableName},
10*time.Minute,
)
if err == nil {
b.Infof("Table %q has been created.", aws.ToString(tableName))
b.logger.InfoContext(ctx, "Table has been created.", "table", aws.ToString(tableName))
}

return trace.Wrap(err)
Expand Down Expand Up @@ -1123,7 +1123,7 @@ func (b *Backend) getKey(ctx context.Context, key backend.Key) (*record, error)
// Check if key expired, if expired delete it
if r.isExpired(b.clock.Now()) {
if err := b.deleteKeyIfExpired(ctx, key); err != nil {
b.Warnf("Failed deleting expired key %q: %v", key, err)
b.logger.WarnContext(ctx, "Failed deleting expired key", "key", key, "error", err)
}
return nil, trace.NotFound("%q is not found", key)
}
Expand Down
5 changes: 3 additions & 2 deletions lib/backend/dynamo/dynamodbbk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package dynamo
import (
"context"
"fmt"
"log/slog"
"os"
"testing"
"time"
Expand All @@ -35,10 +36,10 @@ import (
"github.com/google/uuid"
"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"
log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/gravitational/teleport"
"github.com/gravitational/teleport/lib/backend"
"github.com/gravitational/teleport/lib/backend/test"
"github.com/gravitational/teleport/lib/utils"
Expand Down Expand Up @@ -218,7 +219,7 @@ func TestCreateTable(t *testing.T) {
expectedProvisionedthroughput: tc.expectedProvisionedThroughput,
}
b := &Backend{
Entry: log.NewEntry(log.New()),
logger: slog.With(teleport.ComponentKey, BackendName),
Config: Config{
BillingMode: tc.billingMode,
ReadCapacityUnits: int64(tc.readCapacityUnits),
Expand Down
31 changes: 16 additions & 15 deletions lib/backend/dynamo/shards.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/api/utils/retryutils"
"github.com/gravitational/teleport/lib/backend"
logutils "github.com/gravitational/teleport/lib/utils/log"
)

type shardEvent struct {
Expand All @@ -48,7 +49,7 @@ func (b *Backend) asyncPollStreams(ctx context.Context) error {
Max: b.RetryPeriod,
})
if err != nil {
b.Errorf("Bad retry parameters: %v", err)
b.logger.ErrorContext(ctx, "Bad retry parameters", "error", err)
return trace.Wrap(err)
}

Expand All @@ -61,14 +62,14 @@ func (b *Backend) asyncPollStreams(ctx context.Context) error {
if b.isClosed() {
return trace.Wrap(err)
}
b.Errorf("Poll streams returned with error: %v.", err)
b.logger.ErrorContext(ctx, "Poll streams returned with error", "error", err)
}
b.Debugf("Reloading %v.", retry)
b.logger.DebugContext(ctx, "Reloading", "retry_duration", retry.Duration())
select {
case <-retry.After():
retry.Inc()
case <-ctx.Done():
b.Debugf("Closed, returning from asyncPollStreams loop.")
b.logger.DebugContext(ctx, "Closed, returning from asyncPollStreams loop.")
return nil
}
}
Expand All @@ -82,7 +83,7 @@ func (b *Backend) pollStreams(externalCtx context.Context) error {
if err != nil {
return trace.Wrap(err)
}
b.Debugf("Found latest event stream %v.", aws.ToString(streamArn))
b.logger.DebugContext(ctx, "Found latest event stream", "stream_arn", aws.ToString(streamArn))

set := make(map[string]struct{})
eventsC := make(chan shardEvent)
Expand All @@ -94,7 +95,7 @@ func (b *Backend) pollStreams(externalCtx context.Context) error {
return false
}
if _, ok := set[aws.ToString(shard.ParentShardId)]; ok {
b.Tracef("Skipping child shard: %s, still polling parent %s", sid, aws.ToString(shard.ParentShardId))
b.logger.Log(ctx, logutils.TraceLevel, "Skipping child shard, still polling parent", "child_shard_id", sid, "parent_shard_id", aws.ToString(shard.ParentShardId))
// still processing parent
return false
}
Expand All @@ -120,7 +121,7 @@ func (b *Backend) pollStreams(externalCtx context.Context) error {
continue
}
shardID := aws.ToString(shards[i].ShardId)
b.Tracef("Adding active shard %v.", shardID)
b.logger.Log(ctx, logutils.TraceLevel, "Adding active shard", "shard_id", shardID)
set[shardID] = struct{}{}
go b.asyncPollShard(ctx, streamArn, shards[i], eventsC, initC)
started++
Expand Down Expand Up @@ -161,15 +162,15 @@ func (b *Backend) pollStreams(externalCtx context.Context) error {
if event.shardID == "" {
// empty shard IDs in err-variant events are programming bugs and will lead to
// invalid state.
b.WithError(err).Warnf("Forcing watch system reset due to empty shard ID on error (this is a bug)")
b.logger.WarnContext(ctx, "Forcing watch system reset due to empty shard ID on error (this is a bug)", "error", err)
return trace.BadParameter("empty shard ID")
}
delete(set, event.shardID)
if !errors.Is(event.err, io.EOF) {
b.Debugf("Shard ID %v closed with error: %v, resetting buffers.", event.shardID, event.err)
b.logger.DebugContext(ctx, "Shard closed with error, resetting buffers.", "shard_id", event.shardID, "error", event.err)
return trace.Wrap(event.err)
}
b.Tracef("Shard ID %v exited gracefully.", event.shardID)
b.logger.Log(ctx, logutils.TraceLevel, "Shard exited gracefully.", "shard_id", event.shardID)
} else {
b.buf.Emit(event.events...)
}
Expand All @@ -178,7 +179,7 @@ func (b *Backend) pollStreams(externalCtx context.Context) error {
return trace.Wrap(err)
}
case <-ctx.Done():
b.Tracef("Context is closing, returning.")
b.logger.Log(ctx, logutils.TraceLevel, "Context is closing, returning.")
return nil
}
}
Expand Down Expand Up @@ -231,18 +232,18 @@ func (b *Backend) pollShard(ctx context.Context, streamArn *string, shard stream
return convertError(err)
}
if len(out.Records) > 0 {
b.Tracef("Got %v new stream shard records.", len(out.Records))
b.logger.Log(ctx, logutils.TraceLevel, "Got new stream shard records.", "num_records", len(out.Records))
}
if len(out.Records) == 0 {
if out.NextShardIterator == nil {
b.Tracef("Shard is closed: %v.", aws.ToString(shard.ShardId))
b.logger.Log(ctx, logutils.TraceLevel, "Shard is closed", "shard_id", aws.ToString(shard.ShardId))
return io.EOF
}
iterator = out.NextShardIterator
continue
}
if out.NextShardIterator == nil {
b.Tracef("Shard is closed: %v.", aws.ToString(shard.ShardId))
b.logger.Log(ctx, logutils.TraceLevel, "Shard is closed", "shard_id", aws.ToString(shard.ShardId))
return io.EOF
}
events := make([]backend.Event, 0, len(out.Records))
Expand Down Expand Up @@ -354,7 +355,7 @@ func (b *Backend) asyncPollShard(ctx context.Context, streamArn *string, shard s
select {
case eventsC <- shardEvent{err: err, shardID: shardID}:
case <-ctx.Done():
b.Debugf("Context is closing, returning")
b.logger.DebugContext(ctx, "Context is closing, returning")
return
}
}()
Expand Down
Loading

0 comments on commit eddadae

Please sign in to comment.