Skip to content
This repository has been archived by the owner on Jun 26, 2023. It is now read-only.

Commit

Permalink
fix over-wait in WaitPub
Browse files Browse the repository at this point in the history
Before, WaitPub could wait forever if a value was published at the same time as
the call to WaitPub.

This patch also avoids republishing the same value multiple times and allows
setting an initial value without reaching in and modifying internal state.

fixes #38
  • Loading branch information
Stebalien committed Jan 12, 2019
1 parent 6c00361 commit 740d058
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 96 deletions.
3 changes: 2 additions & 1 deletion ops.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package mfs

import (
"context"
"fmt"
"os"
gopath "path"
Expand Down Expand Up @@ -235,6 +236,6 @@ func FlushPath(rt *Root, pth string) error {
return err
}

rt.repub.WaitPub()
rt.repub.WaitPub(context.TODO())
return nil
}
211 changes: 130 additions & 81 deletions repub.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package mfs

import (
"context"
"sync"
"time"

cid "github.com/ipfs/go-cid"
Expand All @@ -14,15 +13,13 @@ type PubFunc func(context.Context, cid.Cid) error

// Republisher manages when to publish a given entry.
type Republisher struct {
TimeoutLong time.Duration
TimeoutShort time.Duration
valueHasBeenUpdated chan struct{}
pubfunc PubFunc
immediatePublish chan chan struct{}
TimeoutLong time.Duration
TimeoutShort time.Duration
RetryTimeout time.Duration
pubfunc PubFunc

valueLock sync.Mutex
valueToPublish cid.Cid
lastValuePublished cid.Cid
update chan cid.Cid
immediatePublish chan chan struct{}

ctx context.Context
cancel func()
Expand All @@ -33,47 +30,62 @@ type Republisher struct {
func NewRepublisher(ctx context.Context, pf PubFunc, tshort, tlong time.Duration) *Republisher {
ctx, cancel := context.WithCancel(ctx)
return &Republisher{
TimeoutShort: tshort,
TimeoutLong: tlong,
valueHasBeenUpdated: make(chan struct{}, 1),
pubfunc: pf,
immediatePublish: make(chan chan struct{}),
ctx: ctx,
cancel: cancel,
TimeoutShort: tshort,
TimeoutLong: tlong,
RetryTimeout: tlong,
update: make(chan cid.Cid, 1),
pubfunc: pf,
immediatePublish: make(chan chan struct{}),
ctx: ctx,
cancel: cancel,
}
}

// WaitPub waits for the current value to be published (or returns early
// if it already has).
func (rp *Republisher) WaitPub() {
func (rp *Republisher) WaitPub(ctx context.Context) error {
wait := make(chan struct{})
rp.immediatePublish <- wait
<-wait
select {
case rp.immediatePublish <- wait:
case <-ctx.Done():
return ctx.Err()
}
select {
case <-wait:
return nil
case <-ctx.Done():
return ctx.Err()
}
}

func (rp *Republisher) Close() error {
err := rp.publish(rp.ctx)
// TODO(steb): Wait for `Run` to stop
err := rp.WaitPub(rp.ctx)
rp.cancel()
return err
}

// Update the `valueToPublish` and signal it in the `valueHasBeenUpdated`
// channel. Multiple consecutive updates may extend the time period before
// the next publish occurs in order to more efficiently batch updates.
// Update the current value. The value will be published after a delay but each
// consecutive call to Update may extend this delay up to TimeoutLong.
func (rp *Republisher) Update(c cid.Cid) {
rp.valueLock.Lock()
rp.valueToPublish = c
rp.valueLock.Unlock()

select {
case rp.valueHasBeenUpdated <- struct{}{}:
default:
case <-rp.update:
select {
case rp.update <- c:
default:
// Don't try again. If we hit this case, there's a
// concurrent publish and we can safely let that
// concurrent publish win.
}
case rp.update <- c:
}
}

// Run contains the core logic of the `Republisher`. It calls the user-defined
// `pubfunc` function whenever the `Cid` value is updated. The complexity comes
// from the fact that `pubfunc` may be slow so we need to batch updates.
// `pubfunc` function whenever the `Cid` value is updated to a *new* value. The
// complexity comes from the fact that `pubfunc` may be slow so we need to batch
// updates.
//
// Algorithm:
// 1. When we receive the first update after publishing, we set a `longer` timer.
// 2. When we receive any update, we reset the `quick` timer.
Expand All @@ -83,66 +95,103 @@ func (rp *Republisher) Update(c cid.Cid) {
// The `longer` timer ensures that we delay publishing by at most
// `TimeoutLong`. The `quick` timer allows us to publish sooner if
// it looks like there are no more updates coming down the pipe.
func (rp *Republisher) Run() {
for {
//
// Note: If a publish fails, we retry repeatedly every TimeoutRetry.
func (rp *Republisher) Run(lastPublished cid.Cid) {
quick := time.NewTimer(0)
if !quick.Stop() {
<-quick.C
}
longer := time.NewTimer(0)
if !longer.Stop() {
<-longer.C
}

var toPublish cid.Cid
for rp.ctx.Err() == nil {
var waiter chan struct{}

select {
case <-rp.ctx.Done():
return
case <-rp.valueHasBeenUpdated:
// Fast timeout, a `publish` will be issued if there are
// no more updates before it expires (restarted every time
// the `valueHasBeenUpdated` is signaled).
quick := time.After(rp.TimeoutShort)
// Long timeout that guarantees a `publish` after it expires
// even if the value keeps being updated (and `quick` is
// restarted).
longer := time.After(rp.TimeoutLong)

wait:
var valueHasBeenPublished chan struct{}
case newValue := <-rp.update:
// Skip already published values.
if lastPublished.Equals(newValue) {
// Break to the end of the switch to cleanup any
// timers.
toPublish = cid.Undef
break
}

select {
case <-rp.ctx.Done():
return
case <-rp.valueHasBeenUpdated:
// The `valueToPublish` has been updated *again* since
// the last time we checked and we still haven't published
// it, restart the `quick` timer allowing for some more
// time to see if the `valueToPublish` changes again.
quick = time.After(rp.TimeoutShort)
goto wait

case <-quick:
case <-longer:
case valueHasBeenPublished = <-rp.immediatePublish:
// If we aren't already waiting to publish something,
// reset the long timeout.
if !toPublish.Defined() {
longer.Reset(rp.TimeoutLong)
}

err := rp.publish(rp.ctx)
if valueHasBeenPublished != nil {
// The user is waiting in `WaitPub` with this channel, signal
// that the `publish` has happened.
valueHasBeenPublished <- struct{}{}
// Always reset the short timeout.
quick.Reset(rp.TimeoutShort)

// Finally, set the new value to publish.
toPublish = newValue
continue
case waiter = <-rp.immediatePublish:
// Make sure to grab the *latest* value to publish.
select {
case toPublish = <-rp.update:
default:
}
if err != nil {
log.Errorf("republishRoot error: %s", err)

// Avoid publishing duplicate values
if !lastPublished.Equals(toPublish) {
toPublish = cid.Undef
}
case <-quick.C:
case <-longer.C:
}
}
}

// Wrapper function around the user-defined `pubfunc`. It publishes
// the (last) `valueToPublish` set and registers it in `lastValuePublished`.
func (rp *Republisher) publish(ctx context.Context) error {
rp.valueLock.Lock()
topub := rp.valueToPublish
rp.valueLock.Unlock()
// Cleanup, publish, and close waiters.

// 1. Stop any timers. Don't use the `if !t.Stop() { ... }`
// idiom as these timers may not be running.

quick.Stop()
select {
case <-quick.C:
default:
}

err := rp.pubfunc(ctx, topub)
if err != nil {
return err
longer.Stop()
select {
case <-longer.C:
default:
}

// 2. If we have a value to publish, publish it now.
if toPublish.Defined() {
for {
err := rp.pubfunc(rp.ctx, toPublish)
if err == nil {
break
}
// Keep retrying until we succeed or we abort.
// TODO(steb): We could try pulling new values
// off `update` but that's not critical (and
// complicates this code a bit). We'll pull off
// a new value on the next loop through.
select {
case <-time.After(rp.RetryTimeout):
case <-rp.ctx.Done():
return
}
}
lastPublished = toPublish
toPublish = cid.Undef
}

// 3. Trigger anything waiting in `WaitPub`.
if waiter != nil {
close(waiter)
}
}
rp.valueLock.Lock()
rp.lastValuePublished = topub
rp.valueLock.Unlock()
return nil
}
22 changes: 10 additions & 12 deletions repub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,16 @@ func TestRepublisher(t *testing.T) {
return nil
}

testCid1, _ := cid.Parse("QmeomffUNfmQy76CQGy9NdmqEnnHU9soCexBnGU3ezPHVH")
testCid2, _ := cid.Parse("QmeomffUNfmQy76CQGy9NdmqEnnHU9soCexBnGU3ezPHVX")

tshort := time.Millisecond * 50
tlong := time.Second / 2

rp := NewRepublisher(ctx, pf, tshort, tlong)
go rp.Run()
go rp.Run(cid.Undef)

rp.Update(cid.Undef)
rp.Update(testCid1)

// should hit short timeout
select {
Expand All @@ -42,7 +45,7 @@ func TestRepublisher(t *testing.T) {

go func() {
for {
rp.Update(cid.Undef)
rp.Update(testCid2)
time.Sleep(time.Millisecond * 10)
select {
case <-cctx.Done():
Expand All @@ -65,13 +68,8 @@ func TestRepublisher(t *testing.T) {

cancel()

go func() {
err := rp.Close()
if err != nil {
t.Fatal(err)
}
}()

// final pub from closing
<-pub
err := rp.Close()
if err != nil {
t.Fatal(err)
}
}
3 changes: 1 addition & 2 deletions root.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,10 @@ func NewRoot(parent context.Context, ds ipld.DAGService, node *dag.ProtoNode, pf
if pf != nil {
repub = NewRepublisher(parent, pf, time.Millisecond*300, time.Second*3)

repub.valueToPublish = node.Cid()
// No need to take the lock here since we just created
// the `Republisher` and no one has access to it yet.

go repub.Run()
go repub.Run(node.Cid())
}

root := &Root{
Expand Down

0 comments on commit 740d058

Please sign in to comment.