Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: range all unsynced at once #4043

Merged
merged 2 commits into from
Dec 28, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 0 additions & 59 deletions storage/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,65 +189,6 @@ func (s *store) TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err
return n, rev, nil
}

// RangeHistory ranges the history from key to end starting from startRev.
// If `end` is nil, the request only observes the events on key.
// If `end` is not nil, it observes the events on key range [key, range_end).
// Limit limits the number of events returned.
// If startRev <=0, rangeEvents returns events from the beginning of uncompacted history.
//
// If the required start rev is compacted, ErrCompacted will be returned.
// If the required start rev has not happened, ErrFutureRev will be returned.
//
// RangeHistory returns revision bytes slice and key-values that satisfy the requirement (0 <= n <= limit).
// If history in the revision range has not all happened, it returns immeidately
// what is available.
// It also returns nextRev which indicates the start revision used for the following
// RangeEvents call. The nextRev could be smaller than the given endRev if the store
// has not progressed so far or it hits the event limit.
//
// TODO: return byte slices instead of keyValues to avoid meaningless encode and decode.
// This also helps to return raw (key, val) pair directly to make API consistent.
func (s *store) RangeHistory(key, end []byte, limit, startRev int64) (revbs [][]byte, kvs []storagepb.KeyValue, nextRev int64, err error) {
s.mu.Lock()
defer s.mu.Unlock()

if startRev > 0 && startRev <= s.compactMainRev {
return nil, nil, 0, ErrCompacted
}
if startRev > s.currentRev.main {
return nil, nil, 0, ErrFutureRev
}

revs := s.kvindex.RangeSince(key, end, startRev)
if len(revs) == 0 {
return nil, nil, s.currentRev.main + 1, nil
}

tx := s.b.BatchTx()
tx.Lock()
defer tx.Unlock()
// fetch events from the backend using revisions
for _, rev := range revs {
start, end := revBytesRange(rev)

ks, vs := tx.UnsafeRange(keyBucketName, start, end, 0)
if len(vs) != 1 {
log.Fatalf("storage: range cannot find rev (%d,%d)", rev.main, rev.sub)
}

var kv storagepb.KeyValue
if err := kv.Unmarshal(vs[0]); err != nil {
log.Fatalf("storage: cannot unmarshal event: %v", err)
}
revbs = append(revbs, ks[0])
kvs = append(kvs, kv)
if limit > 0 && len(kvs) >= int(limit) {
return revbs, kvs, rev.main + 1, nil
}
}
return revbs, kvs, s.currentRev.main + 1, nil
}

func (s *store) Compact(rev int64) error {
s.mu.Lock()
defer s.mu.Unlock()
Expand Down
263 changes: 0 additions & 263 deletions storage/kvstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,73 +263,6 @@ func TestStoreDeleteRange(t *testing.T) {
}
}

