Skip to content

Commit

Permalink
#461: begin processing of s3 transfers in state machine
Browse files Browse the repository at this point in the history
  • Loading branch information
Charlotte committed Jun 27, 2018
1 parent 61fc20b commit 5709ddb
Showing 1 changed file with 141 additions and 1 deletion.
142 changes: 141 additions & 1 deletion cmd/dotmesh-server/pkg/main/statemachines.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
"time"

"github.com/coreos/etcd/client"
"github.com/nu7hatch/gouuid"
uuid "github.com/nu7hatch/gouuid"
"golang.org/x/net/context"
)

Expand Down Expand Up @@ -1192,6 +1192,25 @@ func (f *fsMachine) attemptReceive() bool {
}
}

func s3TransferRequestify(in interface{}) (S3TransferRequest, error) {
typed, ok := in.(map[string]interface{})
if !ok {
log.Printf("[s3TransferRequestify] Unable to cast %s to map[string]interface{}", in)
return S3TransferRequest{}, fmt.Errorf(
"Unable to cast %s to map[string]interface{}", in,
)
}
return S3TransferRequest{
KeyID: typed["KeyID"].(string),
SecretKey: typed["SecretKey"].(string),
Direction: typed["Direction"].(string),
LocalNamespace: typed["LocalNamespace"].(string),
LocalName: typed["LocalName"].(string),
LocalBranchName: typed["LocalBranchName"].(string),
RemoteName: typed["RemoteName"].(string),
}, nil
}

func transferRequestify(in interface{}) (TransferRequest, error) {
typed, ok := in.(map[string]interface{})
if !ok {
Expand Down Expand Up @@ -1309,6 +1328,63 @@ func missingState(f *fsMachine) stateFn {
} else if f.lastTransferRequest.Direction == "pull" {
return pullInitiatorState
}
} else if e.Name == "s3-transfer" {
log.Printf("GOT S3 TRANSFER REQUEST (while missing) %+v", e.Args)

// TODO dedupe
transferRequest, err := s3TransferRequestify((*e.Args)["Transfer"])
if err != nil {
f.innerResponses <- &Event{
Name: "cant-cast-s3-transfer-request",
Args: &EventArgs{"err": err},
}
return backoffState
}
f.lastS3TransferRequest = transferRequest
transferRequestId, ok := (*e.Args)["RequestId"].(string)
if !ok {
f.innerResponses <- &Event{
Name: "cant-cast-s3-transfer-requestid",
Args: &EventArgs{"err": err},
}
return backoffState
}
f.lastTransferRequestId = transferRequestId

if f.lastS3TransferRequest.Direction == "push" {
// Can't push when we're missing.
f.innerResponses <- &Event{
Name: "cant-push-while-missing",
Args: &EventArgs{"request": e, "node": f.state.myNodeId},
}
return backoffState
} else if f.lastS3TransferRequest.Direction == "pull" {
log.Printf("%s %s %s", ZFS, "create", fq(f.filesystemId))
out, err := exec.Command(ZFS, "create", fq(f.filesystemId)).CombinedOutput()
if err != nil {
log.Printf("%v while trying to create %s", err, fq(f.filesystemId))
f.innerResponses <- &Event{
Name: "failed-create",
Args: &EventArgs{"err": err, "combined-output": string(out)},
}
return backoffState
}
responseEvent, _ := f.mount()
if responseEvent.Name == "mounted" {

return s3PullInitiatorState
} else {
f.innerResponses <- responseEvent
return backoffState
}
} else {
log.Printf("Unknown direction %s, going to backoff", f.lastS3TransferRequest.Direction)
f.innerResponses <- &Event{
Name: "failed-s3-transfer",
Args: &EventArgs{"unknown-direction": f.lastS3TransferRequest.Direction},
}
return backoffState
}
} else if e.Name == "peer-transfer" {
// A transfer has been registered. Try to go into the appropriate
// state.
Expand Down Expand Up @@ -2893,6 +2969,69 @@ func pushPeerState(f *fsMachine) stateFn {
}
}

func s3PullInitiatorState(f *fsMachine) stateFn {
f.transitionedTo("s3PullInitiatorState", "requesting")
containers, err := f.containersRunning()
if err != nil {
log.Printf(
"Can't pull into filesystem while we can't list whether containers are using it",
)
f.innerResponses <- &Event{
Name: "error-listing-containers-during-pull",
Args: &EventArgs{"err": err},
}
return backoffState
}
if len(containers) > 0 {
log.Printf("Can't pull into filesystem while containers are using it")
f.innerResponses <- &Event{
Name: "cannot-pull-while-containers-running",
Args: &EventArgs{"containers": containers},
}
return backoffState
}
// transferRequest := f.lastS3TransferRequest
// transferRequestId := f.lastTransferRequestId
destPath := fmt.Sprintf("%s/__default__", mnt(f.filesystemId))
// TODO: take the contents of the bucket
// dump it in destpath
if _, err := os.Stat(destPath); err != nil {
if os.IsNotExist(err) {
if err := os.MkdirAll(destPath, 0777); err != nil {
log.Printf("[s3PullInitiatorState] error creating subdot %s: %+v", destPath, err)
f.innerResponses <- &Event{
Name: "error-creating-subdot",
Args: &EventArgs{"err": err, "destpath": destPath},
}
return backoffState
}
} else {
log.Printf("[s3PullInitiatorState] error statting subdot %s: %+v", destPath, err)
f.innerResponses <- &Event{
Name: "error-statting-subdot",
Args: &EventArgs{"err": err, "destpath": destPath},
}
return backoffState
}
}
log.Printf("CRG TEST PATH %s/hello-world", fmt.Sprintf("%s/hello-world", destPath))
ioutil.WriteFile(fmt.Sprintf("%s/hello-world", destPath), []byte("hello, world, s3"), 0644)

// commit it
response, _ := f.snapshot(&Event{Name: "snapshot",
Args: &EventArgs{"metadata": metadata{"message": "pull from s3"}}})
if response.Name != "snapshotted" {
log.Printf("Couldn't commit - response %#v", response)
f.innerResponses <- response
return backoffState
}
f.innerResponses <- &Event{
Name: "s3-transferred",
Args: &EventArgs{},
}
return discoveringState
}

func pullInitiatorState(f *fsMachine) stateFn {
f.transitionedTo("pullInitiatorState", "requesting")
// this is a write state. refuse to act if containers are running
Expand Down Expand Up @@ -2921,6 +3060,7 @@ func pullInitiatorState(f *fsMachine) stateFn {

transferRequest := f.lastTransferRequest
transferRequestId := f.lastTransferRequestId
// TODO work out whether it's an S3 transfer or a DM transfer

// TODO dedupe what follows wrt pushInitiatorState!
client := NewJsonRpcClient(
Expand Down

0 comments on commit 5709ddb

Please sign in to comment.