diff --git a/storage/kvstore.go b/storage/kvstore.go index 6dc11a4cc2e..4761b938ec5 100644 --- a/storage/kvstore.go +++ b/storage/kvstore.go @@ -189,65 +189,6 @@ func (s *store) TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err return n, rev, nil } -// RangeHistory ranges the history from key to end starting from startRev. -// If `end` is nil, the request only observes the events on key. -// If `end` is not nil, it observes the events on key range [key, range_end). -// Limit limits the number of events returned. -// If startRev <=0, rangeEvents returns events from the beginning of uncompacted history. -// -// If the required start rev is compacted, ErrCompacted will be returned. -// If the required start rev has not happened, ErrFutureRev will be returned. -// -// RangeHistory returns revision bytes slice and key-values that satisfy the requirement (0 <= n <= limit). -// If history in the revision range has not all happened, it returns immeidately -// what is available. -// It also returns nextRev which indicates the start revision used for the following -// RangeEvents call. The nextRev could be smaller than the given endRev if the store -// has not progressed so far or it hits the event limit. -// -// TODO: return byte slices instead of keyValues to avoid meaningless encode and decode. -// This also helps to return raw (key, val) pair directly to make API consistent. -func (s *store) RangeHistory(key, end []byte, limit, startRev int64) (revbs [][]byte, kvs []storagepb.KeyValue, nextRev int64, err error) { - s.mu.Lock() - defer s.mu.Unlock() - - if startRev > 0 && startRev <= s.compactMainRev { - return nil, nil, 0, ErrCompacted - } - if startRev > s.currentRev.main { - return nil, nil, 0, ErrFutureRev - } - - revs := s.kvindex.RangeSince(key, end, startRev) - if len(revs) == 0 { - return nil, nil, s.currentRev.main + 1, nil - } - - tx := s.b.BatchTx() - tx.Lock() - defer tx.Unlock() - // fetch events from the backend using revisions - for _, rev := range revs { - start, end := revBytesRange(rev) - - ks, vs := tx.UnsafeRange(keyBucketName, start, end, 0) - if len(vs) != 1 { - log.Fatalf("storage: range cannot find rev (%d,%d)", rev.main, rev.sub) - } - - var kv storagepb.KeyValue - if err := kv.Unmarshal(vs[0]); err != nil { - log.Fatalf("storage: cannot unmarshal event: %v", err) - } - revbs = append(revbs, ks[0]) - kvs = append(kvs, kv) - if limit > 0 && len(kvs) >= int(limit) { - return revbs, kvs, rev.main + 1, nil - } - } - return revbs, kvs, s.currentRev.main + 1, nil -} - func (s *store) Compact(rev int64) error { s.mu.Lock() defer s.mu.Unlock() diff --git a/storage/kvstore_test.go b/storage/kvstore_test.go index c179147fce4..ccc0376ddd0 100644 --- a/storage/kvstore_test.go +++ b/storage/kvstore_test.go @@ -263,73 +263,6 @@ func TestStoreDeleteRange(t *testing.T) { } } -func TestStoreRangeHistory(t *testing.T) { - key := newTestKeyBytes(revision{2, 0}, false) - kv := storagepb.KeyValue{ - Key: []byte("foo"), - Value: []byte("bar"), - CreateRevision: 1, - ModRevision: 2, - Version: 1, - } - kvb, err := kv.Marshal() - if err != nil { - t.Fatal(err) - } - currev := revision{2, 0} - - tests := []struct { - idxr indexRangeEventsResp - r rangeResp - }{ - { - indexRangeEventsResp{[]revision{{2, 0}}}, - rangeResp{[][]byte{key}, [][]byte{kvb}}, - }, - { - indexRangeEventsResp{[]revision{{2, 0}, {3, 0}}}, - rangeResp{[][]byte{key}, [][]byte{kvb}}, - }, - } - for i, tt := range tests { - s := newFakeStore() - b := s.b.(*fakeBackend) - fi := s.kvindex.(*fakeIndex) - - s.currentRev = currev - fi.indexRangeEventsRespc <- tt.idxr - b.tx.rangeRespc <- tt.r - - keys, kvs, _, err := s.RangeHistory([]byte("foo"), []byte("goo"), 1, 1) - if err != nil { - t.Errorf("#%d: err = %v, want nil", i, err) - } - if w := [][]byte{key}; !reflect.DeepEqual(keys, w) { - t.Errorf("#%d: keys = %+v, want %+v", i, keys, w) - } - if w := []storagepb.KeyValue{kv}; !reflect.DeepEqual(kvs, w) { - t.Errorf("#%d: kvs = %+v, want %+v", i, kvs, w) - } - - wact := []testutil.Action{ - {"rangeEvents", []interface{}{[]byte("foo"), []byte("goo"), int64(1)}}, - } - if g := fi.Action(); !reflect.DeepEqual(g, wact) { - t.Errorf("#%d: index action = %+v, want %+v", i, g, wact) - } - wstart, wend := revBytesRange(tt.idxr.revs[0]) - wact = []testutil.Action{ - {"range", []interface{}{keyBucketName, wstart, wend, int64(0)}}, - } - if g := b.tx.Action(); !reflect.DeepEqual(g, wact) { - t.Errorf("#%d: tx action = %+v, want %+v", i, g, wact) - } - if s.currentRev != currev { - t.Errorf("#%d: current rev = %+v, want %+v", i, s.currentRev, currev) - } - } -} - func TestStoreCompact(t *testing.T) { s := newFakeStore() b := s.b.(*fakeBackend) @@ -421,202 +354,6 @@ func TestStoreRestore(t *testing.T) { } } -// tests end parameter works well -func TestStoreRangeHistoryEnd(t *testing.T) { - s := newStore(tmpPath) - defer cleanup(s, tmpPath) - - s.Put([]byte("foo"), []byte("bar")) - s.Put([]byte("foo1"), []byte("bar1")) - s.Put([]byte("foo2"), []byte("bar2")) - keys := [][]byte{ - newTestKeyBytes(revision{1, 0}, false), - newTestKeyBytes(revision{2, 0}, false), - newTestKeyBytes(revision{3, 0}, false), - } - kvs := []storagepb.KeyValue{ - {Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 1, Version: 1}, - {Key: []byte("foo1"), Value: []byte("bar1"), CreateRevision: 2, ModRevision: 2, Version: 1}, - {Key: []byte("foo2"), Value: []byte("bar2"), CreateRevision: 3, ModRevision: 3, Version: 1}, - } - - tests := []struct { - key, end []byte - wkeys [][]byte - wkvs []storagepb.KeyValue - }{ - // get no keys - { - []byte("doo"), []byte("foo"), - nil, nil, - }, - // get no keys when key == end - { - []byte("foo"), []byte("foo"), - nil, nil, - }, - // get no keys when ranging single key - { - []byte("doo"), nil, - nil, nil, - }, - // get all keys - { - []byte("foo"), []byte("foo3"), - keys, kvs, - }, - // get partial keys - { - []byte("foo"), []byte("foo1"), - keys[:1], kvs[:1], - }, - // get single key - { - []byte("foo"), nil, - keys[:1], kvs[:1], - }, - } - - for i, tt := range tests { - keys, kvs, rev, err := s.RangeHistory(tt.key, tt.end, 0, 1) - if err != nil { - t.Fatal(err) - } - if rev != 4 { - t.Errorf("#%d: rev = %d, want %d", i, rev, 4) - } - if !reflect.DeepEqual(keys, tt.wkeys) { - t.Errorf("#%d: actions = %+v, want %+v", i, keys, tt.wkeys) - } - if !reflect.DeepEqual(kvs, tt.wkvs) { - t.Errorf("#%d: kvs = %+v, want %+v", i, kvs, tt.wkvs) - } - } -} - -func TestStoreRangeHistoryRev(t *testing.T) { - s := newStore(tmpPath) - defer cleanup(s, tmpPath) - - s.Put([]byte("foo"), []byte("bar")) - s.DeleteRange([]byte("foo"), nil) - s.Put([]byte("foo"), []byte("bar")) - s.Put([]byte("unrelated"), []byte("unrelated")) - keys := [][]byte{ - newTestKeyBytes(revision{1, 0}, false), - newTestKeyBytes(revision{2, 0}, true), - newTestKeyBytes(revision{3, 0}, false), - } - kvs := []storagepb.KeyValue{ - {Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 1, Version: 1}, - {Key: []byte("foo")}, - {Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 3, ModRevision: 3, Version: 1}, - } - - tests := []struct { - start int64 - - wkeys [][]byte - wkvs []storagepb.KeyValue - wnext int64 - }{ - {0, keys, kvs, 5}, - {1, keys, kvs, 5}, - {3, keys[2:], kvs[2:], 5}, - } - - for i, tt := range tests { - keys, kvs, next, err := s.RangeHistory([]byte("foo"), nil, 0, tt.start) - if err != nil { - t.Fatal(err) - } - if !reflect.DeepEqual(keys, tt.wkeys) { - t.Errorf("#%d: acts = %+v, want %+v", i, keys, tt.wkeys) - } - if !reflect.DeepEqual(kvs, tt.wkvs) { - t.Errorf("#%d: kvs = %+v, want %+v", i, kvs, tt.wkvs) - } - if next != tt.wnext { - t.Errorf("#%d: next = %d, want %d", i, next, tt.wnext) - } - } -} - -func TestStoreRangeHistoryBad(t *testing.T) { - s := newStore(tmpPath) - defer cleanup(s, tmpPath) - - s.Put([]byte("foo"), []byte("bar")) - s.Put([]byte("foo"), []byte("bar1")) - s.Put([]byte("foo"), []byte("bar2")) - if err := s.Compact(3); err != nil { - t.Fatalf("compact error (%v)", err) - } - - tests := []struct { - rev int64 - werr error - }{ - {1, ErrCompacted}, - {2, ErrCompacted}, - {3, ErrCompacted}, - {4, ErrFutureRev}, - {10, ErrFutureRev}, - } - for i, tt := range tests { - _, _, _, err := s.RangeHistory([]byte("foo"), nil, 0, tt.rev) - if err != tt.werr { - t.Errorf("#%d: error = %v, want %v", i, err, tt.werr) - } - } -} - -func TestStoreRangeHistoryLimit(t *testing.T) { - s := newStore(tmpPath) - defer cleanup(s, tmpPath) - - s.Put([]byte("foo"), []byte("bar")) - s.DeleteRange([]byte("foo"), nil) - s.Put([]byte("foo"), []byte("bar")) - keys := [][]byte{ - newTestKeyBytes(revision{1, 0}, false), - newTestKeyBytes(revision{2, 0}, true), - newTestKeyBytes(revision{3, 0}, false), - } - kvs := []storagepb.KeyValue{ - {Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 1, Version: 1}, - {Key: []byte("foo")}, - {Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 3, ModRevision: 3, Version: 1}, - } - - tests := []struct { - limit int64 - wkeys [][]byte - wkvs []storagepb.KeyValue - }{ - // no limit - {-1, keys, kvs}, - // no limit - {0, keys, kvs}, - {1, keys[:1], kvs[:1]}, - {2, keys[:2], kvs[:2]}, - {3, keys, kvs}, - {100, keys, kvs}, - } - for i, tt := range tests { - keys, kvs, _, err := s.RangeHistory([]byte("foo"), nil, tt.limit, 1) - if err != nil { - t.Fatalf("#%d: range error (%v)", i, err) - } - if !reflect.DeepEqual(keys, tt.wkeys) { - t.Errorf("#%d: acts = %+v, want %+v", i, keys, tt.wkeys) - } - if !reflect.DeepEqual(kvs, tt.wkvs) { - t.Errorf("#%d: kvs = %+v, want %+v", i, kvs, tt.wkvs) - } - } -} - func TestRestoreContinueUnfinishedCompaction(t *testing.T) { s0 := newStore(tmpPath) defer os.Remove(tmpPath) diff --git a/storage/watchable_store.go b/storage/watchable_store.go index ebad2a64de9..1485b84685a 100644 --- a/storage/watchable_store.go +++ b/storage/watchable_store.go @@ -17,6 +17,7 @@ package storage import ( "fmt" "log" + "math" "sync" "time" @@ -241,57 +242,105 @@ func (s *watchableStore) syncWatchingsLoop() { } } -// syncWatchings syncs the watchings in the unsyncd map. +// syncWatchings periodically syncs unsynced watchings by: Iterate all unsynced +// watchings to get the minimum revision within its range, skipping the +// watching if its current revision is behind the compact revision of the +// store. And use this minimum revision to get all key-value pairs. Then send +// those events to watchings. func (s *watchableStore) syncWatchings() { - _, curRev, _ := s.store.Range(nil, nil, 0, 0) + s.store.mu.Lock() + defer s.store.mu.Unlock() + + if len(s.unsynced) == 0 { + return + } + + // in order to find key-value pairs from unsynced watchings, we need to + // find min revision index, and these revisions can be used to + // query the backend store of key-value pairs + minRev := int64(math.MaxInt64) + + curRev := s.store.currentRev.main + compactionRev := s.store.compactMainRev + + // TODO: change unsynced struct type same to this + keyToUnsynced := make(map[string]map[*watching]struct{}) + for w := range s.unsynced { - var end []byte - if w.prefix { - end = make([]byte, len(w.key)) - copy(end, w.key) - end[len(w.key)-1]++ + k := string(w.key) + + if w.cur > curRev { + panic("watching current revision should not exceed current revision") } - limit := cap(w.ch) - len(w.ch) - // the channel is full, try it in the next round - if limit == 0 { + + if w.cur < compactionRev { + // TODO: return error compacted to that watching instead of + // just removing it sliently from unsynced. + delete(s.unsynced, w) continue } - revbs, kvs, nextRev, err := s.store.RangeHistory(w.key, end, int64(limit), w.cur) - if err != nil { - // TODO: send error event to watching - delete(s.unsynced, w) + + if minRev >= w.cur { + minRev = w.cur + } + + if _, ok := keyToUnsynced[k]; !ok { + keyToUnsynced[k] = make(map[*watching]struct{}) + } + keyToUnsynced[k][w] = struct{}{} + } + + minBytes, maxBytes := newRevBytes(), newRevBytes() + revToBytes(revision{main: minRev}, minBytes) + revToBytes(revision{main: curRev + 1}, maxBytes) + + // UnsafeRange returns keys and values. And in boltdb, keys are revisions. + // values are actual key-value pairs in backend. + tx := s.store.b.BatchTx() + tx.Lock() + ks, vs := tx.UnsafeRange(keyBucketName, minBytes, maxBytes, 0) + tx.Unlock() + + for i, v := range vs { + var kv storagepb.KeyValue + if err := kv.Unmarshal(v); err != nil { + log.Panicf("storage: cannot unmarshal event: %v", err) + } + + k := string(kv.Key) + wm, ok := keyToUnsynced[k] + if !ok { continue } - // push events to the channel - for i, kv := range kvs { - var evt storagepb.Event_EventType - switch { - case isTombstone(revbs[i]): - evt = storagepb.DELETE - default: - evt = storagepb.PUT - } + var ev storagepb.Event + switch { + case isTombstone(ks[i]): + ev.Type = storagepb.DELETE + default: + ev.Type = storagepb.PUT + } + ev.Kv = &kv + + for w := range wm { + ev.WatchID = w.id - w.ch <- storagepb.Event{ - Type: evt, - Kv: &kv, - WatchID: w.id, + select { + case w.ch <- ev: + pendingEventsGauge.Inc() + default: + // TODO: handle the full unsynced watchings. + // continue to process other watchings for now, the full ones + // will be processed next time and hopefully it will not be full. + continue } - pendingEventsGauge.Inc() - } - // switch to tracking future events if needed - if nextRev > curRev { - k := string(w.key) if err := unsafeAddWatching(&s.synced, k, w); err != nil { log.Panicf("error unsafeAddWatching (%v) for key %s", err, k) } delete(s.unsynced, w) - continue } - // put it back to try it in the next round - w.cur = nextRev } + slowWatchingGauge.Set(float64(len(s.unsynced))) }