Skip to content

Commit

Permalink
Merge pull request #790 from dotmesh-io/788-dm-clone-waits-for-commit
Browse files Browse the repository at this point in the history
#788: Before we transition to active, make sure we finish snapshotting.
  • Loading branch information
Itamar Turner-Trauring committed Jan 23, 2020
2 parents ea06e42 + d9177ce commit 54a7ced
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 21 deletions.
17 changes: 17 additions & 0 deletions pkg/fsm/fsm_discovering.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package fsm

import (
"github.com/dotmesh-io/dotmesh/pkg/store"
"github.com/dotmesh-io/dotmesh/pkg/types"

log "github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -47,3 +48,19 @@ func discoveringState(f *FsMachine) StateFn {
}
}
}

// If we initiated pull or push, we're only really finished after discovery,
// otherwise in-memory state won't match on-disk state and we can get race
// conditions due to that. So this wraps discovery and then only marks transfer
// as finished when discovery is done.
func discoveringAfterTransferInitiatorState(f *FsMachine) StateFn {
result := discoveringState(f)
f.transferUpdates <- types.TransferUpdate{
Kind: types.TransferStatus,
Changes: types.TransferPollResult{
Status: "finished",
Message: "",
},
}
return result
}
12 changes: 9 additions & 3 deletions pkg/fsm/fsm_missing.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,14 +239,20 @@ func missingState(f *FsMachine) StateFn {
}
return backoffState
} else {
f.innerResponses <- &types.Event{Name: "created"}
f.snapshot(&types.Event{Name: "snapshot",
responseEvent, nextState := f.snapshot(&types.Event{Name: "snapshot",
Args: &types.EventArgs{"metadata": map[string]string{
"type": "dotmesh.initial",
"message": "Initial commit",
"author": "admin",
}}})
return activeState
if responseEvent.Name == "snapshotted" {
f.innerResponses <- &types.Event{Name: "created"}
return activeState
} else {
// Error snapshotting:
f.innerResponses <- responseEvent
return nextState
}
}
} else {
f.innerResponses <- responseEvent
Expand Down
10 changes: 1 addition & 9 deletions pkg/fsm/fsm_pull_initiator.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,18 +309,10 @@ func (f *FsMachine) pull(
}, backoffState
}

f.transferUpdates <- types.TransferUpdate{
Kind: types.TransferStatus,
Changes: types.TransferPollResult{
Status: "finished",
Message: "",
},
}

log.Printf("Successfully received %s => %s for %s", fromSnapshotId, toSnapshotId, toFilesystemId)
return &types.Event{
Name: "finished-pull",
}, discoveringState
}, discoveringAfterTransferInitiatorState
}

func (f *FsMachine) retryPull(
Expand Down
10 changes: 1 addition & 9 deletions pkg/fsm/fsm_push_initiator.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,19 +353,11 @@ func (f *FsMachine) push(

pipeReader.Close()

f.transferUpdates <- types.TransferUpdate{
Kind: types.TransferStatus,
Changes: types.TransferPollResult{
Status: "finished",
Message: "",
},
}

// TODO update the transfer record, release the peer state machines
return &types.Event{
Name: "finished-push",
Args: &types.EventArgs{},
}, discoveringState
}, discoveringAfterTransferInitiatorState
}

func stash(filesystemId, snapId string, client *dmclient.JsonRpcClient, ctx context.Context) (*types.Event, StateFn) {
Expand Down

0 comments on commit 54a7ced

Please sign in to comment.