Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(providers): gc #325

Merged
merged 3 commits into from
Apr 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions providers/providers.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type ProviderManager struct {
// all non channel fields are meant to be accessed only within
// the run method
providers *lru.LRU
dstore ds.Datastore
dstore *autobatch.Datastore
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are we concerned about the lack of thread-safety in the autobatching datastore?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're only using it from one thread here.


newprovs chan *addProv
getprovs chan *getProv
Expand Down Expand Up @@ -193,8 +193,7 @@ func writeProviderEntry(dstore ds.Datastore, k cid.Cid, p peer.ID, t time.Time)

func (pm *ProviderManager) gc() {
res, err := pm.dstore.Query(dsq.Query{
KeysOnly: true,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, that's a subtle little change. What's the difference here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The expiration is in the value. We had this before because we'd:

  1. Determine that all records have expired.
  2. Delete all records.

This caused us to remember a bunch of expired records indefinitely.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OUCH!

Prefix: providersKeyPrefix,
Prefix: providersKeyPrefix,
})
if err != nil {
log.Error("error garbage collecting provider records: ", err)
Expand Down Expand Up @@ -234,6 +233,8 @@ func (pm *ProviderManager) gc() {
func (pm *ProviderManager) run(proc goprocess.Process) {
tick := time.NewTicker(pm.cleanupInterval)
defer tick.Stop()
defer pm.dstore.Flush()

for {
select {
case np := <-pm.newprovs:
Expand Down
38 changes: 31 additions & 7 deletions providers/providers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,38 +129,62 @@ func TestProvidesExpire(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

ds := dssync.MutexWrap(ds.NewMapDatastore())
mid := peer.ID("testing")
p := NewProviderManager(ctx, mid, dssync.MutexWrap(ds.NewMapDatastore()))
p := NewProviderManager(ctx, mid, ds)

peers := []peer.ID{"a", "b"}
var cids []cid.Cid
for i := 0; i < 10; i++ {
c := cid.NewCidV0(u.Hash([]byte(fmt.Sprint(i))))
cids = append(cids, c)
}

for _, c := range cids[:5] {
p.AddProvider(ctx, c, peers[0])
p.AddProvider(ctx, c, peers[1])
}

for i := 0; i < 10; i++ {
out := p.GetProviders(ctx, cids[i])
time.Sleep(time.Second / 4)

for _, c := range cids[5:] {
p.AddProvider(ctx, c, peers[0])
p.AddProvider(ctx, c, peers[1])
}

for _, c := range cids {
out := p.GetProviders(ctx, c)
if len(out) != 2 {
t.Fatal("expected providers to still be there")
}
}

time.Sleep(time.Second)
for i := 0; i < 10; i++ {
out := p.GetProviders(ctx, cids[i])
time.Sleep(3 * time.Second / 8)

for _, c := range cids[:5] {
out := p.GetProviders(ctx, c)
if len(out) > 0 {
t.Fatal("expected providers to be cleaned up, got: ", out)
}
}

for _, c := range cids[5:] {
out := p.GetProviders(ctx, c)
if len(out) != 2 {
t.Fatal("expected providers to still be there")
}
}

time.Sleep(time.Second / 2)

// Stop to prevent data races
p.Process().Close()

if p.providers.Len() != 0 {
t.Fatal("providers map not cleaned up")
}

res, err := p.dstore.Query(dsq.Query{Prefix: providersKeyPrefix})
res, err := ds.Query(dsq.Query{Prefix: providersKeyPrefix})
if err != nil {
t.Fatal(err)
}
Expand Down