From c178db1960d5976bd9ce75dd2422200e8ae566f1 Mon Sep 17 00:00:00 2001 From: Antoon P Date: Mon, 13 Feb 2023 14:04:40 +0100 Subject: [PATCH] Implement CS3APIs datatx name change PullTransfer -> CreateTransfer (#3553) * Implement CS3APIs datatx message name change. * Add #PR * Bindings to make CI pass * Opaque field ShareId promoted to real message property. * Temporary reference to upcoming/to-be-merged CS3APIs change. * Normalize before url parsing if necessary. * Lint fixes. * Lint fixes. * Cleanup lines. * Revert to new go bindings. --------- Co-authored-by: Antoon P --- changelog/unreleased/rclone-tpc-cs3apis.md | 5 ++ go.mod | 2 +- go.sum | 6 +- internal/grpc/services/datatx/datatx.go | 33 +++++------ internal/grpc/services/gateway/datatx.go | 8 +-- .../grpc/services/gateway/ocmshareprovider.go | 58 ++++++++++--------- pkg/datatx/manager/rclone/rclone.go | 9 +-- 7 files changed, 63 insertions(+), 58 deletions(-) create mode 100644 changelog/unreleased/rclone-tpc-cs3apis.md diff --git a/changelog/unreleased/rclone-tpc-cs3apis.md b/changelog/unreleased/rclone-tpc-cs3apis.md new file mode 100644 index 0000000000..ba3d98f50e --- /dev/null +++ b/changelog/unreleased/rclone-tpc-cs3apis.md @@ -0,0 +1,5 @@ +Change: Rename PullTransfer to CreateTransfer + +This change implements a CS3APIs name change in the datatx module (PullTransfer to CreateTransfer) + +https://github.com/cs3org/reva/pull/3553 \ No newline at end of file diff --git a/go.mod b/go.mod index 9ce713b099..21530e23e8 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,7 @@ require ( github.com/cheggaaa/pb v1.0.29 github.com/coreos/go-oidc v2.2.1+incompatible github.com/cs3org/cato v0.0.0-20200828125504-e418fc54dd5e - github.com/cs3org/go-cs3apis v0.0.0-20230124153659-5dc78d32c9b7 + github.com/cs3org/go-cs3apis v0.0.0-20230209081138-33f5a7d81cb2 github.com/dgraph-io/ristretto v0.1.1 github.com/eventials/go-tus v0.0.0-20200718001131-45c7ec8f5d59 github.com/gdexlab/go-render v1.0.1 diff --git a/go.sum b/go.sum index 4618cb5f2b..ae3087cd09 100644 --- a/go.sum +++ b/go.sum @@ -283,8 +283,8 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3 github.com/creack/pty v1.1.11/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/cs3org/cato v0.0.0-20200828125504-e418fc54dd5e h1:tqSPWQeueWTKnJVMJffz4pz0o1WuQxJ28+5x5JgaHD8= github.com/cs3org/cato v0.0.0-20200828125504-e418fc54dd5e/go.mod h1:XJEZ3/EQuI3BXTp/6DUzFr850vlxq11I6satRtz0YQ4= -github.com/cs3org/go-cs3apis v0.0.0-20230124153659-5dc78d32c9b7 h1:QShkOi9aBptnhYN4W0lueiWTlNtc7O69D6GRpYfZodg= -github.com/cs3org/go-cs3apis v0.0.0-20230124153659-5dc78d32c9b7/go.mod h1:UXha4TguuB52H14EMoSsCqDj7k8a/t7g4gVP+bgY5LY= +github.com/cs3org/go-cs3apis v0.0.0-20230209081138-33f5a7d81cb2 h1:vJGHFm3lS7LC0XwsSit8ZMqIyE55Op2+X7p1xEH4Vt0= +github.com/cs3org/go-cs3apis v0.0.0-20230209081138-33f5a7d81cb2/go.mod h1:UXha4TguuB52H14EMoSsCqDj7k8a/t7g4gVP+bgY5LY= github.com/cyberdelia/templates v0.0.0-20141128023046-ca7fffd4298c/go.mod h1:GyV+0YP4qX0UQ7r2MoYZ+AvYDp12OF5yg4q8rGnyNh4= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= @@ -921,6 +921,8 @@ github.com/prometheus/statsd_exporter v0.22.7/go.mod h1:N/TevpjkIh9ccs6nuzY3jQn9 github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= github.com/rainycape/memcache v0.0.0-20150622160815-1031fa0ce2f2/go.mod h1:7tZKcyumwBO6qip7RNQ5r77yrssm9bfCowcLEBcU5IA= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/redblom/go-cs3apis v0.0.0-20230130162347-1c3e4b532eac h1:GsyT76KNkNHftiTKPLY9qRkrU/Nl91G+x9uStQHBVZE= +github.com/redblom/go-cs3apis v0.0.0-20230130162347-1c3e4b532eac/go.mod h1:UXha4TguuB52H14EMoSsCqDj7k8a/t7g4gVP+bgY5LY= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.1.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= diff --git a/internal/grpc/services/datatx/datatx.go b/internal/grpc/services/datatx/datatx.go index 17aafbba5b..c2a67aee6d 100644 --- a/internal/grpc/services/datatx/datatx.go +++ b/internal/grpc/services/datatx/datatx.go @@ -27,7 +27,6 @@ import ( ocm "github.com/cs3org/go-cs3apis/cs3/sharing/ocm/v1beta1" datatx "github.com/cs3org/go-cs3apis/cs3/tx/v1beta1" - types "github.com/cs3org/go-cs3apis/cs3/types/v1beta1" txdriver "github.com/cs3org/reva/pkg/datatx" txregistry "github.com/cs3org/reva/pkg/datatx/manager/registry" "github.com/cs3org/reva/pkg/errtypes" @@ -72,7 +71,7 @@ type txShare struct { TxID string SrcTargetURI string DestTargetURI string - Opaque *types.Opaque `json:"opaque"` + ShareID string } func (c *config) init() { @@ -146,7 +145,7 @@ func (s *service) UnprotectedEndpoints() []string { return []string{} } -func (s *service) PullTransfer(ctx context.Context, req *datatx.PullTransferRequest) (*datatx.PullTransferResponse, error) { +func (s *service) CreateTransfer(ctx context.Context, req *datatx.CreateTransferRequest) (*datatx.CreateTransferResponse, error) { txInfo, startTransferErr := s.txManager.CreateTransfer(ctx, req.SrcTargetUri, req.DestTargetUri) // we always save the transfer regardless of start transfer outcome @@ -155,7 +154,7 @@ func (s *service) PullTransfer(ctx context.Context, req *datatx.PullTransferRequ TxID: txInfo.GetId().OpaqueId, SrcTargetURI: req.SrcTargetUri, DestTargetURI: req.DestTargetUri, - Opaque: req.Opaque, + ShareID: req.GetShareId().OpaqueId, } s.txShareDriver.Lock() defer s.txShareDriver.Unlock() @@ -163,21 +162,21 @@ func (s *service) PullTransfer(ctx context.Context, req *datatx.PullTransferRequ s.txShareDriver.model.TxShares[txInfo.GetId().OpaqueId] = txShare if err := s.txShareDriver.model.saveTxShare(); err != nil { err = errors.Wrap(err, "datatx service: error saving transfer share: "+datatx.Status_STATUS_INVALID.String()) - return &datatx.PullTransferResponse{ - Status: status.NewInvalid(ctx, "error pulling transfer"), + return &datatx.CreateTransferResponse{ + Status: status.NewInvalid(ctx, "error creating transfer"), }, err } // now check start transfer outcome if startTransferErr != nil { startTransferErr = errors.Wrap(startTransferErr, "datatx service: error starting transfer job") - return &datatx.PullTransferResponse{ - Status: status.NewInvalid(ctx, "datatx service: error pulling transfer"), + return &datatx.CreateTransferResponse{ + Status: status.NewInvalid(ctx, "datatx service: error creating transfer"), TxInfo: txInfo, }, startTransferErr } - return &datatx.PullTransferResponse{ + return &datatx.CreateTransferResponse{ Status: status.NewOK(ctx), TxInfo: txInfo, }, nil @@ -198,7 +197,7 @@ func (s *service) GetTransferStatus(ctx context.Context, req *datatx.GetTransfer }, err } - txInfo.ShareId = &ocm.ShareId{OpaqueId: string(txShare.Opaque.Map["shareId"].Value)} + txInfo.ShareId = &ocm.ShareId{OpaqueId: txShare.ShareID} return &datatx.GetTransferStatusResponse{ Status: status.NewOK(ctx), @@ -207,14 +206,14 @@ func (s *service) GetTransferStatus(ctx context.Context, req *datatx.GetTransfer } func (s *service) CancelTransfer(ctx context.Context, req *datatx.CancelTransferRequest) (*datatx.CancelTransferResponse, error) { - txShare, ok := s.txShareDriver.model.TxShares[req.GetTxId().GetOpaqueId()] + txShare, ok := s.txShareDriver.model.TxShares[req.GetTxId().OpaqueId] if !ok { return nil, errtypes.InternalError("datatx service: transfer not found") } txInfo, err := s.txManager.CancelTransfer(ctx, req.GetTxId().OpaqueId) if err != nil { - txInfo.ShareId = &ocm.ShareId{OpaqueId: string(txShare.Opaque.Map["shareId"].Value)} + txInfo.ShareId = &ocm.ShareId{OpaqueId: txShare.ShareID} err = errors.Wrap(err, "datatx service: error cancelling transfer") return &datatx.CancelTransferResponse{ Status: status.NewInternal(ctx, err, "error cancelling transfer"), @@ -222,7 +221,7 @@ func (s *service) CancelTransfer(ctx context.Context, req *datatx.CancelTransfer }, err } - txInfo.ShareId = &ocm.ShareId{OpaqueId: string(txShare.Opaque.Map["shareId"].Value)} + txInfo.ShareId = &ocm.ShareId{OpaqueId: txShare.ShareID} return &datatx.CancelTransferResponse{ Status: status.NewOK(ctx), @@ -237,15 +236,15 @@ func (s *service) ListTransfers(ctx context.Context, req *datatx.ListTransfersRe if len(filters) == 0 { txInfos = append(txInfos, &datatx.TxInfo{ Id: &datatx.TxId{OpaqueId: txShare.TxID}, - ShareId: &ocm.ShareId{OpaqueId: string(txShare.Opaque.Map["shareId"].Value)}, + ShareId: &ocm.ShareId{OpaqueId: txShare.ShareID}, }) } else { for _, f := range filters { if f.Type == datatx.ListTransfersRequest_Filter_TYPE_SHARE_ID { - if f.GetShareId().GetOpaqueId() == string(txShare.Opaque.Map["shareId"].Value) { + if f.GetShareId().GetOpaqueId() == txShare.ShareID { txInfos = append(txInfos, &datatx.TxInfo{ Id: &datatx.TxId{OpaqueId: txShare.TxID}, - ShareId: &ocm.ShareId{OpaqueId: string(txShare.Opaque.Map["shareId"].Value)}, + ShareId: &ocm.ShareId{OpaqueId: txShare.ShareID}, }) } } @@ -274,7 +273,7 @@ func (s *service) RetryTransfer(ctx context.Context, req *datatx.RetryTransferRe }, err } - txInfo.ShareId = &ocm.ShareId{OpaqueId: string(txShare.Opaque.Map["shareId"].Value)} + txInfo.ShareId = &ocm.ShareId{OpaqueId: txShare.ShareID} return &datatx.RetryTransferResponse{ Status: status.NewOK(ctx), diff --git a/internal/grpc/services/gateway/datatx.go b/internal/grpc/services/gateway/datatx.go index 75d9d34810..d3b7f80156 100644 --- a/internal/grpc/services/gateway/datatx.go +++ b/internal/grpc/services/gateway/datatx.go @@ -27,18 +27,18 @@ import ( "github.com/pkg/errors" ) -func (s *svc) PullTransfer(ctx context.Context, req *datatx.PullTransferRequest) (*datatx.PullTransferResponse, error) { +func (s *svc) CreateTransfer(ctx context.Context, req *datatx.CreateTransferRequest) (*datatx.CreateTransferResponse, error) { c, err := pool.GetDataTxClient(pool.Endpoint(s.c.DataTxEndpoint)) if err != nil { err = errors.Wrap(err, "gateway: error calling GetDataTxClient") - return &datatx.PullTransferResponse{ + return &datatx.CreateTransferResponse{ Status: status.NewInternal(ctx, err, "error getting data transfer client"), }, nil } - res, err := c.PullTransfer(ctx, req) + res, err := c.CreateTransfer(ctx, req) if err != nil { - return nil, errors.Wrap(err, "gateway: error calling PullTransfer") + return nil, errors.Wrap(err, "gateway: error calling CreateTransfer") } return res, nil diff --git a/internal/grpc/services/gateway/ocmshareprovider.go b/internal/grpc/services/gateway/ocmshareprovider.go index ad56f222ac..312f259dd1 100644 --- a/internal/grpc/services/gateway/ocmshareprovider.go +++ b/internal/grpc/services/gateway/ocmshareprovider.go @@ -30,7 +30,6 @@ import ( ocm "github.com/cs3org/go-cs3apis/cs3/sharing/ocm/v1beta1" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" datatx "github.com/cs3org/go-cs3apis/cs3/tx/v1beta1" - types "github.com/cs3org/go-cs3apis/cs3/types/v1beta1" "github.com/cs3org/reva/pkg/appctx" ctxpkg "github.com/cs3org/reva/pkg/ctx" "github.com/cs3org/reva/pkg/errtypes" @@ -285,25 +284,31 @@ func (s *svc) UpdateReceivedOCMShare(ctx context.Context, req *ocm.UpdateReceive var srcEndpointScheme string for _, s := range meshProvider.ProviderInfo.Services { if strings.ToLower(s.Endpoint.Type.Name) == "webdav" { - endpointURL, err := url.Parse(s.Endpoint.Path) - if err != nil { - log.Err(err).Msg("gateway: error calling UpdateReceivedShare: unable to parse webdav endpoint " + s.Endpoint.Path) + srcWebdavEndpointURL, err := url.Parse(s.Endpoint.Path) + if err != nil || srcWebdavEndpointURL.Host == "" { + log.Err(err).Msg("gateway: error calling UpdateReceivedShare: unable to parse webdav endpoint \"" + s.Endpoint.Path + "\" into URL structure") return &ocm.UpdateReceivedOCMShareResponse{ Status: &rpc.Status{Code: rpc.Code_CODE_INTERNAL}, }, nil } - urlServiceHostFull, err := url.Parse(s.Host) + var srcWebdavHostURLString string + if strings.Contains(s.Host, "://") { + srcWebdavHostURLString = s.Host + } else { + srcWebdavHostURLString = "http://" + s.Host + } + srcWebdavHostURL, err := url.Parse(srcWebdavHostURLString) if err != nil { - log.Err(err).Msg("gateway: error calling UpdateReceivedShare: unable to parse webdav service host " + s.Host) + log.Err(err).Msg("gateway: error calling UpdateReceivedShare: unable to parse webdav service host \"" + s.Host + "\" into URL structure") return &ocm.UpdateReceivedOCMShareResponse{ Status: &rpc.Status{Code: rpc.Code_CODE_INTERNAL}, }, nil } - srcServiceHost = urlServiceHostFull.Host + urlServiceHostFull.Path + srcServiceHost = srcWebdavHostURL.Host + srcWebdavHostURL.Path // optional prefix must only appear in target url path: // http://...token...@reva.eu/prefix/?name=remote.php/webdav/home/... - srcEndpointPath = strings.TrimPrefix(endpointURL.Path, urlServiceHostFull.Path) - srcEndpointScheme = endpointURL.Scheme + srcEndpointPath = strings.TrimPrefix(srcWebdavEndpointURL.Path, srcWebdavHostURL.Path) + srcEndpointScheme = srcWebdavEndpointURL.Scheme break } } @@ -345,7 +350,7 @@ func (s *svc) UpdateReceivedOCMShare(ctx context.Context, req *ocm.UpdateReceive } destWebdavEndpointURL, err := url.Parse(destWebdavEndpoint) if err != nil { - log.Err(err).Msg("gateway: error calling UpdateReceivedShare: unable to parse webdav endpoint " + destWebdavEndpoint) + log.Err(err).Msg("gateway: error calling UpdateReceivedShare: unable to parse webdav endpoint \"" + destWebdavEndpoint + "\" into URL structure") return &ocm.UpdateReceivedOCMShareResponse{ Status: &rpc.Status{Code: rpc.Code_CODE_INTERNAL}, }, nil @@ -357,17 +362,23 @@ func (s *svc) UpdateReceivedOCMShare(ctx context.Context, req *ocm.UpdateReceive Status: &rpc.Status{Code: rpc.Code_CODE_INTERNAL}, }, nil } - urlServiceHostFull, err := url.Parse(destWebdavHost) + var dstWebdavURLString string + if strings.Contains(destWebdavHost, "://") { + dstWebdavURLString = destWebdavHost + } else { + dstWebdavURLString = "http://" + destWebdavHost + } + dstWebdavHostURL, err := url.Parse(dstWebdavURLString) if err != nil { - log.Err(err).Msg("gateway: error calling UpdateReceivedShare: unable to parse webdav service host " + destWebdavHost) + log.Err(err).Msg("gateway: error calling UpdateReceivedShare: unable to parse webdav service host \"" + dstWebdavURLString + "\" into URL structure") return &ocm.UpdateReceivedOCMShareResponse{ Status: &rpc.Status{Code: rpc.Code_CODE_INTERNAL}, }, nil } - destServiceHost := urlServiceHostFull.Host + urlServiceHostFull.Path + destServiceHost := dstWebdavHostURL.Host + dstWebdavHostURL.Path // optional prefix must only appear in target url path: // http://...token...@reva.eu/prefix/?name=remote.php/webdav/home/... - destEndpointPath := strings.TrimPrefix(destWebdavEndpointURL.Path, urlServiceHostFull.Path) + destEndpointPath := strings.TrimPrefix(destWebdavEndpointURL.Path, dstWebdavHostURL.Path) destEndpointScheme := destWebdavEndpointURL.Scheme destToken := ctxpkg.ContextMustGetToken(ctx) homeRes, err := s.GetHome(ctx, &provider.GetHomeRequest{}) @@ -380,22 +391,17 @@ func (s *svc) UpdateReceivedOCMShare(ctx context.Context, req *ocm.UpdateReceive destPath := path.Join(destEndpointPath, homeRes.Path, s.c.DataTransfersFolder, path.Base(share.GetShare().Name)) destTargetURI := fmt.Sprintf("%s://%s@%s?name=%s", destEndpointScheme, destToken, destServiceHost, destPath) - opaqueObj := &types.Opaque{ - Map: map[string]*types.OpaqueEntry{ - "shareId": { - Decoder: "plain", - Value: []byte(share.GetShare().GetId().OpaqueId), - }, - }, + shareID := &ocm.ShareId{ + OpaqueId: share.GetShare().GetId().OpaqueId, } - req := &datatx.PullTransferRequest{ + req := &datatx.CreateTransferRequest{ SrcTargetUri: srcTargetURI, DestTargetUri: destTargetURI, - Opaque: opaqueObj, + ShareId: shareID, } - res, err := s.PullTransfer(ctx, req) + res, err := s.CreateTransfer(ctx, req) if err != nil { - log.Err(err).Msg("gateway: error calling PullTransfer") + log.Err(err).Msg("gateway: error calling CreateTransfer") return &ocm.UpdateReceivedOCMShareResponse{ Status: &rpc.Status{ Code: rpc.Code_CODE_INTERNAL, @@ -403,7 +409,7 @@ func (s *svc) UpdateReceivedOCMShare(ctx context.Context, req *ocm.UpdateReceive }, err } - log.Info().Msgf("gateway: PullTransfer: %v", res.TxInfo) + log.Info().Msgf("gateway: CreateTransfer: %v", res.TxInfo) // do not create an OCM reference, just return return &ocm.UpdateReceivedOCMShareResponse{ diff --git a/pkg/datatx/manager/rclone/rclone.go b/pkg/datatx/manager/rclone/rclone.go index ed0700f691..25df9aaeef 100644 --- a/pkg/datatx/manager/rclone/rclone.go +++ b/pkg/datatx/manager/rclone/rclone.go @@ -219,8 +219,6 @@ func (m *transferModel) saveTransfer(e error) error { // CreateTransfer creates a transfer job and returns a TxInfo object that includes a unique transfer id. // Specified target URIs are of form scheme://userinfo@host:port?name={path} func (driver *rclone) CreateTransfer(ctx context.Context, srcTargetURI string, dstTargetURI string) (*datatx.TxInfo, error) { - logger := appctx.GetLogger(ctx) - srcEp, err := driver.extractEndpointInfo(ctx, srcTargetURI) if err != nil { return nil, err @@ -238,9 +236,6 @@ func (driver *rclone) CreateTransfer(ctx context.Context, srcTargetURI string, d // we always set the userinfo part of the destination url for rclone tpc push support dstRemote := fmt.Sprintf("%s://%s@%s", destEp.endpointScheme, dstToken, destEp.endpoint) - logger.Debug().Msgf("destination target URI: %v", dstTargetURI) - logger.Debug().Msgf("destination remote: %v", dstRemote) - return driver.startJob(ctx, "", srcRemote, srcPath, srcToken, dstRemote, dstPath, dstToken) } @@ -308,10 +303,8 @@ func (driver *rclone) startJob(ctx context.Context, transferID string, srcRemote type rcloneAsyncReqJSON struct { SrcFs string `json:"srcFs"` - // SrcToken string `json:"srcToken"` DstFs string `json:"dstFs"` - // DstToken string `json:"destToken"` - Async bool `json:"_async"` + Async bool `json:"_async"` } // bearer is the default authentication scheme for reva srcAuthHeader := fmt.Sprintf("bearer_token=\"%v\"", srcToken)