func TestStoreRangeHistory(t *testing.T) {
key := newTestKeyBytes(revision{2, 0}, false)
kv := storagepb.KeyValue{
Key: []byte("foo"),
Value: []byte("bar"),
CreateRevision: 1,
ModRevision: 2,
Version: 1,
}
kvb, err := kv.Marshal()
if err != nil {
t.Fatal(err)
}
currev := revision{2, 0}

tests := []struct {
idxr indexRangeEventsResp
r rangeResp
}{
{
indexRangeEventsResp{[]revision{{2, 0}}},
rangeResp{[][]byte{key}, [][]byte{kvb}},
},
{
indexRangeEventsResp{[]revision{{2, 0}, {3, 0}}},
rangeResp{[][]byte{key}, [][]byte{kvb}},
},
}
for i, tt := range tests {
s := newFakeStore()
b := s.b.(*fakeBackend)
fi := s.kvindex.(*fakeIndex)

s.currentRev = currev
fi.indexRangeEventsRespc <- tt.idxr
b.tx.rangeRespc <- tt.r

keys, kvs, _, err := s.RangeHistory([]byte("foo"), []byte("goo"), 1, 1)
if err != nil {
t.Errorf("#%d: err = %v, want nil", i, err)
}
if w := [][]byte{key}; !reflect.DeepEqual(keys, w) {
t.Errorf("#%d: keys = %+v, want %+v", i, keys, w)
}
if w := []storagepb.KeyValue{kv}; !reflect.DeepEqual(kvs, w) {
t.Errorf("#%d: kvs = %+v, want %+v", i, kvs, w)
}

wact := []testutil.Action{
{"rangeEvents", []interface{}{[]byte("foo"), []byte("goo"), int64(1)}},
}
if g := fi.Action(); !reflect.DeepEqual(g, wact) {
t.Errorf("#%d: index action = %+v, want %+v", i, g, wact)
}
wstart, wend := revBytesRange(tt.idxr.revs[0])
wact = []testutil.Action{
{"range", []interface{}{keyBucketName, wstart, wend, int64(0)}},
}
if g := b.tx.Action(); !reflect.DeepEqual(g, wact) {
t.Errorf("#%d: tx action = %+v, want %+v", i, g, wact)
}
if s.currentRev != currev {
t.Errorf("#%d: current rev = %+v, want %+v", i, s.currentRev, currev)
}
}
}

func TestStoreCompact(t *testing.T) {
s := newFakeStore()
b := s.b.(*fakeBackend)
Expand Down Expand Up @@ -421,202 +354,6 @@ func TestStoreRestore(t *testing.T) {
}
}

// tests end parameter works well
func TestStoreRangeHistoryEnd(t *testing.T) {
s := newStore(tmpPath)
defer cleanup(s, tmpPath)

s.Put([]byte("foo"), []byte("bar"))
s.Put([]byte("foo1"), []byte("bar1"))
s.Put([]byte("foo2"), []byte("bar2"))
keys := [][]byte{
newTestKeyBytes(revision{1, 0}, false),
newTestKeyBytes(revision{2, 0}, false),
newTestKeyBytes(revision{3, 0}, false),
}
kvs := []storagepb.KeyValue{
{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 1, Version: 1},
{Key: []byte("foo1"), Value: []byte("bar1"), CreateRevision: 2, ModRevision: 2, Version: 1},
{Key: []byte("foo2"), Value: []byte("bar2"), CreateRevision: 3, ModRevision: 3, Version: 1},
}

tests := []struct {
key, end []byte
wkeys [][]byte
wkvs []storagepb.KeyValue
}{
// get no keys
{
[]byte("doo"), []byte("foo"),
nil, nil,
},
// get no keys when key == end
{
[]byte("foo"), []byte("foo"),
nil, nil,
},
// get no keys when ranging single key
{
[]byte("doo"), nil,
nil, nil,
},
// get all keys
{
[]byte("foo"), []byte("foo3"),
keys, kvs,
},
// get partial keys
{
[]byte("foo"), []byte("foo1"),
keys[:1], kvs[:1],
},
// get single key
{
[]byte("foo"), nil,
keys[:1], kvs[:1],
},
}

for i, tt := range tests {
keys, kvs, rev, err := s.RangeHistory(tt.key, tt.end, 0, 1)
if err != nil {
t.Fatal(err)
}
if rev != 4 {
t.Errorf("#%d: rev = %d, want %d", i, rev, 4)
}
if !reflect.DeepEqual(keys, tt.wkeys) {
t.Errorf("#%d: actions = %+v, want %+v", i, keys, tt.wkeys)
}
if !reflect.DeepEqual(kvs, tt.wkvs) {
t.Errorf("#%d: kvs = %+v, want %+v", i, kvs, tt.wkvs)
}
}
}

