From 86d1d3e8dc7b13e616844faf32140d332e6856c7 Mon Sep 17 00:00:00 2001 From: Gyuho Lee Date: Thu, 24 May 2018 14:33:49 -0700 Subject: [PATCH 1/2] integration: enable TestV3WatchRestoreSnapshotUnsync for gRPC proxy Signed-off-by: Gyuho Lee --- integration/v3_watch_restore_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/integration/v3_watch_restore_test.go b/integration/v3_watch_restore_test.go index 4a39280a9a3..2b07a716117 100644 --- a/integration/v3_watch_restore_test.go +++ b/integration/v3_watch_restore_test.go @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -// +build !cluster_proxy - package integration import ( From 0398ec7dcb48397aef5d566529d40953cfdc2d19 Mon Sep 17 00:00:00 2001 From: Gyuho Lee Date: Fri, 25 May 2018 10:04:54 -0700 Subject: [PATCH 2/2] mvcc: fix panic by allowing future revision watcher from restore operation This also happens without gRPC proxy. Fix panic when gRPC proxy leader watcher is restored: ``` go test -v -tags cluster_proxy -cpu 4 -race -run TestV3WatchRestoreSnapshotUnsync === RUN TestV3WatchRestoreSnapshotUnsync panic: watcher minimum revision 9223372036854775805 should not exceed current revision 16 goroutine 156 [running]: github.com/coreos/etcd/mvcc.(*watcherGroup).chooseAll(0xc4202b8720, 0x10, 0xffffffffffffffff, 0x1) /home/gyuho/go/src/github.com/coreos/etcd/mvcc/watcher_group.go:242 +0x3b5 github.com/coreos/etcd/mvcc.(*watcherGroup).choose(0xc4202b8720, 0x200, 0x10, 0xffffffffffffffff, 0xc420253378, 0xc420253378) /home/gyuho/go/src/github.com/coreos/etcd/mvcc/watcher_group.go:225 +0x289 github.com/coreos/etcd/mvcc.(*watchableStore).syncWatchers(0xc4202b86e0, 0x0) /home/gyuho/go/src/github.com/coreos/etcd/mvcc/watchable_store.go:340 +0x237 github.com/coreos/etcd/mvcc.(*watchableStore).syncWatchersLoop(0xc4202b86e0) /home/gyuho/go/src/github.com/coreos/etcd/mvcc/watchable_store.go:214 +0x280 created by github.com/coreos/etcd/mvcc.newWatchableStore /home/gyuho/go/src/github.com/coreos/etcd/mvcc/watchable_store.go:90 +0x477 exit status 2 FAIL github.com/coreos/etcd/integration 2.551s ``` gRPC proxy spawns a watcher with a key "proxy-namespace__lostleader" and watch revision "int64(math.MaxInt64 - 2)" to detect leader loss. But, when the partitioned node restores, this watcher triggers panic with "watcher minimum revision ... should not exceed current ...". This check was added a long time ago, by my PR, when there was no gRPC proxy: https://github.com/coreos/etcd/pull/4043#discussion_r48457145 > we can remove this checking actually. it is impossible for a unsynced watching to have a future rev. or we should just panic here. However, now it's possible that a unsynced watcher has a future revision, when it was moved from a synced watcher group through restore operation. This PR adds "restore" flag to indicate that a watcher was moved from the synced watcher group with restore operation. Otherwise, the watcher with future revision in an unsynced watcher group would still panic. Example logs with future revision watcher from restore operation: ``` {"level":"info","ts":1527196358.9057755,"caller":"mvcc/watcher_group.go:261","msg":"choosing future revision watcher from restore operation","watch-key":"proxy-namespace__lostleader","watch-revision":9223372036854775805,"current-revision":16} {"level":"info","ts":1527196358.910349,"caller":"mvcc/watcher_group.go:261","msg":"choosing future revision watcher from restore operation","watch-key":"proxy-namespace__lostleader","watch-revision":9223372036854775805,"current-revision":16} ``` Signed-off-by: Gyuho Lee --- mvcc/watchable_store.go | 9 ++++++ mvcc/watchable_store_test.go | 56 ++++++++++++++++++++++++++++++++++++ mvcc/watcher_group.go | 10 ++++++- 3 files changed, 74 insertions(+), 1 deletion(-) diff --git a/mvcc/watchable_store.go b/mvcc/watchable_store.go index edbb7f9d0cc..f04fe994320 100644 --- a/mvcc/watchable_store.go +++ b/mvcc/watchable_store.go @@ -193,6 +193,7 @@ func (s *watchableStore) Restore(b backend.Backend) error { } for wa := range s.synced.watchers { + wa.restore = true s.unsynced.add(wa) } s.synced = newWatcherGroup() @@ -500,6 +501,14 @@ type watcher struct { // compacted is set when the watcher is removed because of compaction compacted bool + // restore is true when the watcher is being restored from leader snapshot + // which means that this watcher has just been moved from "synced" to "unsynced" + // watcher group, possibly with a future revision when it was first added + // to the synced watcher + // "unsynced" watcher revision must always be <= current revision, + // except when the watcher were to be moved from "synced" watcher group + restore bool + // minRev is the minimum revision update the watcher will accept minRev int64 id WatchID diff --git a/mvcc/watchable_store_test.go b/mvcc/watchable_store_test.go index ec784e85ae6..a929e995c25 100644 --- a/mvcc/watchable_store_test.go +++ b/mvcc/watchable_store_test.go @@ -339,6 +339,62 @@ func TestWatchRestore(t *testing.T) { t.Run("RunSyncWatchLoopBeforeRestore", test(time.Millisecond*120)) // longer than default waitDuration } +// TestWatchRestoreSyncedWatcher tests such a case that: +// 1. watcher is created with a future revision "math.MaxInt64 - 2" +// 2. watcher with a future revision is added to "synced" watcher group +// 3. restore/overwrite storage with snapshot of a higher lasat revision +// 4. restore operation moves "synced" to "unsynced" watcher group +// 5. choose the watcher from step 1, without panic +func TestWatchRestoreSyncedWatcher(t *testing.T) { + b1, b1Path := backend.NewDefaultTmpBackend() + s1 := newWatchableStore(zap.NewExample(), b1, &lease.FakeLessor{}, nil) + defer cleanup(s1, b1, b1Path) + + b2, b2Path := backend.NewDefaultTmpBackend() + s2 := newWatchableStore(zap.NewExample(), b2, &lease.FakeLessor{}, nil) + defer cleanup(s2, b2, b2Path) + + testKey, testValue := []byte("foo"), []byte("bar") + rev := s1.Put(testKey, testValue, lease.NoLease) + startRev := rev + 2 + + // create a watcher with a future revision + // add to "synced" watcher group (startRev > s.store.currentRev) + w1 := s1.NewWatchStream() + w1.Watch(0, testKey, nil, startRev) + + // make "s2" ends up with a higher last revision + s2.Put(testKey, testValue, lease.NoLease) + s2.Put(testKey, testValue, lease.NoLease) + + // overwrite storage with higher revisions + if err := s1.Restore(b2); err != nil { + t.Fatal(err) + } + + // wait for next "syncWatchersLoop" iteration + // and the unsynced watcher should be chosen + time.Sleep(2 * time.Second) + + // trigger events for "startRev" + s1.Put(testKey, testValue, lease.NoLease) + + select { + case resp := <-w1.Chan(): + if resp.Revision != startRev { + t.Fatalf("resp.Revision expect %d, got %d", startRev, resp.Revision) + } + if len(resp.Events) != 1 { + t.Fatalf("len(resp.Events) expect 1, got %d", len(resp.Events)) + } + if resp.Events[0].Kv.ModRevision != startRev { + t.Fatalf("resp.Events[0].Kv.ModRevision expect %d, got %d", startRev, resp.Events[0].Kv.ModRevision) + } + case <-time.After(time.Second): + t.Fatal("failed to receive event in 1 second") + } +} + // TestWatchBatchUnsynced tests batching on unsynced watchers func TestWatchBatchUnsynced(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() diff --git a/mvcc/watcher_group.go b/mvcc/watcher_group.go index f7835203a59..b65c7bc5eb7 100644 --- a/mvcc/watcher_group.go +++ b/mvcc/watcher_group.go @@ -239,7 +239,15 @@ func (wg *watcherGroup) chooseAll(curRev, compactRev int64) int64 { minRev := int64(math.MaxInt64) for w := range wg.watchers { if w.minRev > curRev { - panic(fmt.Errorf("watcher minimum revision %d should not exceed current revision %d", w.minRev, curRev)) + // after network partition, possibly choosing future revision watcher from restore operation + // with watch key "proxy-namespace__lostleader" and revision "math.MaxInt64 - 2" + // do not panic when such watcher had been moved from "synced" watcher during restore operation + if !w.restore { + panic(fmt.Errorf("watcher minimum revision %d should not exceed current revision %d", w.minRev, curRev)) + } + + // mark 'restore' done, since it's chosen + w.restore = false } if w.minRev < compactRev { select {