Skip to content

Commit

Permalink
CancelTransfer implemented.
Browse files Browse the repository at this point in the history
  • Loading branch information
Antoon Prins committed Nov 4, 2020
1 parent 7b92247 commit 7e1d7b1
Showing 1 changed file with 109 additions and 50 deletions.
159 changes: 109 additions & 50 deletions pkg/datatx/driver/rclone/rclone.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,15 +78,61 @@ func (driver *Rclone) DoTransfer(srcRemote string, srcPath string, srcToken stri
// 1. prepare config: add src/dest remotes

// 2. request for an async rclone call
jobID, err := driver.rcloneTransfer(srcRemote, srcPath, srcToken, destRemote, destPath, destToken)
fmt.Printf("calling %v\n", driver.SenderEndpoint)
// convert remote names to rclone type fs ('remotename:')
rcloneReq := &rcloneAsyncReqJSON{
SrcRemote: strings.Trim(srcRemote, ":") + ":",
SrcPath: srcPath,
DstRemote: strings.Trim(destRemote, ":") + ":",
DstPath: destPath,
Async: true,
}

data, err := json.Marshal(rcloneReq)
if err != nil {
return nil, errors.Wrap(err, "error marshalling rclone req data")
}

// TODO handle directory transfers
transferFileMethod := "/operations/copyfile"
url := strings.Trim(driver.SenderEndpoint, "/") + transferFileMethod
req, err := http.NewRequest("POST", url, bytes.NewReader(data))
if err != nil {
return nil, errors.Wrap(err, "json: error framing post request")
}
req.Header.Set("Content-Type", "application/json")

// TODO if we want to use basic auth, should be configurable.
req.SetBasicAuth("rclone", "secret")

// TODO insecure should be configurable
client := rhttp.GetHTTPClient(rhttp.Insecure(true))
res, err := client.Do(req)
if err != nil {
err = errors.Wrap(err, "json: error sending post request")
return nil, err
}

defer res.Body.Close()

if res.StatusCode != http.StatusOK {
resBody, err := ioutil.ReadAll(res.Body)
if err != nil {
err = errors.Wrap(err, "json: error reading response body")
return nil, err
}
err = errors.Wrap(errors.New(fmt.Sprintf("%s: %s", res.Status, string(resBody))), "json: rclone request responded with error")
return nil, err
}

var resData rcloneAsyncResJSON
if err = json.NewDecoder(res.Body).Decode(&resData); err != nil {
err = errors.Wrap(err, "json: error decoding response data")
return nil, err
}

// create new Job from jobID
newJob := txdriver.Job{
JobID: jobID,
JobID: resData.JobID,
SrcRemote: txdriver.Remote{OpaqueName: srcRemote},
DestRemote: txdriver.Remote{OpaqueName: destRemote},
}
Expand Down Expand Up @@ -120,27 +166,26 @@ func (driver *Rclone) GetTransferStatus(job *txdriver.Job) (string, error) {

data, err := json.Marshal(rcloneStatusReq)
if err != nil {
return txdriver.StatusTransferFailed, errors.Wrap(err, "error marshalling rclone req data")
return txdriver.StatusInvalid, errors.Wrap(err, "error marshalling rclone req data")
}

// TODO handle directory transfers
transferFileMethod := "/job/status"
url := strings.Trim(driver.SenderEndpoint, "/") + transferFileMethod
req, err := http.NewRequest("POST", url, bytes.NewReader(data))
if err != nil {
return txdriver.StatusTransferFailed, errors.Wrap(err, "json: error framing post request")
return txdriver.StatusInvalid, errors.Wrap(err, "json: error framing post request")
}
req.Header.Set("Content-Type", "application/json")

// TODO Do we want to use this? Also should be configurable.
// TODO if we want to use basic auth, should be configurable.
req.SetBasicAuth("rclone", "secret")

// TODO insecure should be configurable
client := rhttp.GetHTTPClient(rhttp.Insecure(true))
res, err := client.Do(req)
if err != nil {
err = errors.Wrap(err, "json: error sending post request")
return txdriver.StatusTransferFailed, err
return txdriver.StatusInvalid, err
}

defer res.Body.Close()
Expand All @@ -152,19 +197,19 @@ func (driver *Rclone) GetTransferStatus(job *txdriver.Job) (string, error) {
resBody, e := ioutil.ReadAll(res.Body)
if e != nil {
e = errors.Wrap(e, "json: error reading response body")
return txdriver.StatusTransferFailed, e
return txdriver.StatusInvalid, e
}
err = errors.Wrap(errors.New(fmt.Sprintf("%s: %s", res.Status, string(resBody))), "json: rclone request responded with error")
return txdriver.StatusTransferFailed, err
return txdriver.StatusInvalid, err
}

var resData rcloneStatusResJSON
if err = json.NewDecoder(res.Body).Decode(&resData); err != nil {
return txdriver.StatusTransferFailed, errors.Wrap(err, "json: error decoding response data")
return txdriver.StatusInvalid, errors.Wrap(err, "json: error decoding response data")
}

if resData.Error != "" {
return txdriver.StatusTransferFailed, errors.New(resData.Error)
return txdriver.StatusInvalid, errors.New(resData.Error)
}

if resData.Finished && resData.Success {
Expand All @@ -179,80 +224,94 @@ func (driver *Rclone) GetTransferStatus(job *txdriver.Job) (string, error) {
return txdriver.StatusInvalid, nil
}

// CancelTransfer cancels the transfer defined by the specified job
func (driver *Rclone) CancelTransfer(job *txdriver.Job) (bool, error) {
return true, nil
}

// rcloneAsyncReqJSON the rclone operations/filecopy async method json request
type rcloneAsyncReqJSON struct {
SrcRemote string `json:"srcFs"`
SrcPath string `json:"srcRemote"`
// SrcToken string `json:"srcToken"`
DstRemote string `json:"dstFs"`
DstPath string `json:"dstRemote"`
// DstToken string `json:"srcToken"`
Async bool `json:"_async"`
// rcloneCancelTransferReqJSON the rclone job/stop method json request
type rcloneCancelTransferReqJSON struct {
JobID int64 `json:"jobid"`
}

// rcloneAsyncResJSON the rclone operations/copyfile async method json response
type rcloneAsyncResJSON struct {
JobID int64 `json:"jobid"`
// rcloneCancelTransferResJSON the rclone job/stop method json response
type rcloneCancelTransferResJSON struct {
Finished bool `json:"finished"`
Success bool `json:"success"`
ID int64 `json:"id"`
Error string `json:"error"`
Group string `json:"group"`
StartTime string `json:"startTime"`
EndTime string `json:"endTime"`
Duration float64 `json:"duration"`
// think we don't need this
// "output": {} // output of the job as would have been returned if called synchronously
}

func (driver *Rclone) rcloneTransfer(srcRemote string, srcPath string, srcToken string, dstRemote string, dstPath string, dstToken string) (int64, error) {
// convert remote names to rclone type fs ('remotename:')
rcloneReq := &rcloneAsyncReqJSON{
SrcRemote: strings.Trim(srcRemote, ":") + ":",
SrcPath: srcPath,
DstRemote: strings.Trim(dstRemote, ":") + ":",
DstPath: dstPath,
Async: true,
// CancelTransfer cancels the transfer defined by the specified job
func (driver *Rclone) CancelTransfer(job *txdriver.Job) (string, error) {
rcloneCancelTransferReq := &rcloneCancelTransferReqJSON{
JobID: job.JobID,
}

data, err := json.Marshal(rcloneReq)
data, err := json.Marshal(rcloneCancelTransferReq)
if err != nil {
return -1, errors.Wrap(err, "error marshalling rclone req data")
return txdriver.StatusInvalid, errors.Wrap(err, "error marshalling rclone req data")
}

// TODO handle directory transfers
transferFileMethod := "/operations/copyfile"
transferFileMethod := "/job/stop"
url := strings.Trim(driver.SenderEndpoint, "/") + transferFileMethod
req, err := http.NewRequest("POST", url, bytes.NewReader(data))
if err != nil {
return -1, errors.Wrap(err, "json: error framing post request")
return txdriver.StatusInvalid, errors.Wrap(err, "json: error framing post request")
}
req.Header.Set("Content-Type", "application/json")

// TODO Do we want to use this? Also should be configurable.
// TODO if we want to use basic auth, should be configurable.
req.SetBasicAuth("rclone", "secret")

// TODO insecure should be configurable
client := rhttp.GetHTTPClient(rhttp.Insecure(true))
res, err := client.Do(req)
if err != nil {
err = errors.Wrap(err, "json: error sending post request")
return -1, err
return txdriver.StatusInvalid, err
}

defer res.Body.Close()

if res.StatusCode != http.StatusOK {
// TODO "job not found" also gives a 500
// Should that return STATUS_INVALID ??
// at the minimum the returned error message should be the rclone error message
resBody, e := ioutil.ReadAll(res.Body)
if e != nil {
e = errors.Wrap(e, "json: error reading response body")
return -1, e
return txdriver.StatusInvalid, e
}
err = errors.Wrap(errors.New(fmt.Sprintf("%s: %s", res.Status, string(resBody))), "json: rclone request responded with error")
return -1, err
return txdriver.StatusInvalid, err
}

var resData rcloneAsyncResJSON
var resData rcloneCancelTransferResJSON
if err = json.NewDecoder(res.Body).Decode(&resData); err != nil {
err = errors.Wrap(err, "json: error decoding response data")
return -1, err
return txdriver.StatusInvalid, errors.Wrap(err, "json: error decoding response data")
}

if resData.Error != "" {
return txdriver.StatusTransferCancelFailed, errors.New(resData.Error)
}
// an empty response means success
return txdriver.StatusTransferCancelled, nil
}

return resData.JobID, nil
// rcloneAsyncReqJSON the rclone operations/filecopy async method json request
type rcloneAsyncReqJSON struct {
SrcRemote string `json:"srcFs"`
SrcPath string `json:"srcRemote"`
// SrcToken string `json:"srcToken"`
DstRemote string `json:"dstFs"`
DstPath string `json:"dstRemote"`
// DstToken string `json:"srcToken"`
Async bool `json:"_async"`
}

// rcloneAsyncResJSON the rclone operations/copyfile async method json response
type rcloneAsyncResJSON struct {
JobID int64 `json:"jobid"`
}

0 comments on commit 7e1d7b1

Please sign in to comment.