Skip to content

Commit

Permalink
#461: syntax error
Browse files Browse the repository at this point in the history
  • Loading branch information
Charlotte committed Jun 22, 2018
1 parent 34e0bec commit 69cd88a
Showing 1 changed file with 64 additions and 62 deletions.
126 changes: 64 additions & 62 deletions cmd/dotmesh-server/pkg/main/rpc.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package main

import (
"github.com/aws/aws-sdk-go"
"encoding/json"
"fmt"
"log"
Expand All @@ -11,6 +10,10 @@ import (
"strings"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/coreos/etcd/client"
"golang.org/x/net/context"
)
Expand Down Expand Up @@ -1042,69 +1045,11 @@ func (d *DotmeshRPC) GetTransfer(
return nil
}

// Register a transfer from an initiator (the cluster where the user initially
// connected) to a peer (the cluster which will be the target of a push/pull).
func (d *DotmeshRPC) RegisterTransfer(
r *http.Request,
args *TransferPollResult,
result *bool,
) error {
log.Printf("[RegisterTransfer] called with args: %+v", args)
serialized, err := json.Marshal(args)
if err != nil {
return err
}
kapi, err := getEtcdKeysApi()
if err != nil {
return err
}

_, err = kapi.Set(
context.Background(),
fmt.Sprintf(
"%s/filesystems/transfers/%s", ETCD_PREFIX, args.TransferRequestId,
),
string(serialized),
nil,
)
if err != nil {
return err
}
// XXX A transfer should be able to span multiple filesystemIds, really. So
// tying a transfer to a filesystem id is probably wrong. except, the thing
// being updated is a specific branch (filesystem id), it's ok if it drags
// dependent snapshots along with it.
responseChan, err := d.state.globalFsRequest(args.FilesystemId, &Event{
Name: "peer-transfer",
Args: &EventArgs{
"Transfer": args,
},
})
if err != nil {
return err
}

// Block until the fsmachine is ready to transfer
log.Printf("[RegisterTransfer:%s] waiting for ack from the fsmachine...", args.FilesystemId)
e := <-responseChan
log.Printf("[RegisterTransfer:%s] received ack from the fsmachine: %+v", args.FilesystemId, e)

if e.Name != "awaiting-transfer" {
// Something went wrong!
return fmt.Errorf("Error requesting peer transfer: %+v", e)
} else {
return nil
}

return nil
}

func (d *DotmeshRPC) S3Transfer(
r *http.Request,
args *S3TransferRequest,
result *string,
) error
{
) error {
switch args.Direction {
case "push":
// TODO do we need this check, should it be different? Suspect s3 just requires there's no `.` etc but that's something the user will need to do on AWS anyway, unless we're allowing create-if-not-exists for pushes
Expand Down Expand Up @@ -1161,10 +1106,9 @@ func (d *DotmeshRPC) S3Transfer(
return err
}
filesystemId = remoteFilesystemId
}
}
// TODO: look at the logic for filesystem combinations, there was a bunch of logic for pulling/pushing fsystems that were in sync or had dirty changes. Not sure we need or can use those


// Now run globalFsRequest, returning the request id, to make the master of
// a (possibly nonexisting) filesystem start pulling or pushing it, and
// make it update status as it goes in a new pollable "transfers" object in
Expand All @@ -1191,6 +1135,64 @@ func (d *DotmeshRPC) S3Transfer(
*result = requestId
return nil
}

// Register a transfer from an initiator (the cluster where the user initially
// connected) to a peer (the cluster which will be the target of a push/pull).
func (d *DotmeshRPC) RegisterTransfer(
r *http.Request,
args *TransferPollResult,
result *bool,
) error {
log.Printf("[RegisterTransfer] called with args: %+v", args)
serialized, err := json.Marshal(args)
if err != nil {
return err
}
kapi, err := getEtcdKeysApi()
if err != nil {
return err
}

_, err = kapi.Set(
context.Background(),
fmt.Sprintf(
"%s/filesystems/transfers/%s", ETCD_PREFIX, args.TransferRequestId,
),
string(serialized),
nil,
)
if err != nil {
return err
}
// XXX A transfer should be able to span multiple filesystemIds, really. So
// tying a transfer to a filesystem id is probably wrong. except, the thing
// being updated is a specific branch (filesystem id), it's ok if it drags
// dependent snapshots along with it.
responseChan, err := d.state.globalFsRequest(args.FilesystemId, &Event{
Name: "peer-transfer",
Args: &EventArgs{
"Transfer": args,
},
})
if err != nil {
return err
}

// Block until the fsmachine is ready to transfer
log.Printf("[RegisterTransfer:%s] waiting for ack from the fsmachine...", args.FilesystemId)
e := <-responseChan
log.Printf("[RegisterTransfer:%s] received ack from the fsmachine: %+v", args.FilesystemId, e)

if e.Name != "awaiting-transfer" {
// Something went wrong!
return fmt.Errorf("Error requesting peer transfer: %+v", e)
} else {
return nil
}

return nil
}

// Need both push and pull because one cluster will often be behind NAT.
// Transfer will immediately return a transferId which can be queried until
// completion
Expand Down

0 comments on commit 69cd88a

Please sign in to comment.