diff --git a/cmd/dotmesh-server/pkg/main/rpc.go b/cmd/dotmesh-server/pkg/main/rpc.go index 6bc42e311..6994f921d 100644 --- a/cmd/dotmesh-server/pkg/main/rpc.go +++ b/cmd/dotmesh-server/pkg/main/rpc.go @@ -1,7 +1,6 @@ package main import ( - "github.com/aws/aws-sdk-go" "encoding/json" "fmt" "log" @@ -11,6 +10,10 @@ import ( "strings" "time" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/s3" "github.com/coreos/etcd/client" "golang.org/x/net/context" ) @@ -1042,69 +1045,11 @@ func (d *DotmeshRPC) GetTransfer( return nil } -// Register a transfer from an initiator (the cluster where the user initially -// connected) to a peer (the cluster which will be the target of a push/pull). -func (d *DotmeshRPC) RegisterTransfer( - r *http.Request, - args *TransferPollResult, - result *bool, -) error { - log.Printf("[RegisterTransfer] called with args: %+v", args) - serialized, err := json.Marshal(args) - if err != nil { - return err - } - kapi, err := getEtcdKeysApi() - if err != nil { - return err - } - - _, err = kapi.Set( - context.Background(), - fmt.Sprintf( - "%s/filesystems/transfers/%s", ETCD_PREFIX, args.TransferRequestId, - ), - string(serialized), - nil, - ) - if err != nil { - return err - } - // XXX A transfer should be able to span multiple filesystemIds, really. So - // tying a transfer to a filesystem id is probably wrong. except, the thing - // being updated is a specific branch (filesystem id), it's ok if it drags - // dependent snapshots along with it. - responseChan, err := d.state.globalFsRequest(args.FilesystemId, &Event{ - Name: "peer-transfer", - Args: &EventArgs{ - "Transfer": args, - }, - }) - if err != nil { - return err - } - - // Block until the fsmachine is ready to transfer - log.Printf("[RegisterTransfer:%s] waiting for ack from the fsmachine...", args.FilesystemId) - e := <-responseChan - log.Printf("[RegisterTransfer:%s] received ack from the fsmachine: %+v", args.FilesystemId, e) - - if e.Name != "awaiting-transfer" { - // Something went wrong! - return fmt.Errorf("Error requesting peer transfer: %+v", e) - } else { - return nil - } - - return nil -} - func (d *DotmeshRPC) S3Transfer( r *http.Request, args *S3TransferRequest, result *string, -) error -{ +) error { switch args.Direction { case "push": // TODO do we need this check, should it be different? Suspect s3 just requires there's no `.` etc but that's something the user will need to do on AWS anyway, unless we're allowing create-if-not-exists for pushes @@ -1161,10 +1106,9 @@ func (d *DotmeshRPC) S3Transfer( return err } filesystemId = remoteFilesystemId - } + } // TODO: look at the logic for filesystem combinations, there was a bunch of logic for pulling/pushing fsystems that were in sync or had dirty changes. Not sure we need or can use those - // Now run globalFsRequest, returning the request id, to make the master of // a (possibly nonexisting) filesystem start pulling or pushing it, and // make it update status as it goes in a new pollable "transfers" object in @@ -1191,6 +1135,64 @@ func (d *DotmeshRPC) S3Transfer( *result = requestId return nil } + +// Register a transfer from an initiator (the cluster where the user initially +// connected) to a peer (the cluster which will be the target of a push/pull). +func (d *DotmeshRPC) RegisterTransfer( + r *http.Request, + args *TransferPollResult, + result *bool, +) error { + log.Printf("[RegisterTransfer] called with args: %+v", args) + serialized, err := json.Marshal(args) + if err != nil { + return err + } + kapi, err := getEtcdKeysApi() + if err != nil { + return err + } + + _, err = kapi.Set( + context.Background(), + fmt.Sprintf( + "%s/filesystems/transfers/%s", ETCD_PREFIX, args.TransferRequestId, + ), + string(serialized), + nil, + ) + if err != nil { + return err + } + // XXX A transfer should be able to span multiple filesystemIds, really. So + // tying a transfer to a filesystem id is probably wrong. except, the thing + // being updated is a specific branch (filesystem id), it's ok if it drags + // dependent snapshots along with it. + responseChan, err := d.state.globalFsRequest(args.FilesystemId, &Event{ + Name: "peer-transfer", + Args: &EventArgs{ + "Transfer": args, + }, + }) + if err != nil { + return err + } + + // Block until the fsmachine is ready to transfer + log.Printf("[RegisterTransfer:%s] waiting for ack from the fsmachine...", args.FilesystemId) + e := <-responseChan + log.Printf("[RegisterTransfer:%s] received ack from the fsmachine: %+v", args.FilesystemId, e) + + if e.Name != "awaiting-transfer" { + // Something went wrong! + return fmt.Errorf("Error requesting peer transfer: %+v", e) + } else { + return nil + } + + return nil +} + // Need both push and pull because one cluster will often be behind NAT. // Transfer will immediately return a transferId which can be queried until // completion