Skip to content

Commit

Permalink
use kv interface (WIP)
Browse files Browse the repository at this point in the history
  • Loading branch information
gyuho committed Dec 23, 2015
1 parent 28e2832 commit 1206087
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 103 deletions.
120 changes: 47 additions & 73 deletions storage/watchable_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,53 @@ func (s *watchableStore) syncWatchingsLoop() {
}
}

// RangeAllUnsynced ranges on all unsynced watchings.
func (s *watchableStore) RangeAllUnsynced() []storagepb.KeyValue {
totalLimit := 0
minRev, maxRev := int64(math.MaxInt64), int64(math.MinInt64)
for w := range s.unsynced {
if w.cur > 0 && w.cur <= s.store.compactMainRev {
log.Printf("storage: %v", ErrCompacted)
delete(s.unsynced, w)
continue
}
if w.cur > s.store.currentRev.main {
log.Printf("storage: %v", ErrFutureRev)
delete(s.unsynced, w)
continue
}
totalLimit += cap(w.ch) - len(w.ch)
if minRev >= w.cur {
minRev = w.cur
}
if maxRev <= w.cur {
maxRev = w.cur
}
}

min, max := newRevBytes(), newRevBytes()
revToBytes(revision{main: minRev}, min)
revToBytes(revision{main: maxRev}, max)

s.store.mu.Lock()
defer s.store.mu.Unlock()

tx := s.store.b.BatchTx()
tx.Lock()
defer tx.Unlock()
_, vs := tx.UnsafeRange(keyBucketName, min, max, int64(totalLimit))
kvs := []storagepb.KeyValue{}
for _, vi := range vs {
var kv storagepb.KeyValue
if err := kv.Unmarshal(vi); err != nil {
log.Fatalf("storage: cannot unmarshal event: %v", err)
}
kvs = append(kvs, kv)
}

return kvs
}

// syncWatchings syncs the watchings in the unsyncd map.
func (s *watchableStore) syncWatchings() {
_, curRev, _ := s.store.Range(nil, nil, 0, 0)
Expand Down Expand Up @@ -296,79 +343,6 @@ func (s *watchableStore) syncWatchings() {
slowWatchingGauge.Set(float64(len(s.unsynced)))
}

// RangeAllUnsynced ranges on all unsynced watchings.
func (s *watchableStore) RangeAllUnsynced() {
totalLimit := 0
minRev, maxRev := int64(math.MaxInt64), int64(math.MinInt64)
for w := range s.unsynced {
if w.cur > 0 && w.cur <= s.store.compactMainRev {
log.Printf("storage: %v", ErrCompacted)
delete(s.unsynced, w)
continue
}
if w.cur > s.store.currentRev.main {
log.Printf("storage: %v", ErrFutureRev)
delete(s.unsynced, w)
continue
}
totalLimit += cap(w.ch) - len(w.ch)
if minRev >= w.cur {
minRev = w.cur
}
if maxRev <= w.cur {
maxRev = w.cur
}
}

min, max := newRevBytes(), newRevBytes()
revToBytes(revision{main: minRev}, min)
revToBytes(revision{main: maxRev}, max)

s.store.mu.Lock()
defer s.store.mu.Unlock()

tx := s.store.b.BatchTx()
tx.Lock()

_, finishedCompactBytes := tx.UnsafeRange(metaBucketName, finishedCompactKeyName, nil, 0)
if len(finishedCompactBytes) != 0 {
s.store.compactMainRev = bytesToRev(finishedCompactBytes[0]).main
log.Printf("storage: restore compact to %d", s.store.compactMainRev)
}

keys, vals := tx.UnsafeRange(keyBucketName, min, max, int64(totalLimit))
for i, key := range keys {
var kv storagepb.KeyValue
if err := kv.Unmarshal(vals[i]); err != nil {
log.Fatalf("storage: cannot unmarshal event: %v", err)
}

rev := bytesToRev(key[:revBytesLen])

// restore index
switch {
case isTombstone(key):
s.store.kvindex.Tombstone(kv.Key, rev)
default:
s.store.kvindex.Restore(kv.Key, revision{kv.CreateRevision, 0}, rev, kv.Version)
}

// update revision
s.store.currentRev = rev
}

tx.Unlock()

fmt.Println("keys:", keys)
fmt.Println("vals:", vals)
/*
min: foo1
max: foo9
keys: []
vals: []
*/
}

// handle handles the change of the happening event on all watchings.
func (s *watchableStore) handle(rev int64, ev storagepb.Event) {
s.notify(rev, ev)
Expand Down
61 changes: 31 additions & 30 deletions storage/watchable_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,37 @@ func TestCancelUnsynced(t *testing.T) {
}
}

// TestRangeAllUnsynced ... WIP
func TestRangeAllUnsynced(t *testing.T) {
s := &watchableStore{
store: newStore(tmpPath),
unsynced: make(map[*watching]struct{}),
synced: make(map[string]map[*watching]struct{}),
}

defer func() {
s.store.Close()
os.Remove(tmpPath)
}()

watcherN := 10

for i := 1; i < watcherN; i++ {
key := []byte(fmt.Sprintf("foo%d", i))
val := []byte(fmt.Sprintf("bar%d", i))

s.Put(key, val)

w := s.NewWatcher()

// use non-0 to keep watchers in unsynced
w.Watch(key, false, int64(i))
}

rs := s.RangeAllUnsynced()
fmt.Println(len(rs))
}

// TestSyncWatchings populates unsynced watching map and
// tests syncWatchings method to see if it correctly sends
// events to channel of unsynced watchings and moves these
Expand Down Expand Up @@ -186,36 +217,6 @@ func TestSyncWatchings(t *testing.T) {
}
}

// TestRangeAllUnsynced ...
func TestRangeAllUnsynced(t *testing.T) {
s := &watchableStore{
store: newStore(tmpPath),
unsynced: make(map[*watching]struct{}),
synced: make(map[string]map[*watching]struct{}),
}

defer func() {
s.store.Close()
os.Remove(tmpPath)
}()

watcherN := 10

for i := 1; i < watcherN; i++ {
key := []byte(fmt.Sprintf("foo%d", i))
val := []byte(fmt.Sprintf("bar%d", i))

s.Put(key, val)

w := s.NewWatcher()

// use non-0 to keep watchers in unsynced
w.Watch(key, false, int64(i))
}

s.RangeAllUnsynced()
}

func TestUnsafeAddWatching(t *testing.T) {
s := newWatchableStore(tmpPath)
defer func() {
Expand Down

0 comments on commit 1206087

Please sign in to comment.