Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sectorstorage: Fix manager restart edge-case #4645

Merged
merged 2 commits into from
Oct 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 26 additions & 12 deletions extern/sector-storage/manager_calltracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,11 +251,36 @@ func (m *Manager) waitWork(ctx context.Context, wid WorkID) (interface{}, error)
return nil, xerrors.Errorf("something else in waiting on callRes")
}

done := func() {
delete(m.results, wid)

_, ok := m.callToWork[ws.WorkerCall]
if ok {
delete(m.callToWork, ws.WorkerCall)
}

err := m.work.Get(wk).End()
if err != nil {
// Not great, but not worth discarding potentially multi-hour computation over this
log.Errorf("marking work as done: %+v", err)
}
}

// the result can already be there if the work was running, manager restarted,
// and the worker has delivered the result before we entered waitWork
res, ok := m.results[wid]
if ok {
done()
m.workLk.Unlock()
return res.r, res.err
}

ch, ok := m.waitRes[wid]
if !ok {
ch = make(chan struct{})
m.waitRes[wid] = ch
}

m.workLk.Unlock()

select {
Expand All @@ -264,18 +289,7 @@ func (m *Manager) waitWork(ctx context.Context, wid WorkID) (interface{}, error)
defer m.workLk.Unlock()

res := m.results[wid]
delete(m.results, wid)

_, ok := m.callToWork[ws.WorkerCall]
if ok {
delete(m.callToWork, ws.WorkerCall)
}

err := m.work.Get(wk).End()
if err != nil {
// Not great, but not worth discarding potentially multi-hour computation over this
log.Errorf("marking work as done: %+v", err)
}
done()

return res.r, res.err
case <-ctx.Done():
Expand Down
120 changes: 72 additions & 48 deletions extern/sector-storage/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,76 +210,100 @@ func TestRedoPC1(t *testing.T) {

// Manager restarts in the middle of a task, restarts it, it completes
func TestRestartManager(t *testing.T) {
logging.SetAllLoggers(logging.LevelDebug)
test := func(returnBeforeCall bool) func(*testing.T) {
return func(t *testing.T) {
logging.SetAllLoggers(logging.LevelDebug)

ctx, done := context.WithCancel(context.Background())
defer done()
ctx, done := context.WithCancel(context.Background())
defer done()

ds := datastore.NewMapDatastore()
ds := datastore.NewMapDatastore()

m, lstor, _, _, cleanup := newTestMgr(ctx, t, ds)
defer cleanup()
m, lstor, _, _, cleanup := newTestMgr(ctx, t, ds)
defer cleanup()

localTasks := []sealtasks.TaskType{
sealtasks.TTAddPiece, sealtasks.TTPreCommit1, sealtasks.TTCommit1, sealtasks.TTFinalize, sealtasks.TTFetch,
}
localTasks := []sealtasks.TaskType{
sealtasks.TTAddPiece, sealtasks.TTPreCommit1, sealtasks.TTCommit1, sealtasks.TTFinalize, sealtasks.TTFetch,
}

tw := newTestWorker(WorkerConfig{
SealProof: abi.RegisteredSealProof_StackedDrg2KiBV1,
TaskTypes: localTasks,
}, lstor, m)
tw := newTestWorker(WorkerConfig{
SealProof: abi.RegisteredSealProof_StackedDrg2KiBV1,
TaskTypes: localTasks,
}, lstor, m)

err := m.AddWorker(ctx, tw)
require.NoError(t, err)
err := m.AddWorker(ctx, tw)
require.NoError(t, err)

sid := abi.SectorID{Miner: 1000, Number: 1}
sid := abi.SectorID{Miner: 1000, Number: 1}

pi, err := m.AddPiece(ctx, sid, nil, 1016, strings.NewReader(strings.Repeat("testthis", 127)))
require.NoError(t, err)
require.Equal(t, abi.PaddedPieceSize(1024), pi.Size)
pi, err := m.AddPiece(ctx, sid, nil, 1016, strings.NewReader(strings.Repeat("testthis", 127)))
require.NoError(t, err)
require.Equal(t, abi.PaddedPieceSize(1024), pi.Size)

piz, err := m.AddPiece(ctx, sid, nil, 1016, bytes.NewReader(make([]byte, 1016)[:]))
require.NoError(t, err)
require.Equal(t, abi.PaddedPieceSize(1024), piz.Size)
piz, err := m.AddPiece(ctx, sid, nil, 1016, bytes.NewReader(make([]byte, 1016)[:]))
require.NoError(t, err)
require.Equal(t, abi.PaddedPieceSize(1024), piz.Size)

pieces := []abi.PieceInfo{pi, piz}
pieces := []abi.PieceInfo{pi, piz}

ticket := abi.SealRandomness{0, 9, 9, 9, 9, 9, 9, 9}
ticket := abi.SealRandomness{0, 9, 9, 9, 9, 9, 9, 9}

tw.pc1lk.Lock()
tw.pc1wait = &sync.WaitGroup{}
tw.pc1wait.Add(1)
tw.pc1lk.Lock()
tw.pc1wait = &sync.WaitGroup{}
tw.pc1wait.Add(1)

var cwg sync.WaitGroup
cwg.Add(1)
var cwg sync.WaitGroup
cwg.Add(1)

var perr error
go func() {
defer cwg.Done()
_, perr = m.SealPreCommit1(ctx, sid, ticket, pieces)
}()
var perr error
go func() {
defer cwg.Done()
_, perr = m.SealPreCommit1(ctx, sid, ticket, pieces)
}()

tw.pc1wait.Wait()
tw.pc1wait.Wait()

require.NoError(t, m.Close(ctx))
tw.ret = nil
require.NoError(t, m.Close(ctx))
tw.ret = nil

cwg.Wait()
require.Error(t, perr)
cwg.Wait()
require.Error(t, perr)

m, _, _, _, cleanup2 := newTestMgr(ctx, t, ds)
defer cleanup2()
m, _, _, _, cleanup2 := newTestMgr(ctx, t, ds)
defer cleanup2()

tw.ret = m // simulate jsonrpc auto-reconnect
err = m.AddWorker(ctx, tw)
require.NoError(t, err)
tw.ret = m // simulate jsonrpc auto-reconnect
err = m.AddWorker(ctx, tw)
require.NoError(t, err)

tw.pc1lk.Unlock()
if returnBeforeCall {
tw.pc1lk.Unlock()
time.Sleep(100 * time.Millisecond)

_, err = m.SealPreCommit1(ctx, sid, ticket, pieces)
require.NoError(t, err)
_, err = m.SealPreCommit1(ctx, sid, ticket, pieces)
} else {
done := make(chan struct{})
go func() {
defer close(done)
_, err = m.SealPreCommit1(ctx, sid, ticket, pieces)
}()

time.Sleep(100 * time.Millisecond)
tw.pc1lk.Unlock()
<-done
}

require.NoError(t, err)

require.Equal(t, 1, tw.pc1s)

ws := m.WorkerJobs()
require.Empty(t, ws)
}
}

require.Equal(t, 1, tw.pc1s)
t.Run("callThenReturn", test(false))
t.Run("returnThenCall", test(true))
}

// Worker restarts in the middle of a task, task fails after restart
Expand Down
47 changes: 3 additions & 44 deletions extern/sector-storage/testworker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,11 @@ package sectorstorage

import (
"context"
"io"
"sync"

"github.com/google/uuid"
"github.com/ipfs/go-cid"

"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/specs-storage/storage"
"github.com/google/uuid"

"github.com/filecoin-project/lotus/extern/sector-storage/mock"
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
Expand All @@ -29,6 +26,8 @@ type testWorker struct {
pc1wait *sync.WaitGroup

session uuid.UUID

Worker
}

func newTestWorker(wcfg WorkerConfig, lstor *stores.Local, ret storiface.WorkerReturn) *testWorker {
Expand Down Expand Up @@ -64,18 +63,6 @@ func (t *testWorker) asyncCall(sector abi.SectorID, work func(ci storiface.CallI
return ci, nil
}

func (t *testWorker) NewSector(ctx context.Context, sector abi.SectorID) error {
panic("implement me")
}

func (t *testWorker) UnsealPiece(ctx context.Context, id abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, cid cid.Cid) (storiface.CallID, error) {
panic("implement me")
}

func (t *testWorker) ReadPiece(ctx context.Context, writer io.Writer, id abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (storiface.CallID, error) {
panic("implement me")
}

func (t *testWorker) AddPiece(ctx context.Context, sector abi.SectorID, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (storiface.CallID, error) {
return t.asyncCall(sector, func(ci storiface.CallID) {
p, err := t.mockSeal.AddPiece(ctx, sector, pieceSizes, newPieceSize, pieceData)
Expand Down Expand Up @@ -103,34 +90,6 @@ func (t *testWorker) SealPreCommit1(ctx context.Context, sector abi.SectorID, ti
})
}

func (t *testWorker) SealPreCommit2(ctx context.Context, sector abi.SectorID, pc1o storage.PreCommit1Out) (storiface.CallID, error) {
panic("implement me")
}

func (t *testWorker) SealCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storiface.CallID, error) {
panic("implement me")
}

func (t *testWorker) SealCommit2(ctx context.Context, sector abi.SectorID, c1o storage.Commit1Out) (storiface.CallID, error) {
panic("implement me")
}

func (t *testWorker) FinalizeSector(ctx context.Context, sector abi.SectorID, keepUnsealed []storage.Range) (storiface.CallID, error) {
panic("implement me")
}

func (t *testWorker) ReleaseUnsealed(ctx context.Context, sector abi.SectorID, safeToFree []storage.Range) (storiface.CallID, error) {
panic("implement me")
}

func (t *testWorker) Remove(ctx context.Context, sector abi.SectorID) (storiface.CallID, error) {
panic("implement me")
}

func (t *testWorker) MoveStorage(ctx context.Context, sector abi.SectorID, types storiface.SectorFileType) (storiface.CallID, error) {
panic("implement me")
}

func (t *testWorker) Fetch(ctx context.Context, sector abi.SectorID, fileType storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) (storiface.CallID, error) {
return t.asyncCall(sector, func(ci storiface.CallID) {
if err := t.ret.ReturnFetch(ctx, ci, ""); err != nil {
Expand Down