diff --git a/tests/robustness/main_test.go b/tests/robustness/main_test.go index 1d078574ff2d..a4b6d763a29f 100644 --- a/tests/robustness/main_test.go +++ b/tests/robustness/main_test.go @@ -16,6 +16,7 @@ package robustness import ( "context" + "math/rand" "testing" "time" @@ -36,6 +37,7 @@ import ( var testRunner = framework.E2eTestRunner func TestMain(m *testing.M) { + rand.Seed(time.Now().UnixNano()) testRunner.TestMain(m) } diff --git a/tests/robustness/model/deterministic.go b/tests/robustness/model/deterministic.go index f59bb91ab541..679263d6fcf8 100644 --- a/tests/robustness/model/deterministic.go +++ b/tests/robustness/model/deterministic.go @@ -19,6 +19,7 @@ import ( "errors" "fmt" "hash/fnv" + "maps" "reflect" "sort" @@ -75,6 +76,15 @@ func (s EtcdState) apply(request EtcdRequest, response EtcdResponse) (bool, Etcd return Match(MaybeEtcdResponse{EtcdResponse: response}, modelResponse), newState } +func (s EtcdState) clone() EtcdState { + newState := EtcdState{Revision: s.Revision} + + newState.KeyValues = maps.Clone(s.KeyValues) + newState.KeyLeases = maps.Clone(s.KeyLeases) + newState.Leases = maps.Clone(s.Leases) + return newState +} + func freshEtcdState() EtcdState { return EtcdState{ Revision: 1, @@ -85,12 +95,9 @@ func freshEtcdState() EtcdState { } // Step handles a successful request, returning updated state and response it would generate. -func (s EtcdState) Step(request EtcdRequest) (EtcdState, MaybeEtcdResponse) { - newKVs := map[string]ValueRevision{} - for k, v := range s.KeyValues { - newKVs[k] = v - } - s.KeyValues = newKVs +func (prevState EtcdState) Step(request EtcdRequest) (EtcdState, MaybeEtcdResponse) { + s := prevState.clone() + switch request.Type { case Range: if request.Range.Revision == 0 || request.Range.Revision == s.Revision { diff --git a/tests/robustness/model/replay.go b/tests/robustness/model/replay.go index d839c19768cf..b8ea784683a3 100644 --- a/tests/robustness/model/replay.go +++ b/tests/robustness/model/replay.go @@ -16,6 +16,7 @@ package model import ( "fmt" + "sort" "strings" ) @@ -63,44 +64,72 @@ func (r *EtcdReplay) EventsForWatch(watch WatchRequest) (events []PersistedEvent } func toWatchEvents(prevState *EtcdState, request EtcdRequest, response MaybeEtcdResponse) (events []PersistedEvent) { - if request.Type != Txn || response.Error != "" { + if response.Error != "" { return events } - var ops []EtcdOperation - if response.Txn.Failure { - ops = request.Txn.OperationsOnFailure - } else { - ops = request.Txn.OperationsOnSuccess - } - for _, op := range ops { - switch op.Type { - case RangeOperation: - case DeleteOperation: - e := PersistedEvent{ - Event: Event{ - Type: op.Type, - Key: op.Delete.Key, - }, - Revision: response.Revision, - } - if _, ok := prevState.KeyValues[op.Delete.Key]; ok { + + switch request.Type { + case Txn: + var ops []EtcdOperation + if response.Txn.Failure { + ops = request.Txn.OperationsOnFailure + } else { + ops = request.Txn.OperationsOnSuccess + } + for _, op := range ops { + switch op.Type { + case RangeOperation: + case DeleteOperation: + e := PersistedEvent{ + Event: Event{ + Type: op.Type, + Key: op.Delete.Key, + }, + Revision: response.Revision, + } + if _, ok := prevState.KeyValues[op.Delete.Key]; ok { + events = append(events, e) + } + case PutOperation: + _, leaseExists := prevState.Leases[op.Put.LeaseID] + if op.Put.LeaseID != 0 && !leaseExists { + break + } + + e := PersistedEvent{ + Event: Event{ + Type: op.Type, + Key: op.Put.Key, + Value: op.Put.Value, + }, + Revision: response.Revision, + } + if _, ok := prevState.KeyValues[op.Put.Key]; !ok { + e.IsCreate = true + } events = append(events, e) + default: + panic(fmt.Sprintf("unsupported operation type: %v", op)) + } + } + case LeaseRevoke: + deletedKeys := []string{} + for key := range prevState.Leases[request.LeaseRevoke.LeaseID].Keys { + if _, ok := prevState.KeyValues[key]; ok { + deletedKeys = append(deletedKeys, key) } - case PutOperation: + } + + sort.Strings(deletedKeys) + for _, key := range deletedKeys { e := PersistedEvent{ Event: Event{ - Type: op.Type, - Key: op.Put.Key, - Value: op.Put.Value, + Type: DeleteOperation, + Key: key, }, Revision: response.Revision, } - if _, ok := prevState.KeyValues[op.Put.Key]; !ok { - e.IsCreate = true - } events = append(events, e) - default: - panic(fmt.Sprintf("unsupported operation type: %v", op)) } } return events diff --git a/tests/robustness/report/wal.go b/tests/robustness/report/wal.go index c1d8e5decbee..9ef212332b6e 100644 --- a/tests/robustness/report/wal.go +++ b/tests/robustness/report/wal.go @@ -172,7 +172,7 @@ func parseEntryNormal(ent raftpb.Entry) (*model.EtcdRequest, error) { case raftReq.LeaseRevoke != nil: return &model.EtcdRequest{ Type: model.LeaseRevoke, - LeaseRevoke: &model.LeaseRevokeRequest{LeaseID: raftReq.LeaseGrant.ID}, + LeaseRevoke: &model.LeaseRevokeRequest{LeaseID: raftReq.LeaseRevoke.ID}, }, nil case raftReq.LeaseGrant != nil: return &model.EtcdRequest{ diff --git a/tests/robustness/traffic/etcd.go b/tests/robustness/traffic/etcd.go index 4e80c633d571..44ad21ddf994 100644 --- a/tests/robustness/traffic/etcd.go +++ b/tests/robustness/traffic/etcd.go @@ -115,10 +115,12 @@ func (t etcdTraffic) Run(ctx context.Context, c *client.RecordingClient, limiter return default: } + shouldReturn := false + // Avoid multiple failed writes in a row if lastOperationSucceeded { choices := t.requests - if !nonUniqueWriteLimiter.Take() { + if shouldReturn = nonUniqueWriteLimiter.Take(); !shouldReturn { choices = filterOutNonUniqueEtcdWrites(choices) } requestType = pickRandom(choices) @@ -126,7 +128,7 @@ func (t etcdTraffic) Run(ctx context.Context, c *client.RecordingClient, limiter requestType = Get } rev, err := client.Request(ctx, requestType, lastRev) - if requestType == Delete || requestType == LeaseRevoke { + if shouldReturn { nonUniqueWriteLimiter.Return() } lastOperationSucceeded = err == nil diff --git a/tests/robustness/traffic/kubernetes.go b/tests/robustness/traffic/kubernetes.go index 7ba278a90c9f..4fbf8d0a73c6 100644 --- a/tests/robustness/traffic/kubernetes.go +++ b/tests/robustness/traffic/kubernetes.go @@ -154,15 +154,16 @@ func (t kubernetesTraffic) Write(ctx context.Context, kc *kubernetesClient, ids _, err = kc.OptimisticDelete(writeCtx, key, rev) nonUniqueWriteLimiter.Return() } else { + shouldReturn := false + choices := t.writeChoices - if !nonUniqueWriteLimiter.Take() { + if shouldReturn = nonUniqueWriteLimiter.Take(); !shouldReturn { choices = filterOutNonUniqueKubernetesWrites(t.writeChoices) } op := pickRandom(choices) switch op { case KubernetesDelete: _, err = kc.OptimisticDelete(writeCtx, key, rev) - nonUniqueWriteLimiter.Return() case KubernetesUpdate: _, err = kc.OptimisticUpdate(writeCtx, key, fmt.Sprintf("%d", ids.NewRequestID()), rev) case KubernetesCreate: @@ -170,6 +171,9 @@ func (t kubernetesTraffic) Write(ctx context.Context, kc *kubernetesClient, ids default: panic(fmt.Sprintf("invalid choice: %q", op)) } + if shouldReturn { + nonUniqueWriteLimiter.Return() + } } } if err != nil { diff --git a/tests/robustness/validate/patch_history.go b/tests/robustness/validate/patch_history.go index 44185de99eb9..03e23e5bcc46 100644 --- a/tests/robustness/validate/patch_history.go +++ b/tests/robustness/validate/patch_history.go @@ -192,6 +192,7 @@ func persistedOperationsReturnTime(allOperations []porcupine.Operation, persiste } } case model.LeaseGrant: + case model.LeaseRevoke: default: panic(fmt.Sprintf("Unknown request type: %q", request.Type)) } @@ -216,6 +217,7 @@ func operationReturnTime(operations []porcupine.Operation) map[model.EtcdOperati } case model.Range: case model.LeaseGrant: + case model.LeaseRevoke: default: panic(fmt.Sprintf("Unknown request type: %q", request.Type)) }