Skip to content

Commit

Permalink
tests/robustness: unlock Delete/LeaseRevoke ops
Browse files Browse the repository at this point in the history
We should return token to that bucket if `nonUniqueWriteLimiter.Take()`
return true. After unlock Delete/LeaseRevoke ops, the model should be
updated for replay function. There are two updates for `toWatchEvents`.

1. When leaveRevokes op has deleted few keys, we should generate
   `delete-operation` events based on alphabetical order of deleted
   keys.
2. When putWithLease op hits non-exist lease, we should ignore that
   update event.

Signed-off-by: Wei Fu <fuweid89@gmail.com>
  • Loading branch information
fuweid committed Jun 6, 2024
1 parent 6216cbf commit 3044c48
Show file tree
Hide file tree
Showing 7 changed files with 85 additions and 39 deletions.
2 changes: 2 additions & 0 deletions tests/robustness/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package robustness

import (
"context"
"math/rand"
"testing"
"time"

Expand All @@ -36,6 +37,7 @@ import (
var testRunner = framework.E2eTestRunner

func TestMain(m *testing.M) {
rand.Seed(time.Now().UnixNano())
testRunner.TestMain(m)
}

Expand Down
19 changes: 13 additions & 6 deletions tests/robustness/model/deterministic.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"errors"
"fmt"
"hash/fnv"
"maps"
"reflect"
"sort"

Expand Down Expand Up @@ -75,6 +76,15 @@ func (s EtcdState) apply(request EtcdRequest, response EtcdResponse) (bool, Etcd
return Match(MaybeEtcdResponse{EtcdResponse: response}, modelResponse), newState
}

func (s EtcdState) clone() EtcdState {
newState := EtcdState{Revision: s.Revision}

newState.KeyValues = maps.Clone(s.KeyValues)
newState.KeyLeases = maps.Clone(s.KeyLeases)
newState.Leases = maps.Clone(s.Leases)
return newState
}

func freshEtcdState() EtcdState {
return EtcdState{
Revision: 1,
Expand All @@ -85,12 +95,9 @@ func freshEtcdState() EtcdState {
}

// Step handles a successful request, returning updated state and response it would generate.
func (s EtcdState) Step(request EtcdRequest) (EtcdState, MaybeEtcdResponse) {
newKVs := map[string]ValueRevision{}
for k, v := range s.KeyValues {
newKVs[k] = v
}
s.KeyValues = newKVs
func (prevState EtcdState) Step(request EtcdRequest) (EtcdState, MaybeEtcdResponse) {
s := prevState.clone()

switch request.Type {
case Range:
if request.Range.Revision == 0 || request.Range.Revision == s.Revision {
Expand Down
85 changes: 57 additions & 28 deletions tests/robustness/model/replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package model

import (
"fmt"
"sort"
"strings"
)

Expand Down Expand Up @@ -63,44 +64,72 @@ func (r *EtcdReplay) EventsForWatch(watch WatchRequest) (events []PersistedEvent
}

func toWatchEvents(prevState *EtcdState, request EtcdRequest, response MaybeEtcdResponse) (events []PersistedEvent) {
if request.Type != Txn || response.Error != "" {
if response.Error != "" {
return events
}
var ops []EtcdOperation
if response.Txn.Failure {
ops = request.Txn.OperationsOnFailure
} else {
ops = request.Txn.OperationsOnSuccess
}
for _, op := range ops {
switch op.Type {
case RangeOperation:
case DeleteOperation:
e := PersistedEvent{
Event: Event{
Type: op.Type,
Key: op.Delete.Key,
},
Revision: response.Revision,
}
if _, ok := prevState.KeyValues[op.Delete.Key]; ok {

switch request.Type {
case Txn:
var ops []EtcdOperation
if response.Txn.Failure {
ops = request.Txn.OperationsOnFailure
} else {
ops = request.Txn.OperationsOnSuccess
}
for _, op := range ops {
switch op.Type {
case RangeOperation:
case DeleteOperation:
e := PersistedEvent{
Event: Event{
Type: op.Type,
Key: op.Delete.Key,
},
Revision: response.Revision,
}
if _, ok := prevState.KeyValues[op.Delete.Key]; ok {
events = append(events, e)
}
case PutOperation:
_, leaseExists := prevState.Leases[op.Put.LeaseID]
if op.Put.LeaseID != 0 && !leaseExists {
break
}

e := PersistedEvent{
Event: Event{
Type: op.Type,
Key: op.Put.Key,
Value: op.Put.Value,
},
Revision: response.Revision,
}
if _, ok := prevState.KeyValues[op.Put.Key]; !ok {
e.IsCreate = true
}
events = append(events, e)
default:
panic(fmt.Sprintf("unsupported operation type: %v", op))
}
}
case LeaseRevoke:
deletedKeys := []string{}
for key := range prevState.Leases[request.LeaseRevoke.LeaseID].Keys {
if _, ok := prevState.KeyValues[key]; ok {
deletedKeys = append(deletedKeys, key)
}
case PutOperation:
}

sort.Strings(deletedKeys)
for _, key := range deletedKeys {
e := PersistedEvent{
Event: Event{
Type: op.Type,
Key: op.Put.Key,
Value: op.Put.Value,
Type: DeleteOperation,
Key: key,
},
Revision: response.Revision,
}
if _, ok := prevState.KeyValues[op.Put.Key]; !ok {
e.IsCreate = true
}
events = append(events, e)
default:
panic(fmt.Sprintf("unsupported operation type: %v", op))
}
}
return events
Expand Down
2 changes: 1 addition & 1 deletion tests/robustness/report/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func parseEntryNormal(ent raftpb.Entry) (*model.EtcdRequest, error) {
case raftReq.LeaseRevoke != nil:
return &model.EtcdRequest{
Type: model.LeaseRevoke,
LeaseRevoke: &model.LeaseRevokeRequest{LeaseID: raftReq.LeaseGrant.ID},
LeaseRevoke: &model.LeaseRevokeRequest{LeaseID: raftReq.LeaseRevoke.ID},
}, nil
case raftReq.LeaseGrant != nil:
return &model.EtcdRequest{
Expand Down
6 changes: 4 additions & 2 deletions tests/robustness/traffic/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,18 +115,20 @@ func (t etcdTraffic) Run(ctx context.Context, c *client.RecordingClient, limiter
return
default:
}
shouldReturn := false

// Avoid multiple failed writes in a row
if lastOperationSucceeded {
choices := t.requests
if !nonUniqueWriteLimiter.Take() {
if shouldReturn = nonUniqueWriteLimiter.Take(); !shouldReturn {
choices = filterOutNonUniqueEtcdWrites(choices)
}
requestType = pickRandom(choices)
} else {
requestType = Get
}
rev, err := client.Request(ctx, requestType, lastRev)
if requestType == Delete || requestType == LeaseRevoke {
if shouldReturn {
nonUniqueWriteLimiter.Return()
}
lastOperationSucceeded = err == nil
Expand Down
8 changes: 6 additions & 2 deletions tests/robustness/traffic/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,22 +154,26 @@ func (t kubernetesTraffic) Write(ctx context.Context, kc *kubernetesClient, ids
_, err = kc.OptimisticDelete(writeCtx, key, rev)
nonUniqueWriteLimiter.Return()
} else {
shouldReturn := false

choices := t.writeChoices
if !nonUniqueWriteLimiter.Take() {
if shouldReturn = nonUniqueWriteLimiter.Take(); !shouldReturn {
choices = filterOutNonUniqueKubernetesWrites(t.writeChoices)
}
op := pickRandom(choices)
switch op {
case KubernetesDelete:
_, err = kc.OptimisticDelete(writeCtx, key, rev)
nonUniqueWriteLimiter.Return()
case KubernetesUpdate:
_, err = kc.OptimisticUpdate(writeCtx, key, fmt.Sprintf("%d", ids.NewRequestID()), rev)
case KubernetesCreate:
err = kc.OptimisticCreate(writeCtx, t.generateKey(), fmt.Sprintf("%d", ids.NewRequestID()))
default:
panic(fmt.Sprintf("invalid choice: %q", op))
}
if shouldReturn {
nonUniqueWriteLimiter.Return()
}
}
}
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions tests/robustness/validate/patch_history.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ func persistedOperationsReturnTime(allOperations []porcupine.Operation, persiste
}
}
case model.LeaseGrant:
case model.LeaseRevoke:
default:
panic(fmt.Sprintf("Unknown request type: %q", request.Type))
}
Expand All @@ -216,6 +217,7 @@ func operationReturnTime(operations []porcupine.Operation) map[model.EtcdOperati
}
case model.Range:
case model.LeaseGrant:
case model.LeaseRevoke:
default:
panic(fmt.Sprintf("Unknown request type: %q", request.Type))
}
Expand Down

0 comments on commit 3044c48

Please sign in to comment.