func TestStoreRangeHistoryRev(t *testing.T) {
s := newStore(tmpPath)
defer cleanup(s, tmpPath)

s.Put([]byte("foo"), []byte("bar"))
s.DeleteRange([]byte("foo"), nil)
s.Put([]byte("foo"), []byte("bar"))
s.Put([]byte("unrelated"), []byte("unrelated"))
keys := [][]byte{
newTestKeyBytes(revision{1, 0}, false),
newTestKeyBytes(revision{2, 0}, true),
newTestKeyBytes(revision{3, 0}, false),
}
kvs := []storagepb.KeyValue{
{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 1, Version: 1},
{Key: []byte("foo")},
{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 3, ModRevision: 3, Version: 1},
}

tests := []struct {
start int64

wkeys [][]byte
wkvs []storagepb.KeyValue
wnext int64
}{
{0, keys, kvs, 5},
{1, keys, kvs, 5},
{3, keys[2:], kvs[2:], 5},
}

for i, tt := range tests {
keys, kvs, next, err := s.RangeHistory([]byte("foo"), nil, 0, tt.start)
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(keys, tt.wkeys) {
t.Errorf("#%d: acts = %+v, want %+v", i, keys, tt.wkeys)
}
if !reflect.DeepEqual(kvs, tt.wkvs) {
t.Errorf("#%d: kvs = %+v, want %+v", i, kvs, tt.wkvs)
}
if next != tt.wnext {
t.Errorf("#%d: next = %d, want %d", i, next, tt.wnext)
}
}
}

func TestStoreRangeHistoryBad(t *testing.T) {
s := newStore(tmpPath)
defer cleanup(s, tmpPath)

s.Put([]byte("foo"), []byte("bar"))
s.Put([]byte("foo"), []byte("bar1"))
s.Put([]byte("foo"), []byte("bar2"))
if err := s.Compact(3); err != nil {
t.Fatalf("compact error (%v)", err)
}

tests := []struct {
rev int64
werr error
}{
{1, ErrCompacted},
{2, ErrCompacted},
{3, ErrCompacted},
{4, ErrFutureRev},
{10, ErrFutureRev},
}
for i, tt := range tests {
_, _, _, err := s.RangeHistory([]byte("foo"), nil, 0, tt.rev)
if err != tt.werr {
t.Errorf("#%d: error = %v, want %v", i, err, tt.werr)
}
}
}

func TestStoreRangeHistoryLimit(t *testing.T) {
s := newStore(tmpPath)
defer cleanup(s, tmpPath)

s.Put([]byte("foo"), []byte("bar"))
s.DeleteRange([]byte("foo"), nil)
s.Put([]byte("foo"), []byte("bar"))
keys := [][]byte{
newTestKeyBytes(revision{1, 0}, false),
newTestKeyBytes(revision{2, 0}, true),
newTestKeyBytes(revision{3, 0}, false),
}
kvs := []storagepb.KeyValue{
{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 1, Version: 1},
{Key: []byte("foo")},
{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 3, ModRevision: 3, Version: 1},
}

tests := []struct {
limit int64
wkeys [][]byte
wkvs []storagepb.KeyValue
}{
// no limit
{-1, keys, kvs},
// no limit
{0, keys, kvs},
{1, keys[:1], kvs[:1]},
{2, keys[:2], kvs[:2]},
{3, keys, kvs},
{100, keys, kvs},
}
for i, tt := range tests {
keys, kvs, _, err := s.RangeHistory([]byte("foo"), nil, tt.limit, 1)
if err != nil {
t.Fatalf("#%d: range error (%v)", i, err)
}
if !reflect.DeepEqual(keys, tt.wkeys) {
t.Errorf("#%d: acts = %+v, want %+v", i, keys, tt.wkeys)
}
if !reflect.DeepEqual(kvs, tt.wkvs) {
t.Errorf("#%d: kvs = %+v, want %+v", i, kvs, tt.wkvs)
}
}
}

func TestRestoreContinueUnfinishedCompaction(t *testing.T) {
s0 := newStore(tmpPath)
defer os.Remove(tmpPath)
Expand Down
Loading