Skip to content

Commit

Permalink
Datatx pull model (#2052)
Browse files Browse the repository at this point in the history
  • Loading branch information
redblom authored Mar 16, 2022
1 parent dcb9b01 commit 559fccb
Show file tree
Hide file tree
Showing 20 changed files with 1,652 additions and 29 deletions.
6 changes: 6 additions & 0 deletions changelog/unreleased/pull-transfer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
Enhancement: New CS3API datatx methods

CS3 datatx pull model methods: PullTransfer, RetryTransfer, ListTransfers
Method CreateTransfer removed.

https://github.com/cs3org/reva/pull/2052
2 changes: 2 additions & 0 deletions cmd/reva/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ var (
transferCreateCommand(),
transferGetStatusCommand(),
transferCancelCommand(),
transferListCommand(),
transferRetryCommand(),
appTokensListCommand(),
appTokensRemoveCommand(),
appTokensCreateCommand(),
Expand Down
28 changes: 25 additions & 3 deletions cmd/reva/transfer-cancel.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,27 @@
package main

import (
"encoding/gob"
"errors"
"io"
"os"
"time"

rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
datatx "github.com/cs3org/go-cs3apis/cs3/tx/v1beta1"
"github.com/jedib0t/go-pretty/table"
)

func transferCancelCommand() *command {
cmd := newCommand("transfer-cancel")
cmd.Description = func() string { return "cancel a running transfer" }
cmd.Usage = func() string { return "Usage: transfer-cancel [-flags]" }
txID := cmd.String("txID", "", "the transfer identifier")
txID := cmd.String("txId", "", "the transfer identifier")

cmd.Action = func(w ...io.Writer) error {
// validate flags
if *txID == "" {
return errors.New("txID must be specified: use -txID flag\n" + cmd.Usage())
return errors.New("txId must be specified: use -txId flag\n" + cmd.Usage())
}

ctx := getAuthContext()
Expand All @@ -44,7 +48,9 @@ func transferCancelCommand() *command {
return err
}

cancelRequest := &datatx.CancelTransferRequest{}
cancelRequest := &datatx.CancelTransferRequest{
TxId: &datatx.TxId{OpaqueId: *txID},
}

cancelResponse, err := client.CancelTransfer(ctx, cancelRequest)
if err != nil {
Expand All @@ -54,6 +60,22 @@ func transferCancelCommand() *command {
return formatError(cancelResponse.Status)
}

if len(w) == 0 {
t := table.NewWriter()
t.SetOutputMirror(os.Stdout)
t.AppendHeader(table.Row{"ShareId.OpaqueId", "Id.OpaqueId", "Status", "Ctime"})
cTime := time.Unix(int64(cancelResponse.TxInfo.Ctime.Seconds), int64(cancelResponse.TxInfo.Ctime.Nanos))
t.AppendRows([]table.Row{
{cancelResponse.TxInfo.ShareId.OpaqueId, cancelResponse.TxInfo.Id.OpaqueId, cancelResponse.TxInfo.Status, cTime.Format("Mon Jan 2 15:04:05 -0700 MST 2006")},
})
t.Render()
} else {
enc := gob.NewEncoder(w[0])
if err := enc.Encode(cancelResponse.TxInfo); err != nil {
return err
}
}

return nil
}
return cmd
Expand Down
4 changes: 2 additions & 2 deletions cmd/reva/transfer-create.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ func transferCreateCommand() *command {
cmd.Description = func() string { return "create transfer between 2 sites" }
cmd.Usage = func() string { return "Usage: transfer-create [-flags] <path>" }
grantee := cmd.String("grantee", "", "the grantee, receiver of the transfer")
granteeType := cmd.String("granteeType", "user", "the grantee type, one of: user, group")
idp := cmd.String("idp", "", "the idp of the grantee, default to same idp as the user triggering the action")
granteeType := cmd.String("granteeType", "user", "the grantee type, one of: user, group (defaults to user)")
idp := cmd.String("idp", "", "the idp of the grantee")
userType := cmd.String("user-type", "primary", "the type of user account, defaults to primary")

cmd.Action = func(w ...io.Writer) error {
Expand Down
28 changes: 25 additions & 3 deletions cmd/reva/transfer-get-status.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,27 @@
package main

import (
"encoding/gob"
"errors"
"io"
"os"
"time"

rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
datatx "github.com/cs3org/go-cs3apis/cs3/tx/v1beta1"
"github.com/jedib0t/go-pretty/table"
)

func transferGetStatusCommand() *command {
cmd := newCommand("transfer-get-status")
cmd.Description = func() string { return "get the status of a transfer" }
cmd.Usage = func() string { return "Usage: transfer-get-status [-flags]" }
txID := cmd.String("txID", "", "the transfer identifier")
txID := cmd.String("txId", "", "the transfer identifier")

cmd.Action = func(w ...io.Writer) error {
// validate flags
if *txID == "" {
return errors.New("txID must be specified: use -txID flag\n" + cmd.Usage())
return errors.New("txId must be specified: use -txId flag\n" + cmd.Usage())
}

ctx := getAuthContext()
Expand All @@ -44,7 +48,9 @@ func transferGetStatusCommand() *command {
return err
}

getStatusRequest := &datatx.GetTransferStatusRequest{}
getStatusRequest := &datatx.GetTransferStatusRequest{
TxId: &datatx.TxId{OpaqueId: *txID},
}

getStatusResponse, err := client.GetTransferStatus(ctx, getStatusRequest)
if err != nil {
Expand All @@ -54,6 +60,22 @@ func transferGetStatusCommand() *command {
return formatError(getStatusResponse.Status)
}

if len(w) == 0 {
t := table.NewWriter()
t.SetOutputMirror(os.Stdout)
t.AppendHeader(table.Row{"ShareId.OpaqueId", "Id.OpaqueId", "Status", "Ctime"})
cTime := time.Unix(int64(getStatusResponse.TxInfo.Ctime.Seconds), int64(getStatusResponse.TxInfo.Ctime.Nanos))
t.AppendRows([]table.Row{
{getStatusResponse.TxInfo.ShareId.OpaqueId, getStatusResponse.TxInfo.Id.OpaqueId, getStatusResponse.TxInfo.Status, cTime.Format("Mon Jan 2 15:04:05 -0700 MST 2006")},
})
t.Render()
} else {
enc := gob.NewEncoder(w[0])
if err := enc.Encode(getStatusResponse.TxInfo); err != nil {
return err
}
}

return nil
}
return cmd
Expand Down
91 changes: 91 additions & 0 deletions cmd/reva/transfer-list.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright 2018-2021 CERN
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// In applying this license, CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.

package main

import (
"encoding/gob"
"io"
"os"

rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
ocm "github.com/cs3org/go-cs3apis/cs3/sharing/ocm/v1beta1"
datatx "github.com/cs3org/go-cs3apis/cs3/tx/v1beta1"
"github.com/jedib0t/go-pretty/table"
)

func transferListCommand() *command {
cmd := newCommand("transfer-list")
cmd.Description = func() string { return "get a list of transfers" }
cmd.Usage = func() string { return "Usage: transfer-list [-flags]" }
filterShareID := cmd.String("shareId", "", "share ID filter (optional)")

cmd.Action = func(w ...io.Writer) error {
ctx := getAuthContext()
client, err := getClient()
if err != nil {
return err
}

// validate flags
var filters []*datatx.ListTransfersRequest_Filter
if *filterShareID != "" {
filters = append(filters, &datatx.ListTransfersRequest_Filter{
Type: datatx.ListTransfersRequest_Filter_TYPE_SHARE_ID,
Term: &datatx.ListTransfersRequest_Filter_ShareId{
ShareId: &ocm.ShareId{
OpaqueId: *filterShareID,
},
},
})
}

transferslistRequest := &datatx.ListTransfersRequest{
Filters: filters,
}

listTransfersResponse, err := client.ListTransfers(ctx, transferslistRequest)
if err != nil {
return err
}
if listTransfersResponse.Status.Code != rpc.Code_CODE_OK {
return formatError(listTransfersResponse.Status)
}

if len(w) == 0 {
t := table.NewWriter()
t.SetOutputMirror(os.Stdout)
t.AppendHeader(table.Row{"ShareId.OpaqueId", "Id.OpaqueId"})

for _, s := range listTransfersResponse.Transfers {
t.AppendRows([]table.Row{
{s.ShareId.OpaqueId, s.Id.OpaqueId},
})
}
t.Render()
} else {
enc := gob.NewEncoder(w[0])
if err := enc.Encode(listTransfersResponse.Transfers); err != nil {
return err
}
}

return nil
}
return cmd
}
84 changes: 84 additions & 0 deletions cmd/reva/transfer-retry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Copyright 2018-2021 CERN
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// In applying this license, CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.

package main

import (
"encoding/gob"
"errors"
"io"
"os"
"time"

rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
datatx "github.com/cs3org/go-cs3apis/cs3/tx/v1beta1"
"github.com/jedib0t/go-pretty/table"
)

func transferRetryCommand() *command {
cmd := newCommand("transfer-retry")
cmd.Description = func() string { return "retry a transfer" }
cmd.Usage = func() string { return "Usage: transfer-retry [-flags]" }
txID := cmd.String("txId", "", "the transfer identifier")

cmd.Action = func(w ...io.Writer) error {
// validate flags
if *txID == "" {
return errors.New("txId must be specified: use -txId flag\n" + cmd.Usage())
}

ctx := getAuthContext()
client, err := getClient()
if err != nil {
return err
}

retryRequest := &datatx.RetryTransferRequest{
TxId: &datatx.TxId{
OpaqueId: *txID,
},
}

retryResponse, err := client.RetryTransfer(ctx, retryRequest)
if err != nil {
return err
}
if retryResponse.Status.Code != rpc.Code_CODE_OK {
return formatError(retryResponse.Status)
}

if len(w) == 0 {
t := table.NewWriter()
t.SetOutputMirror(os.Stdout)
t.AppendHeader(table.Row{"ShareId.OpaqueId", "Id.OpaqueId", "Status", "Ctime"})
cTime := time.Unix(int64(retryResponse.TxInfo.Ctime.Seconds), int64(retryResponse.TxInfo.Ctime.Nanos))
t.AppendRows([]table.Row{
{retryResponse.TxInfo.ShareId.OpaqueId, retryResponse.TxInfo.Id.OpaqueId, retryResponse.TxInfo.Status, cTime},
})
t.Render()
} else {
enc := gob.NewEncoder(w[0])
if err := enc.Encode(retryResponse.TxInfo); err != nil {
return err
}
}

return nil
}
return cmd
}
1 change: 1 addition & 0 deletions cmd/revad/runtime/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
_ "github.com/cs3org/reva/pkg/auth/manager/loader"
_ "github.com/cs3org/reva/pkg/auth/registry/loader"
_ "github.com/cs3org/reva/pkg/cbox/loader"
_ "github.com/cs3org/reva/pkg/datatx/manager/loader"
_ "github.com/cs3org/reva/pkg/group/manager/loader"
_ "github.com/cs3org/reva/pkg/metrics/driver/loader"
_ "github.com/cs3org/reva/pkg/ocm/invite/manager/loader"
Expand Down
22 changes: 22 additions & 0 deletions examples/datatx/datatx.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# example data transfer service configuration
[grpc.services.datatx]
# rclone is the default data transfer driver
txdriver = "rclone"
# the shares,transfers db file (default: /var/tmp/reva/datatx-shares.json)
tx_shares_file = ""
# base folder of the data transfers (default: /home/DataTransfers)
data_transfers_folder = ""

# rclone data transfer driver
[grpc.services.datatx.txdrivers.rclone]
# rclone endpoint
endpoint = "http://..."
# basic auth is used
auth_user = "...rcloneuser"
auth_pass = "...rcloneusersecret"
# the transfers(jobs) db file (default: /var/tmp/reva/datatx-transfers.json)
file = ""
# check status job interval in milliseconds
job_status_check_interval = 2000
# the job timeout in milliseconds (must be long enough for big transfers!)
job_timeout = 120000
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ require (
github.com/cheggaaa/pb v1.0.29
github.com/coreos/go-oidc v2.2.1+incompatible
github.com/cs3org/cato v0.0.0-20200828125504-e418fc54dd5e
github.com/cs3org/go-cs3apis v0.0.0-20211214102128-4e8745ab1654
github.com/cs3org/go-cs3apis v0.0.0-20220214084335-d975ab5d6e65
github.com/cubewise-code/go-mime v0.0.0-20200519001935-8c5762b177d8
github.com/dgraph-io/ristretto v0.1.0
github.com/eventials/go-tus v0.0.0-20200718001131-45c7ec8f5d59
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,8 @@ github.com/cs3org/cato v0.0.0-20200828125504-e418fc54dd5e h1:tqSPWQeueWTKnJVMJff
github.com/cs3org/cato v0.0.0-20200828125504-e418fc54dd5e/go.mod h1:XJEZ3/EQuI3BXTp/6DUzFr850vlxq11I6satRtz0YQ4=
github.com/cs3org/go-cs3apis v0.0.0-20211214102128-4e8745ab1654 h1:ha5tiuuFyDrwKUrVEc3TrRDFgTKVQ9NGDRmEP0PRPno=
github.com/cs3org/go-cs3apis v0.0.0-20211214102128-4e8745ab1654/go.mod h1:UXha4TguuB52H14EMoSsCqDj7k8a/t7g4gVP+bgY5LY=
github.com/cs3org/go-cs3apis v0.0.0-20220214084335-d975ab5d6e65 h1:cee0dhBsF8KofV2TM52T41eOo1QLSgtgEZsjYmC5dhU=
github.com/cs3org/go-cs3apis v0.0.0-20220214084335-d975ab5d6e65/go.mod h1:UXha4TguuB52H14EMoSsCqDj7k8a/t7g4gVP+bgY5LY=
github.com/cubewise-code/go-mime v0.0.0-20200519001935-8c5762b177d8 h1:Z9lwXumT5ACSmJ7WGnFl+OMLLjpz5uR2fyz7dC255FI=
github.com/cubewise-code/go-mime v0.0.0-20200519001935-8c5762b177d8/go.mod h1:4abs/jPXcmJzYoYGF91JF9Uq9s/KL5n1jvFDix8KcqY=
github.com/cyberdelia/templates v0.0.0-20141128023046-ca7fffd4298c/go.mod h1:GyV+0YP4qX0UQ7r2MoYZ+AvYDp12OF5yg4q8rGnyNh4=
Expand Down
Loading

0 comments on commit 559fccb

Please sign in to comment.