From 7eb9bec13f6d351e192b0e8319ccf1bf0c3c077e Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Fri, 18 Sep 2020 18:03:59 +0200 Subject: [PATCH] feat: dont recompute post on submit redux --- storage/wdpost_changehandler.go | 533 ++++++++++++ storage/wdpost_changehandler_test.go | 1173 ++++++++++++++++++++++++++ storage/wdpost_nextdl_test.go | 38 + storage/wdpost_run.go | 187 ++-- storage/wdpost_run_test.go | 6 +- storage/wdpost_sched.go | 142 +--- 6 files changed, 1910 insertions(+), 169 deletions(-) create mode 100644 storage/wdpost_changehandler.go create mode 100644 storage/wdpost_changehandler_test.go create mode 100644 storage/wdpost_nextdl_test.go diff --git a/storage/wdpost_changehandler.go b/storage/wdpost_changehandler.go new file mode 100644 index 00000000000..e65b7a7fc9d --- /dev/null +++ b/storage/wdpost_changehandler.go @@ -0,0 +1,533 @@ +package storage + +import ( + "context" + "sync" + + "github.com/filecoin-project/go-state-types/abi" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/specs-actors/actors/builtin/miner" + + "github.com/filecoin-project/go-state-types/dline" + "github.com/filecoin-project/lotus/chain/types" +) + +const SubmitConfidence = 4 + +type CompleteGeneratePoSTCb func(posts []miner.SubmitWindowedPoStParams, err error) +type CompleteSubmitPoSTCb func(err error) + +type changeHandlerAPI interface { + StateMinerProvingDeadline(context.Context, address.Address, types.TipSetKey) (*dline.Info, error) + startGeneratePoST(ctx context.Context, ts *types.TipSet, deadline *dline.Info, onComplete CompleteGeneratePoSTCb) context.CancelFunc + startSubmitPoST(ctx context.Context, ts *types.TipSet, deadline *dline.Info, posts []miner.SubmitWindowedPoStParams, onComplete CompleteSubmitPoSTCb) context.CancelFunc + onAbort(ts *types.TipSet, deadline *dline.Info) + failPost(err error, ts *types.TipSet, deadline *dline.Info) +} + +type changeHandler struct { + api changeHandlerAPI + actor address.Address + proveHdlr *proveHandler + submitHdlr *submitHandler +} + +func newChangeHandler(api changeHandlerAPI, actor address.Address) *changeHandler { + posts := newPostsCache() + p := newProver(api, posts) + s := newSubmitter(api, posts) + return &changeHandler{api: api, actor: actor, proveHdlr: p, submitHdlr: s} +} + +func (ch *changeHandler) start() { + go ch.proveHdlr.run() + go ch.submitHdlr.run() +} + +func (ch *changeHandler) update(ctx context.Context, revert *types.TipSet, advance *types.TipSet) error { + // Get the current deadline period + di, err := ch.api.StateMinerProvingDeadline(ctx, ch.actor, advance.Key()) + if err != nil { + return err + } + + if !di.PeriodStarted() { + return nil // not proving anything yet + } + + hc := &headChange{ + ctx: ctx, + revert: revert, + advance: advance, + di: di, + } + + select { + case ch.proveHdlr.hcs <- hc: + case <-ch.proveHdlr.shutdownCtx.Done(): + case <-ctx.Done(): + } + + select { + case ch.submitHdlr.hcs <- hc: + case <-ch.submitHdlr.shutdownCtx.Done(): + case <-ctx.Done(): + } + + return nil +} + +func (ch *changeHandler) shutdown() { + ch.proveHdlr.shutdown() + ch.submitHdlr.shutdown() +} + +func (ch *changeHandler) currentTSDI() (*types.TipSet, *dline.Info) { + return ch.submitHdlr.currentTSDI() +} + +// postsCache keeps a cache of PoSTs for each proving window +type postsCache struct { + added chan *postInfo + lk sync.RWMutex + cache map[abi.ChainEpoch][]miner.SubmitWindowedPoStParams +} + +func newPostsCache() *postsCache { + return &postsCache{ + added: make(chan *postInfo, 16), + cache: make(map[abi.ChainEpoch][]miner.SubmitWindowedPoStParams), + } +} + +func (c *postsCache) add(di *dline.Info, posts []miner.SubmitWindowedPoStParams) { + c.lk.Lock() + defer c.lk.Unlock() + + // TODO: clear cache entries older than chain finality + c.cache[di.Open] = posts + + c.added <- &postInfo{ + di: di, + posts: posts, + } +} + +func (c *postsCache) get(di *dline.Info) ([]miner.SubmitWindowedPoStParams, bool) { + c.lk.RLock() + defer c.lk.RUnlock() + + posts, ok := c.cache[di.Open] + return posts, ok +} + +type headChange struct { + ctx context.Context + revert *types.TipSet + advance *types.TipSet + di *dline.Info +} + +type currentPost struct { + di *dline.Info + abort context.CancelFunc +} + +type postResult struct { + ts *types.TipSet + currPost *currentPost + posts []miner.SubmitWindowedPoStParams + err error +} + +// proveHandler generates proofs +type proveHandler struct { + api changeHandlerAPI + posts *postsCache + + postResults chan *postResult + hcs chan *headChange + + current *currentPost + + shutdownCtx context.Context + shutdown context.CancelFunc + + // Used for testing + processedHeadChanges chan *headChange + processedPostResults chan *postResult +} + +func newProver( + api changeHandlerAPI, + posts *postsCache, +) *proveHandler { + ctx, cancel := context.WithCancel(context.Background()) + return &proveHandler{ + api: api, + posts: posts, + postResults: make(chan *postResult), + hcs: make(chan *headChange), + shutdownCtx: ctx, + shutdown: cancel, + } +} + +func (p *proveHandler) run() { + // Abort proving on shutdown + defer func() { + if p.current != nil { + p.current.abort() + } + }() + + for p.shutdownCtx.Err() == nil { + select { + case <-p.shutdownCtx.Done(): + return + + case hc := <-p.hcs: + // Head changed + p.processHeadChange(hc.ctx, hc.advance, hc.di) + if p.processedHeadChanges != nil { + p.processedHeadChanges <- hc + } + + case res := <-p.postResults: + // Proof generation complete + p.processPostResult(res) + if p.processedPostResults != nil { + p.processedPostResults <- res + } + } + } +} + +func (p *proveHandler) processHeadChange(ctx context.Context, newTS *types.TipSet, di *dline.Info) { + // If the post window has expired, abort the current proof + if p.current != nil && newTS.Height() >= p.current.di.Close { + // Cancel the context on the current proof + p.current.abort() + + // Clear out the reference to the proof so that we can immediately + // start generating a new proof, without having to worry about state + // getting clobbered when the abort completes + p.current = nil + } + + // Only generate one proof at a time + if p.current != nil { + return + } + + // If the proof for the current post window has been generated, check the + // next post window + _, complete := p.posts.get(di) + for complete { + di = nextDeadline(di) + _, complete = p.posts.get(di) + } + + // Check if the chain is above the Challenge height for the post window + if newTS.Height() < di.Challenge { + return + } + + p.current = ¤tPost{di: di} + curr := p.current + p.current.abort = p.api.startGeneratePoST(ctx, newTS, di, func(posts []miner.SubmitWindowedPoStParams, err error) { + p.postResults <- &postResult{ts: newTS, currPost: curr, posts: posts, err: err} + }) +} + +func (p *proveHandler) processPostResult(res *postResult) { + di := res.currPost.di + if res.err != nil { + // Proving failed so inform the API + p.api.failPost(res.err, res.ts, di) + log.Warnf("Aborted window post Proving (Deadline: %+v)", di) + p.api.onAbort(res.ts, di) + + // Check if the current post has already been aborted + if p.current == res.currPost { + // If the current post was not already aborted, setting it to nil + // marks it as complete so that a new post can be started + p.current = nil + } + return + } + + // Completed processing this proving window + p.current = nil + + // Add the proofs to the cache + p.posts.add(di, res.posts) +} + +type submitResult struct { + pw *postWindow + err error +} + +type SubmitState string + +const ( + SubmitStateStart SubmitState = "SubmitStateStart" + SubmitStateSubmitting SubmitState = "SubmitStateSubmitting" + SubmitStateComplete SubmitState = "SubmitStateComplete" +) + +type postWindow struct { + ts *types.TipSet + di *dline.Info + submitState SubmitState + abort context.CancelFunc +} + +type postInfo struct { + di *dline.Info + posts []miner.SubmitWindowedPoStParams +} + +// submitHandler submits proofs on-chain +type submitHandler struct { + api changeHandlerAPI + posts *postsCache + + submitResults chan *submitResult + hcs chan *headChange + + postWindows map[abi.ChainEpoch]*postWindow + getPostWindowReqs chan *getPWReq + + shutdownCtx context.Context + shutdown context.CancelFunc + + currentCtx context.Context + currentTS *types.TipSet + currentDI *dline.Info + getTSDIReq chan chan *tsdi + + // Used for testing + processedHeadChanges chan *headChange + processedSubmitResults chan *submitResult + processedPostReady chan *postInfo +} + +func newSubmitter( + api changeHandlerAPI, + posts *postsCache, +) *submitHandler { + ctx, cancel := context.WithCancel(context.Background()) + return &submitHandler{ + api: api, + posts: posts, + submitResults: make(chan *submitResult), + hcs: make(chan *headChange), + postWindows: make(map[abi.ChainEpoch]*postWindow), + getPostWindowReqs: make(chan *getPWReq), + getTSDIReq: make(chan chan *tsdi), + shutdownCtx: ctx, + shutdown: cancel, + } +} + +func (s *submitHandler) run() { + // On shutdown, abort in-progress submits + defer func() { + for _, pw := range s.postWindows { + if pw.abort != nil { + pw.abort() + } + } + }() + + for s.shutdownCtx.Err() == nil { + select { + case <-s.shutdownCtx.Done(): + return + + case hc := <-s.hcs: + // Head change + s.processHeadChange(hc.ctx, hc.revert, hc.advance, hc.di) + if s.processedHeadChanges != nil { + s.processedHeadChanges <- hc + } + + case pi := <-s.posts.added: + // Proof generated + s.processPostReady(pi) + if s.processedPostReady != nil { + s.processedPostReady <- pi + } + + case res := <-s.submitResults: + // Submit complete + s.processSubmitResult(res) + if s.processedSubmitResults != nil { + s.processedSubmitResults <- res + } + + case pwreq := <-s.getPostWindowReqs: + // used by getPostWindow() to sync with run loop + pwreq.out <- s.postWindows[pwreq.di.Open] + + case out := <-s.getTSDIReq: + // used by currentTSDI() to sync with run loop + out <- &tsdi{ts: s.currentTS, di: s.currentDI} + } + } +} + +// processHeadChange is called when the chain head changes +func (s *submitHandler) processHeadChange(ctx context.Context, revert *types.TipSet, advance *types.TipSet, di *dline.Info) { + s.currentCtx = ctx + s.currentTS = advance + s.currentDI = di + + // Start tracking the current post window if we're not already + // TODO: clear post windows older than chain finality + if _, ok := s.postWindows[di.Open]; !ok { + s.postWindows[di.Open] = &postWindow{ + di: di, + ts: advance, + submitState: SubmitStateStart, + } + } + + // Apply the change to all post windows + for _, pw := range s.postWindows { + s.processHeadChangeForPW(ctx, revert, advance, pw) + } +} + +func (s *submitHandler) processHeadChangeForPW(ctx context.Context, revert *types.TipSet, advance *types.TipSet, pw *postWindow) { + revertedToPrevDL := revert != nil && revert.Height() < pw.di.Open + expired := advance.Height() >= pw.di.Close + + // If the chain was reverted back to the previous deadline, or if the post + // window has expired, abort submit + if pw.submitState == SubmitStateSubmitting && (revertedToPrevDL || expired) { + // Replace the aborted postWindow with a new one so that we can + // submit again at any time without the state getting clobbered + // when the abort completes + abort := pw.abort + if abort != nil { + pw = &postWindow{ + di: pw.di, + ts: advance, + submitState: SubmitStateStart, + } + s.postWindows[pw.di.Open] = pw + + // Abort the current submit + abort() + } + } else if pw.submitState == SubmitStateComplete && revertedToPrevDL { + // If submit for this deadline has completed, but the chain was + // reverted back to the previous deadline, reset the submit state to the + // starting state, so that it can be resubmitted + pw.submitState = SubmitStateStart + } + + // Submit the proof to chain if the proof has been generated and the chain + // height is above confidence + s.submitIfReady(ctx, advance, pw) +} + +// processPostReady is called when a proof generation completes +func (s *submitHandler) processPostReady(pi *postInfo) { + pw, ok := s.postWindows[pi.di.Open] + if ok { + s.submitIfReady(s.currentCtx, s.currentTS, pw) + } +} + +// submitIfReady submits a proof if the chain is high enough and the proof +// has been generated for this deadline +func (s *submitHandler) submitIfReady(ctx context.Context, advance *types.TipSet, pw *postWindow) { + // If the window has expired, there's nothing more to do. + if advance.Height() >= pw.di.Close { + return + } + + // Check if we're already submitting, or already completed submit + if pw.submitState != SubmitStateStart { + return + } + + // Check if we've reached the confidence height to submit + if advance.Height() < pw.di.Open+SubmitConfidence { + return + } + + // Check if the proofs have been generated for this deadline + posts, ok := s.posts.get(pw.di) + if !ok { + return + } + + // If there was nothing to prove, move straight to the complete state + if len(posts) == 0 { + pw.submitState = SubmitStateComplete + return + } + + // Start submitting post + pw.submitState = SubmitStateSubmitting + pw.abort = s.api.startSubmitPoST(ctx, advance, pw.di, posts, func(err error) { + s.submitResults <- &submitResult{pw: pw, err: err} + }) +} + +// processSubmitResult is called with the response to a submit +func (s *submitHandler) processSubmitResult(res *submitResult) { + if res.err != nil { + // Submit failed so inform the API and go back to the start state + s.api.failPost(res.err, res.pw.ts, res.pw.di) + log.Warnf("Aborted window post Submitting (Deadline: %+v)", res.pw.di) + s.api.onAbort(res.pw.ts, res.pw.di) + + res.pw.submitState = SubmitStateStart + return + } + + // Submit succeeded so move to complete state + res.pw.submitState = SubmitStateComplete +} + +type tsdi struct { + ts *types.TipSet + di *dline.Info +} + +func (s *submitHandler) currentTSDI() (*types.TipSet, *dline.Info) { + out := make(chan *tsdi) + s.getTSDIReq <- out + res := <-out + return res.ts, res.di +} + +type getPWReq struct { + di *dline.Info + out chan *postWindow +} + +func (s *submitHandler) getPostWindow(di *dline.Info) *postWindow { + out := make(chan *postWindow) + s.getPostWindowReqs <- &getPWReq{di: di, out: out} + return <-out +} + +// nextDeadline gets deadline info for the subsequent deadline +func nextDeadline(currentDeadline *dline.Info) *dline.Info { + periodStart := currentDeadline.PeriodStart + newDeadline := currentDeadline.Index + 1 + if newDeadline == miner.WPoStPeriodDeadlines { + newDeadline = 0 + periodStart = periodStart + miner.WPoStProvingPeriod + } + + return miner.NewDeadlineInfo(periodStart, newDeadline, currentDeadline.CurrentEpoch) +} diff --git a/storage/wdpost_changehandler_test.go b/storage/wdpost_changehandler_test.go new file mode 100644 index 00000000000..d2a4779e60a --- /dev/null +++ b/storage/wdpost_changehandler_test.go @@ -0,0 +1,1173 @@ +package storage + +import ( + "context" + "fmt" + "sync" + "testing" + "time" + + tutils "github.com/filecoin-project/specs-actors/support/testing" + + "github.com/filecoin-project/go-state-types/crypto" + + "github.com/ipfs/go-cid" + "github.com/stretchr/testify/require" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/go-state-types/dline" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/specs-actors/actors/builtin/miner" +) + +var dummyCid cid.Cid + +func init() { + dummyCid, _ = cid.Parse("bafkqaaa") +} + +type proveRes struct { + posts []miner.SubmitWindowedPoStParams + err error +} + +type postStatus string + +const ( + postStatusStart postStatus = "postStatusStart" + postStatusProving postStatus = "postStatusProving" + postStatusComplete postStatus = "postStatusComplete" +) + +type mockAPI struct { + ch *changeHandler + deadline *dline.Info + proveResult chan *proveRes + submitResult chan error + onStateChange chan struct{} + + tsLock sync.RWMutex + ts map[types.TipSetKey]*types.TipSet + + abortCalledLock sync.RWMutex + abortCalled bool + + statesLk sync.RWMutex + postStates map[abi.ChainEpoch]postStatus +} + +func newMockAPI() *mockAPI { + return &mockAPI{ + proveResult: make(chan *proveRes), + onStateChange: make(chan struct{}), + submitResult: make(chan error), + postStates: make(map[abi.ChainEpoch]postStatus), + ts: make(map[types.TipSetKey]*types.TipSet), + } +} + +func (m *mockAPI) makeTs(t *testing.T, h abi.ChainEpoch) *types.TipSet { + m.tsLock.Lock() + defer m.tsLock.Unlock() + + ts := makeTs(t, h) + m.ts[ts.Key()] = ts + return ts +} + +func (m *mockAPI) setDeadline(di *dline.Info) { + m.tsLock.Lock() + defer m.tsLock.Unlock() + + m.deadline = di +} + +func (m *mockAPI) getDeadline(currentEpoch abi.ChainEpoch) *dline.Info { + close := miner.WPoStChallengeWindow - 1 + dlIdx := uint64(0) + for close < currentEpoch { + close += miner.WPoStChallengeWindow + dlIdx++ + } + return miner.NewDeadlineInfo(0, dlIdx, currentEpoch) +} + +func (m *mockAPI) StateMinerProvingDeadline(ctx context.Context, address address.Address, key types.TipSetKey) (*dline.Info, error) { + m.tsLock.RLock() + defer m.tsLock.RUnlock() + + ts, ok := m.ts[key] + if !ok { + panic(fmt.Sprintf("unexpected tipset key %s", key)) + } + + if m.deadline != nil { + m.deadline.CurrentEpoch = ts.Height() + return m.deadline, nil + } + + return m.getDeadline(ts.Height()), nil +} + +func (m *mockAPI) startGeneratePoST( + ctx context.Context, + ts *types.TipSet, + deadline *dline.Info, + completeGeneratePoST CompleteGeneratePoSTCb, +) context.CancelFunc { + ctx, cancel := context.WithCancel(ctx) + + m.statesLk.Lock() + defer m.statesLk.Unlock() + m.postStates[deadline.Open] = postStatusProving + + go func() { + defer cancel() + + select { + case psRes := <-m.proveResult: + m.statesLk.Lock() + { + if psRes.err == nil { + m.postStates[deadline.Open] = postStatusComplete + } else { + m.postStates[deadline.Open] = postStatusStart + } + } + m.statesLk.Unlock() + completeGeneratePoST(psRes.posts, psRes.err) + case <-ctx.Done(): + completeGeneratePoST(nil, ctx.Err()) + } + }() + + return cancel +} + +func (m *mockAPI) getPostStatus(di *dline.Info) postStatus { + m.statesLk.RLock() + defer m.statesLk.RUnlock() + + status, ok := m.postStates[di.Open] + if ok { + return status + } + return postStatusStart +} + +func (m *mockAPI) startSubmitPoST( + ctx context.Context, + ts *types.TipSet, + deadline *dline.Info, + posts []miner.SubmitWindowedPoStParams, + completeSubmitPoST CompleteSubmitPoSTCb, +) context.CancelFunc { + ctx, cancel := context.WithCancel(ctx) + + go func() { + defer cancel() + + select { + case err := <-m.submitResult: + completeSubmitPoST(err) + case <-ctx.Done(): + completeSubmitPoST(ctx.Err()) + } + }() + + return cancel +} + +func (m *mockAPI) onAbort(ts *types.TipSet, deadline *dline.Info) { + m.abortCalledLock.Lock() + defer m.abortCalledLock.Unlock() + m.abortCalled = true +} + +func (m *mockAPI) wasAbortCalled() bool { + m.abortCalledLock.RLock() + defer m.abortCalledLock.RUnlock() + return m.abortCalled +} + +func (m *mockAPI) failPost(err error, ts *types.TipSet, deadline *dline.Info) { +} + +func (m *mockAPI) setChangeHandler(ch *changeHandler) { + m.ch = ch +} + +// TestChangeHandlerBasic verifies we can generate a proof and submit it +func TestChangeHandlerBasic(t *testing.T) { + s := makeScaffolding(t) + mock := s.mock + + defer s.ch.shutdown() + s.ch.start() + + // Trigger a head change + currentEpoch := abi.ChainEpoch(1) + go triggerHeadAdvance(t, s, currentEpoch) + + // Should start proving + <-s.ch.proveHdlr.processedHeadChanges + di := mock.getDeadline(currentEpoch) + require.Equal(t, postStatusProving, s.mock.getPostStatus(di)) + + // Submitter doesn't have anything to do yet + <-s.ch.submitHdlr.processedHeadChanges + require.Equal(t, SubmitStateStart, s.submitState(di)) + + // Send a response to the call to generate proofs + posts := []miner.SubmitWindowedPoStParams{{Deadline: di.Index}} + mock.proveResult <- &proveRes{posts: posts} + + // Should move to proving complete + <-s.ch.proveHdlr.processedPostResults + require.Equal(t, postStatusComplete, s.mock.getPostStatus(di)) + + // Move to the correct height to submit the proof + currentEpoch = 1 + SubmitConfidence + go triggerHeadAdvance(t, s, currentEpoch) + + // Should move to submitting state + <-s.ch.submitHdlr.processedHeadChanges + di = mock.getDeadline(currentEpoch) + require.Equal(t, SubmitStateSubmitting, s.submitState(di)) + + // Send a response to the submit call + mock.submitResult <- nil + + // Should move to the complete state + <-s.ch.submitHdlr.processedSubmitResults + require.Equal(t, SubmitStateComplete, s.submitState(di)) +} + +// TestChangeHandlerFromProvingToSubmittingNoHeadChange tests that when the +// chain is already advanced past the confidence interval, we should move from +// proving to submitting without a head change in between. +func TestChangeHandlerFromProvingToSubmittingNoHeadChange(t *testing.T) { + s := makeScaffolding(t) + mock := s.mock + + // Monitor submit handler's processing of incoming postInfo + s.ch.submitHdlr.processedPostReady = make(chan *postInfo) + + defer s.ch.shutdown() + s.ch.start() + + // Trigger a head change + currentEpoch := abi.ChainEpoch(1) + go triggerHeadAdvance(t, s, currentEpoch) + + // Should start proving + <-s.ch.proveHdlr.processedHeadChanges + di := mock.getDeadline(currentEpoch) + require.Equal(t, postStatusProving, s.mock.getPostStatus(di)) + + // Submitter doesn't have anything to do yet + <-s.ch.submitHdlr.processedHeadChanges + require.Equal(t, SubmitStateStart, s.submitState(di)) + + // Trigger a head change that advances the chain beyond the submit + // confidence + currentEpoch = 1 + SubmitConfidence + go triggerHeadAdvance(t, s, currentEpoch) + + // Should be no change to state yet + <-s.ch.proveHdlr.processedHeadChanges + require.Equal(t, postStatusProving, s.mock.getPostStatus(di)) + <-s.ch.submitHdlr.processedHeadChanges + require.Equal(t, SubmitStateStart, s.submitState(di)) + + // Send a response to the call to generate proofs + posts := []miner.SubmitWindowedPoStParams{{Deadline: di.Index}} + mock.proveResult <- &proveRes{posts: posts} + + // Should move to proving complete + <-s.ch.proveHdlr.processedPostResults + di = mock.getDeadline(currentEpoch) + require.Equal(t, postStatusComplete, s.mock.getPostStatus(di)) + + // Should move directly to submitting state with no further head changes + <-s.ch.submitHdlr.processedPostReady + require.Equal(t, SubmitStateSubmitting, s.submitState(di)) +} + +// TestChangeHandlerFromProvingEmptyProofsToComplete tests that when there are no +// proofs generated we should not submit anything to chain but submit state +// should move to completed +func TestChangeHandlerFromProvingEmptyProofsToComplete(t *testing.T) { + s := makeScaffolding(t) + mock := s.mock + + // Monitor submit handler's processing of incoming postInfo + s.ch.submitHdlr.processedPostReady = make(chan *postInfo) + + defer s.ch.shutdown() + s.ch.start() + + // Trigger a head change + currentEpoch := abi.ChainEpoch(1) + go triggerHeadAdvance(t, s, currentEpoch) + + // Should start proving + <-s.ch.proveHdlr.processedHeadChanges + di := mock.getDeadline(currentEpoch) + require.Equal(t, postStatusProving, s.mock.getPostStatus(di)) + + // Submitter doesn't have anything to do yet + <-s.ch.submitHdlr.processedHeadChanges + require.Equal(t, SubmitStateStart, s.submitState(di)) + + // Trigger a head change that advances the chain beyond the submit + // confidence + currentEpoch = 1 + SubmitConfidence + go triggerHeadAdvance(t, s, currentEpoch) + + // Should be no change to state yet + <-s.ch.proveHdlr.processedHeadChanges + require.Equal(t, postStatusProving, s.mock.getPostStatus(di)) + <-s.ch.submitHdlr.processedHeadChanges + require.Equal(t, SubmitStateStart, s.submitState(di)) + + // Send a response to the call to generate proofs with an empty proofs array + posts := []miner.SubmitWindowedPoStParams{} + mock.proveResult <- &proveRes{posts: posts} + + // Should move to proving complete + <-s.ch.proveHdlr.processedPostResults + di = mock.getDeadline(currentEpoch) + require.Equal(t, postStatusComplete, s.mock.getPostStatus(di)) + + // Should move directly to submitting complete state + <-s.ch.submitHdlr.processedPostReady + require.Equal(t, SubmitStateComplete, s.submitState(di)) +} + +// TestChangeHandlerDontStartUntilProvingPeriod tests that the handler +// ignores updates until the proving period has been reached. +func TestChangeHandlerDontStartUntilProvingPeriod(t *testing.T) { + s := makeScaffolding(t) + mock := s.mock + + periodStart := miner.WPoStProvingPeriod + dlIdx := uint64(1) + currentEpoch := abi.ChainEpoch(10) + di := miner.NewDeadlineInfo(periodStart, dlIdx, currentEpoch) + mock.setDeadline(di) + + defer s.ch.shutdown() + s.ch.start() + + // Trigger a head change + go triggerHeadAdvance(t, s, currentEpoch) + + // Nothing should happen because the proving period has not started + select { + case <-s.ch.proveHdlr.processedHeadChanges: + require.Fail(t, "unexpected prove change") + case <-s.ch.submitHdlr.processedHeadChanges: + require.Fail(t, "unexpected submit change") + case <-time.After(10 * time.Millisecond): + } + + // Advance the head to the next proving period's first epoch + currentEpoch = periodStart + miner.WPoStChallengeWindow + di = miner.NewDeadlineInfo(periodStart, dlIdx, currentEpoch) + mock.setDeadline(di) + go triggerHeadAdvance(t, s, currentEpoch) + + // Should start proving + <-s.ch.proveHdlr.processedHeadChanges + require.Equal(t, postStatusProving, s.mock.getPostStatus(di)) +} + +// TestChangeHandlerStartProvingNextDeadline verifies that the proof handler +// starts proving the next deadline after the current one +func TestChangeHandlerStartProvingNextDeadline(t *testing.T) { + s := makeScaffolding(t) + mock := s.mock + + defer s.ch.shutdown() + s.ch.start() + + // Trigger a head change + currentEpoch := abi.ChainEpoch(1) + go triggerHeadAdvance(t, s, currentEpoch) + + // Should start proving + <-s.ch.proveHdlr.processedHeadChanges + di := mock.getDeadline(currentEpoch) + require.Equal(t, postStatusProving, s.mock.getPostStatus(di)) + + // Trigger a head change that advances the chain beyond the submit + // confidence + currentEpoch = 1 + SubmitConfidence + go triggerHeadAdvance(t, s, currentEpoch) + + // Should be no change to state yet + <-s.ch.proveHdlr.processedHeadChanges + require.Equal(t, postStatusProving, s.mock.getPostStatus(di)) + + // Send a response to the call to generate proofs + posts := []miner.SubmitWindowedPoStParams{{Deadline: di.Index}} + mock.proveResult <- &proveRes{posts: posts} + + // Should move to proving complete + <-s.ch.proveHdlr.processedPostResults + di = mock.getDeadline(currentEpoch) + require.Equal(t, postStatusComplete, s.mock.getPostStatus(di)) + + // Trigger head change that advances the chain to the Challenge epoch for + // the next deadline + go func() { + di = nextDeadline(di) + currentEpoch = di.Challenge + triggerHeadAdvance(t, s, currentEpoch) + }() + + // Should start generating next window's proof + <-s.ch.proveHdlr.processedHeadChanges + require.Equal(t, postStatusProving, s.mock.getPostStatus(di)) +} + +// TestChangeHandlerProvingRounds verifies we can generate several rounds of +// proofs as the chain head advances +func TestChangeHandlerProvingRounds(t *testing.T) { + s := makeScaffolding(t) + mock := s.mock + + defer s.ch.shutdown() + s.ch.start() + + completeProofIndex := abi.ChainEpoch(10) + for currentEpoch := abi.ChainEpoch(1); currentEpoch < miner.WPoStChallengeWindow*5; currentEpoch++ { + // Trigger a head change + di := mock.getDeadline(currentEpoch) + go triggerHeadAdvance(t, s, currentEpoch) + + // Wait for prover to process head change + <-s.ch.proveHdlr.processedHeadChanges + + completeProofEpoch := di.Open + completeProofIndex + next := nextDeadline(di) + //fmt.Println("epoch", currentEpoch, s.mock.getPostStatus(di), "next", s.mock.getPostStatus(next)) + if currentEpoch >= next.Challenge { + require.Equal(t, postStatusComplete, s.mock.getPostStatus(di)) + // At the next deadline's challenge epoch, should start proving + // for that epoch + require.Equal(t, postStatusProving, s.mock.getPostStatus(next)) + } else if currentEpoch > completeProofEpoch { + // After proving for the round is complete, should be in complete state + require.Equal(t, postStatusComplete, s.mock.getPostStatus(di)) + require.Equal(t, postStatusStart, s.mock.getPostStatus(next)) + } else { + // Until proving completes, should be in the proving state + require.Equal(t, postStatusProving, s.mock.getPostStatus(di)) + require.Equal(t, postStatusStart, s.mock.getPostStatus(next)) + } + + // Wait for submitter to process head change + <-s.ch.submitHdlr.processedHeadChanges + + completeSubmitEpoch := completeProofEpoch + 1 + //fmt.Println("epoch", currentEpoch, s.submitState(di)) + if currentEpoch > completeSubmitEpoch { + require.Equal(t, SubmitStateComplete, s.submitState(di)) + } else if currentEpoch > completeProofEpoch { + require.Equal(t, SubmitStateSubmitting, s.submitState(di)) + } else { + require.Equal(t, SubmitStateStart, s.submitState(di)) + } + + if currentEpoch == completeProofEpoch { + // Send a response to the call to generate proofs + posts := []miner.SubmitWindowedPoStParams{{Deadline: di.Index}} + mock.proveResult <- &proveRes{posts: posts} + + // Should move to proving complete + <-s.ch.proveHdlr.processedPostResults + require.Equal(t, postStatusComplete, s.mock.getPostStatus(di)) + } + + if currentEpoch == completeSubmitEpoch { + // Send a response to the submit call + mock.submitResult <- nil + + // Should move to the complete state + <-s.ch.submitHdlr.processedSubmitResults + require.Equal(t, SubmitStateComplete, s.submitState(di)) + } + } +} + +// TestChangeHandlerProvingErrorRecovery verifies that the proof handler +// recovers correctly from an error +func TestChangeHandlerProvingErrorRecovery(t *testing.T) { + s := makeScaffolding(t) + mock := s.mock + + defer s.ch.shutdown() + s.ch.start() + + // Trigger a head change + currentEpoch := abi.ChainEpoch(1) + go triggerHeadAdvance(t, s, currentEpoch) + + // Should start proving + <-s.ch.proveHdlr.processedHeadChanges + di := mock.getDeadline(currentEpoch) + require.Equal(t, postStatusProving, s.mock.getPostStatus(di)) + + // Send an error response to the call to generate proofs + mock.proveResult <- &proveRes{err: fmt.Errorf("err")} + + // Should abort and then move to start state + <-s.ch.proveHdlr.processedPostResults + require.Equal(t, postStatusStart, s.mock.getPostStatus(di)) + + // Trigger a head change + go triggerHeadAdvance(t, s, currentEpoch) + + // Should start proving + <-s.ch.proveHdlr.processedHeadChanges + require.Equal(t, postStatusProving, s.mock.getPostStatus(di)) + + // Send a success response to the call to generate proofs + posts := []miner.SubmitWindowedPoStParams{{Deadline: di.Index}} + mock.proveResult <- &proveRes{posts: posts} + + // Should move to proving complete + <-s.ch.proveHdlr.processedPostResults + require.Equal(t, postStatusComplete, s.mock.getPostStatus(di)) +} + +// TestChangeHandlerSubmitErrorRecovery verifies that the submit handler +// recovers correctly from an error +func TestChangeHandlerSubmitErrorRecovery(t *testing.T) { + s := makeScaffolding(t) + mock := s.mock + + defer s.ch.shutdown() + s.ch.start() + + // Trigger a head change + currentEpoch := abi.ChainEpoch(1) + go triggerHeadAdvance(t, s, currentEpoch) + + // Should start proving + <-s.ch.proveHdlr.processedHeadChanges + di := mock.getDeadline(currentEpoch) + require.Equal(t, postStatusProving, s.mock.getPostStatus(di)) + + // Submitter doesn't have anything to do yet + <-s.ch.submitHdlr.processedHeadChanges + require.Equal(t, SubmitStateStart, s.submitState(di)) + + // Send a response to the call to generate proofs + posts := []miner.SubmitWindowedPoStParams{{Deadline: di.Index}} + mock.proveResult <- &proveRes{posts: posts} + + // Should move to proving complete + <-s.ch.proveHdlr.processedPostResults + require.Equal(t, postStatusComplete, s.mock.getPostStatus(di)) + + // Move to the correct height to submit the proof + currentEpoch = 1 + SubmitConfidence + go triggerHeadAdvance(t, s, currentEpoch) + + // Read from prover incoming channel (so as not to block) + <-s.ch.proveHdlr.processedHeadChanges + + // Should move to submitting state + <-s.ch.submitHdlr.processedHeadChanges + di = mock.getDeadline(currentEpoch) + require.Equal(t, SubmitStateSubmitting, s.submitState(di)) + + // Send an error response to the call to submit + mock.submitResult <- fmt.Errorf("err") + + // Should abort and then move back to the start state + <-s.ch.submitHdlr.processedSubmitResults + require.Equal(t, SubmitStateStart, s.submitState(di)) + require.True(t, mock.wasAbortCalled()) + + // Trigger another head change + go triggerHeadAdvance(t, s, currentEpoch) + + // Read from prover incoming channel (so as not to block) + <-s.ch.proveHdlr.processedHeadChanges + + // Should move to submitting state + <-s.ch.submitHdlr.processedHeadChanges + di = mock.getDeadline(currentEpoch) + require.Equal(t, SubmitStateSubmitting, s.submitState(di)) + + // Send a response to the submit call + mock.submitResult <- nil + + // Should move to the complete state + <-s.ch.submitHdlr.processedSubmitResults + require.Equal(t, SubmitStateComplete, s.submitState(di)) +} + +// TestChangeHandlerProveExpiry verifies that the prove handler +// behaves correctly on expiry +func TestChangeHandlerProveExpiry(t *testing.T) { + s := makeScaffolding(t) + mock := s.mock + + defer s.ch.shutdown() + s.ch.start() + + // Trigger a head change + currentEpoch := abi.ChainEpoch(1) + go triggerHeadAdvance(t, s, currentEpoch) + + // Should start proving + <-s.ch.proveHdlr.processedHeadChanges + di := mock.getDeadline(currentEpoch) + require.Equal(t, postStatusProving, s.mock.getPostStatus(di)) + + // Move to a height that expires the current proof + currentEpoch = miner.WPoStChallengeWindow + di = mock.getDeadline(currentEpoch) + go triggerHeadAdvance(t, s, currentEpoch) + + // Should trigger an abort and start proving for the new deadline + <-s.ch.proveHdlr.processedHeadChanges + require.Equal(t, postStatusProving, s.mock.getPostStatus(di)) + <-s.ch.proveHdlr.processedPostResults + require.True(t, mock.wasAbortCalled()) + + // Send a response to the call to generate proofs + posts := []miner.SubmitWindowedPoStParams{{Deadline: di.Index}} + mock.proveResult <- &proveRes{posts: posts} + + // Should move to proving complete + <-s.ch.proveHdlr.processedPostResults + require.Equal(t, postStatusComplete, s.mock.getPostStatus(di)) +} + +// TestChangeHandlerSubmitExpiry verifies that the submit handler +// behaves correctly on expiry +func TestChangeHandlerSubmitExpiry(t *testing.T) { + s := makeScaffolding(t) + mock := s.mock + + // Ignore prove handler head change processing for this test + s.ch.proveHdlr.processedHeadChanges = nil + + defer s.ch.shutdown() + s.ch.start() + + // Trigger a head change + currentEpoch := abi.ChainEpoch(1) + go triggerHeadAdvance(t, s, currentEpoch) + + // Submitter doesn't have anything to do yet + <-s.ch.submitHdlr.processedHeadChanges + di := mock.getDeadline(currentEpoch) + require.Equal(t, SubmitStateStart, s.submitState(di)) + + // Send a response to the call to generate proofs + posts := []miner.SubmitWindowedPoStParams{{Deadline: di.Index}} + mock.proveResult <- &proveRes{posts: posts} + + // Should move to proving complete + <-s.ch.proveHdlr.processedPostResults + require.Equal(t, postStatusComplete, s.mock.getPostStatus(di)) + + // Move to the correct height to submit the proof + currentEpoch = 1 + SubmitConfidence + go triggerHeadAdvance(t, s, currentEpoch) + + // Should move to submitting state + <-s.ch.submitHdlr.processedHeadChanges + di = mock.getDeadline(currentEpoch) + require.Equal(t, SubmitStateSubmitting, s.submitState(di)) + + // Move to a height that expires the submit + currentEpoch = miner.WPoStChallengeWindow + di = mock.getDeadline(currentEpoch) + go triggerHeadAdvance(t, s, currentEpoch) + + // Should trigger an abort and move back to start state + var wg sync.WaitGroup + wg.Add(2) + go func() { + defer wg.Done() + + <-s.ch.submitHdlr.processedSubmitResults + require.True(t, mock.wasAbortCalled()) + }() + + go func() { + defer wg.Done() + + <-s.ch.submitHdlr.processedHeadChanges + require.Equal(t, SubmitStateStart, s.submitState(di)) + }() + + wg.Wait() +} + +// TestChangeHandlerProveRevert verifies that the prove handler +// behaves correctly on revert +func TestChangeHandlerProveRevert(t *testing.T) { + s := makeScaffolding(t) + mock := s.mock + + defer s.ch.shutdown() + s.ch.start() + + // Trigger a head change + currentEpoch := miner.WPoStChallengeWindow + go triggerHeadAdvance(t, s, currentEpoch) + + // Should start proving + <-s.ch.proveHdlr.processedHeadChanges + di := mock.getDeadline(currentEpoch) + require.Equal(t, postStatusProving, s.mock.getPostStatus(di)) + + // Trigger a revert to the previous epoch + revertEpoch := di.Open - 5 + go triggerHeadChange(t, s, revertEpoch, currentEpoch) + + // Should be no change + <-s.ch.proveHdlr.processedHeadChanges + require.Equal(t, postStatusProving, s.mock.getPostStatus(di)) + + // Send a response to the call to generate proofs + posts := []miner.SubmitWindowedPoStParams{{Deadline: di.Index}} + mock.proveResult <- &proveRes{posts: posts} + + // Should move to proving complete + <-s.ch.proveHdlr.processedPostResults + require.Equal(t, postStatusComplete, s.mock.getPostStatus(di)) + require.False(t, mock.wasAbortCalled()) +} + +// TestChangeHandlerSubmittingRevert verifies that the submit handler +// behaves correctly when there's a revert from the submitting state +func TestChangeHandlerSubmittingRevert(t *testing.T) { + s := makeScaffolding(t) + mock := s.mock + + // Ignore prove handler head change processing for this test + s.ch.proveHdlr.processedHeadChanges = nil + + defer s.ch.shutdown() + s.ch.start() + + // Trigger a head change + currentEpoch := miner.WPoStChallengeWindow + go triggerHeadAdvance(t, s, currentEpoch) + + // Submitter doesn't have anything to do yet + <-s.ch.submitHdlr.processedHeadChanges + di := mock.getDeadline(currentEpoch) + require.Equal(t, SubmitStateStart, s.submitState(di)) + + // Send a response to the call to generate proofs + posts := []miner.SubmitWindowedPoStParams{{Deadline: di.Index}} + mock.proveResult <- &proveRes{posts: posts} + + // Should move to proving complete + <-s.ch.proveHdlr.processedPostResults + require.Equal(t, postStatusComplete, s.mock.getPostStatus(di)) + + // Move to the correct height to submit the proof + currentEpoch = currentEpoch + 1 + SubmitConfidence + go triggerHeadAdvance(t, s, currentEpoch) + + // Should move to submitting state + <-s.ch.submitHdlr.processedHeadChanges + di = mock.getDeadline(currentEpoch) + require.Equal(t, SubmitStateSubmitting, s.submitState(di)) + + // Trigger a revert to the previous epoch + revertEpoch := di.Open - 5 + go triggerHeadChange(t, s, revertEpoch, currentEpoch) + + var wg sync.WaitGroup + wg.Add(2) + + // Should trigger an abort + go func() { + defer wg.Done() + + <-s.ch.submitHdlr.processedSubmitResults + require.True(t, mock.wasAbortCalled()) + }() + + // Should resubmit current epoch + go func() { + defer wg.Done() + + <-s.ch.submitHdlr.processedHeadChanges + require.Equal(t, SubmitStateSubmitting, s.submitState(di)) + }() + + wg.Wait() + + // Send a response to the resubmit call + mock.submitResult <- nil + + // Should move to the complete state + <-s.ch.submitHdlr.processedSubmitResults + require.Equal(t, SubmitStateComplete, s.submitState(di)) +} + +// TestChangeHandlerSubmitCompleteRevert verifies that the submit handler +// behaves correctly when there's a revert from the submit complete state +func TestChangeHandlerSubmitCompleteRevert(t *testing.T) { + s := makeScaffolding(t) + mock := s.mock + + // Ignore prove handler head change processing for this test + s.ch.proveHdlr.processedHeadChanges = nil + + defer s.ch.shutdown() + s.ch.start() + + // Trigger a head change + currentEpoch := miner.WPoStChallengeWindow + go triggerHeadAdvance(t, s, currentEpoch) + + // Submitter doesn't have anything to do yet + <-s.ch.submitHdlr.processedHeadChanges + di := mock.getDeadline(currentEpoch) + require.Equal(t, SubmitStateStart, s.submitState(di)) + + // Send a response to the call to generate proofs + posts := []miner.SubmitWindowedPoStParams{{Deadline: di.Index}} + mock.proveResult <- &proveRes{posts: posts} + + // Should move to proving complete + <-s.ch.proveHdlr.processedPostResults + require.Equal(t, postStatusComplete, s.mock.getPostStatus(di)) + + // Move to the correct height to submit the proof + currentEpoch = currentEpoch + 1 + SubmitConfidence + go triggerHeadAdvance(t, s, currentEpoch) + + // Should move to submitting state + <-s.ch.submitHdlr.processedHeadChanges + di = mock.getDeadline(currentEpoch) + require.Equal(t, SubmitStateSubmitting, s.submitState(di)) + + // Send a response to the resubmit call + mock.submitResult <- nil + + // Should move to the complete state + <-s.ch.submitHdlr.processedSubmitResults + require.Equal(t, SubmitStateComplete, s.submitState(di)) + + // Trigger a revert to the previous epoch + revertEpoch := di.Open - 5 + go triggerHeadChange(t, s, revertEpoch, currentEpoch) + + // Should resubmit current epoch + <-s.ch.submitHdlr.processedHeadChanges + require.Equal(t, SubmitStateSubmitting, s.submitState(di)) + + // Send a response to the resubmit call + mock.submitResult <- nil + + // Should move to the complete state + <-s.ch.submitHdlr.processedSubmitResults + require.Equal(t, SubmitStateComplete, s.submitState(di)) +} + +// TestChangeHandlerSubmitRevertTwoEpochs verifies that the submit handler +// behaves correctly when the revert is two epochs deep +func TestChangeHandlerSubmitRevertTwoEpochs(t *testing.T) { + s := makeScaffolding(t) + mock := s.mock + + // Ignore prove handler head change processing for this test + s.ch.proveHdlr.processedHeadChanges = nil + + defer s.ch.shutdown() + s.ch.start() + + // Trigger a head change + currentEpoch := miner.WPoStChallengeWindow + go triggerHeadAdvance(t, s, currentEpoch) + + // Submitter doesn't have anything to do yet + <-s.ch.submitHdlr.processedHeadChanges + diE1 := mock.getDeadline(currentEpoch) + require.Equal(t, SubmitStateStart, s.submitState(diE1)) + + // Send a response to the call to generate proofs + posts := []miner.SubmitWindowedPoStParams{{Deadline: diE1.Index}} + mock.proveResult <- &proveRes{posts: posts} + + // Should move to proving complete + <-s.ch.proveHdlr.processedPostResults + require.Equal(t, postStatusComplete, s.mock.getPostStatus(diE1)) + + // Move to the challenge epoch for the next deadline + diE2 := nextDeadline(diE1) + currentEpoch = diE2.Challenge + go triggerHeadAdvance(t, s, currentEpoch) + + // Should move to submitting state for epoch 1 + <-s.ch.submitHdlr.processedHeadChanges + diE1 = mock.getDeadline(currentEpoch) + require.Equal(t, SubmitStateSubmitting, s.submitState(diE1)) + + // Send a response to the submit call for epoch 1 + mock.submitResult <- nil + + // Should move to the complete state for epoch 1 + <-s.ch.submitHdlr.processedSubmitResults + require.Equal(t, SubmitStateComplete, s.submitState(diE1)) + + // Should start proving epoch 2 + // Send a response to the call to generate proofs + postsE2 := []miner.SubmitWindowedPoStParams{{Deadline: diE2.Index}} + mock.proveResult <- &proveRes{posts: postsE2} + + // Should move to proving complete for epoch 2 + <-s.ch.proveHdlr.processedPostResults + require.Equal(t, postStatusComplete, s.mock.getPostStatus(diE2)) + + // Move to the correct height to submit the proof for epoch 2 + currentEpoch = diE2.Open + 1 + SubmitConfidence + go triggerHeadAdvance(t, s, currentEpoch) + + // Should move to submitting state for epoch 2 + <-s.ch.submitHdlr.processedHeadChanges + diE2 = mock.getDeadline(currentEpoch) + require.Equal(t, SubmitStateSubmitting, s.submitState(diE2)) + + // Trigger a revert through two epochs (from epoch 2 to epoch 0) + revertEpoch := diE1.Open - 5 + go triggerHeadChange(t, s, revertEpoch, currentEpoch) + + var wg sync.WaitGroup + wg.Add(2) + + // Should trigger an abort + go func() { + defer wg.Done() + + <-s.ch.submitHdlr.processedSubmitResults + require.True(t, mock.wasAbortCalled()) + }() + + go func() { + defer wg.Done() + + <-s.ch.submitHdlr.processedHeadChanges + + // Should reset epoch 1 (that is expired) to start state + require.Equal(t, SubmitStateStart, s.submitState(diE1)) + // Should resubmit epoch 2 + require.Equal(t, SubmitStateSubmitting, s.submitState(diE2)) + }() + + wg.Wait() + + // Send a response to the resubmit call for epoch 2 + mock.submitResult <- nil + + // Should move to the complete state for epoch 2 + <-s.ch.submitHdlr.processedSubmitResults + require.Equal(t, SubmitStateComplete, s.submitState(diE2)) +} + +// TestChangeHandlerSubmitRevertAdvanceLess verifies that the submit handler +// behaves correctly when the revert is two epochs deep and the advance is +// to a lower height than before +func TestChangeHandlerSubmitRevertAdvanceLess(t *testing.T) { + s := makeScaffolding(t) + mock := s.mock + + // Ignore prove handler head change processing for this test + s.ch.proveHdlr.processedHeadChanges = nil + + defer s.ch.shutdown() + s.ch.start() + + // Trigger a head change + currentEpoch := miner.WPoStChallengeWindow + go triggerHeadAdvance(t, s, currentEpoch) + + // Submitter doesn't have anything to do yet + <-s.ch.submitHdlr.processedHeadChanges + diE1 := mock.getDeadline(currentEpoch) + require.Equal(t, SubmitStateStart, s.submitState(diE1)) + + // Send a response to the call to generate proofs + posts := []miner.SubmitWindowedPoStParams{{Deadline: diE1.Index}} + mock.proveResult <- &proveRes{posts: posts} + + // Should move to proving complete + <-s.ch.proveHdlr.processedPostResults + require.Equal(t, postStatusComplete, s.mock.getPostStatus(diE1)) + + // Move to the challenge epoch for the next deadline + diE2 := nextDeadline(diE1) + currentEpoch = diE2.Challenge + go triggerHeadAdvance(t, s, currentEpoch) + + // Should move to submitting state for epoch 1 + <-s.ch.submitHdlr.processedHeadChanges + diE1 = mock.getDeadline(currentEpoch) + require.Equal(t, SubmitStateSubmitting, s.submitState(diE1)) + + // Send a response to the submit call for epoch 1 + mock.submitResult <- nil + + // Should move to the complete state for epoch 1 + <-s.ch.submitHdlr.processedSubmitResults + require.Equal(t, SubmitStateComplete, s.submitState(diE1)) + + // Should start proving epoch 2 + // Send a response to the call to generate proofs + postsE2 := []miner.SubmitWindowedPoStParams{{Deadline: diE2.Index}} + mock.proveResult <- &proveRes{posts: postsE2} + + // Should move to proving complete for epoch 2 + <-s.ch.proveHdlr.processedPostResults + require.Equal(t, postStatusComplete, s.mock.getPostStatus(diE2)) + + // Move to the correct height to submit the proof for epoch 2 + currentEpoch = diE2.Open + 1 + SubmitConfidence + go triggerHeadAdvance(t, s, currentEpoch) + + // Should move to submitting state for epoch 2 + <-s.ch.submitHdlr.processedHeadChanges + diE2 = mock.getDeadline(currentEpoch) + require.Equal(t, SubmitStateSubmitting, s.submitState(diE2)) + + // Trigger a revert through two epochs (from epoch 2 to epoch 0) + // then advance to the previous epoch (to epoch 1) + revertEpoch := diE1.Open - 5 + currentEpoch = diE2.Open - 1 + go triggerHeadChange(t, s, revertEpoch, currentEpoch) + + var wg sync.WaitGroup + wg.Add(2) + + // Should trigger an abort + go func() { + defer wg.Done() + + <-s.ch.submitHdlr.processedSubmitResults + require.True(t, mock.wasAbortCalled()) + }() + + go func() { + defer wg.Done() + + <-s.ch.submitHdlr.processedHeadChanges + + // Should resubmit epoch 1 + require.Equal(t, SubmitStateSubmitting, s.submitState(diE1)) + // Should reset epoch 2 to start state + require.Equal(t, SubmitStateStart, s.submitState(diE2)) + }() + + wg.Wait() + + // Send a response to the resubmit call for epoch 1 + mock.submitResult <- nil + + // Should move to the complete state for epoch 1 + <-s.ch.submitHdlr.processedSubmitResults + require.Equal(t, SubmitStateComplete, s.submitState(diE1)) +} + +type smScaffolding struct { + ctx context.Context + mock *mockAPI + ch *changeHandler +} + +func makeScaffolding(t *testing.T) *smScaffolding { + ctx := context.Background() + actor := tutils.NewActorAddr(t, "actor") + mock := newMockAPI() + ch := newChangeHandler(mock, actor) + mock.setChangeHandler(ch) + + ch.proveHdlr.processedHeadChanges = make(chan *headChange) + ch.proveHdlr.processedPostResults = make(chan *postResult) + + ch.submitHdlr.processedHeadChanges = make(chan *headChange) + ch.submitHdlr.processedSubmitResults = make(chan *submitResult) + + return &smScaffolding{ + ctx: ctx, + mock: mock, + ch: ch, + } +} + +func triggerHeadAdvance(t *testing.T, s *smScaffolding, height abi.ChainEpoch) { + ts := s.mock.makeTs(t, height) + err := s.ch.update(s.ctx, nil, ts) + require.NoError(t, err) +} + +func triggerHeadChange(t *testing.T, s *smScaffolding, revertHeight, advanceHeight abi.ChainEpoch) { + tsRev := s.mock.makeTs(t, revertHeight) + tsAdv := s.mock.makeTs(t, advanceHeight) + err := s.ch.update(s.ctx, tsRev, tsAdv) + require.NoError(t, err) +} + +func (s *smScaffolding) submitState(di *dline.Info) SubmitState { + return s.ch.submitHdlr.getPostWindow(di).submitState +} + +func makeTs(t *testing.T, h abi.ChainEpoch) *types.TipSet { + var parents []cid.Cid + msgcid := dummyCid + + a, _ := address.NewFromString("t00") + b, _ := address.NewFromString("t02") + var ts, err = types.NewTipSet([]*types.BlockHeader{ + { + Height: h, + Miner: a, + + Parents: parents, + + Ticket: &types.Ticket{VRFProof: []byte{byte(h % 2)}}, + + ParentStateRoot: dummyCid, + Messages: msgcid, + ParentMessageReceipts: dummyCid, + + BlockSig: &crypto.Signature{Type: crypto.SigTypeBLS}, + BLSAggregate: &crypto.Signature{Type: crypto.SigTypeBLS}, + }, + { + Height: h, + Miner: b, + + Parents: parents, + + Ticket: &types.Ticket{VRFProof: []byte{byte((h + 1) % 2)}}, + + ParentStateRoot: dummyCid, + Messages: msgcid, + ParentMessageReceipts: dummyCid, + + BlockSig: &crypto.Signature{Type: crypto.SigTypeBLS}, + BLSAggregate: &crypto.Signature{Type: crypto.SigTypeBLS}, + }, + }) + + require.NoError(t, err) + + return ts +} diff --git a/storage/wdpost_nextdl_test.go b/storage/wdpost_nextdl_test.go new file mode 100644 index 00000000000..ad4b1fdeb3d --- /dev/null +++ b/storage/wdpost_nextdl_test.go @@ -0,0 +1,38 @@ +package storage + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/specs-actors/actors/builtin/miner" +) + +func TestNextDeadline(t *testing.T) { + periodStart := abi.ChainEpoch(0) + deadlineIdx := 0 + currentEpoch := abi.ChainEpoch(10) + + di := miner.NewDeadlineInfo(periodStart, uint64(deadlineIdx), currentEpoch) + require.EqualValues(t, 0, di.Index) + require.EqualValues(t, 0, di.PeriodStart) + require.EqualValues(t, -20, di.Challenge) + require.EqualValues(t, 0, di.Open) + require.EqualValues(t, 60, di.Close) + + for i := 1; i < 1+int(miner.WPoStPeriodDeadlines)*2; i++ { + di = nextDeadline(di) + deadlineIdx = i % int(miner.WPoStPeriodDeadlines) + expPeriodStart := int(miner.WPoStProvingPeriod) * (i / int(miner.WPoStPeriodDeadlines)) + expOpen := expPeriodStart + deadlineIdx*int(miner.WPoStChallengeWindow) + expClose := expOpen + int(miner.WPoStChallengeWindow) + expChallenge := expOpen - int(miner.WPoStChallengeLookback) + //fmt.Printf("%d: %d@%d %d-%d (%d)\n", i, expPeriodStart, deadlineIdx, expOpen, expClose, expChallenge) + require.EqualValues(t, deadlineIdx, di.Index) + require.EqualValues(t, expPeriodStart, di.PeriodStart) + require.EqualValues(t, expOpen, di.Open) + require.EqualValues(t, expClose, di.Close) + require.EqualValues(t, expChallenge, di.Challenge) + } +} diff --git a/storage/wdpost_run.go b/storage/wdpost_run.go index 9a497f87915..35fdfc6d13a 100644 --- a/storage/wdpost_run.go +++ b/storage/wdpost_run.go @@ -29,15 +29,21 @@ import ( "github.com/filecoin-project/lotus/journal" ) -func (s *WindowPoStScheduler) failPost(err error, deadline *dline.Info) { +func (s *WindowPoStScheduler) failPost(err error, ts *types.TipSet, deadline *dline.Info) { journal.J.RecordEvent(s.evtTypes[evtTypeWdPoStScheduler], func() interface{} { + c := evtCommon{Error: err} + if ts != nil { + c.Deadline = deadline + c.Height = ts.Height() + c.TipSet = ts.Cids() + } return WdPoStSchedulerEvt{ - evtCommon: s.getEvtCommon(err), + evtCommon: c, State: SchedulerStateFaulted, } }) - log.Errorf("TODO") + log.Errorf("Got err %w - TODO handle errors", err) /*s.failLk.Lock() if eps > s.failed { s.failed = eps @@ -45,67 +51,134 @@ func (s *WindowPoStScheduler) failPost(err error, deadline *dline.Info) { s.failLk.Unlock()*/ } -func (s *WindowPoStScheduler) doPost(ctx context.Context, deadline *dline.Info, ts *types.TipSet) { +// recordProofsEvent records a successful proofs_processed event in the +// journal, even if it was a noop (no partitions). +func (s *WindowPoStScheduler) recordProofsEvent(partitions []miner.PoStPartition, mcid cid.Cid) { + journal.J.RecordEvent(s.evtTypes[evtTypeWdPoStProofs], func() interface{} { + return &WdPoStProofsProcessedEvt{ + evtCommon: s.getEvtCommon(nil), + Partitions: partitions, + MessageCID: mcid, + } + }) +} + +// startGeneratePoST kicks off the process of generating a PoST +func (s *WindowPoStScheduler) startGeneratePoST( + ctx context.Context, + ts *types.TipSet, + deadline *dline.Info, + completeGeneratePoST CompleteGeneratePoSTCb, +) context.CancelFunc { ctx, abort := context.WithCancel(ctx) + go func() { + defer abort() - s.abort = abort - s.activeDeadline = deadline + journal.J.RecordEvent(s.evtTypes[evtTypeWdPoStScheduler], func() interface{} { + return WdPoStSchedulerEvt{ + evtCommon: s.getEvtCommon(nil), + State: SchedulerStateStarted, + } + }) - journal.J.RecordEvent(s.evtTypes[evtTypeWdPoStScheduler], func() interface{} { - return WdPoStSchedulerEvt{ - evtCommon: s.getEvtCommon(nil), - State: SchedulerStateStarted, - } - }) + posts, err := s.runGeneratePoST(ctx, ts, deadline) + completeGeneratePoST(posts, err) + }() + return abort +} + +// runGeneratePoST generates the PoST +func (s *WindowPoStScheduler) runGeneratePoST( + ctx context.Context, + ts *types.TipSet, + deadline *dline.Info, +) ([]miner.SubmitWindowedPoStParams, error) { + ctx, span := trace.StartSpan(ctx, "WindowPoStScheduler.generatePoST") + defer span.End() + + posts, err := s.runPost(ctx, *deadline, ts) + if err != nil { + log.Errorf("runPost failed: %+v", err) + return nil, err + } + + if len(posts) == 0 { + s.recordProofsEvent(nil, cid.Undef) + } + + return posts, nil +} + +// startSubmitPoST kicks of the process of submitting PoST +func (s *WindowPoStScheduler) startSubmitPoST( + ctx context.Context, + ts *types.TipSet, + deadline *dline.Info, + posts []miner.SubmitWindowedPoStParams, + completeSubmitPoST CompleteSubmitPoSTCb, +) context.CancelFunc { + + ctx, abort := context.WithCancel(ctx) go func() { defer abort() - ctx, span := trace.StartSpan(ctx, "WindowPoStScheduler.doPost") - defer span.End() - - // recordProofsEvent records a successful proofs_processed event in the - // journal, even if it was a noop (no partitions). - recordProofsEvent := func(partitions []miner.PoStPartition, mcid cid.Cid) { - journal.J.RecordEvent(s.evtTypes[evtTypeWdPoStProofs], func() interface{} { - return &WdPoStProofsProcessedEvt{ - evtCommon: s.getEvtCommon(nil), - Partitions: partitions, - MessageCID: mcid, + err := s.runSubmitPoST(ctx, ts, deadline, posts) + if err == nil { + journal.J.RecordEvent(s.evtTypes[evtTypeWdPoStScheduler], func() interface{} { + return WdPoStSchedulerEvt{ + evtCommon: s.getEvtCommon(nil), + State: SchedulerStateSucceeded, } }) } + completeSubmitPoST(err) + }() - posts, err := s.runPost(ctx, *deadline, ts) - if err != nil { - log.Errorf("run window post failed: %+v", err) - s.failPost(err, deadline) - return - } + return abort +} - if len(posts) == 0 { - recordProofsEvent(nil, cid.Undef) - return - } +// runSubmitPoST submits PoST +func (s *WindowPoStScheduler) runSubmitPoST( + ctx context.Context, + ts *types.TipSet, + deadline *dline.Info, + posts []miner.SubmitWindowedPoStParams, +) error { + if len(posts) == 0 { + return nil + } - for i := range posts { - post := &posts[i] - sm, err := s.submitPost(ctx, post) - if err != nil { - log.Errorf("submit window post failed: %+v", err) - s.failPost(err, deadline) - } else { - recordProofsEvent(post.Partitions, sm.Cid()) - } + ctx, span := trace.StartSpan(ctx, "WindowPoStScheduler.submitPoST") + defer span.End() + + // Get randomness from tickets + commEpoch := deadline.Open + commRand, err := s.api.ChainGetRandomnessFromTickets(ctx, ts.Key(), crypto.DomainSeparationTag_PoStChainCommit, commEpoch, nil) + if err != nil { + err = xerrors.Errorf("failed to get chain randomness from tickets for windowPost (ts=%d; deadline=%d): %w", ts.Height(), commEpoch, err) + log.Errorf("submitPost failed: %+v", err) + + return err + } + + var submitErr error + for i := range posts { + // Add randomness to PoST + post := &posts[i] + post.ChainCommitEpoch = commEpoch + post.ChainCommitRand = commRand + + // Submit PoST + sm, submitErr := s.submitPost(ctx, post) + if submitErr != nil { + log.Errorf("submit window post failed: %+v", submitErr) + } else { + s.recordProofsEvent(post.Partitions, sm.Cid()) } + } - journal.J.RecordEvent(s.evtTypes[evtTypeWdPoStScheduler], func() interface{} { - return WdPoStSchedulerEvt{ - evtCommon: s.getEvtCommon(nil), - State: SchedulerStateSucceeded, - } - }) - }() + return submitErr } func (s *WindowPoStScheduler) checkSectors(ctx context.Context, check bitfield.BitField) (bitfield.BitField, error) { @@ -392,7 +465,7 @@ func (s *WindowPoStScheduler) runPost(ctx context.Context, di dline.Info, ts *ty rand, err := s.api.ChainGetRandomnessFromBeacon(ctx, ts.Key(), crypto.DomainSeparationTag_WindowedPoStChallengeSeed, di.Challenge, buf.Bytes()) if err != nil { - return nil, xerrors.Errorf("failed to get chain randomness for window post (ts=%d; deadline=%d): %w", ts.Height(), di, err) + return nil, xerrors.Errorf("failed to get chain randomness from beacon for window post (ts=%d; deadline=%d): %w", ts.Height(), di, err) } // Get the partitions for the given deadline @@ -536,19 +609,6 @@ func (s *WindowPoStScheduler) runPost(ctx context.Context, di dline.Info, ts *ty posts = append(posts, params) } - // Compute randomness after generating proofs so as to reduce the impact - // of chain reorgs (which change randomness) - commEpoch := di.Open - commRand, err := s.api.ChainGetRandomnessFromTickets(ctx, ts.Key(), crypto.DomainSeparationTag_PoStChainCommit, commEpoch, nil) - if err != nil { - return nil, xerrors.Errorf("failed to get chain randomness for window post (ts=%d; deadline=%d): %w", ts.Height(), commEpoch, err) - } - - for i := range posts { - posts[i].ChainCommitEpoch = commEpoch - posts[i].ChainCommitRand = commRand - } - return posts, nil } @@ -589,6 +649,7 @@ func (s *WindowPoStScheduler) batchPartitions(partitions []api.Partition) ([][]a } batches = append(batches, partitions[i:end]) } + return batches, nil } diff --git a/storage/wdpost_run_test.go b/storage/wdpost_run_test.go index 10be2fbcd90..09b9aee5cb5 100644 --- a/storage/wdpost_run_test.go +++ b/storage/wdpost_run_test.go @@ -11,6 +11,7 @@ import ( "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-bitfield" + "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/go-state-types/crypto" @@ -177,7 +178,10 @@ func TestWDPostDoPost(t *testing.T) { FaultDeclarationCutoff: miner0.FaultDeclarationCutoff, } ts := mockTipSet(t) - scheduler.doPost(ctx, di, ts) + + scheduler.startGeneratePoST(ctx, ts, di, func(posts []miner.SubmitWindowedPoStParams, err error) { + scheduler.startSubmitPoST(ctx, ts, di, posts, func(err error) {}) + }) // Read the window PoST messages for i := 0; i < expectedMsgCount; i++ { diff --git a/storage/wdpost_sched.go b/storage/wdpost_sched.go index 7e60fd9eebd..3a76a219fd1 100644 --- a/storage/wdpost_sched.go +++ b/storage/wdpost_sched.go @@ -22,8 +22,6 @@ import ( "go.opencensus.io/trace" ) -const StartConfidence = 4 // TODO: config - type WindowPoStScheduler struct { api storageMinerApi feeCfg config.MinerFeeConfig @@ -31,16 +29,11 @@ type WindowPoStScheduler struct { faultTracker sectorstorage.FaultTracker proofType abi.RegisteredPoStProof partitionSectors uint64 + ch *changeHandler actor address.Address worker address.Address - cur *types.TipSet - - // if a post is in progress, this indicates for which ElectionPeriodStart - activeDeadline *dline.Info - abort context.CancelFunc - evtTypes [4]journal.EventType // failed abi.ChainEpoch // eps @@ -77,16 +70,17 @@ func NewWindowedPoStScheduler(api storageMinerApi, fc config.MinerFeeConfig, sb }, nil } -func deadlineEquals(a, b *dline.Info) bool { - if a == nil || b == nil { - return b == a - } - - return a.PeriodStart == b.PeriodStart && a.Index == b.Index && a.Challenge == b.Challenge +type changeHandlerAPIImpl struct { + storageMinerApi + *WindowPoStScheduler } func (s *WindowPoStScheduler) Run(ctx context.Context) { - defer s.abortActivePoSt() + // Initialize change handler + chImpl := &changeHandlerAPIImpl{storageMinerApi: s.api, WindowPoStScheduler: s} + s.ch = newChangeHandler(chImpl, s.actor) + defer s.ch.shutdown() + s.ch.start() var notifs <-chan []*api.HeadChange var err error @@ -125,9 +119,7 @@ func (s *WindowPoStScheduler) Run(ctx context.Context) { continue } - if err := s.update(ctx, chg.Val); err != nil { - log.Errorf("%+v", err) - } + s.update(ctx, nil, chg.Val) gotCur = true continue @@ -135,7 +127,7 @@ func (s *WindowPoStScheduler) Run(ctx context.Context) { ctx, span := trace.StartSpan(ctx, "WindowPoStScheduler.headChange") - var lowest, highest *types.TipSet = s.cur, nil + var lowest, highest *types.TipSet = nil, nil for _, change := range changes { if change.Val == nil { @@ -149,12 +141,7 @@ func (s *WindowPoStScheduler) Run(ctx context.Context) { } } - if err := s.revert(ctx, lowest); err != nil { - log.Error("handling head reverts in window post sched: %+v", err) - } - if err := s.update(ctx, highest); err != nil { - log.Error("handling head updates in window post sched: %+v", err) - } + s.update(ctx, lowest, highest) span.End() case <-ctx.Done(): @@ -163,95 +150,40 @@ func (s *WindowPoStScheduler) Run(ctx context.Context) { } } -func (s *WindowPoStScheduler) revert(ctx context.Context, newLowest *types.TipSet) error { - if s.cur == newLowest { - return nil +func (s *WindowPoStScheduler) update(ctx context.Context, revert, apply *types.TipSet) { + if apply == nil { + log.Error("no new tipset in window post WindowPoStScheduler.update") + return } - s.cur = newLowest - - newDeadline, err := s.api.StateMinerProvingDeadline(ctx, s.actor, newLowest.Key()) + err := s.ch.update(ctx, revert, apply) if err != nil { - return err - } - - if !deadlineEquals(s.activeDeadline, newDeadline) { - s.abortActivePoSt() + log.Errorf("handling head updates in window post sched: %+v", err) } - - return nil } -func (s *WindowPoStScheduler) update(ctx context.Context, new *types.TipSet) error { - if new == nil { - return xerrors.Errorf("no new tipset in window post sched update") - } - - di, err := s.api.StateMinerProvingDeadline(ctx, s.actor, new.Key()) - if err != nil { - return err - } - - if deadlineEquals(s.activeDeadline, di) { - return nil // already working on this deadline - } - - if !di.PeriodStarted() { - return nil // not proving anything yet - } - - s.abortActivePoSt() - - // TODO: wait for di.Challenge here, will give us ~10min more to compute windowpost - // (Need to get correct deadline above, which is tricky) - - if di.Open+StartConfidence >= new.Height() { - log.Info("not starting window post yet, waiting for startconfidence", di.Open, di.Open+StartConfidence, new.Height()) - return nil - } - - /*s.failLk.Lock() - if s.failed > 0 { - s.failed = 0 - s.activeEPS = 0 - } - s.failLk.Unlock()*/ - log.Infof("at %d, do window post for P %d, dd %d", new.Height(), di.PeriodStart, di.Index) - - s.doPost(ctx, di, new) - - return nil -} - -func (s *WindowPoStScheduler) abortActivePoSt() { - if s.activeDeadline == nil { - return // noop - } - - if s.abort != nil { - s.abort() - - journal.J.RecordEvent(s.evtTypes[evtTypeWdPoStScheduler], func() interface{} { - return WdPoStSchedulerEvt{ - evtCommon: s.getEvtCommon(nil), - State: SchedulerStateAborted, - } - }) - - log.Warnf("Aborting window post (Deadline: %+v)", s.activeDeadline) - } - - s.activeDeadline = nil - s.abort = nil +// onAbort is called when generating proofs or submitting proofs is aborted +func (s *WindowPoStScheduler) onAbort(ts *types.TipSet, deadline *dline.Info) { + journal.J.RecordEvent(s.evtTypes[evtTypeWdPoStScheduler], func() interface{} { + c := evtCommon{} + if ts != nil { + c.Deadline = deadline + c.Height = ts.Height() + c.TipSet = ts.Cids() + } + return WdPoStSchedulerEvt{ + evtCommon: c, + State: SchedulerStateAborted, + } + }) } -// getEvtCommon populates and returns common attributes from state, for a -// WdPoSt journal event. func (s *WindowPoStScheduler) getEvtCommon(err error) evtCommon { c := evtCommon{Error: err} - if s.cur != nil { - c.Deadline = s.activeDeadline - c.Height = s.cur.Height() - c.TipSet = s.cur.Cids() + currentTS, currentDeadline := s.ch.currentTSDI() + if currentTS != nil { + c.Deadline = currentDeadline + c.Height = currentTS.Height() + c.TipSet = currentTS.Cids() } return c }