Skip to content

Commit

Permalink
Merge pull request #9775 from gyuho/fix-grpc-proxy-watch
Browse files Browse the repository at this point in the history
mvcc: fix panic by allowing future revision watcher from restore operation
  • Loading branch information
gyuho committed May 31, 2018
2 parents 2b3aa7e + 0398ec7 commit 391433b
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 3 deletions.
2 changes: 0 additions & 2 deletions integration/v3_watch_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// +build !cluster_proxy

package integration

import (
Expand Down
9 changes: 9 additions & 0 deletions mvcc/watchable_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ func (s *watchableStore) Restore(b backend.Backend) error {
}

for wa := range s.synced.watchers {
wa.restore = true
s.unsynced.add(wa)
}
s.synced = newWatcherGroup()
Expand Down Expand Up @@ -500,6 +501,14 @@ type watcher struct {
// compacted is set when the watcher is removed because of compaction
compacted bool

// restore is true when the watcher is being restored from leader snapshot
// which means that this watcher has just been moved from "synced" to "unsynced"
// watcher group, possibly with a future revision when it was first added
// to the synced watcher
// "unsynced" watcher revision must always be <= current revision,
// except when the watcher were to be moved from "synced" watcher group
restore bool

// minRev is the minimum revision update the watcher will accept
minRev int64
id WatchID
Expand Down
56 changes: 56 additions & 0 deletions mvcc/watchable_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,62 @@ func TestWatchRestore(t *testing.T) {
t.Run("RunSyncWatchLoopBeforeRestore", test(time.Millisecond*120)) // longer than default waitDuration
}

// TestWatchRestoreSyncedWatcher tests such a case that:
// 1. watcher is created with a future revision "math.MaxInt64 - 2"
// 2. watcher with a future revision is added to "synced" watcher group
// 3. restore/overwrite storage with snapshot of a higher lasat revision
// 4. restore operation moves "synced" to "unsynced" watcher group
// 5. choose the watcher from step 1, without panic
func TestWatchRestoreSyncedWatcher(t *testing.T) {
b1, b1Path := backend.NewDefaultTmpBackend()
s1 := newWatchableStore(zap.NewExample(), b1, &lease.FakeLessor{}, nil)
defer cleanup(s1, b1, b1Path)

b2, b2Path := backend.NewDefaultTmpBackend()
s2 := newWatchableStore(zap.NewExample(), b2, &lease.FakeLessor{}, nil)
defer cleanup(s2, b2, b2Path)

testKey, testValue := []byte("foo"), []byte("bar")
rev := s1.Put(testKey, testValue, lease.NoLease)
startRev := rev + 2

// create a watcher with a future revision
// add to "synced" watcher group (startRev > s.store.currentRev)
w1 := s1.NewWatchStream()
w1.Watch(0, testKey, nil, startRev)

// make "s2" ends up with a higher last revision
s2.Put(testKey, testValue, lease.NoLease)
s2.Put(testKey, testValue, lease.NoLease)

// overwrite storage with higher revisions
if err := s1.Restore(b2); err != nil {
t.Fatal(err)
}

// wait for next "syncWatchersLoop" iteration
// and the unsynced watcher should be chosen
time.Sleep(2 * time.Second)

// trigger events for "startRev"
s1.Put(testKey, testValue, lease.NoLease)

select {
case resp := <-w1.Chan():
if resp.Revision != startRev {
t.Fatalf("resp.Revision expect %d, got %d", startRev, resp.Revision)
}
if len(resp.Events) != 1 {
t.Fatalf("len(resp.Events) expect 1, got %d", len(resp.Events))
}
if resp.Events[0].Kv.ModRevision != startRev {
t.Fatalf("resp.Events[0].Kv.ModRevision expect %d, got %d", startRev, resp.Events[0].Kv.ModRevision)
}
case <-time.After(time.Second):
t.Fatal("failed to receive event in 1 second")
}
}

// TestWatchBatchUnsynced tests batching on unsynced watchers
func TestWatchBatchUnsynced(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
Expand Down
10 changes: 9 additions & 1 deletion mvcc/watcher_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,15 @@ func (wg *watcherGroup) chooseAll(curRev, compactRev int64) int64 {
minRev := int64(math.MaxInt64)
for w := range wg.watchers {
if w.minRev > curRev {
panic(fmt.Errorf("watcher minimum revision %d should not exceed current revision %d", w.minRev, curRev))
// after network partition, possibly choosing future revision watcher from restore operation
// with watch key "proxy-namespace__lostleader" and revision "math.MaxInt64 - 2"
// do not panic when such watcher had been moved from "synced" watcher during restore operation
if !w.restore {
panic(fmt.Errorf("watcher minimum revision %d should not exceed current revision %d", w.minRev, curRev))
}

// mark 'restore' done, since it's chosen
w.restore = false
}
if w.minRev < compactRev {
select {
Expand Down

0 comments on commit 391433b

Please sign in to comment.