diff --git a/ops.go b/ops.go index dc3da4c..031a77d 100644 --- a/ops.go +++ b/ops.go @@ -1,6 +1,7 @@ package mfs import ( + "context" "fmt" "os" gopath "path" @@ -235,6 +236,6 @@ func FlushPath(rt *Root, pth string) error { return err } - rt.repub.WaitPub() + rt.repub.WaitPub(context.TODO()) return nil } diff --git a/repub.go b/repub.go index 12738fa..de1a3c5 100644 --- a/repub.go +++ b/repub.go @@ -2,7 +2,6 @@ package mfs import ( "context" - "sync" "time" cid "github.com/ipfs/go-cid" @@ -14,15 +13,13 @@ type PubFunc func(context.Context, cid.Cid) error // Republisher manages when to publish a given entry. type Republisher struct { - TimeoutLong time.Duration - TimeoutShort time.Duration - valueHasBeenUpdated chan struct{} - pubfunc PubFunc - immediatePublish chan chan struct{} + TimeoutLong time.Duration + TimeoutShort time.Duration + RetryTimeout time.Duration + pubfunc PubFunc - valueLock sync.Mutex - valueToPublish cid.Cid - lastValuePublished cid.Cid + update chan cid.Cid + immediatePublish chan chan struct{} ctx context.Context cancel func() @@ -33,47 +30,62 @@ type Republisher struct { func NewRepublisher(ctx context.Context, pf PubFunc, tshort, tlong time.Duration) *Republisher { ctx, cancel := context.WithCancel(ctx) return &Republisher{ - TimeoutShort: tshort, - TimeoutLong: tlong, - valueHasBeenUpdated: make(chan struct{}, 1), - pubfunc: pf, - immediatePublish: make(chan chan struct{}), - ctx: ctx, - cancel: cancel, + TimeoutShort: tshort, + TimeoutLong: tlong, + RetryTimeout: tlong, + update: make(chan cid.Cid, 1), + pubfunc: pf, + immediatePublish: make(chan chan struct{}), + ctx: ctx, + cancel: cancel, } } // WaitPub waits for the current value to be published (or returns early // if it already has). -func (rp *Republisher) WaitPub() { +func (rp *Republisher) WaitPub(ctx context.Context) error { wait := make(chan struct{}) - rp.immediatePublish <- wait - <-wait + select { + case rp.immediatePublish <- wait: + case <-ctx.Done(): + return ctx.Err() + } + select { + case <-wait: + return nil + case <-ctx.Done(): + return ctx.Err() + } } func (rp *Republisher) Close() error { - err := rp.publish(rp.ctx) + // TODO(steb): Wait for `Run` to stop + err := rp.WaitPub(rp.ctx) rp.cancel() return err } -// Update the `valueToPublish` and signal it in the `valueHasBeenUpdated` -// channel. Multiple consecutive updates may extend the time period before -// the next publish occurs in order to more efficiently batch updates. +// Update the current value. The value will be published after a delay but each +// consecutive call to Update may extend this delay up to TimeoutLong. func (rp *Republisher) Update(c cid.Cid) { - rp.valueLock.Lock() - rp.valueToPublish = c - rp.valueLock.Unlock() - select { - case rp.valueHasBeenUpdated <- struct{}{}: - default: + case <-rp.update: + select { + case rp.update <- c: + default: + // Don't try again. If we hit this case, there's a + // concurrent publish and we can safely let that + // concurrent publish win. + } + case rp.update <- c: } } // Run contains the core logic of the `Republisher`. It calls the user-defined -// `pubfunc` function whenever the `Cid` value is updated. The complexity comes -// from the fact that `pubfunc` may be slow so we need to batch updates. +// `pubfunc` function whenever the `Cid` value is updated to a *new* value. The +// complexity comes from the fact that `pubfunc` may be slow so we need to batch +// updates. +// // Algorithm: // 1. When we receive the first update after publishing, we set a `longer` timer. // 2. When we receive any update, we reset the `quick` timer. @@ -83,66 +95,103 @@ func (rp *Republisher) Update(c cid.Cid) { // The `longer` timer ensures that we delay publishing by at most // `TimeoutLong`. The `quick` timer allows us to publish sooner if // it looks like there are no more updates coming down the pipe. -func (rp *Republisher) Run() { - for { +// +// Note: If a publish fails, we retry repeatedly every TimeoutRetry. +func (rp *Republisher) Run(lastPublished cid.Cid) { + quick := time.NewTimer(0) + if !quick.Stop() { + <-quick.C + } + longer := time.NewTimer(0) + if !longer.Stop() { + <-longer.C + } + + var toPublish cid.Cid + for rp.ctx.Err() == nil { + var waiter chan struct{} + select { case <-rp.ctx.Done(): return - case <-rp.valueHasBeenUpdated: - // Fast timeout, a `publish` will be issued if there are - // no more updates before it expires (restarted every time - // the `valueHasBeenUpdated` is signaled). - quick := time.After(rp.TimeoutShort) - // Long timeout that guarantees a `publish` after it expires - // even if the value keeps being updated (and `quick` is - // restarted). - longer := time.After(rp.TimeoutLong) - - wait: - var valueHasBeenPublished chan struct{} + case newValue := <-rp.update: + // Skip already published values. + if lastPublished.Equals(newValue) { + // Break to the end of the switch to cleanup any + // timers. + toPublish = cid.Undef + break + } - select { - case <-rp.ctx.Done(): - return - case <-rp.valueHasBeenUpdated: - // The `valueToPublish` has been updated *again* since - // the last time we checked and we still haven't published - // it, restart the `quick` timer allowing for some more - // time to see if the `valueToPublish` changes again. - quick = time.After(rp.TimeoutShort) - goto wait - - case <-quick: - case <-longer: - case valueHasBeenPublished = <-rp.immediatePublish: + // If we aren't already waiting to publish something, + // reset the long timeout. + if !toPublish.Defined() { + longer.Reset(rp.TimeoutLong) } - err := rp.publish(rp.ctx) - if valueHasBeenPublished != nil { - // The user is waiting in `WaitPub` with this channel, signal - // that the `publish` has happened. - valueHasBeenPublished <- struct{}{} + // Always reset the short timeout. + quick.Reset(rp.TimeoutShort) + + // Finally, set the new value to publish. + toPublish = newValue + continue + case waiter = <-rp.immediatePublish: + // Make sure to grab the *latest* value to publish. + select { + case toPublish = <-rp.update: + default: } - if err != nil { - log.Errorf("republishRoot error: %s", err) + + // Avoid publishing duplicate values + if !lastPublished.Equals(toPublish) { + toPublish = cid.Undef } + case <-quick.C: + case <-longer.C: } - } -} -// Wrapper function around the user-defined `pubfunc`. It publishes -// the (last) `valueToPublish` set and registers it in `lastValuePublished`. -func (rp *Republisher) publish(ctx context.Context) error { - rp.valueLock.Lock() - topub := rp.valueToPublish - rp.valueLock.Unlock() + // Cleanup, publish, and close waiters. + + // 1. Stop any timers. Don't use the `if !t.Stop() { ... }` + // idiom as these timers may not be running. + + quick.Stop() + select { + case <-quick.C: + default: + } - err := rp.pubfunc(ctx, topub) - if err != nil { - return err + longer.Stop() + select { + case <-longer.C: + default: + } + + // 2. If we have a value to publish, publish it now. + if toPublish.Defined() { + for { + err := rp.pubfunc(rp.ctx, toPublish) + if err == nil { + break + } + // Keep retrying until we succeed or we abort. + // TODO(steb): We could try pulling new values + // off `update` but that's not critical (and + // complicates this code a bit). We'll pull off + // a new value on the next loop through. + select { + case <-time.After(rp.RetryTimeout): + case <-rp.ctx.Done(): + return + } + } + lastPublished = toPublish + toPublish = cid.Undef + } + + // 3. Trigger anything waiting in `WaitPub`. + if waiter != nil { + close(waiter) + } } - rp.valueLock.Lock() - rp.lastValuePublished = topub - rp.valueLock.Unlock() - return nil } diff --git a/repub_test.go b/repub_test.go index d81ffd0..3a7eaaf 100644 --- a/repub_test.go +++ b/repub_test.go @@ -23,13 +23,16 @@ func TestRepublisher(t *testing.T) { return nil } + testCid1, _ := cid.Parse("QmeomffUNfmQy76CQGy9NdmqEnnHU9soCexBnGU3ezPHVH") + testCid2, _ := cid.Parse("QmeomffUNfmQy76CQGy9NdmqEnnHU9soCexBnGU3ezPHVX") + tshort := time.Millisecond * 50 tlong := time.Second / 2 rp := NewRepublisher(ctx, pf, tshort, tlong) - go rp.Run() + go rp.Run(cid.Undef) - rp.Update(cid.Undef) + rp.Update(testCid1) // should hit short timeout select { @@ -42,7 +45,7 @@ func TestRepublisher(t *testing.T) { go func() { for { - rp.Update(cid.Undef) + rp.Update(testCid2) time.Sleep(time.Millisecond * 10) select { case <-cctx.Done(): @@ -65,13 +68,8 @@ func TestRepublisher(t *testing.T) { cancel() - go func() { - err := rp.Close() - if err != nil { - t.Fatal(err) - } - }() - - // final pub from closing - <-pub + err := rp.Close() + if err != nil { + t.Fatal(err) + } } diff --git a/root.go b/root.go index ef1d9bc..026a320 100644 --- a/root.go +++ b/root.go @@ -99,11 +99,10 @@ func NewRoot(parent context.Context, ds ipld.DAGService, node *dag.ProtoNode, pf if pf != nil { repub = NewRepublisher(parent, pf, time.Millisecond*300, time.Second*3) - repub.valueToPublish = node.Cid() // No need to take the lock here since we just created // the `Republisher` and no one has access to it yet. - go repub.Run() + go repub.Run(node.Cid()) } root := &Root{