Skip to content
This repository has been archived by the owner on Jun 26, 2023. It is now read-only.

Commit

Permalink
Merge pull request #53 from ipfs/fix/38
Browse files Browse the repository at this point in the history
fix over-wait in WaitPub
  • Loading branch information
schomatis authored Jan 14, 2019
2 parents 6c00361 + 740d058 commit 996259a
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 96 deletions.
3 changes: 2 additions & 1 deletion ops.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package mfs

import (
"context"
"fmt"
"os"
gopath "path"
Expand Down Expand Up @@ -235,6 +236,6 @@ func FlushPath(rt *Root, pth string) error {
return err
}

rt.repub.WaitPub()
rt.repub.WaitPub(context.TODO())
return nil
}
211 changes: 130 additions & 81 deletions repub.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package mfs

import (
"context"
"sync"
"time"

cid "github.com/ipfs/go-cid"
Expand All @@ -14,15 +13,13 @@ type PubFunc func(context.Context, cid.Cid) error

// Republisher manages when to publish a given entry.
type Republisher struct {
TimeoutLong time.Duration
TimeoutShort time.Duration
valueHasBeenUpdated chan struct{}
pubfunc PubFunc
immediatePublish chan chan struct{}
TimeoutLong time.Duration
TimeoutShort time.Duration
RetryTimeout time.Duration
pubfunc PubFunc

valueLock sync.Mutex
valueToPublish cid.Cid
lastValuePublished cid.Cid
update chan cid.Cid
immediatePublish chan chan struct{}

ctx context.Context
cancel func()
Expand All @@ -33,47 +30,62 @@ type Republisher struct {
func NewRepublisher(ctx context.Context, pf PubFunc, tshort, tlong time.Duration) *Republisher {
ctx, cancel := context.WithCancel(ctx)
return &Republisher{
TimeoutShort: tshort,
TimeoutLong: tlong,
valueHasBeenUpdated: make(chan struct{}, 1),
pubfunc: pf,
immediatePublish: make(chan chan struct{}),
ctx: ctx,
cancel: cancel,
TimeoutShort: tshort,
TimeoutLong: tlong,
RetryTimeout: tlong,
update: make(chan cid.Cid, 1),
pubfunc: pf,
immediatePublish: make(chan chan struct{}),
ctx: ctx,
cancel: cancel,
}
}

// WaitPub waits for the current value to be published (or returns early
// if it already has).
func (rp *Republisher) WaitPub() {
func (rp *Republisher) WaitPub(ctx context.Context) error {
wait := make(chan struct{})
rp.immediatePublish <- wait
<-wait
select {
case rp.immediatePublish <- wait:
case <-ctx.Done():
return ctx.Err()
}
select {
case <-wait:
return nil
case <-ctx.Done():
return ctx.Err()
}
}

func (rp *Republisher) Close() error {
err := rp.publish(rp.ctx)
// TODO(steb): Wait for `Run` to stop
err := rp.WaitPub(rp.ctx)
rp.cancel()
return err
}

// Update the `valueToPublish` and signal it in the `valueHasBeenUpdated`
// channel. Multiple consecutive updates may extend the time period before
// the next publish occurs in order to more efficiently batch updates.
// Update the current value. The value will be published after a delay but each
// consecutive call to Update may extend this delay up to TimeoutLong.
func (rp *Republisher) Update(c cid.Cid) {
rp.valueLock.Lock()
rp.valueToPublish = c
rp.valueLock.Unlock()

select {
case rp.valueHasBeenUpdated <- struct{}{}:
default:
case <-rp.update:
select {
case rp.update <- c:
default:
// Don't try again. If we hit this case, there's a
// concurrent publish and we can safely let that
// concurrent publish win.
}
case rp.update <- c:
}
}

// Run contains the core logic of the `Republisher`. It calls the user-defined
// `pubfunc` function whenever the `Cid` value is updated. The complexity comes
// from the fact that `pubfunc` may be slow so we need to batch updates.
// `pubfunc` function whenever the `Cid` value is updated to a *new* value. The
// complexity comes from the fact that `pubfunc` may be slow so we need to batch
// updates.
//
// Algorithm:
// 1. When we receive the first update after publishing, we set a `longer` timer.
// 2. When we receive any update, we reset the `quick` timer.
Expand All @@ -83,66 +95,103 @@ func (rp *Republisher) Update(c cid.Cid) {
// The `longer` timer ensures that we delay publishing by at most
// `TimeoutLong`. The `quick` timer allows us to publish sooner if
// it looks like there are no more updates coming down the pipe.
func (rp *Republisher) Run() {
for {
//
// Note: If a publish fails, we retry repeatedly every TimeoutRetry.
func (rp *Republisher) Run(lastPublished cid.Cid) {
quick := time.NewTimer(0)
if !quick.Stop() {
<-quick.C
}
longer := time.NewTimer(0)
if !longer.Stop() {
<-longer.C
}

var toPublish cid.Cid
for rp.ctx.Err() == nil {
var waiter chan struct{}

select {
case <-rp.ctx.Done():
return
case <-rp.valueHasBeenUpdated:
// Fast timeout, a `publish` will be issued if there are
// no more updates before it expires (restarted every time
// the `valueHasBeenUpdated` is signaled).
quick := time.After(rp.TimeoutShort)
// Long timeout that guarantees a `publish` after it expires
// even if the value keeps being updated (and `quick` is
// restarted).
longer := time.After(rp.TimeoutLong)

wait:
var valueHasBeenPublished chan struct{}
case newValue := <-rp.update:
// Skip already published values.
if lastPublished.Equals(newValue) {
// Break to the end of the switch to cleanup any
// timers.
toPublish = cid.Undef
break
}

select {
case <-rp.ctx.Done():
return
case <-rp.valueHasBeenUpdated:
// The `valueToPublish` has been updated *again* since
// the last time we checked and we still haven't published
// it, restart the `quick` timer allowing for some more
// time to see if the `valueToPublish` changes again.
quick = time.After(rp.TimeoutShort)
goto wait

case <-quick:
case <-longer:
case valueHasBeenPublished = <-rp.immediatePublish:
// If we aren't already waiting to publish something,
// reset the long timeout.
if !toPublish.Defined() {
longer.Reset(rp.TimeoutLong)
}

err := rp.publish(rp.ctx)
if valueHasBeenPublished != nil {
// The user is waiting in `WaitPub` with this channel, signal
// that the `publish` has happened.
valueHasBeenPublished <- struct{}{}
// Always reset the short timeout.
quick.Reset(rp.TimeoutShort)

// Finally, set the new value to publish.
toPublish = newValue
continue
case waiter = <-rp.immediatePublish:
// Make sure to grab the *latest* value to publish.
select {
case toPublish = <-rp.update:
default:
}
if err != nil {
log.Errorf("republishRoot error: %s", err)

// Avoid publishing duplicate values
if !lastPublished.Equals(toPublish) {
toPublish = cid.Undef
}
case <-quick.C:
case <-longer.C:
}
}
}

// Wrapper function around the user-defined `pubfunc`. It publishes
// the (last) `valueToPublish` set and registers it in `lastValuePublished`.
func (rp *Republisher) publish(ctx context.Context) error {
rp.valueLock.Lock()
topub := rp.valueToPublish
rp.valueLock.Unlock()
// Cleanup, publish, and close waiters.

// 1. Stop any timers. Don't use the `if !t.Stop() { ... }`
// idiom as these timers may not be running.

quick.Stop()
select {
case <-quick.C:
default:
}

err := rp.pubfunc(ctx, topub)
if err != nil {
return err
longer.Stop()
select {
case <-longer.C:
default:
}

// 2. If we have a value to publish, publish it now.
if toPublish.Defined() {
for {
err := rp.pubfunc(rp.ctx, toPublish)
if err == nil {
break
}
// Keep retrying until we succeed or we abort.
// TODO(steb): We could try pulling new values
// off `update` but that's not critical (and
// complicates this code a bit). We'll pull off
// a new value on the next loop through.
select {
case <-time.After(rp.RetryTimeout):
case <-rp.ctx.Done():
return
}
}
lastPublished = toPublish
toPublish = cid.Undef
}

// 3. Trigger anything waiting in `WaitPub`.
if waiter != nil {
close(waiter)
}
}
rp.valueLock.Lock()
rp.lastValuePublished = topub
rp.valueLock.Unlock()
return nil
}
22 changes: 10 additions & 12 deletions repub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,16 @@ func TestRepublisher(t *testing.T) {
return nil
}

testCid1, _ := cid.Parse("QmeomffUNfmQy76CQGy9NdmqEnnHU9soCexBnGU3ezPHVH")
testCid2, _ := cid.Parse("QmeomffUNfmQy76CQGy9NdmqEnnHU9soCexBnGU3ezPHVX")

tshort := time.Millisecond * 50
tlong := time.Second / 2

rp := NewRepublisher(ctx, pf, tshort, tlong)
go rp.Run()
go rp.Run(cid.Undef)

rp.Update(cid.Undef)
rp.Update(testCid1)

// should hit short timeout
select {
Expand All @@ -42,7 +45,7 @@ func TestRepublisher(t *testing.T) {

go func() {
for {
rp.Update(cid.Undef)
rp.Update(testCid2)
time.Sleep(time.Millisecond * 10)
select {
case <-cctx.Done():
Expand All @@ -65,13 +68,8 @@ func TestRepublisher(t *testing.T) {

cancel()

go func() {
err := rp.Close()
if err != nil {
t.Fatal(err)
}
}()

// final pub from closing
<-pub
err := rp.Close()
if err != nil {
t.Fatal(err)
}
}
3 changes: 1 addition & 2 deletions root.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,10 @@ func NewRoot(parent context.Context, ds ipld.DAGService, node *dag.ProtoNode, pf
if pf != nil {
repub = NewRepublisher(parent, pf, time.Millisecond*300, time.Second*3)

repub.valueToPublish = node.Cid()
// No need to take the lock here since we just created
// the `Republisher` and no one has access to it yet.

go repub.Run()
go repub.Run(node.Cid())
}

root := &Root{
Expand Down

0 comments on commit 996259a

Please sign in to comment.