From b605d094308dd7e7395a723203384be00a559524 Mon Sep 17 00:00:00 2001 From: Matt Dale <9760375+matthewdale@users.noreply.github.com> Date: Thu, 11 Apr 2024 20:50:15 -0700 Subject: [PATCH] GODRIVER-3172 Read response in the background after an op timeout. (#1589) --- internal/csot/csot.go | 6 +- mongo/change_stream.go | 8 +- mongo/collection.go | 30 +- mongo/gridfs/bucket.go | 12 +- mongo/integration/csot_test.go | 571 ++++++++++++++++++ mongo/integration/mtest/mongotest.go | 4 +- .../unified/unified_spec_runner.go | 12 + x/mongo/driver/errors.go | 19 +- x/mongo/driver/operation.go | 47 +- x/mongo/driver/operation/aggregate.go | 14 + x/mongo/driver/operation/find.go | 14 + x/mongo/driver/operation_test.go | 52 +- x/mongo/driver/topology/connection.go | 18 +- x/mongo/driver/topology/pool.go | 89 +++ x/mongo/driver/topology/rtt_monitor.go | 9 +- x/mongo/driver/topology/sdam_spec_test.go | 2 +- 16 files changed, 868 insertions(+), 39 deletions(-) create mode 100644 mongo/integration/csot_test.go diff --git a/internal/csot/csot.go b/internal/csot/csot.go index 678252c51a..43801a5d4f 100644 --- a/internal/csot/csot.go +++ b/internal/csot/csot.go @@ -21,11 +21,13 @@ type timeoutKey struct{} // TODO default behavior. func MakeTimeoutContext(ctx context.Context, to time.Duration) (context.Context, context.CancelFunc) { // Only use the passed in Duration as a timeout on the Context if it - // is non-zero. + // is non-zero and if the Context doesn't already have a timeout. cancelFunc := func() {} - if to != 0 { + if _, deadlineSet := ctx.Deadline(); to != 0 && !deadlineSet { ctx, cancelFunc = context.WithTimeout(ctx, to) } + + // Add timeoutKey either way to indicate CSOT is enabled. return context.WithValue(ctx, timeoutKey{}, true), cancelFunc } diff --git a/mongo/change_stream.go b/mongo/change_stream.go index c4c2fb2590..1bedcc3f8a 100644 --- a/mongo/change_stream.go +++ b/mongo/change_stream.go @@ -277,10 +277,10 @@ func (cs *ChangeStream) executeOperation(ctx context.Context, resuming bool) err cs.aggregate.Pipeline(plArr) } - // If no deadline is set on the passed-in context, cs.client.timeout is set, and context is not already - // a Timeout context, honor cs.client.timeout in new Timeout context for change stream operation execution - // and potential retry. - if _, deadlineSet := ctx.Deadline(); !deadlineSet && cs.client.timeout != nil && !csot.IsTimeoutContext(ctx) { + // If cs.client.timeout is set and context is not already a Timeout context, + // honor cs.client.timeout in new Timeout context for change stream + // operation execution and potential retry. + if cs.client.timeout != nil && !csot.IsTimeoutContext(ctx) { newCtx, cancelFunc := csot.MakeTimeoutContext(ctx, *cs.client.timeout) // Redefine ctx to be the new timeout-derived context. ctx = newCtx diff --git a/mongo/collection.go b/mongo/collection.go index c7b2a8a113..555035ff51 100644 --- a/mongo/collection.go +++ b/mongo/collection.go @@ -863,6 +863,15 @@ func aggregate(a aggregateParams) (cur *Cursor, err error) { Timeout(a.client.timeout). MaxTime(ao.MaxTime) + // Omit "maxTimeMS" from operations that return a user-managed cursor to + // prevent confusing "cursor not found" errors. To maintain existing + // behavior for users who set "timeoutMS" with no context deadline, only + // omit "maxTimeMS" when a context deadline is set. + // + // See DRIVERS-2722 for more detail. + _, deadlineSet := a.ctx.Deadline() + op.OmitCSOTMaxTimeMS(deadlineSet) + if ao.AllowDiskUse != nil { op.AllowDiskUse(*ao.AllowDiskUse) } @@ -1191,6 +1200,22 @@ func (coll *Collection) Distinct(ctx context.Context, fieldName string, filter i // For more information about the command, see https://www.mongodb.com/docs/manual/reference/command/find/. func (coll *Collection) Find(ctx context.Context, filter interface{}, opts ...*options.FindOptions) (cur *Cursor, err error) { + // Omit "maxTimeMS" from operations that return a user-managed cursor to + // prevent confusing "cursor not found" errors. To maintain existing + // behavior for users who set "timeoutMS" with no context deadline, only + // omit "maxTimeMS" when a context deadline is set. + // + // See DRIVERS-2722 for more detail. + _, deadlineSet := ctx.Deadline() + return coll.find(ctx, filter, deadlineSet, opts...) +} + +func (coll *Collection) find( + ctx context.Context, + filter interface{}, + omitCSOTMaxTimeMS bool, + opts ...*options.FindOptions, +) (cur *Cursor, err error) { if ctx == nil { ctx = context.Background() @@ -1230,7 +1255,8 @@ func (coll *Collection) Find(ctx context.Context, filter interface{}, CommandMonitor(coll.client.monitor).ServerSelector(selector). ClusterClock(coll.client.clock).Database(coll.db.name).Collection(coll.name). Deployment(coll.client.deployment).Crypt(coll.client.cryptFLE).ServerAPI(coll.client.serverAPI). - Timeout(coll.client.timeout).MaxTime(fo.MaxTime).Logger(coll.client.logger) + Timeout(coll.client.timeout).MaxTime(fo.MaxTime).Logger(coll.client.logger). + OmitCSOTMaxTimeMS(omitCSOTMaxTimeMS) cursorOpts := coll.client.createBaseCursorOptions() @@ -1408,7 +1434,7 @@ func (coll *Collection) FindOne(ctx context.Context, filter interface{}, // by the server. findOpts = append(findOpts, options.Find().SetLimit(-1)) - cursor, err := coll.Find(ctx, filter, findOpts...) + cursor, err := coll.find(ctx, filter, false, findOpts...) return &SingleResult{ ctx: ctx, cur: cursor, diff --git a/mongo/gridfs/bucket.go b/mongo/gridfs/bucket.go index b231d1dd77..3bad4cb7c0 100644 --- a/mongo/gridfs/bucket.go +++ b/mongo/gridfs/bucket.go @@ -257,10 +257,10 @@ func (b *Bucket) Delete(fileID interface{}) error { // // Use the context parameter to time-out or cancel the delete operation. The deadline set by SetWriteDeadline is ignored. func (b *Bucket) DeleteContext(ctx context.Context, fileID interface{}) error { - // If no deadline is set on the passed-in context, Timeout is set on the Client, and context is - // not already a Timeout context, honor Timeout in new Timeout context for operation execution to + // If Timeout is set on the Client and context is not already a Timeout + // context, honor Timeout in new Timeout context for operation execution to // be shared by both delete operations. - if _, deadlineSet := ctx.Deadline(); !deadlineSet && b.db.Client().Timeout() != nil && !csot.IsTimeoutContext(ctx) { + if b.db.Client().Timeout() != nil && !csot.IsTimeoutContext(ctx) { newCtx, cancelFunc := csot.MakeTimeoutContext(ctx, *b.db.Client().Timeout()) // Redefine ctx to be the new timeout-derived context. ctx = newCtx @@ -384,10 +384,10 @@ func (b *Bucket) Drop() error { // // Use the context parameter to time-out or cancel the drop operation. The deadline set by SetWriteDeadline is ignored. func (b *Bucket) DropContext(ctx context.Context) error { - // If no deadline is set on the passed-in context, Timeout is set on the Client, and context is - // not already a Timeout context, honor Timeout in new Timeout context for operation execution to + // If Timeout is set on the Client and context is not already a Timeout + // context, honor Timeout in new Timeout context for operation execution to // be shared by both drop operations. - if _, deadlineSet := ctx.Deadline(); !deadlineSet && b.db.Client().Timeout() != nil && !csot.IsTimeoutContext(ctx) { + if b.db.Client().Timeout() != nil && !csot.IsTimeoutContext(ctx) { newCtx, cancelFunc := csot.MakeTimeoutContext(ctx, *b.db.Client().Timeout()) // Redefine ctx to be the new timeout-derived context. ctx = newCtx diff --git a/mongo/integration/csot_test.go b/mongo/integration/csot_test.go new file mode 100644 index 0000000000..3eb0328616 --- /dev/null +++ b/mongo/integration/csot_test.go @@ -0,0 +1,571 @@ +// Copyright (C) MongoDB, Inc. 2024-present. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + +package integration + +import ( + "context" + "errors" + "testing" + "time" + + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/event" + "go.mongodb.org/mongo-driver/internal/assert" + "go.mongodb.org/mongo-driver/internal/eventtest" + "go.mongodb.org/mongo-driver/internal/require" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/integration/mtest" + "go.mongodb.org/mongo-driver/mongo/options" + "go.mongodb.org/mongo-driver/x/bsonx/bsoncore" + "go.mongodb.org/mongo-driver/x/mongo/driver" +) + +// Test automatic "maxTimeMS" appending and connection closing behavior when +// CSOT is disabled and enabled. +func TestCSOT(t *testing.T) { + mt := mtest.New(t, mtest.NewOptions().CreateClient(false)) + + testCases := []struct { + desc string + commandName string + setup func(coll *mongo.Collection) error + operation func(ctx context.Context, coll *mongo.Collection) error + topologies []mtest.TopologyKind + + // sendsMaxTimeMSWithTimeoutMS specifies whether the driver + // automatically adds "maxTimeMS" to the command-under-test when + // "timeoutMS" is set but no context deadline is provided. + sendsMaxTimeMSWithTimeoutMS bool + + // sendsMaxTimeMSWithContextDeadline specifies whether the driver + // automatically adds "maxTimeMS" to the command-under-test when + // "timeoutMS" is set and a context deadline is provided. + sendsMaxTimeMSWithContextDeadline bool + + // preventsConnClosureWithTimeoutMS specifies whether the driver + // attempts to prevent closing connections when "timeoutMS" is set for + // the command-under-test. + preventsConnClosureWithTimeoutMS bool + }{ + { + desc: "FindOne", + commandName: "find", + setup: func(coll *mongo.Collection) error { + _, err := coll.InsertOne(context.Background(), bson.D{}) + return err + }, + operation: func(ctx context.Context, coll *mongo.Collection) error { + return coll.FindOne(ctx, bson.D{}).Err() + }, + sendsMaxTimeMSWithTimeoutMS: true, + sendsMaxTimeMSWithContextDeadline: true, + preventsConnClosureWithTimeoutMS: true, + }, + { + desc: "Find", + commandName: "find", + setup: func(coll *mongo.Collection) error { + _, err := coll.InsertOne(context.Background(), bson.D{}) + return err + }, + operation: func(ctx context.Context, coll *mongo.Collection) error { + _, err := coll.Find(ctx, bson.D{}) + return err + }, + sendsMaxTimeMSWithTimeoutMS: true, + sendsMaxTimeMSWithContextDeadline: false, + preventsConnClosureWithTimeoutMS: true, + }, + { + desc: "FindOneAndDelete", + commandName: "findAndModify", + setup: func(coll *mongo.Collection) error { + _, err := coll.InsertOne(context.Background(), bson.D{}) + return err + }, + operation: func(ctx context.Context, coll *mongo.Collection) error { + return coll.FindOneAndDelete(ctx, bson.D{}).Err() + }, + sendsMaxTimeMSWithTimeoutMS: true, + sendsMaxTimeMSWithContextDeadline: true, + preventsConnClosureWithTimeoutMS: true, + }, + { + desc: "FindOneAndUpdate", + commandName: "findAndModify", + setup: func(coll *mongo.Collection) error { + _, err := coll.InsertOne(context.Background(), bson.D{}) + return err + }, + operation: func(ctx context.Context, coll *mongo.Collection) error { + return coll.FindOneAndUpdate(ctx, bson.D{}, bson.M{"$set": bson.M{"key": "value"}}).Err() + }, + sendsMaxTimeMSWithTimeoutMS: true, + sendsMaxTimeMSWithContextDeadline: true, + preventsConnClosureWithTimeoutMS: true, + }, + { + desc: "FindOneAndReplace", + commandName: "findAndModify", + setup: func(coll *mongo.Collection) error { + _, err := coll.InsertOne(context.Background(), bson.D{}) + return err + }, + operation: func(ctx context.Context, coll *mongo.Collection) error { + return coll.FindOneAndReplace(ctx, bson.D{}, bson.D{}).Err() + }, + sendsMaxTimeMSWithTimeoutMS: true, + sendsMaxTimeMSWithContextDeadline: true, + preventsConnClosureWithTimeoutMS: true, + }, + { + desc: "InsertOne", + commandName: "insert", + operation: func(ctx context.Context, coll *mongo.Collection) error { + _, err := coll.InsertOne(ctx, bson.D{}) + return err + }, + sendsMaxTimeMSWithTimeoutMS: true, + sendsMaxTimeMSWithContextDeadline: true, + preventsConnClosureWithTimeoutMS: true, + }, + { + desc: "InsertMany", + commandName: "insert", + operation: func(ctx context.Context, coll *mongo.Collection) error { + _, err := coll.InsertMany(ctx, []interface{}{bson.D{}}) + return err + }, + sendsMaxTimeMSWithTimeoutMS: true, + sendsMaxTimeMSWithContextDeadline: true, + preventsConnClosureWithTimeoutMS: true, + }, + { + desc: "UpdateOne", + commandName: "update", + operation: func(ctx context.Context, coll *mongo.Collection) error { + _, err := coll.UpdateOne(ctx, bson.D{}, bson.M{"$set": bson.M{"key": "value"}}) + return err + }, + sendsMaxTimeMSWithTimeoutMS: true, + sendsMaxTimeMSWithContextDeadline: true, + preventsConnClosureWithTimeoutMS: true, + }, + { + desc: "UpdateMany", + commandName: "update", + operation: func(ctx context.Context, coll *mongo.Collection) error { + _, err := coll.UpdateMany(ctx, bson.D{}, bson.M{"$set": bson.M{"key": "value"}}) + return err + }, + sendsMaxTimeMSWithTimeoutMS: true, + sendsMaxTimeMSWithContextDeadline: true, + preventsConnClosureWithTimeoutMS: true, + }, + { + desc: "ReplaceOne", + commandName: "update", + operation: func(ctx context.Context, coll *mongo.Collection) error { + _, err := coll.ReplaceOne(ctx, bson.D{}, bson.D{}) + return err + }, + sendsMaxTimeMSWithTimeoutMS: true, + sendsMaxTimeMSWithContextDeadline: true, + preventsConnClosureWithTimeoutMS: true, + }, + { + desc: "DeleteOne", + commandName: "delete", + operation: func(ctx context.Context, coll *mongo.Collection) error { + _, err := coll.DeleteOne(ctx, bson.D{}) + return err + }, + sendsMaxTimeMSWithTimeoutMS: true, + sendsMaxTimeMSWithContextDeadline: true, + preventsConnClosureWithTimeoutMS: true, + }, + { + desc: "DeleteMany", + commandName: "delete", + operation: func(ctx context.Context, coll *mongo.Collection) error { + _, err := coll.DeleteMany(ctx, bson.D{}) + return err + }, + sendsMaxTimeMSWithTimeoutMS: true, + sendsMaxTimeMSWithContextDeadline: true, + preventsConnClosureWithTimeoutMS: true, + }, + { + desc: "Distinct", + commandName: "distinct", + operation: func(ctx context.Context, coll *mongo.Collection) error { + _, err := coll.Distinct(ctx, "name", bson.D{}) + return err + }, + sendsMaxTimeMSWithTimeoutMS: true, + sendsMaxTimeMSWithContextDeadline: true, + preventsConnClosureWithTimeoutMS: true, + }, + { + desc: "Aggregate", + commandName: "aggregate", + operation: func(ctx context.Context, coll *mongo.Collection) error { + _, err := coll.Aggregate(ctx, mongo.Pipeline{}) + return err + }, + sendsMaxTimeMSWithTimeoutMS: true, + sendsMaxTimeMSWithContextDeadline: false, + preventsConnClosureWithTimeoutMS: true, + }, + { + desc: "Watch", + commandName: "aggregate", + operation: func(ctx context.Context, coll *mongo.Collection) error { + cs, err := coll.Watch(ctx, mongo.Pipeline{}) + if cs != nil { + cs.Close(context.Background()) + } + return err + }, + sendsMaxTimeMSWithTimeoutMS: true, + sendsMaxTimeMSWithContextDeadline: true, + preventsConnClosureWithTimeoutMS: true, + // Change Streams aren't supported on standalone topologies. + topologies: []mtest.TopologyKind{ + mtest.ReplicaSet, + mtest.Sharded, + }, + }, + { + desc: "Cursor getMore", + commandName: "getMore", + setup: func(coll *mongo.Collection) error { + _, err := coll.InsertMany(context.Background(), []interface{}{bson.D{}, bson.D{}}) + return err + }, + operation: func(ctx context.Context, coll *mongo.Collection) error { + cursor, err := coll.Find(ctx, bson.D{}, options.Find().SetBatchSize(1)) + if err != nil { + return err + } + var res []bson.D + return cursor.All(ctx, &res) + }, + sendsMaxTimeMSWithTimeoutMS: false, + sendsMaxTimeMSWithContextDeadline: false, + preventsConnClosureWithTimeoutMS: false, + }, + } + + // getStartedEvent returns the first command started event that matches the + // specified command name. + getStartedEvent := func(mt *mtest.T, command string) *event.CommandStartedEvent { + for { + evt := mt.GetStartedEvent() + if evt == nil { + break + } + _, err := evt.Command.LookupErr(command) + if errors.Is(err, bsoncore.ErrElementNotFound) { + continue + } + return evt + } + + mt.Errorf("could not find command started event for command %q", command) + mt.FailNow() + return nil + } + + // assertMaxTimeMSIsSet asserts that "maxTimeMS" is set to a positive value + // on the given command document. + assertMaxTimeMSIsSet := func(mt *mtest.T, command bson.Raw) { + mt.Helper() + + maxTimeVal := command.Lookup("maxTimeMS") + + require.Greater(mt, + len(maxTimeVal.Value), + 0, + "expected maxTimeMS BSON value to be non-empty") + require.Equal(mt, + maxTimeVal.Type, + bson.TypeInt64, + "expected maxTimeMS BSON value to be type Int64") + assert.Greater(mt, + maxTimeVal.Int64(), + int64(0), + "expected maxTimeMS value to be greater than 0") + } + + // assertMaxTimeMSIsSet asserts that "maxTimeMS" is not set on the given + // command document. + assertMaxTimeMSNotSet := func(mt *mtest.T, command bson.Raw) { + mt.Helper() + + _, err := command.LookupErr("maxTimeMS") + assert.ErrorIs(mt, + err, + bsoncore.ErrElementNotFound, + "expected maxTimeMS BSON value to be missing, but is present") + } + + for _, tc := range testCases { + mt.RunOpts(tc.desc, mtest.NewOptions().Topologies(tc.topologies...), func(mt *mtest.T) { + mt.Run("maxTimeMS", func(mt *mtest.T) { + mt.Run("timeoutMS not set", func(mt *mtest.T) { + if tc.setup != nil { + err := tc.setup(mt.Coll) + require.NoError(mt, err) + } + + err := tc.operation(context.Background(), mt.Coll) + require.NoError(mt, err) + + evt := getStartedEvent(mt, tc.commandName) + assertMaxTimeMSNotSet(mt, evt.Command) + }) + + csotOpts := mtest.NewOptions().ClientOptions(options.Client().SetTimeout(10 * time.Second)) + mt.RunOpts("timeoutMS and context.Background", csotOpts, func(mt *mtest.T) { + if tc.setup != nil { + err := tc.setup(mt.Coll) + require.NoError(mt, err) + } + + err := tc.operation(context.Background(), mt.Coll) + require.NoError(mt, err) + + evt := getStartedEvent(mt, tc.commandName) + if tc.sendsMaxTimeMSWithTimeoutMS { + assertMaxTimeMSIsSet(mt, evt.Command) + } else { + assertMaxTimeMSNotSet(mt, evt.Command) + } + }) + + mt.RunOpts("timeoutMS and Context with deadline", csotOpts, func(mt *mtest.T) { + if tc.setup != nil { + err := tc.setup(mt.Coll) + require.NoError(mt, err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + err := tc.operation(ctx, mt.Coll) + require.NoError(mt, err) + + evt := getStartedEvent(mt, tc.commandName) + if tc.sendsMaxTimeMSWithContextDeadline { + assertMaxTimeMSIsSet(mt, evt.Command) + } else { + assertMaxTimeMSNotSet(mt, evt.Command) + } + }) + }) + + if tc.preventsConnClosureWithTimeoutMS { + opts := mtest.NewOptions(). + // Blocking failpoints don't work on pre-4.2 and sharded clusters. + Topologies(mtest.Single, mtest.ReplicaSet). + MinServerVersion("4.2") + mt.RunOpts("prevents connection closure with timeoutMS", opts, func(mt *mtest.T) { + if tc.setup != nil { + err := tc.setup(mt.Coll) + require.NoError(mt, err) + } + + mt.SetFailPoint(mtest.FailPoint{ + ConfigureFailPoint: "failCommand", + Mode: "alwaysOn", + Data: mtest.FailPointData{ + FailCommands: []string{tc.commandName}, + BlockConnection: true, + BlockTimeMS: 500, + }, + }) + + tpm := eventtest.NewTestPoolMonitor() + mt.ResetClient(options.Client(). + SetPoolMonitor(tpm.PoolMonitor)) + + // Run 5 operations that time out with CSOT disabled, then + // assert that at least 1 connection was closed during those + // timeouts. + for i := 0; i < 5; i++ { + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Millisecond) + err := tc.operation(ctx, mt.Coll) + cancel() + + if !mongo.IsTimeout(err) { + t.Logf("CSOT-disabled operation %d returned a non-timeout error: %v", i, err) + } + } + + closedEvents := tpm.Events(func(pe *event.PoolEvent) bool { + return pe.Type == event.ConnectionClosed + }) + assert.Greater(mt, + len(closedEvents), + 0, + "expected more than 0 connection closed events") + + tpm = eventtest.NewTestPoolMonitor() + mt.ResetClient(options.Client(). + SetPoolMonitor(tpm.PoolMonitor). + SetTimeout(10 * time.Second)) + + // Run 5 operations that time out with CSOT enabled, then + // assert that no connections were closed. + for i := 0; i < 5; i++ { + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Millisecond) + err := tc.operation(ctx, mt.Coll) + cancel() + + if !mongo.IsTimeout(err) { + t.Logf("CSOT-enabled operation %d returned a non-timeout error: %v", i, err) + } + } + + closedEvents = tpm.Events(func(pe *event.PoolEvent) bool { + return pe.Type == event.ConnectionClosed + }) + assert.Len(mt, closedEvents, 0, "expected no connection closed event") + }) + } + }) + } + + csotOpts := mtest.NewOptions().ClientOptions(options.Client().SetTimeout(10 * time.Second)) + mt.RunOpts("maxTimeMS is omitted for values greater than 2147483647ms", csotOpts, func(mt *mtest.T) { + ctx, cancel := context.WithTimeout(context.Background(), (2147483647+1000)*time.Millisecond) + defer cancel() + _, err := mt.Coll.InsertOne(ctx, bson.D{}) + require.NoError(t, err) + + evt := mt.GetStartedEvent() + _, err = evt.Command.LookupErr("maxTimeMS") + assert.ErrorIs(mt, + err, + bsoncore.ErrElementNotFound, + "expected maxTimeMS BSON value to be missing, but is present") + }) +} + +func TestCSOT_errors(t *testing.T) { + mt := mtest.New(t, mtest.NewOptions(). + CreateClient(false). + // Blocking failpoints don't work on pre-4.2 and sharded clusters. + Topologies(mtest.Single, mtest.ReplicaSet). + MinServerVersion("4.2"). + // Enable CSOT. + ClientOptions(options.Client().SetTimeout(10*time.Second))) + + // Test that, when CSOT is enabled, the error returned when the database + // returns a MaxTimeMSExceeded error (error code 50) wraps + // "context.DeadlineExceeded". + mt.Run("MaxTimeMSExceeded wraps context.DeadlineExceeded", func(mt *mtest.T) { + _, err := mt.Coll.InsertOne(context.Background(), bson.D{}) + require.NoError(mt, err, "InsertOne error") + + mt.SetFailPoint(mtest.FailPoint{ + ConfigureFailPoint: "failCommand", + Mode: mtest.FailPointMode{ + Times: 1, + }, + Data: mtest.FailPointData{ + FailCommands: []string{"find"}, + ErrorCode: 50, // MaxTimeMSExceeded + }, + }) + + err = mt.Coll.FindOne(context.Background(), bson.D{}).Err() + + assert.True(mt, + errors.Is(err, context.DeadlineExceeded), + "expected error %[1]T(%[1]q) to wrap context.DeadlineExceeded", + err) + assert.True(mt, + mongo.IsTimeout(err), + "expected error %[1]T(%[1]q) to be a timeout error", + err) + }) + + // Test that, when CSOT is enabled, the error returned when a context + // deadline is exceeded during a network operation wraps + // "context.DeadlineExceeded". + mt.Run("Context timeout wraps context.DeadlineExceeded", func(mt *mtest.T) { + _, err := mt.Coll.InsertOne(context.Background(), bson.D{}) + require.NoError(mt, err, "InsertOne error") + + mt.SetFailPoint(mtest.FailPoint{ + ConfigureFailPoint: "failCommand", + Mode: mtest.FailPointMode{ + Times: 1, + }, + Data: mtest.FailPointData{ + FailCommands: []string{"find"}, + BlockConnection: true, + BlockTimeMS: 500, + }, + }) + + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Millisecond) + defer cancel() + err = mt.Coll.FindOne(ctx, bson.D{}).Err() + + assert.False(mt, + errors.Is(err, driver.ErrDeadlineWouldBeExceeded), + "expected error %[1]T(%[1]q) to not wrap driver.ErrDeadlineWouldBeExceeded", + err) + assert.True(mt, + errors.Is(err, context.DeadlineExceeded), + "expected error %[1]T(%[1]q) to wrap context.DeadlineExceeded", + err) + assert.True(mt, + mongo.IsTimeout(err), + "expected error %[1]T(%[1]q) to be a timeout error", + err) + }) + + mt.Run("timeoutMS timeout wraps context.DeadlineExceeded", func(mt *mtest.T) { + _, err := mt.Coll.InsertOne(context.Background(), bson.D{}) + require.NoError(mt, err, "InsertOne error") + + mt.SetFailPoint(mtest.FailPoint{ + ConfigureFailPoint: "failCommand", + Mode: mtest.FailPointMode{ + Times: 1, + }, + Data: mtest.FailPointData{ + FailCommands: []string{"find"}, + BlockConnection: true, + BlockTimeMS: 100, + }, + }) + + // Set timeoutMS=10 to run the FindOne, then unset it so the mtest + // cleanup operations pass successfully (e.g. unsetting failpoints). + mt.ResetClient(options.Client().SetTimeout(10 * time.Millisecond)) + defer mt.ResetClient(options.Client()) + err = mt.Coll.FindOne(context.Background(), bson.D{}).Err() + + assert.False(mt, + errors.Is(err, driver.ErrDeadlineWouldBeExceeded), + "expected error %[1]T(%[1]q) to not wrap driver.ErrDeadlineWouldBeExceeded", + err) + assert.True(mt, + errors.Is(err, context.DeadlineExceeded), + "expected error %[1]T(%[1]q) to wrap context.DeadlineExceeded", + err) + assert.True(mt, + mongo.IsTimeout(err), + "expected error %[1]T(%[1]q) to be a timeout error", + err) + }) +} diff --git a/mongo/integration/mtest/mongotest.go b/mongo/integration/mtest/mongotest.go index f1d7440f10..f92b5c583f 100644 --- a/mongo/integration/mtest/mongotest.go +++ b/mongo/integration/mtest/mongotest.go @@ -207,7 +207,7 @@ func (t *T) cleanup() { // Run creates a new T instance for a sub-test and runs the given callback. It also creates a new collection using the // given name which is available to the callback through the T.Coll variable and is dropped after the callback // returns. -func (t *T) Run(name string, callback func(*T)) { +func (t *T) Run(name string, callback func(mt *T)) { t.RunOpts(name, NewOptions(), callback) } @@ -215,7 +215,7 @@ func (t *T) Run(name string, callback func(*T)) { // constraints specified in the options, the new sub-test will be skipped automatically. If the test is not skipped, // the callback will be run with the new T instance. RunOpts creates a new collection with the given name which is // available to the callback through the T.Coll variable and is dropped after the callback returns. -func (t *T) RunOpts(name string, opts *Options, callback func(*T)) { +func (t *T) RunOpts(name string, opts *Options, callback func(mt *T)) { t.T.Run(name, func(wrapped *testing.T) { sub := newT(wrapped, t.baseOpts, opts) diff --git a/mongo/integration/unified/unified_spec_runner.go b/mongo/integration/unified/unified_spec_runner.go index b7744844e0..4b76d41242 100644 --- a/mongo/integration/unified/unified_spec_runner.go +++ b/mongo/integration/unified/unified_spec_runner.go @@ -43,6 +43,18 @@ var ( "dropSearchIndex ignores read and write concern": "Sync GODRIVER-3074, but skip testing bug GODRIVER-3043", "listSearchIndexes ignores read and write concern": "Sync GODRIVER-3074, but skip testing bug GODRIVER-3043", "updateSearchIndex ignores the read and write concern": "Sync GODRIVER-3074, but skip testing bug GODRIVER-3043", + + // DRIVERS-2722: Setting "maxTimeMS" on a command that creates a cursor + // also limits the lifetime of the cursor. That may be surprising to + // users, so omit "maxTimeMS" from operations that return user-managed + // cursors. + "timeoutMS can be overridden for a find": "maxTimeMS is disabled on find and aggregate. See DRIVERS-2722.", + "timeoutMS can be configured for an operation - find on collection": "maxTimeMS is disabled on find and aggregate. See DRIVERS-2722.", + "timeoutMS can be configured for an operation - aggregate on collection": "maxTimeMS is disabled on find and aggregate. See DRIVERS-2722.", + "timeoutMS can be configured for an operation - aggregate on database": "maxTimeMS is disabled on find and aggregate. See DRIVERS-2722.", + "operation is retried multiple times for non-zero timeoutMS - find on collection": "maxTimeMS is disabled on find and aggregate. See DRIVERS-2722.", + "operation is retried multiple times for non-zero timeoutMS - aggregate on collection": "maxTimeMS is disabled on find and aggregate. See DRIVERS-2722.", + "operation is retried multiple times for non-zero timeoutMS - aggregate on database": "maxTimeMS is disabled on find and aggregate. See DRIVERS-2722.", } logMessageValidatorTimeout = 10 * time.Millisecond diff --git a/x/mongo/driver/errors.go b/x/mongo/driver/errors.go index 3b8b9823b7..177c2d4501 100644 --- a/x/mongo/driver/errors.go +++ b/x/mongo/driver/errors.go @@ -14,6 +14,7 @@ import ( "strings" "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/internal/csot" "go.mongodb.org/mongo-driver/mongo/description" "go.mongodb.org/mongo-driver/x/bsonx/bsoncore" ) @@ -377,7 +378,7 @@ func (e Error) NamespaceNotFound() bool { // ExtractErrorFromServerResponse extracts an error from a server response bsoncore.Document // if there is one. Also used in testing for SDAM. -func ExtractErrorFromServerResponse(doc bsoncore.Document) error { +func ExtractErrorFromServerResponse(ctx context.Context, doc bsoncore.Document) error { var errmsg, codeName string var code int32 var labels []string @@ -514,7 +515,7 @@ func ExtractErrorFromServerResponse(doc bsoncore.Document) error { errmsg = "command failed" } - return Error{ + err := Error{ Code: code, Message: errmsg, Name: codeName, @@ -522,6 +523,20 @@ func ExtractErrorFromServerResponse(doc bsoncore.Document) error { TopologyVersion: tv, Raw: doc, } + + // If CSOT is enabled and we get a MaxTimeMSExpired error, assume that + // the error was caused by setting "maxTimeMS" on the command based on + // the context deadline or on "timeoutMS". In that case, make the error + // wrap context.DeadlineExceeded so that users can always check + // + // errors.Is(err, context.DeadlineExceeded) + // + // for either client-side or server-side timeouts. + if csot.IsTimeoutContext(ctx) && err.Code == 50 { + err.Wrapped = context.DeadlineExceeded + } + + return err } if len(wcError.WriteErrors) > 0 || wcError.WriteConcernError != nil { diff --git a/x/mongo/driver/operation.go b/x/mongo/driver/operation.go index b39a63abe4..8f87c21d3f 100644 --- a/x/mongo/driver/operation.go +++ b/x/mongo/driver/operation.go @@ -310,6 +310,11 @@ type Operation struct { // OP_MSG as well as for logging server selection data. Name string + // OmitCSOTMaxTimeMS omits the automatically-calculated "maxTimeMS" from the + // command when CSOT is enabled. It does not effect "maxTimeMS" set by + // [Operation.MaxTime]. + OmitCSOTMaxTimeMS bool + // omitReadPreference is a boolean that indicates whether to omit the // read preference from the command. This omition includes the case // where a default read preference is used when the operation @@ -499,9 +504,9 @@ func (op Operation) Execute(ctx context.Context) error { return err } - // If no deadline is set on the passed-in context, op.Timeout is set, and context is not already - // a Timeout context, honor op.Timeout in new Timeout context for operation execution. - if _, deadlineSet := ctx.Deadline(); !deadlineSet && op.Timeout != nil && !csot.IsTimeoutContext(ctx) { + // If op.Timeout is set, and context is not already a Timeout context, honor + // op.Timeout in new Timeout context for operation execution. + if op.Timeout != nil && !csot.IsTimeoutContext(ctx) { newCtx, cancelFunc := csot.MakeTimeoutContext(ctx, *op.Timeout) // Redefine ctx to be the new timeout-derived context. ctx = newCtx @@ -683,8 +688,7 @@ func (op Operation) Execute(ctx context.Context) error { first = false } - // Calculate maxTimeMS value to potentially be appended to the wire message. - maxTimeMS, err := op.calculateMaxTimeMS(ctx, srvr.RTTMonitor().P90(), srvr.RTTMonitor().Stats()) + maxTimeMS, err := op.calculateMaxTimeMS(ctx, srvr.RTTMonitor()) if err != nil { return err } @@ -777,7 +781,7 @@ func (op Operation) Execute(ctx context.Context) error { } else if deadline, ok := ctx.Deadline(); ok { if csot.IsTimeoutContext(ctx) && time.Now().Add(srvr.RTTMonitor().P90()).After(deadline) { err = fmt.Errorf( - "remaining time %v until context deadline is less than 90th percentile RTT: %w\n%v", + "remaining time %v until context deadline is less than 90th percentile network round-trip time: %w\n%v", time.Until(deadline), ErrDeadlineWouldBeExceeded, srvr.RTTMonitor().Stats()) @@ -1089,7 +1093,7 @@ func (op Operation) readWireMessage(ctx context.Context, conn Connection) (resul } // decode - res, err := op.decodeResult(opcode, rem) + res, err := op.decodeResult(ctx, opcode, rem) // Update cluster/operation time and recovery tokens before handling the error to ensure we're properly updating // everything. op.updateClusterTimes(res) @@ -1562,10 +1566,15 @@ func (op Operation) addClusterTime(dst []byte, desc description.SelectedServer) // if the ctx is a Timeout context. If the context is not a Timeout context, it uses the // operation's MaxTimeMS if set. If no MaxTimeMS is set on the operation, and context is // not a Timeout context, calculateMaxTimeMS returns 0. -func (op Operation) calculateMaxTimeMS(ctx context.Context, rtt90 time.Duration, rttStats string) (uint64, error) { +func (op Operation) calculateMaxTimeMS(ctx context.Context, mon RTTMonitor) (uint64, error) { if csot.IsTimeoutContext(ctx) { + if op.OmitCSOTMaxTimeMS { + return 0, nil + } + if deadline, ok := ctx.Deadline(); ok { remainingTimeout := time.Until(deadline) + rtt90 := mon.P90() maxTime := remainingTimeout - rtt90 // Always round up to the next millisecond value so we never truncate the calculated @@ -1573,11 +1582,21 @@ func (op Operation) calculateMaxTimeMS(ctx context.Context, rtt90 time.Duration, maxTimeMS := int64((maxTime + (time.Millisecond - 1)) / time.Millisecond) if maxTimeMS <= 0 { return 0, fmt.Errorf( - "remaining time %v until context deadline is less than or equal to 90th percentile RTT: %w\n%v", + "negative maxTimeMS: remaining time %v until context deadline is less than 90th percentile network round-trip time (%v): %w", remainingTimeout, - ErrDeadlineWouldBeExceeded, - rttStats) + mon.Stats(), + ErrDeadlineWouldBeExceeded) } + + // The server will return a "BadValue" error if maxTimeMS is greater + // than the maximum positive int32 value (about 24.9 days). If the + // user specified a timeout value greater than that, omit maxTimeMS + // and let the client-side timeout handle cancelling the op if the + // timeout is ever reached. + if maxTimeMS > math.MaxInt32 { + return 0, nil + } + return uint64(maxTimeMS), nil } } else if op.MaxTime != nil { @@ -1827,7 +1846,7 @@ func (Operation) decodeOpReply(wm []byte) opReply { return reply } -func (op Operation) decodeResult(opcode wiremessage.OpCode, wm []byte) (bsoncore.Document, error) { +func (op Operation) decodeResult(ctx context.Context, opcode wiremessage.OpCode, wm []byte) (bsoncore.Document, error) { switch opcode { case wiremessage.OpReply: reply := op.decodeOpReply(wm) @@ -1845,7 +1864,7 @@ func (op Operation) decodeResult(opcode wiremessage.OpCode, wm []byte) (bsoncore return nil, NewCommandResponseError("malformed OP_REPLY: invalid document", err) } - return rdr, ExtractErrorFromServerResponse(rdr) + return rdr, ExtractErrorFromServerResponse(ctx, rdr) case wiremessage.OpMsg: _, wm, ok := wiremessage.ReadMsgFlags(wm) if !ok { @@ -1882,7 +1901,7 @@ func (op Operation) decodeResult(opcode wiremessage.OpCode, wm []byte) (bsoncore return nil, NewCommandResponseError("malformed OP_MSG: invalid document", err) } - return res, ExtractErrorFromServerResponse(res) + return res, ExtractErrorFromServerResponse(ctx, res) default: return nil, fmt.Errorf("cannot decode result from %s", opcode) } diff --git a/x/mongo/driver/operation/aggregate.go b/x/mongo/driver/operation/aggregate.go index ca0e796523..44467df8fd 100644 --- a/x/mongo/driver/operation/aggregate.go +++ b/x/mongo/driver/operation/aggregate.go @@ -50,6 +50,7 @@ type Aggregate struct { hasOutputStage bool customOptions map[string]bsoncore.Value timeout *time.Duration + omitCSOTMaxTimeMS bool result driver.CursorResponse } @@ -113,6 +114,7 @@ func (a *Aggregate) Execute(ctx context.Context) error { MaxTime: a.maxTime, Timeout: a.timeout, Name: driverutil.AggregateOp, + OmitCSOTMaxTimeMS: a.omitCSOTMaxTimeMS, }.Execute(ctx) } @@ -419,3 +421,15 @@ func (a *Aggregate) Timeout(timeout *time.Duration) *Aggregate { a.timeout = timeout return a } + +// OmitCSOTMaxTimeMS omits the automatically-calculated "maxTimeMS" from the +// command when CSOT is enabled. It does not effect "maxTimeMS" set by +// [Aggregate.MaxTime]. +func (a *Aggregate) OmitCSOTMaxTimeMS(omit bool) *Aggregate { + if a == nil { + a = new(Aggregate) + } + + a.omitCSOTMaxTimeMS = omit + return a +} diff --git a/x/mongo/driver/operation/find.go b/x/mongo/driver/operation/find.go index 27bb5b4f99..8950fde86d 100644 --- a/x/mongo/driver/operation/find.go +++ b/x/mongo/driver/operation/find.go @@ -62,6 +62,7 @@ type Find struct { result driver.CursorResponse serverAPI *driver.ServerAPIOptions timeout *time.Duration + omitCSOTMaxTimeMS bool logger *logger.Logger } @@ -110,6 +111,7 @@ func (f *Find) Execute(ctx context.Context) error { Timeout: f.timeout, Logger: f.logger, Name: driverutil.FindOp, + OmitCSOTMaxTimeMS: f.omitCSOTMaxTimeMS, }.Execute(ctx) } @@ -552,6 +554,18 @@ func (f *Find) Timeout(timeout *time.Duration) *Find { return f } +// OmitCSOTMaxTimeMS omits the automatically-calculated "maxTimeMS" from the +// command when CSOT is enabled. It does not effect "maxTimeMS" set by +// [Find.MaxTime]. +func (f *Find) OmitCSOTMaxTimeMS(omit bool) *Find { + if f == nil { + f = new(Find) + } + + f.omitCSOTMaxTimeMS = omit + return f +} + // Logger sets the logger for this operation. func (f *Find) Logger(logger *logger.Logger) *Find { if f == nil { diff --git a/x/mongo/driver/operation_test.go b/x/mongo/driver/operation_test.go index e6c9d4cf95..4fcb58b72d 100644 --- a/x/mongo/driver/operation_test.go +++ b/x/mongo/driver/operation_test.go @@ -328,7 +328,7 @@ func TestOperation(t *testing.T) { t.Run(tc.name, func(t *testing.T) { t.Parallel() - got, err := tc.op.calculateMaxTimeMS(tc.ctx, tc.rtt90, "") + got, err := tc.op.calculateMaxTimeMS(tc.ctx, mockRTTMonitor{p90: tc.rtt90}) // Assert that the calculated maxTimeMS is less than or equal to the expected value. A few // milliseconds will have elapsed toward the context deadline, and (remainingTimeout @@ -654,6 +654,35 @@ func TestOperation(t *testing.T) { // the TransientTransactionError label. assert.Equal(t, err, context.Canceled, "expected context.Canceled error, got %v", err) }) + t.Run("ErrDeadlineWouldBeExceeded wraps context.DeadlineExceeded", func(t *testing.T) { + // Create a deployment that returns a server that reports a 90th + // percentile RTT of 1 minute. + d := new(mockDeployment) + d.returns.server = mockServer{ + conn: new(mockConnection), + rttMonitor: mockRTTMonitor{p90: 1 * time.Minute}, + } + + // Create an operation with a Timeout specified to enable CSOT behavior. + var dur time.Duration + op := Operation{ + Database: "foobar", + Deployment: d, + CommandFn: func(dst []byte, desc description.SelectedServer) ([]byte, error) { + return dst, nil + }, + Timeout: &dur, + } + + // Call the operation with a context with a deadline less than the 90th + // percentile RTT configured above. + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + err := op.Execute(ctx) + + assert.ErrorIs(t, err, ErrDeadlineWouldBeExceeded) + assert.ErrorIs(t, err, context.DeadlineExceeded) + }) } func createExhaustServerResponse(response bsoncore.Document, moreToCome bool) []byte { @@ -713,6 +742,27 @@ func (m *mockServerSelector) String() string { panic("not implemented") } +type mockServer struct { + conn Connection + err error + rttMonitor RTTMonitor +} + +func (ms mockServer) Connection(context.Context) (Connection, error) { return ms.conn, ms.err } +func (ms mockServer) RTTMonitor() RTTMonitor { return ms.rttMonitor } + +type mockRTTMonitor struct { + ewma time.Duration + min time.Duration + p90 time.Duration + stats string +} + +func (mrm mockRTTMonitor) EWMA() time.Duration { return mrm.ewma } +func (mrm mockRTTMonitor) Min() time.Duration { return mrm.min } +func (mrm mockRTTMonitor) P90() time.Duration { return mrm.p90 } +func (mrm mockRTTMonitor) Stats() string { return mrm.stats } + type mockConnection struct { // parameters pWriteWM []byte diff --git a/x/mongo/driver/topology/connection.go b/x/mongo/driver/topology/connection.go index 13035abc0f..476459e8e6 100644 --- a/x/mongo/driver/topology/connection.go +++ b/x/mongo/driver/topology/connection.go @@ -18,6 +18,7 @@ import ( "sync/atomic" "time" + "go.mongodb.org/mongo-driver/internal/csot" "go.mongodb.org/mongo-driver/mongo/address" "go.mongodb.org/mongo-driver/mongo/description" "go.mongodb.org/mongo-driver/x/bsonx/bsoncore" @@ -77,6 +78,10 @@ type connection struct { // TODO(GODRIVER-2824): change driverConnectionID type to int64. driverConnectionID uint64 generation uint64 + + // awaitingResponse indicates that the server response was not completely + // read before returning the connection to the pool. + awaitingResponse bool } // newConnection handles the creation of a connection. It does not connect the connection. @@ -414,8 +419,17 @@ func (c *connection) readWireMessage(ctx context.Context) ([]byte, error) { dst, errMsg, err := c.read(ctx) if err != nil { - // We closeConnection the connection because we don't know if there are other bytes left to read. - c.close() + if nerr := net.Error(nil); errors.As(err, &nerr) && nerr.Timeout() && csot.IsTimeoutContext(ctx) { + // If the error was a timeout error and CSOT is enabled, instead of + // closing the connection mark it as awaiting response so the pool + // can read the response before making it available to other + // operations. + c.awaitingResponse = true + } else { + // Otherwise, use the pre-CSOT behavior and close the connection + // because we don't know if there are other bytes left to read. + c.close() + } message := errMsg if errors.Is(err, io.EOF) { message = "socket was unexpectedly closed" diff --git a/x/mongo/driver/topology/pool.go b/x/mongo/driver/topology/pool.go index bfbda4fa48..52461eb681 100644 --- a/x/mongo/driver/topology/pool.go +++ b/x/mongo/driver/topology/pool.go @@ -764,6 +764,81 @@ func (p *pool) removeConnection(conn *connection, reason reason, err error) erro return nil } +var ( + // BGReadTimeout is the maximum amount of the to wait when trying to read + // the server reply on a connection after an operation timed out. The + // default is 1 second. + // + // Deprecated: BGReadTimeout is intended for internal use only and may be + // removed or modified at any time. + BGReadTimeout = 1 * time.Second + + // BGReadCallback is a callback for monitoring the behavior of the + // background-read-on-timeout connection preserving mechanism. + // + // Deprecated: BGReadCallback is intended for internal use only and may be + // removed or modified at any time. + BGReadCallback func(addr string, start, read time.Time, errs []error, connClosed bool) +) + +// bgRead sets a new read deadline on the provided connection (1 second in the +// future) and tries to read any bytes returned by the server. If successful, it +// checks the connection into the provided pool. If there are any errors, it +// closes the connection. +// +// It calls the package-global BGReadCallback function, if set, with the +// address, timings, and any errors that occurred. +func bgRead(pool *pool, conn *connection) { + var start, read time.Time + start = time.Now() + errs := make([]error, 0) + connClosed := false + + defer func() { + // No matter what happens, always check the connection back into the + // pool, which will either make it available for other operations or + // remove it from the pool if it was closed. + err := pool.checkInNoEvent(conn) + if err != nil { + errs = append(errs, fmt.Errorf("error checking in: %w", err)) + } + + if BGReadCallback != nil { + BGReadCallback(conn.addr.String(), start, read, errs, connClosed) + } + }() + + err := conn.nc.SetReadDeadline(time.Now().Add(BGReadTimeout)) + if err != nil { + errs = append(errs, fmt.Errorf("error setting a read deadline: %w", err)) + + connClosed = true + err := conn.close() + if err != nil { + errs = append(errs, fmt.Errorf("error closing conn after setting read deadline: %w", err)) + } + + return + } + + // The context here is only used for cancellation, not deadline timeout, so + // use context.Background(). The read timeout is set by calling + // SetReadDeadline above. + _, _, err = conn.read(context.Background()) + read = time.Now() + if err != nil { + errs = append(errs, fmt.Errorf("error reading: %w", err)) + + connClosed = true + err := conn.close() + if err != nil { + errs = append(errs, fmt.Errorf("error closing conn after reading: %w", err)) + } + + return + } +} + // checkIn returns an idle connection to the pool. If the connection is perished or the pool is // closed, it is removed from the connection pool and closed. func (p *pool) checkIn(conn *connection) error { @@ -803,6 +878,20 @@ func (p *pool) checkInNoEvent(conn *connection) error { return ErrWrongPool } + // If the connection has an awaiting server response, try to read the + // response in another goroutine before checking it back into the pool. + // + // Do this here because we want to publish checkIn events when the operation + // is done with the connection, not when it's ready to be used again. That + // means that connections in "awaiting response" state are checked in but + // not usable, which is not covered by the current pool events. We may need + // to add pool event information in the future to communicate that. + if conn.awaitingResponse { + conn.awaitingResponse = false + go bgRead(p, conn) + return nil + } + // Bump the connection idle deadline here because we're about to make the connection "available". // The idle deadline is used to determine when a connection has reached its max idle time and // should be closed. A connection reaches its max idle time when it has been "available" in the diff --git a/x/mongo/driver/topology/rtt_monitor.go b/x/mongo/driver/topology/rtt_monitor.go index 3dd031f2ea..07f508caae 100644 --- a/x/mongo/driver/topology/rtt_monitor.go +++ b/x/mongo/driver/topology/rtt_monitor.go @@ -322,7 +322,10 @@ func (r *rttMonitor) Stats() string { } } - return fmt.Sprintf(`Round-trip-time monitor statistics:`+"\n"+ - `average RTT: %v, minimum RTT: %v, 90th percentile RTT: %v, standard dev: %v`+"\n", - time.Duration(avg), r.minRTT, r.rtt90, time.Duration(stdDev)) + return fmt.Sprintf( + "network round-trip time stats: avg: %v, min: %v, 90th pct: %v, stddev: %v", + time.Duration(avg), + r.minRTT, + r.rtt90, + time.Duration(stdDev)) } diff --git a/x/mongo/driver/topology/sdam_spec_test.go b/x/mongo/driver/topology/sdam_spec_test.go index a24c7e3dae..6e360509c1 100644 --- a/x/mongo/driver/topology/sdam_spec_test.go +++ b/x/mongo/driver/topology/sdam_spec_test.go @@ -295,7 +295,7 @@ func applyErrors(t *testing.T, topo *Topology, errors []applicationError) { var currError error switch appErr.Type { case "command": - currError = driver.ExtractErrorFromServerResponse(appErr.Response) + currError = driver.ExtractErrorFromServerResponse(context.Background(), appErr.Response) case "network": currError = driver.Error{ Labels: []string{driver.NetworkError},