diff --git a/changelog/unreleased/uploads-refactor.md b/changelog/unreleased/uploads-refactor.md new file mode 100644 index 0000000000..f3ebb8e6e9 --- /dev/null +++ b/changelog/unreleased/uploads-refactor.md @@ -0,0 +1,8 @@ +Enhancement: Refactor the uploading files workflow from various clients + +Previously, we were implementing the tus client logic in the ocdav service, +leading to restricting the whole of tus logic to the internal services. This PR +refactors that workflow to accept incoming requests following the tus protocol +while using simpler transmission internally. + +https://github.com/cs3org/reva/pull/1285 diff --git a/internal/grpc/services/storageprovider/storageprovider.go b/internal/grpc/services/storageprovider/storageprovider.go index 7457670e2f..957491920d 100644 --- a/internal/grpc/services/storageprovider/storageprovider.go +++ b/internal/grpc/services/storageprovider/storageprovider.go @@ -57,7 +57,6 @@ type config struct { TmpFolder string `mapstructure:"tmp_folder" docs:"/var/tmp;Path to temporary folder."` DataServerURL string `mapstructure:"data_server_url" docs:"http://localhost/data;The URL for the data server."` ExposeDataServer bool `mapstructure:"expose_data_server" docs:"false;Whether to expose data server."` // if true the client will be able to upload/download directly to it - DisableTus bool `mapstructure:"disable_tus" docs:"false;Whether to disable TUS uploads."` AvailableXS map[string]uint32 `mapstructure:"available_checksums" docs:"nil;List of available checksums."` MimeTypes map[string]string `mapstructure:"mimetypes" docs:"nil;List of supported mime types and corresponding file extensions."` } @@ -267,17 +266,17 @@ func (s *service) InitiateFileDownload(ctx context.Context, req *provider.Initia // For example, https://data-server.example.org/home/docs/myfile.txt // or ownclouds://data-server.example.org/home/docs/myfile.txt log := appctx.GetLogger(ctx) - url := *s.dataServerURL + u := *s.dataServerURL newRef, err := s.unwrap(ctx, req.Ref) if err != nil { return &provider.InitiateFileDownloadResponse{ Status: status.NewInternal(ctx, err, "error unwrapping path"), }, nil } - url.Path = path.Join("/", url.Path, newRef.GetPath()) - log.Info().Str("data-server", url.String()).Str("fn", req.Ref.GetPath()).Msg("file download") + u.Path = path.Join(u.Path, newRef.GetPath()) + log.Info().Str("data-server", u.String()).Str("fn", req.Ref.GetPath()).Msg("file download") res := &provider.InitiateFileDownloadResponse{ - DownloadEndpoint: url.String(), + DownloadEndpoint: u.String(), Status: status.NewOK(ctx), Expose: s.conf.ExposeDataServer, } @@ -298,50 +297,53 @@ func (s *service) InitiateFileUpload(ctx context.Context, req *provider.Initiate Status: status.NewInternal(ctx, errors.New("can't upload to mount path"), "can't upload to mount path"), }, nil } - url := *s.dataServerURL - if s.conf.DisableTus { - url.Path = path.Join("/", url.Path, newRef.GetPath()) - } else { - metadata := map[string]string{} - var uploadLength int64 - if req.Opaque != nil && req.Opaque.Map != nil { - if req.Opaque.Map["Upload-Length"] != nil { - var err error - uploadLength, err = strconv.ParseInt(string(req.Opaque.Map["Upload-Length"].Value), 10, 64) - if err != nil { - return &provider.InitiateFileUploadResponse{ - Status: status.NewInternal(ctx, err, "error parsing upload length"), - }, nil - } - } - if req.Opaque.Map["X-OC-Mtime"] != nil { - metadata["mtime"] = string(req.Opaque.Map["X-OC-Mtime"].Value) + + metadata := map[string]string{} + var uploadLength int64 + if req.Opaque != nil && req.Opaque.Map != nil { + if req.Opaque.Map["Upload-Length"] != nil { + var err error + uploadLength, err = strconv.ParseInt(string(req.Opaque.Map["Upload-Length"].Value), 10, 64) + if err != nil { + return &provider.InitiateFileUploadResponse{ + Status: status.NewInternal(ctx, err, "error parsing upload length"), + }, nil } } - uploadID, err := s.storage.InitiateUpload(ctx, newRef, uploadLength, metadata) - if err != nil { - var st *rpc.Status - switch err.(type) { - case errtypes.IsNotFound: - st = status.NewNotFound(ctx, "path not found when initiating upload") - case errtypes.PermissionDenied: - st = status.NewPermissionDenied(ctx, err, "permission denied") - default: - st = status.NewInternal(ctx, err, "error getting upload id: "+req.Ref.String()) - } - return &provider.InitiateFileUploadResponse{ - Status: st, - }, nil + if req.Opaque.Map["X-OC-Mtime"] != nil { + metadata["mtime"] = string(req.Opaque.Map["X-OC-Mtime"].Value) + } + } + uploadID, err := s.storage.InitiateUpload(ctx, newRef, uploadLength, metadata) + if err != nil { + var st *rpc.Status + switch err.(type) { + case errtypes.IsNotFound: + st = status.NewNotFound(ctx, "path not found when initiating upload") + case errtypes.PermissionDenied: + st = status.NewPermissionDenied(ctx, err, "permission denied") + default: + st = status.NewInternal(ctx, err, "error getting upload id: "+req.Ref.String()) } - url.Path = path.Join("/", url.Path, uploadID) + return &provider.InitiateFileUploadResponse{ + Status: st, + }, nil + } + + u := *s.dataServerURL + u.Path = path.Join(u.Path, uploadID) + if err != nil { + return &provider.InitiateFileUploadResponse{ + Status: status.NewInternal(ctx, err, "error parsing data server URL"), + }, nil } - log.Info().Str("data-server", url.String()). + log.Info().Str("data-server", u.String()). Str("fn", req.Ref.GetPath()). Str("xs", fmt.Sprintf("%+v", s.conf.AvailableXS)). Msg("file upload") res := &provider.InitiateFileUploadResponse{ - UploadEndpoint: url.String(), + UploadEndpoint: u.String(), Status: status.NewOK(ctx), AvailableChecksums: s.availableXS, Expose: s.conf.ExposeDataServer, diff --git a/internal/http/services/dataprovider/dataprovider.go b/internal/http/services/dataprovider/dataprovider.go index 7b62ea64d9..fea3b918df 100644 --- a/internal/http/services/dataprovider/dataprovider.go +++ b/internal/http/services/dataprovider/dataprovider.go @@ -36,28 +36,20 @@ func init() { } type config struct { - Prefix string `mapstructure:"prefix" docs:"data;The prefix to be used for this HTTP service"` - Driver string `mapstructure:"driver" docs:"localhome;The storage driver to be used."` - Drivers map[string]map[string]interface{} `mapstructure:"drivers" docs:"url:pkg/storage/fs/localhome/localhome.go;The configuration for the storage driver"` - Timeout int64 `mapstructure:"timeout"` - Insecure bool `mapstructure:"insecure"` - DisableTus bool `mapstructure:"disable_tus" docs:"false;Whether to disable TUS uploads."` - TempDirectory string `mapstructure:"temp_directory"` + Prefix string `mapstructure:"prefix" docs:"data;The prefix to be used for this HTTP service"` + Driver string `mapstructure:"driver" docs:"localhome;The storage driver to be used."` + Drivers map[string]map[string]interface{} `mapstructure:"drivers" docs:"url:pkg/storage/fs/localhome/localhome.go;The configuration for the storage driver"` + Timeout int64 `mapstructure:"timeout"` + Insecure bool `mapstructure:"insecure"` } func (c *config) init() { if c.Prefix == "" { c.Prefix = "data" } - if c.Driver == "" { c.Driver = "localhome" } - - if c.TempDirectory == "" { - c.TempDirectory = "/var/tmp/reva/tmp" - } - } type svc struct { @@ -100,10 +92,7 @@ func (s *svc) Unprotected() []string { // Create a new DataStore instance which is responsible for // storing the uploaded file on disk in the specified directory. -// This path _must_ exist before tusd will store uploads in it. -// If you want to save them on a different medium, for example -// a remote FTP server, you can implement your own storage backend -// by implementing the tusd.DataStore interface. +// This path _must_ exist before we store uploads in it. func getFS(c *config) (storage.FS, error) { if f, ok := registry.NewFuncs[c.Driver]; ok { return f(c.Drivers[c.Driver]) @@ -119,14 +108,70 @@ func (s *svc) Handler() http.Handler { return s.handler } -// Composable is the interface that a struct needs to implement to be composable by this composer -type Composable interface { +func (s *svc) setHandler() error { + + tusHandler := s.getTusHandler() + + s.handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + log := appctx.GetLogger(r.Context()) + log.Info().Msgf("dataprovider routing: path=%s", r.URL.Path) + + method := r.Method + // https://github.com/tus/tus-resumable-upload-protocol/blob/master/protocol.md#x-http-method-override + if r.Header.Get("X-HTTP-Method-Override") != "" { + method = r.Header.Get("X-HTTP-Method-Override") + } + + switch method { + // old fashioned download. + // GET is not part of the tus.io protocol + // TODO allow range based get requests? that end before the current offset + case "GET": + s.doGet(w, r) + case "PUT": + s.doPut(w, r) + case "HEAD": + w.WriteHeader(http.StatusOK) + + // tus.io based uploads + // uploads are initiated using the CS3 APIs Initiate Upload call + case "POST": + if tusHandler != nil { + tusHandler.PostFile(w, r) + } else { + w.WriteHeader(http.StatusNotImplemented) + } + case "PATCH": + if tusHandler != nil { + tusHandler.PatchFile(w, r) + } else { + w.WriteHeader(http.StatusNotImplemented) + } + // TODO Only attach the DELETE handler if the Terminate() method is provided + case "DELETE": + if tusHandler != nil { + tusHandler.DelFile(w, r) + } else { + w.WriteHeader(http.StatusNotImplemented) + } + default: + w.WriteHeader(http.StatusNotImplemented) + return + } + }) + + return nil +} + +// Composable is the interface that a struct needs to implement +// to be composable, so that it can support the TUS methods +type composable interface { UseIn(composer *tusd.StoreComposer) } -func (s *svc) setHandler() (err error) { - composable, ok := s.storage.(Composable) - if ok && !s.conf.DisableTus { +func (s *svc) getTusHandler() *tusd.UnroutedHandler { + composable, ok := s.storage.(composable) + if ok { // A storage backend for tusd may consist of multiple different parts which // handle upload creation, locking, termination and so on. The composer is a // place where all those separated pieces are joined together. In this example @@ -144,71 +189,9 @@ func (s *svc) setHandler() (err error) { handler, err := tusd.NewUnroutedHandler(config) if err != nil { - return err + return nil } - - s.handler = handler.Middleware(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - - log := appctx.GetLogger(r.Context()) - log.Info().Msgf("tusd routing: path=%s", r.URL.Path) - - method := r.Method - // https://github.com/tus/tus-resumable-upload-protocol/blob/master/protocol.md#x-http-method-override - if r.Header.Get("X-HTTP-Method-Override") != "" { - method = r.Header.Get("X-HTTP-Method-Override") - } - - switch method { - // old fashioned download. - - // GET is not part of the tus.io protocol - // currently there is no way to GET an upload that is in progress - // TODO allow range based get requests? that end before the current offset - case "GET": - s.doGet(w, r) - - // tus.io based upload - - // uploads are initiated using the CS3 APIs Initiate Download call - case "POST": - handler.PostFile(w, r) - case "HEAD": - handler.HeadFile(w, r) - case "PATCH": - handler.PatchFile(w, r) - // PUT provides a wrapper around the POST call, to save the caller from - // the trouble of configuring the tus client. - case "PUT": - s.doTusPut(w, r) - // TODO Only attach the DELETE handler if the Terminate() method is provided - case "DELETE": - handler.DelFile(w, r) - } - })) - } else { - s.handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - method := r.Method - // https://github.com/tus/tus-resumable-upload-protocol/blob/master/protocol.md#x-http-method-override - if r.Header.Get("X-HTTP-Method-Override") != "" { - method = r.Header.Get("X-HTTP-Method-Override") - } - - switch method { - case "HEAD": - w.WriteHeader(http.StatusOK) - return - case "GET": - s.doGet(w, r) - return - case "PUT": - s.doPut(w, r) - return - default: - w.WriteHeader(http.StatusNotImplemented) - return - } - }) + return handler } - - return err + return nil } diff --git a/internal/http/services/dataprovider/put.go b/internal/http/services/dataprovider/put.go index 13e957688e..b9b1fe9d4d 100644 --- a/internal/http/services/dataprovider/put.go +++ b/internal/http/services/dataprovider/put.go @@ -19,115 +19,30 @@ package dataprovider import ( - "fmt" - "io" - "io/ioutil" "net/http" - "os" - "path" - "strconv" "strings" - "time" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" "github.com/cs3org/reva/pkg/appctx" - "github.com/cs3org/reva/pkg/rhttp" - "github.com/cs3org/reva/pkg/token" - "github.com/eventials/go-tus" - "github.com/eventials/go-tus/memorystore" + "github.com/cs3org/reva/pkg/errtypes" ) func (s *svc) doPut(w http.ResponseWriter, r *http.Request) { ctx := r.Context() log := appctx.GetLogger(ctx) fn := r.URL.Path + defer r.Body.Close() fsfn := strings.TrimPrefix(fn, s.conf.Prefix) ref := &provider.Reference{Spec: &provider.Reference_Path{Path: fsfn}} err := s.storage.Upload(ctx, ref, r.Body) if err != nil { - log.Error().Err(err).Msg("error uploading file") - w.WriteHeader(http.StatusInternalServerError) - return - } - - r.Body.Close() - w.WriteHeader(http.StatusOK) -} - -func (s *svc) doTusPut(w http.ResponseWriter, r *http.Request) { - ctx := r.Context() - log := appctx.GetLogger(ctx) - - fp := r.Header.Get("File-Path") - if fp == "" { - w.WriteHeader(http.StatusBadRequest) - return - } - - length, err := strconv.ParseInt(r.Header.Get("Content-Length"), 10, 64) - if err != nil { - // Fallback to Upload-Length - length, err = strconv.ParseInt(r.Header.Get("Upload-Length"), 10, 64) - if err != nil { - w.WriteHeader(http.StatusBadRequest) + if _, ok := err.(errtypes.IsPartialContent); ok { + w.WriteHeader(http.StatusPartialContent) return } - } - - fd, err := ioutil.TempFile(fmt.Sprintf("/%s", s.conf.TempDirectory), "") - if err != nil { - w.WriteHeader(http.StatusInternalServerError) - return - } - defer os.RemoveAll(fd.Name()) - defer fd.Close() - if _, err := io.Copy(fd, r.Body); err != nil { - w.WriteHeader(http.StatusInternalServerError) - return - } - - dataServerURL := fmt.Sprintf("http://%s%s", r.Host, r.RequestURI) - - // create the tus client. - c := tus.DefaultConfig() - c.Resume = true - c.HttpClient = rhttp.GetHTTPClient( - rhttp.Context(ctx), - rhttp.Timeout(time.Duration(s.conf.Timeout*int64(time.Second))), - rhttp.Insecure(s.conf.Insecure), - ) - c.Store, err = memorystore.NewMemoryStore() - if err != nil { - w.WriteHeader(http.StatusInternalServerError) - return - } - c.Header.Set(token.TokenHeader, token.ContextMustGetToken(ctx)) - - tusc, err := tus.NewClient(dataServerURL, c) - if err != nil { - log.Error().Err(err).Msg("error starting TUS client") - w.WriteHeader(http.StatusInternalServerError) - return - } - - metadata := map[string]string{ - "filename": path.Base(fp), - "dir": path.Dir(fp), - } - - upload := tus.NewUpload(fd, length, metadata, "") - defer r.Body.Close() - - // create the uploader. - c.Store.Set(upload.Fingerprint, dataServerURL) - uploader := tus.NewUploader(tusc, dataServerURL, upload, 0) - - // start the uploading process. - err = uploader.Upload() - if err != nil { - log.Error().Err(err).Msg("Could not start TUS upload") + log.Error().Err(err).Msg("error uploading file") w.WriteHeader(http.StatusInternalServerError) return } diff --git a/internal/http/services/owncloud/ocdav/copy.go b/internal/http/services/owncloud/ocdav/copy.go index 631d002120..54606421d3 100644 --- a/internal/http/services/owncloud/ocdav/copy.go +++ b/internal/http/services/owncloud/ocdav/copy.go @@ -21,9 +21,7 @@ package ocdav import ( "context" "fmt" - "io" "net/http" - "os" "path" "strings" @@ -34,9 +32,6 @@ import ( "github.com/cs3org/reva/internal/http/services/datagateway" "github.com/cs3org/reva/pkg/appctx" "github.com/cs3org/reva/pkg/rhttp" - tokenpkg "github.com/cs3org/reva/pkg/token" - "github.com/eventials/go-tus" - "github.com/eventials/go-tus/memorystore" ) func (s *svc) handleCopy(w http.ResponseWriter, r *http.Request, ns string) { @@ -135,7 +130,7 @@ func (s *svc) handleCopy(w http.ResponseWriter, r *http.Request, ns string) { return } - var successCode int + successCode := http.StatusCreated // 201 if new resource was created, see https://tools.ietf.org/html/rfc4918#section-9.8.5 if dstStatRes.Status.Code == rpc.Code_CODE_OK { successCode = http.StatusNoContent // 204 if target already existed, see https://tools.ietf.org/html/rfc4918#section-9.8.5 @@ -146,8 +141,6 @@ func (s *svc) handleCopy(w http.ResponseWriter, r *http.Request, ns string) { } } else { - successCode = http.StatusCreated // 201 if new resource was created, see https://tools.ietf.org/html/rfc4918#section-9.8.5 - // check if an intermediate path / the parent exists intermediateDir := path.Dir(dst) ref = &provider.Reference{ @@ -233,6 +226,7 @@ func (s *svc) descend(ctx context.Context, client gateway.GatewayAPIClient, src // copy file // 1. get download url + dReq := &provider.InitiateFileDownloadRequest{ Ref: &provider.Reference{ Spec: &provider.Reference_Path{Path: src.Path}, @@ -282,84 +276,33 @@ func (s *svc) descend(ctx context.Context, client gateway.GatewayAPIClient, src } httpDownloadReq.Header.Set(datagateway.TokenTransportHeader, dRes.Token) - httpDownloadClient := s.client - - httpDownloadRes, err := httpDownloadClient.Do(httpDownloadReq) + httpDownloadRes, err := s.client.Do(httpDownloadReq) if err != nil { return err } defer httpDownloadRes.Body.Close() - if httpDownloadRes.StatusCode != http.StatusOK { return fmt.Errorf("status code %d", httpDownloadRes.StatusCode) } - fileName, fd, err := s.createChunkTempFile() - if err != nil { - return err - } - defer os.RemoveAll(fileName) - defer fd.Close() - if _, err := io.Copy(fd, httpDownloadRes.Body); err != nil { - return err - } - - // do upload - err = s.tusUpload(ctx, uRes.UploadEndpoint, uRes.Token, dst, fd, int64(src.GetSize())) - if err != nil { - return err - } - } - return nil -} - -func (s *svc) tusUpload(ctx context.Context, dataServerURL string, transferToken string, fn string, body io.ReadSeeker, length int64) error { - var err error - log := appctx.GetLogger(ctx) - - // create the tus client. - c := tus.DefaultConfig() - c.Resume = true - c.HttpClient = s.client - c.Store, err = memorystore.NewMemoryStore() - if err != nil { - return err - } - - log.Debug(). - Str("header", tokenpkg.TokenHeader). - Str("token", tokenpkg.ContextMustGetToken(ctx)). - Msg("adding token to header") - c.Header.Set(tokenpkg.TokenHeader, tokenpkg.ContextMustGetToken(ctx)) - c.Header.Set(datagateway.TokenTransportHeader, transferToken) - - tusc, err := tus.NewClient(dataServerURL, c) - if err != nil { - return nil - } - - // TODO: also copy properties: https://tools.ietf.org/html/rfc4918#section-9.8.2 - metadata := map[string]string{ - "filename": path.Base(fn), - "dir": path.Dir(fn), - //"checksum": fmt.Sprintf("%s %s", storageprovider.GRPC2PKGXS(xsType).String(), xs), - } - log.Debug(). - Str("length", fmt.Sprintf("%d", length)). - Str("filename", path.Base(fn)). - Str("dir", path.Dir(fn)). - Msg("tus.NewUpload") + // 4. do upload - upload := tus.NewUpload(body, length, metadata, "") - - // create the uploader. - c.Store.Set(upload.Fingerprint, dataServerURL) - uploader := tus.NewUploader(tusc, dataServerURL, upload, 0) + if src.GetSize() > 0 { + httpUploadReq, err := rhttp.NewRequest(ctx, "PUT", uRes.UploadEndpoint, httpDownloadRes.Body) + if err != nil { + return err + } + httpUploadReq.Header.Set(datagateway.TokenTransportHeader, uRes.Token) - // start the uploading process. - err = uploader.Upload() - if err != nil { - return err + httpUploadRes, err := s.client.Do(httpUploadReq) + if err != nil { + return err + } + defer httpUploadRes.Body.Close() + if httpUploadRes.StatusCode != http.StatusOK { + return err + } + } } return nil } diff --git a/internal/http/services/owncloud/ocdav/move.go b/internal/http/services/owncloud/ocdav/move.go index 4c98d8e625..cc94cdd285 100644 --- a/internal/http/services/owncloud/ocdav/move.go +++ b/internal/http/services/owncloud/ocdav/move.go @@ -114,7 +114,7 @@ func (s *svc) handleMove(w http.ResponseWriter, r *http.Request, ns string) { return } - var successCode int + successCode := http.StatusCreated // 201 if new resource was created, see https://tools.ietf.org/html/rfc4918#section-9.9.4 if dstStatRes.Status.Code == rpc.Code_CODE_OK { successCode = http.StatusNoContent // 204 if target already existed, see https://tools.ietf.org/html/rfc4918#section-9.9.4 @@ -145,8 +145,6 @@ func (s *svc) handleMove(w http.ResponseWriter, r *http.Request, ns string) { return } } else { - successCode = http.StatusCreated // 201 if new resource was created, see https://tools.ietf.org/html/rfc4918#section-9.9.4 - // check if an intermediate path / the parent exists intermediateDir := path.Dir(dst) ref2 := &provider.Reference{ diff --git a/internal/http/services/owncloud/ocdav/ocdav.go b/internal/http/services/owncloud/ocdav/ocdav.go index f34e37a340..291d4811fa 100644 --- a/internal/http/services/owncloud/ocdav/ocdav.go +++ b/internal/http/services/owncloud/ocdav/ocdav.go @@ -24,7 +24,6 @@ import ( "fmt" "net/http" "net/url" - "os" "path" "strings" "time" @@ -67,22 +66,14 @@ type Config struct { // and received path is /docs the internal path will be: // /users///docs WebdavNamespace string `mapstructure:"webdav_namespace"` - ChunkFolder string `mapstructure:"chunk_folder"` GatewaySvc string `mapstructure:"gatewaysvc"` Timeout int64 `mapstructure:"timeout"` Insecure bool `mapstructure:"insecure"` - DisableTus bool `mapstructure:"disable_tus"` } func (c *Config) init() { // note: default c.Prefix is an empty string - c.GatewaySvc = sharedconf.GetGatewaySVC(c.GatewaySvc) - - if c.ChunkFolder == "" { - c.ChunkFolder = "/var/tmp/reva/tmp/davchunks" - } - } type svc struct { @@ -101,10 +92,6 @@ func New(m map[string]interface{}, log *zerolog.Logger) (global.Service, error) conf.init() - if err := os.MkdirAll(conf.ChunkFolder, 0755); err != nil { - return nil, err - } - s := &svc{ c: conf, webDavHandler: new(WebDavHandler), diff --git a/internal/http/services/owncloud/ocdav/options.go b/internal/http/services/owncloud/ocdav/options.go index b3eb4e929a..3d95863aa0 100644 --- a/internal/http/services/owncloud/ocdav/options.go +++ b/internal/http/services/owncloud/ocdav/options.go @@ -34,7 +34,7 @@ func (s *svc) handleOptions(w http.ResponseWriter, r *http.Request, ns string) { w.Header().Set("Allow", allow) w.Header().Set("DAV", "1, 2") w.Header().Set("MS-Author-Via", "DAV") - if !s.c.DisableTus && !isPublic { + if !isPublic { w.Header().Add("Access-Control-Allow-Headers", "Tus-Resumable") w.Header().Add("Access-Control-Expose-Headers", "Tus-Resumable, Tus-Version, Tus-Extension") w.Header().Set("Tus-Resumable", "1.0.0") // TODO(jfd): only for dirs? diff --git a/internal/http/services/owncloud/ocdav/propfind.go b/internal/http/services/owncloud/ocdav/propfind.go index f570315087..e6fbdd591f 100644 --- a/internal/http/services/owncloud/ocdav/propfind.go +++ b/internal/http/services/owncloud/ocdav/propfind.go @@ -194,7 +194,7 @@ func (s *svc) handlePropfind(w http.ResponseWriter, r *http.Request, ns string) w.Header().Set("DAV", "1, 3, extended-mkcol") w.Header().Set("Content-Type", "application/xml; charset=utf-8") // let clients know this collection supports tus.io POST requests to start uploads - if info.Type == provider.ResourceType_RESOURCE_TYPE_CONTAINER && !s.c.DisableTus { + if info.Type == provider.ResourceType_RESOURCE_TYPE_CONTAINER { w.Header().Add("Access-Control-Expose-Headers", "Tus-Resumable, Tus-Version, Tus-Extension") w.Header().Set("Tus-Resumable", "1.0.0") w.Header().Set("Tus-Version", "1.0.0") @@ -344,7 +344,7 @@ func (s *svc) mdToPropResponse(ctx context.Context, pf *propfindXML, md *provide ) } } - // Finder needs the the getLastModified property to work. + // Finder needs the getLastModified property to work. t := utils.TSToTime(md.Mtime).UTC() lastModifiedString := t.Format(time.RFC1123Z) response.Propstat[0].Prop = append(response.Propstat[0].Prop, s.newProp("d:getlastmodified", lastModifiedString)) diff --git a/internal/http/services/owncloud/ocdav/put.go b/internal/http/services/owncloud/ocdav/put.go index 9ac9c40878..61c4875b28 100644 --- a/internal/http/services/owncloud/ocdav/put.go +++ b/internal/http/services/owncloud/ocdav/put.go @@ -21,24 +21,20 @@ package ocdav import ( "io" "net/http" - "os" "path" - "regexp" "strconv" "time" rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" typespb "github.com/cs3org/go-cs3apis/cs3/types/v1beta1" + "github.com/cs3org/reva/internal/http/services/datagateway" "github.com/cs3org/reva/pkg/appctx" + "github.com/cs3org/reva/pkg/rhttp" + "github.com/cs3org/reva/pkg/storage/utils/chunking" "github.com/cs3org/reva/pkg/utils" ) -func isChunked(fn string) (bool, error) { - // FIXME: also need to check whether the OC-Chunked header is set - return regexp.MatchString(`-chunking-\w+-[0-9]+-[0-9]+$`, fn) -} - func sufferMacOSFinder(r *http.Request) bool { return r.Header.Get("X-Expected-Entity-Length") != "" } @@ -109,7 +105,6 @@ func (s *svc) handlePut(w http.ResponseWriter, r *http.Request, ns string) { log := appctx.GetLogger(ctx) ns = applyLayout(ctx, ns) - fn := path.Join(ns, r.URL.Path) if r.Body == nil { @@ -118,27 +113,6 @@ func (s *svc) handlePut(w http.ResponseWriter, r *http.Request, ns string) { return } - ok, err := isChunked(fn) - if err != nil { - log.Error().Err(err).Msg("error checking if request is chunked or not") - w.WriteHeader(http.StatusInternalServerError) - return - } - - if ok { - // TODO: disable if chunking capability is turned off in config - /** - if s.c.Capabilities.Dav.Chunking == "1.0" { - s.handlePutChunked(w, r) - } else { - log.Error().Err(err).Msg("chunking 1.0 is not enabled") - w.WriteHeader(http.StatusBadRequest) - } - */ - s.handlePutChunked(w, r, ns) - return - } - if isContentRange(r) { log.Warn().Msg("Content-Range not supported for PUT") w.WriteHeader(http.StatusNotImplemented) @@ -163,22 +137,11 @@ func (s *svc) handlePut(w http.ResponseWriter, r *http.Request, ns string) { return } } - fileName, fd, err := s.createChunkTempFile() - if err != nil { - w.WriteHeader(http.StatusInternalServerError) - return - } - defer os.RemoveAll(fileName) - defer fd.Close() - if _, err := io.Copy(fd, r.Body); err != nil { - w.WriteHeader(http.StatusInternalServerError) - return - } - s.handlePutHelper(w, r, fd, fn, length) + s.handlePutHelper(w, r, r.Body, fn, length) } -func (s *svc) handlePutHelper(w http.ResponseWriter, r *http.Request, content io.ReadSeeker, fn string, length int64) { +func (s *svc) handlePutHelper(w http.ResponseWriter, r *http.Request, content io.Reader, fn string, length int64) { ctx := r.Context() log := appctx.GetLogger(ctx) @@ -199,7 +162,6 @@ func (s *svc) handlePutHelper(w http.ResponseWriter, r *http.Request, content io w.WriteHeader(http.StatusInternalServerError) return } - if sRes.Status.Code != rpc.Code_CODE_OK && sRes.Status.Code != rpc.Code_CODE_NOT_FOUND { switch sRes.Status.Code { case rpc.Code_CODE_PERMISSION_DENIED: @@ -213,13 +175,12 @@ func (s *svc) handlePutHelper(w http.ResponseWriter, r *http.Request, content io } info := sRes.Info - if info != nil && info.Type != provider.ResourceType_RESOURCE_TYPE_FILE { - log.Warn().Msg("resource is not a file") - w.WriteHeader(http.StatusConflict) - return - } - if info != nil { + if info.Type != provider.ResourceType_RESOURCE_TYPE_FILE { + log.Warn().Msg("resource is not a file") + w.WriteHeader(http.StatusConflict) + return + } clientETag := r.Header.Get("If-Match") serverETag := info.Etag if clientETag != "" { @@ -238,8 +199,7 @@ func (s *svc) handlePutHelper(w http.ResponseWriter, r *http.Request, content io }, } - mtime := r.Header.Get("X-OC-Mtime") - if mtime != "" { + if mtime := r.Header.Get("X-OC-Mtime"); mtime != "" { opaqueMap["X-OC-Mtime"] = &typespb.OpaqueEntry{ Decoder: "plain", Value: []byte(mtime), @@ -250,10 +210,8 @@ func (s *svc) handlePutHelper(w http.ResponseWriter, r *http.Request, content io } uReq := &provider.InitiateFileUploadRequest{ - Ref: ref, - Opaque: &typespb.Opaque{ - Map: opaqueMap, - }, + Ref: ref, + Opaque: &typespb.Opaque{Map: opaqueMap}, } // where to upload the file? @@ -279,11 +237,51 @@ func (s *svc) handlePutHelper(w http.ResponseWriter, r *http.Request, content io return } - if err = s.tusUpload(ctx, uRes.UploadEndpoint, uRes.Token, fn, content, length); err != nil { - log.Error().Err(err).Msg("TUS upload failed") + if length > 0 { + httpReq, err := rhttp.NewRequest(ctx, "PUT", uRes.UploadEndpoint, content) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + httpReq.Header.Set(datagateway.TokenTransportHeader, uRes.Token) + + httpRes, err := s.client.Do(httpReq) + if err != nil { + log.Err(err).Msg("error doing PUT request to data service") + w.WriteHeader(http.StatusInternalServerError) + return + } + defer httpRes.Body.Close() + if httpRes.StatusCode != http.StatusOK { + if httpRes.StatusCode == http.StatusPartialContent { + w.WriteHeader(http.StatusPartialContent) + return + } + log.Err(err).Msg("PUT request to data server failed") + w.WriteHeader(http.StatusInternalServerError) + return + } + } + + ok, err := chunking.IsChunked(fn) + if err != nil { w.WriteHeader(http.StatusInternalServerError) return } + if ok { + chunk, err := chunking.GetChunkBLOBInfo(fn) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + sReq = &provider.StatRequest{ + Ref: &provider.Reference{ + Spec: &provider.Reference_Path{ + Path: chunk.Path, + }, + }, + } + } // stat again to check the new file's metadata sRes, err = client.Stat(ctx, sReq) @@ -308,13 +306,13 @@ func (s *svc) handlePutHelper(w http.ResponseWriter, r *http.Request, content io return } - info2 := sRes.Info + newInfo := sRes.Info - w.Header().Add("Content-Type", info2.MimeType) - w.Header().Set("ETag", info2.Etag) - w.Header().Set("OC-FileId", wrapResourceID(info2.Id)) - w.Header().Set("OC-ETag", info2.Etag) - t := utils.TSToTime(info2.Mtime) + w.Header().Add("Content-Type", newInfo.MimeType) + w.Header().Set("ETag", newInfo.Etag) + w.Header().Set("OC-FileId", wrapResourceID(newInfo.Id)) + w.Header().Set("OC-ETag", newInfo.Etag) + t := utils.TSToTime(newInfo.Mtime) lastModifiedString := t.Format(time.RFC1123Z) w.Header().Set("Last-Modified", lastModifiedString) diff --git a/internal/http/services/owncloud/ocdav/tus.go b/internal/http/services/owncloud/ocdav/tus.go index d9319cc051..0dd2490f2f 100644 --- a/internal/http/services/owncloud/ocdav/tus.go +++ b/internal/http/services/owncloud/ocdav/tus.go @@ -193,7 +193,6 @@ func (s *svc) handleTusPost(w http.ResponseWriter, r *http.Request, ns string) { var httpRes *http.Response if length != 0 { - httpClient := s.client httpReq, err := rhttp.NewRequest(ctx, "PATCH", uRes.UploadEndpoint, r.Body) if err != nil { log.Err(err).Msg("wrong request") @@ -210,7 +209,7 @@ func (s *svc) handleTusPost(w http.ResponseWriter, r *http.Request, ns string) { } httpReq.Header.Set("Tus-Resumable", r.Header.Get("Tus-Resumable")) - httpRes, err = httpClient.Do(httpReq) + httpRes, err = s.client.Do(httpReq) if err != nil { log.Err(err).Msg("error doing GET request to data service") w.WriteHeader(http.StatusInternalServerError) diff --git a/internal/http/services/owncloud/ocdav/webdav.go b/internal/http/services/owncloud/ocdav/webdav.go index 22d05d784d..5c7e5be7c1 100644 --- a/internal/http/services/owncloud/ocdav/webdav.go +++ b/internal/http/services/owncloud/ocdav/webdav.go @@ -58,11 +58,7 @@ func (h *WebDavHandler) Handler(s *svc) http.Handler { case http.MethodPut: s.handlePut(w, r, h.namespace) case http.MethodPost: - if !s.c.DisableTus { - s.handleTusPost(w, r, h.namespace) - } else { - w.WriteHeader(http.StatusNotFound) - } + s.handleTusPost(w, r, h.namespace) case http.MethodOptions: s.handleOptions(w, r, h.namespace) case http.MethodHead: diff --git a/pkg/errtypes/errtypes.go b/pkg/errtypes/errtypes.go index a01a4679fb..dbd6c351a1 100644 --- a/pkg/errtypes/errtypes.go +++ b/pkg/errtypes/errtypes.go @@ -35,6 +35,9 @@ type InternalError string func (e InternalError) Error() string { return "internal error: " + string(e) } +// IsInternalError is the method to check for w +func (e InternalError) IsInternalError() {} + // PermissionDenied is the error to use when a resource cannot be access because of missing permissions. type PermissionDenied string @@ -75,6 +78,14 @@ func (e NotSupported) Error() string { return "error: not supported: " + string( // IsNotSupported implements the IsNotSupported interface. func (e NotSupported) IsNotSupported() {} +// PartialContent is the error to use when the client request has partial data. +type PartialContent string + +func (e PartialContent) Error() string { return "error: partial content: " + string(e) } + +// IsPartialContent implements the IsPartialContent interface. +func (e PartialContent) IsPartialContent() {} + // IsNotFound is the interface to implement // to specify that an a resource is not found. type IsNotFound interface { @@ -112,7 +123,13 @@ type IsNotSupported interface { } // IsPermissionDenied is the interface to implement -// to specify that an action is not supported. +// to specify that an action is denied. type IsPermissionDenied interface { IsPermissionDenied() } + +// IsPartialContent is the interface to implement +// to specify that the client request has partial data. +type IsPartialContent interface { + IsPartialContent() +} diff --git a/pkg/storage/fs/ocis/ocis.go b/pkg/storage/fs/ocis/ocis.go index 8fc1da1d75..59f092997b 100644 --- a/pkg/storage/fs/ocis/ocis.go +++ b/pkg/storage/fs/ocis/ocis.go @@ -33,6 +33,7 @@ import ( "github.com/cs3org/reva/pkg/logger" "github.com/cs3org/reva/pkg/storage" "github.com/cs3org/reva/pkg/storage/fs/registry" + "github.com/cs3org/reva/pkg/storage/utils/chunking" "github.com/cs3org/reva/pkg/storage/utils/templates" "github.com/cs3org/reva/pkg/user" "github.com/mitchellh/mapstructure" @@ -160,18 +161,20 @@ func New(m map[string]interface{}) (storage.FS, error) { } return &ocisfs{ - tp: tp, - lu: lu, - o: o, - p: &Permissions{lu: lu}, + tp: tp, + lu: lu, + o: o, + p: &Permissions{lu: lu}, + chunkHandler: chunking.NewChunkHandler(filepath.Join(o.Root, "uploads")), }, nil } type ocisfs struct { - tp TreePersistence - lu *Lookup - o *Options - p *Permissions + tp TreePersistence + lu *Lookup + o *Options + p *Permissions + chunkHandler *chunking.ChunkHandler } func (fs *ocisfs) Shutdown(ctx context.Context) error { diff --git a/pkg/storage/fs/ocis/upload.go b/pkg/storage/fs/ocis/upload.go index b6cf86373c..3b8f0e9563 100644 --- a/pkg/storage/fs/ocis/upload.go +++ b/pkg/storage/fs/ocis/upload.go @@ -32,120 +32,55 @@ import ( "github.com/cs3org/reva/pkg/appctx" "github.com/cs3org/reva/pkg/errtypes" "github.com/cs3org/reva/pkg/logger" + "github.com/cs3org/reva/pkg/storage/utils/chunking" "github.com/cs3org/reva/pkg/user" "github.com/google/uuid" "github.com/pkg/errors" - "github.com/rs/zerolog/log" tusd "github.com/tus/tusd/pkg/handler" ) var defaultFilePerm = os.FileMode(0664) -// TODO deprecated ... use tus - func (fs *ocisfs) Upload(ctx context.Context, ref *provider.Reference, r io.ReadCloser) (err error) { - - var n *Node - if n, err = fs.lu.NodeFromResource(ctx, ref); err != nil { - return - } - - // check permissions - var ok bool - if n.Exists { - // check permissions of file to be overwritten - ok, err = fs.p.HasPermission(ctx, n, func(rp *provider.ResourcePermissions) bool { - return rp.InitiateFileUpload - }) - } else { - // check permissions of parent - p, perr := n.Parent() - if perr != nil { - return errors.Wrap(perr, "ocisfs: error getting parent "+n.ParentID) - } - - ok, err = fs.p.HasPermission(ctx, p, func(rp *provider.ResourcePermissions) bool { - return rp.InitiateFileUpload - }) - } - switch { - case err != nil: - return errtypes.InternalError(err.Error()) - case !ok: - return errtypes.PermissionDenied(filepath.Join(n.ParentID, n.Name)) - } - - if n.ID == "" { - n.ID = uuid.New().String() - } - - nodePath := fs.lu.toInternalPath(n.ID) - - var tmp *os.File - tmp, err = ioutil.TempFile(filepath.Dir(nodePath), "._reva_atomic_upload") + upload, err := fs.GetUpload(ctx, ref.GetPath()) if err != nil { - return errors.Wrap(err, "ocisfs: error creating tmp fn at "+nodePath) + return errors.Wrap(err, "ocisfs: error retrieving upload") } - _, err = io.Copy(tmp, r) - r.Close() - tmp.Close() - if err != nil { - return errors.Wrap(err, "ocisfs: error writing to tmp file "+tmp.Name()) - } - - // TODO move old content to version - //_ = os.RemoveAll(path.Join(nodePath, "content")) - appctx.GetLogger(ctx).Warn().Msg("TODO move old content to version") + uploadInfo := upload.(*fileUpload) - appctx.GetLogger(ctx).Debug().Str("tmp", tmp.Name()).Str("ipath", nodePath).Msg("moving tmp to content") - if err = os.Rename(tmp.Name(), nodePath); err != nil { - return - } - - // who will become the owner? - u, ok := user.ContextGetUser(ctx) - switch { - case ok: - err = n.writeMetadata(u.Id) - case fs.o.EnableHome: - log := appctx.GetLogger(ctx) - log.Error().Msg("home support enabled but no user in context") - err = errors.Wrap(errtypes.UserRequired("userrequired"), "error getting user from ctx") - case fs.o.Owner != "": - err = n.writeMetadata(&userpb.UserId{ - OpaqueId: fs.o.Owner, - }) - default: - // fallback to parent owner? - err = n.writeMetadata(nil) - } + p := uploadInfo.info.Storage["NodeName"] + ok, err := chunking.IsChunked(p) if err != nil { - return + return errors.Wrap(err, "ocfs: error checking path") } - - // link child name to parent if it is new - childNameLink := filepath.Join(fs.lu.toInternalPath(n.ParentID), n.Name) - var link string - link, err = os.Readlink(childNameLink) - if err == nil && link != "../"+n.ID { - log.Err(err). - Interface("node", n). - Str("childNameLink", childNameLink). - Str("link", link). - Msg("ocisfs: child name link has wrong target id, repairing") - - if err = os.Remove(childNameLink); err != nil { - return errors.Wrap(err, "ocisfs: could not remove symlink child entry") + if ok { + var assembledFile string + p, assembledFile, err = fs.chunkHandler.WriteChunk(p, r) + if err != nil { + return err } - } - if os.IsNotExist(err) || link != "../"+n.ID { - if err = os.Symlink("../"+n.ID, childNameLink); err != nil { - return errors.Wrap(err, "ocisfs: could not symlink child entry") + if p == "" { + if err = uploadInfo.Terminate(ctx); err != nil { + return errors.Wrap(err, "ocfs: error removing auxiliary files") + } + return errtypes.PartialContent(ref.String()) } + uploadInfo.info.Storage["NodeName"] = p + fd, err := os.Open(assembledFile) + if err != nil { + return errors.Wrap(err, "eos: error opening assembled file") + } + defer fd.Close() + defer os.RemoveAll(assembledFile) + r = fd + } + + if _, err := uploadInfo.WriteChunk(ctx, 0, r); err != nil { + return errors.Wrap(err, "ocisfs: error writing to binary file") } - return fs.tp.Propagate(ctx, n) + return uploadInfo.FinishUpload(ctx) } // InitiateUpload returns an upload id that can be used for uploads with tus @@ -423,6 +358,7 @@ func (upload *fileUpload) writeInfo() error { // FinishUpload finishes an upload and moves the file to the internal destination func (upload *fileUpload) FinishUpload(ctx context.Context) (err error) { + log := appctx.GetLogger(upload.ctx) n := &Node{ lu: upload.fs.lu, @@ -443,7 +379,6 @@ func (upload *fileUpload) FinishUpload(ctx context.Context) (err error) { versionsPath := upload.fs.lu.toInternalPath(n.ID + ".REV." + fi.ModTime().UTC().Format(time.RFC3339Nano)) if err = os.Rename(targetPath, versionsPath); err != nil { - log := appctx.GetLogger(upload.ctx) log.Err(err).Interface("info", upload.info). Str("binPath", upload.binPath). Str("targetPath", targetPath). diff --git a/pkg/storage/fs/owncloud/owncloud.go b/pkg/storage/fs/owncloud/owncloud.go index ab32fe1f42..6e08cf4047 100644 --- a/pkg/storage/fs/owncloud/owncloud.go +++ b/pkg/storage/fs/owncloud/owncloud.go @@ -44,6 +44,7 @@ import ( "github.com/cs3org/reva/pkg/storage" "github.com/cs3org/reva/pkg/storage/fs/registry" "github.com/cs3org/reva/pkg/storage/utils/ace" + "github.com/cs3org/reva/pkg/storage/utils/chunking" "github.com/cs3org/reva/pkg/storage/utils/templates" "github.com/cs3org/reva/pkg/user" "github.com/gofrs/uuid" @@ -198,12 +199,17 @@ func New(m map[string]interface{}) (storage.FS, error) { }, } - return &ocfs{c: c, pool: pool}, nil + return &ocfs{ + c: c, + pool: pool, + chunkHandler: chunking.NewChunkHandler(c.UploadInfoDir), + }, nil } type ocfs struct { - c *config - pool *redis.Pool + c *config + pool *redis.Pool + chunkHandler *chunking.ChunkHandler } func (fs *ocfs) Shutdown(ctx context.Context) error { diff --git a/pkg/storage/fs/owncloud/upload.go b/pkg/storage/fs/owncloud/upload.go index 99c8c2958d..d8e166ac42 100644 --- a/pkg/storage/fs/owncloud/upload.go +++ b/pkg/storage/fs/owncloud/upload.go @@ -31,6 +31,7 @@ import ( "github.com/cs3org/reva/pkg/appctx" "github.com/cs3org/reva/pkg/errtypes" "github.com/cs3org/reva/pkg/logger" + "github.com/cs3org/reva/pkg/storage/utils/chunking" "github.com/cs3org/reva/pkg/storage/utils/templates" "github.com/cs3org/reva/pkg/user" "github.com/google/uuid" @@ -40,65 +41,46 @@ import ( var defaultFilePerm = os.FileMode(0664) -// TODO deprecated ... use tus func (fs *ocfs) Upload(ctx context.Context, ref *provider.Reference, r io.ReadCloser) error { - ip, err := fs.resolve(ctx, ref) + upload, err := fs.GetUpload(ctx, ref.GetPath()) if err != nil { - return errors.Wrap(err, "ocfs: error resolving reference") + return errors.Wrap(err, "ocfs: error retrieving upload") } - var perm *provider.ResourcePermissions - var perr error - // if destination exists - if _, err := os.Stat(ip); err == nil { - // check permissions of file to be overwritten - perm, perr = fs.readPermissions(ctx, ip) - } else { - // check permissions - perm, perr = fs.readPermissions(ctx, filepath.Dir(ip)) - } - if perr == nil { - if !perm.InitiateFileUpload { - return errtypes.PermissionDenied("") - } - } else { - if isNotFound(perr) { - return errtypes.NotFound(fs.toStoragePath(ctx, filepath.Dir(ip))) - } - return errors.Wrap(perr, "ocfs: error reading permissions") - } + uploadInfo := upload.(*fileUpload) - // we cannot rely on /tmp as it can live in another partition and we can - // hit invalid cross-device link errors, so we create the tmp file in the same directory - // the file is supposed to be written. - tmp, err := ioutil.TempFile(filepath.Dir(ip), "._reva_atomic_upload") + p := uploadInfo.info.Storage["InternalDestination"] + ok, err := chunking.IsChunked(p) if err != nil { - return errors.Wrap(err, "ocfs: error creating tmp file at "+filepath.Dir(ip)) + return errors.Wrap(err, "ocfs: error checking path") } - - _, err = io.Copy(tmp, r) - if err != nil { - return errors.Wrap(err, "ocfs: error writing to tmp file "+tmp.Name()) - } - - // if destination exists - if _, err := os.Stat(ip); err == nil { - // copy attributes of existing file to tmp file - if err := fs.copyMD(ip, tmp.Name()); err != nil { - return errors.Wrap(err, "ocfs: error copying metadata from "+ip+" to "+tmp.Name()) - } - // create revision - if err := fs.archiveRevision(ctx, fs.getVersionsPath(ctx, ip), ip); err != nil { + if ok { + var assembledFile string + p, assembledFile, err = fs.chunkHandler.WriteChunk(p, r) + if err != nil { return err } + if p == "" { + if err = uploadInfo.Terminate(ctx); err != nil { + return errors.Wrap(err, "ocfs: error removing auxiliary files") + } + return errtypes.PartialContent(ref.String()) + } + uploadInfo.info.Storage["InternalDestination"] = p + fd, err := os.Open(assembledFile) + if err != nil { + return errors.Wrap(err, "eos: error opening assembled file") + } + defer fd.Close() + defer os.RemoveAll(assembledFile) + r = fd } - // TODO(jfd): make sure rename is atomic, missing fsync ... - if err := os.Rename(tmp.Name(), ip); err != nil { - return errors.Wrap(err, "ocfs: error renaming from "+tmp.Name()+" to "+ip) + if _, err := uploadInfo.WriteChunk(ctx, 0, r); err != nil { + return errors.Wrap(err, "ocfs: error writing to binary file") } - return nil + return uploadInfo.FinishUpload(ctx) } // InitiateUpload returns an upload id that can be used for uploads with tus diff --git a/internal/http/services/owncloud/ocdav/putchunked.go b/pkg/storage/utils/chunking/chunking.go similarity index 63% rename from internal/http/services/owncloud/ocdav/putchunked.go rename to pkg/storage/utils/chunking/chunking.go index 2708643d5f..57508a376e 100644 --- a/internal/http/services/owncloud/ocdav/putchunked.go +++ b/pkg/storage/utils/chunking/chunking.go @@ -16,47 +16,50 @@ // granted to it by virtue of its status as an Intergovernmental Organization // or submit itself to any jurisdiction. -package ocdav +package chunking import ( - "context" "fmt" "io" "io/ioutil" - "net/http" "os" - "path" "path/filepath" + "regexp" "strconv" "strings" - - "github.com/cs3org/reva/pkg/appctx" ) -type chunkBLOBInfo struct { - path string - transferID string - totalChunks int64 - currentChunk int64 +// IsChunked checks if a given path refers to a chunk or not +func IsChunked(fn string) (bool, error) { + // FIXME: also need to check whether the OC-Chunked header is set + return regexp.MatchString(`-chunking-\w+-[0-9]+-[0-9]+$`, fn) +} + +// ChunkBLOBInfo stores info about a particular chunk +type ChunkBLOBInfo struct { + Path string + TransferID string + TotalChunks int + CurrentChunk int } -// not using the resource path in the chunk folder name allows uploading -// to the same folder after a move without having to restart the chunk -// upload -func (c *chunkBLOBInfo) uploadID() string { - return fmt.Sprintf("chunking-%s-%d", c.transferID, c.totalChunks) +// Not using the resource path in the chunk folder name allows uploading to +// the same folder after a move without having to restart the chunk upload +func (c *ChunkBLOBInfo) uploadID() string { + return fmt.Sprintf("chunking-%s-%d", c.TransferID, c.TotalChunks) } -func getChunkBLOBInfo(path string) (*chunkBLOBInfo, error) { +// GetChunkBLOBInfo decodes a chunk name to retrieve info about it. +func GetChunkBLOBInfo(path string) (*ChunkBLOBInfo, error) { parts := strings.Split(path, "-chunking-") tail := strings.Split(parts[1], "-") - totalChunks, err := strconv.ParseInt(tail[1], 10, 64) + totalChunks, err := strconv.Atoi(tail[1]) if err != nil { return nil, err } - currentChunk, err := strconv.ParseInt(tail[2], 10, 64) + currentChunk, err := strconv.Atoi(tail[2]) if err != nil { return nil, err } @@ -64,16 +67,27 @@ func getChunkBLOBInfo(path string) (*chunkBLOBInfo, error) { return nil, fmt.Errorf("current chunk:%d exceeds total number of chunks:%d", currentChunk, totalChunks) } - return &chunkBLOBInfo{ - path: parts[0], - transferID: tail[0], - totalChunks: totalChunks, - currentChunk: currentChunk, + return &ChunkBLOBInfo{ + Path: parts[0], + TransferID: tail[0], + TotalChunks: totalChunks, + CurrentChunk: currentChunk, }, nil } -func (s *svc) createChunkTempFile() (string, *os.File, error) { - file, err := ioutil.TempFile(fmt.Sprintf("/%s", s.c.ChunkFolder), "") +// ChunkHandler manages chunked uploads, storing the chunks in a temporary directory +// until it gets the final chunk which is then returned. +type ChunkHandler struct { + ChunkFolder string `mapstructure:"chunk_folder"` +} + +// NewChunkHandler creates a handler for chunked uploads. +func NewChunkHandler(chunkFolder string) *ChunkHandler { + return &ChunkHandler{chunkFolder} +} + +func (c *ChunkHandler) createChunkTempFile() (string, *os.File, error) { + file, err := ioutil.TempFile(fmt.Sprintf("/%s", c.ChunkFolder), "") if err != nil { return "", nil, err } @@ -81,60 +95,48 @@ func (s *svc) createChunkTempFile() (string, *os.File, error) { return file.Name(), file, nil } -func (s *svc) getChunkFolderName(i *chunkBLOBInfo) (string, error) { - path := "/" + s.c.ChunkFolder + filepath.Clean("/"+i.uploadID()) +func (c *ChunkHandler) getChunkFolderName(i *ChunkBLOBInfo) (string, error) { + path := "/" + c.ChunkFolder + filepath.Clean("/"+i.uploadID()) if err := os.MkdirAll(path, 0755); err != nil { return "", err } return path, nil } -func (s *svc) saveChunk(ctx context.Context, path string, r io.ReadCloser) (bool, string, error) { - log := appctx.GetLogger(ctx) - chunkInfo, err := getChunkBLOBInfo(path) +func (c *ChunkHandler) saveChunk(path string, r io.ReadCloser) (bool, string, error) { + chunkInfo, err := GetChunkBLOBInfo(path) if err != nil { err := fmt.Errorf("error getting chunk info from path: %s", path) - //c.logger.Error().Log("error", err) return false, "", err } - //c.logger.Info().Log("chunknum", chunkInfo.currentChunk, "chunks", chunkInfo.totalChunks, - //"transferid", chunkInfo.transferID, "uploadid", chunkInfo.uploadID()) - - chunkTempFilename, chunkTempFile, err := s.createChunkTempFile() + chunkTempFilename, chunkTempFile, err := c.createChunkTempFile() if err != nil { - //c.logger.Error().Log("error", err) return false, "", err } defer chunkTempFile.Close() if _, err := io.Copy(chunkTempFile, r); err != nil { - //c.logger.Error().Log("error", err) return false, "", err } // force close of the file here because if it is the last chunk to // assemble the big file we must have all the chunks already closed. if err = chunkTempFile.Close(); err != nil { - //c.logger.Error().Log("error", err) return false, "", err } - chunksFolderName, err := s.getChunkFolderName(chunkInfo) + chunksFolderName, err := c.getChunkFolderName(chunkInfo) if err != nil { - //c.logger.Error().Log("error", err) return false, "", err } //c.logger.Info().Log("chunkfolder", chunksFolderName) - chunkTarget := chunksFolderName + "/" + fmt.Sprintf("%d", chunkInfo.currentChunk) + chunkTarget := chunksFolderName + "/" + fmt.Sprintf("%d", chunkInfo.CurrentChunk) if err = os.Rename(chunkTempFilename, chunkTarget); err != nil { - //c.logger.Error().Log("error", err) return false, "", err } - //c.logger.Info().Log("chunktarget", chunkTarget) - // Check that all chunks are uploaded. // This is very inefficient, the server has to check that it has all the // chunks after each uploaded chunk. @@ -142,7 +144,6 @@ func (s *svc) saveChunk(ctx context.Context, path string, r io.ReadCloser) (bool // assembly the chunks when the client asks for it. chunksFolder, err := os.Open(chunksFolderName) if err != nil { - //c.logger.Error().Log("error", err) return false, "", err } defer chunksFolder.Close() @@ -150,50 +151,41 @@ func (s *svc) saveChunk(ctx context.Context, path string, r io.ReadCloser) (bool // read all the chunks inside the chunk folder; -1 == all chunks, err := chunksFolder.Readdir(-1) if err != nil { - //c.logger.Error().Log("error", err) return false, "", err } - //c.logger.Info().Log("msg", "chunkfolder readed", "nchunks", len(chunks)) - // there is still some chunks to be uploaded. + // there are still some chunks to be uploaded. // we return CodeUploadIsPartial to notify upper layers that the upload is still // not complete and requires more actions. // This code is needed to notify the owncloud webservice that the upload has not yet been // completed and needs to continue uploading chunks. - if len(chunks) < int(chunkInfo.totalChunks) { + if len(chunks) < chunkInfo.TotalChunks { return false, "", nil } - assembledFileName, assembledFile, err := s.createChunkTempFile() + assembledFileName, assembledFile, err := c.createChunkTempFile() if err != nil { - //c.logger.Error().Log("error", err) return false, "", err } defer assembledFile.Close() - //c.logger.Info().Log("assembledfile", assembledFileName) - // walk all chunks and append to assembled file for i := range chunks { target := chunksFolderName + "/" + fmt.Sprintf("%d", i) chunk, err := os.Open(target) if err != nil { - //c.logger.Error().Log("error", err) return false, "", err } defer chunk.Close() if _, err = io.Copy(assembledFile, chunk); err != nil { - //c.logger.Error().Log("error", err) return false, "", err } - //c.logger.Debug().Log("msg", "chunk appended to assembledfile") // we close the chunk here because if the assembled file contains hundreds of chunks // we will end up with hundreds of open file descriptors if err = chunk.Close(); err != nil { - //c.logger.Error().Log("error", err) return false, "", err } @@ -201,64 +193,29 @@ func (s *svc) saveChunk(ctx context.Context, path string, r io.ReadCloser) (bool // at this point the assembled file is complete // so we free space removing the chunks folder - defer func() { - if err = os.RemoveAll(chunksFolderName); err != nil { - log.Warn().Err(err).Msg("error deleting chunk folder, remove folder manually/cron to not leak storage space") - } - }() - - // when writing to the assembled file the write pointer points to the end of the file - // so we need to seek it to the beginning - if _, err = assembledFile.Seek(0, 0); err != nil { - //c.logger.Error().Log("error", err) - return false, "", err - } + defer os.RemoveAll(chunksFolderName) return true, assembledFileName, nil } -func (s *svc) handlePutChunked(w http.ResponseWriter, r *http.Request, ns string) { - ctx := r.Context() - log := appctx.GetLogger(ctx) - - fn := r.URL.Path - if r.Body == nil { - log.Warn().Msg("body is nil") - w.WriteHeader(http.StatusBadRequest) - return - } - - finish, chunk, err := s.saveChunk(ctx, fn, r.Body) +// WriteChunk saves an intermediate chunk temporarily and assembles all chunks +// once the final one is received. +func (c *ChunkHandler) WriteChunk(fn string, r io.ReadCloser) (string, string, error) { + finish, chunk, err := c.saveChunk(fn, r) if err != nil { - log.Error().Err(err).Msg("error saving chunk") - w.WriteHeader(http.StatusInternalServerError) - return + return "", "", err } if !finish { - w.WriteHeader(http.StatusPartialContent) - return + return "", "", nil } - fd, err := os.Open(chunk) + chunkInfo, err := GetChunkBLOBInfo(fn) if err != nil { - log.Error().Err(err).Msg("error opening chunk") - w.WriteHeader(http.StatusInternalServerError) - return - } - defer fd.Close() - - md, err := fd.Stat() - if err != nil { - log.Error().Err(err).Msg("error statting chunk") - w.WriteHeader(http.StatusInternalServerError) - return + return "", "", err } - chunkInfo, _ := getChunkBLOBInfo(fn) - fn = path.Join(applyLayout(ctx, ns), chunkInfo.path) - - s.handlePutHelper(w, r, fd, fn, md.Size()) + return chunkInfo.Path, chunk, nil // TODO(labkode): implement old chunking diff --git a/pkg/storage/utils/eosfs/eosfs.go b/pkg/storage/utils/eosfs/eosfs.go index b2865c2411..087f1e696f 100644 --- a/pkg/storage/utils/eosfs/eosfs.go +++ b/pkg/storage/utils/eosfs/eosfs.go @@ -42,6 +42,7 @@ import ( "github.com/cs3org/reva/pkg/sharedconf" "github.com/cs3org/reva/pkg/storage" "github.com/cs3org/reva/pkg/storage/utils/acl" + "github.com/cs3org/reva/pkg/storage/utils/chunking" "github.com/cs3org/reva/pkg/storage/utils/grants" "github.com/cs3org/reva/pkg/storage/utils/templates" "github.com/cs3org/reva/pkg/user" @@ -175,6 +176,7 @@ func (c *Config) init() { type eosfs struct { c *eosclient.Client conf *Config + chunkHandler *chunking.ChunkHandler singleUserUID string singleUserGID string } @@ -208,8 +210,9 @@ func NewEOSFS(c *Config) (storage.FS, error) { eosClient := eosclient.New(eosClientOpts) eosfs := &eosfs{ - c: eosClient, - conf: c, + c: eosClient, + conf: c, + chunkHandler: chunking.NewChunkHandler(c.CacheDirectory), } return eosfs, nil diff --git a/pkg/storage/utils/eosfs/upload.go b/pkg/storage/utils/eosfs/upload.go index 39a6da9501..e60c539680 100644 --- a/pkg/storage/utils/eosfs/upload.go +++ b/pkg/storage/utils/eosfs/upload.go @@ -20,24 +20,15 @@ package eosfs import ( "context" - "encoding/json" "io" - "io/ioutil" "os" - "path/filepath" - "strings" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" - "github.com/cs3org/reva/pkg/appctx" "github.com/cs3org/reva/pkg/errtypes" - "github.com/google/uuid" + "github.com/cs3org/reva/pkg/storage/utils/chunking" "github.com/pkg/errors" - tusd "github.com/tus/tusd/pkg/handler" ) -var defaultFilePerm = os.FileMode(0664) - -// TODO deprecated ... use tus func (fs *eosfs) Upload(ctx context.Context, ref *provider.Reference, r io.ReadCloser) error { u, err := getUser(ctx) if err != nil { @@ -57,301 +48,32 @@ func (fs *eosfs) Upload(ctx context.Context, ref *provider.Reference, r io.ReadC return errtypes.PermissionDenied("eos: cannot upload under the virtual share folder") } - fn := fs.wrap(ctx, p) - - return fs.c.Write(ctx, uid, gid, fn, r) -} - -// InitiateUpload returns an upload id that can be used for uploads with tus -// TODO read optional content for small files in this request -func (fs *eosfs) InitiateUpload(ctx context.Context, ref *provider.Reference, uploadLength int64, metadata map[string]string) (uploadID string, err error) { - u, err := getUser(ctx) + ok, err := chunking.IsChunked(p) if err != nil { - return "", errors.Wrap(err, "eos: no user in ctx") - } - - np, err := fs.resolve(ctx, u, ref) - if err != nil { - return "", errors.Wrap(err, "eos: error resolving reference") - } - - p := fs.wrap(ctx, np) - - info := tusd.FileInfo{ - MetaData: tusd.MetaData{ - "filename": filepath.Base(p), - "dir": filepath.Dir(p), - }, - Size: uploadLength, - } - - if metadata != nil && metadata["mtime"] != "" { - info.MetaData["mtime"] = metadata["mtime"] - } - - upload, err := fs.NewUpload(ctx, info) - if err != nil { - return "", err - } - - info, _ = upload.GetInfo(ctx) - - return info.ID, nil -} - -// UseIn tells the tus upload middleware which extensions it supports. -func (fs *eosfs) UseIn(composer *tusd.StoreComposer) { - composer.UseCore(fs) - composer.UseTerminater(fs) -} - -// NewUpload creates a new upload using the size as the file's length. To determine where to write the binary data -// the Fileinfo metadata must contain a dir and a filename. -// returns a unique id which is used to identify the upload. The properties Size and MetaData will be filled. -func (fs *eosfs) NewUpload(ctx context.Context, info tusd.FileInfo) (upload tusd.Upload, err error) { - - log := appctx.GetLogger(ctx) - log.Debug().Interface("info", info).Msg("eos: NewUpload") - - fn := info.MetaData["filename"] - if fn == "" { - return nil, errors.New("eos: missing filename in metadata") - } - info.MetaData["filename"] = filepath.Clean(info.MetaData["filename"]) - - dir := info.MetaData["dir"] - if dir == "" { - return nil, errors.New("eos: missing dir in metadata") + return errors.Wrap(err, "eos: error checking path") } - info.MetaData["dir"] = filepath.Clean(info.MetaData["dir"]) - - log.Debug().Interface("info", info).Msg("eos: resolved filename") - - info.ID = uuid.New().String() - - binPath, err := fs.getUploadPath(ctx, info.ID) - if err != nil { - return nil, errors.Wrap(err, "eos: error resolving upload path") - } - user, err := getUser(ctx) - if err != nil { - return nil, errors.Wrap(err, "eos: no user in ctx") - } - uid, gid, err := fs.getUserUIDAndGID(ctx, user) - if err != nil { - return nil, err - } - - info.Storage = map[string]string{ - "Type": "EOSStore", - "Username": user.Username, - "UID": uid, - "GID": gid, - } - // Create binary file with no content - - file, err := os.OpenFile(binPath, os.O_CREATE|os.O_WRONLY, defaultFilePerm) - if err != nil { - return nil, err - } - defer file.Close() - - u := &fileUpload{ - info: info, - binPath: binPath, - infoPath: binPath + ".info", - fs: fs, - } - - if !info.SizeIsDeferred && info.Size == 0 { - log.Debug().Interface("info", info).Msg("eos: finishing upload for empty file") - // no need to create info file and finish directly - err := u.FinishUpload(ctx) + if ok { + var assembledFile string + p, assembledFile, err = fs.chunkHandler.WriteChunk(p, r) if err != nil { - return nil, err - } - return u, nil - } - - // writeInfo creates the file by itself if necessary - err = u.writeInfo() - if err != nil { - return nil, err - } - - return u, nil -} - -// TODO use a subdirectory in the shadow tree -func (fs *eosfs) getUploadPath(ctx context.Context, uploadID string) (string, error) { - return filepath.Join(fs.conf.CacheDirectory, uploadID), nil -} - -// GetUpload returns the Upload for the given upload id -func (fs *eosfs) GetUpload(ctx context.Context, id string) (tusd.Upload, error) { - binPath, err := fs.getUploadPath(ctx, id) - if err != nil { - return nil, err - } - infoPath := binPath + ".info" - info := tusd.FileInfo{} - data, err := ioutil.ReadFile(infoPath) - if err != nil { - return nil, err - } - if err := json.Unmarshal(data, &info); err != nil { - return nil, err - } - - stat, err := os.Stat(binPath) - if err != nil { - return nil, err - } - - info.Offset = stat.Size() - - return &fileUpload{ - info: info, - binPath: binPath, - infoPath: infoPath, - fs: fs, - }, nil -} - -type fileUpload struct { - // info stores the current information about the upload - info tusd.FileInfo - // infoPath is the path to the .info file - infoPath string - // binPath is the path to the binary file (which has no extension) - binPath string - // only fs knows how to handle metadata and versions - fs *eosfs -} - -// GetInfo returns the FileInfo -func (upload *fileUpload) GetInfo(ctx context.Context) (tusd.FileInfo, error) { - return upload.info, nil -} - -// GetReader returns an io.Reader for the upload -func (upload *fileUpload) GetReader(ctx context.Context) (io.Reader, error) { - f, err := os.Open(upload.binPath) - if err != nil { - return nil, err - } - defer f.Close() - return f, nil -} - -// WriteChunk writes the stream from the reader to the given offset of the upload -// TODO use the grpc api to directly stream to a temporary uploads location in the eos shadow tree -func (upload *fileUpload) WriteChunk(ctx context.Context, offset int64, src io.Reader) (int64, error) { - file, err := os.OpenFile(upload.binPath, os.O_WRONLY|os.O_APPEND, defaultFilePerm) - if err != nil { - return 0, err - } - defer file.Close() - - n, err := io.Copy(file, src) - - // If the HTTP PATCH request gets interrupted in the middle (e.g. because - // the user wants to pause the upload), Go's net/http returns an io.ErrUnexpectedEOF. - // However, for OwnCloudStore it's not important whether the stream has ended - // on purpose or accidentally. - if err != nil { - if err != io.ErrUnexpectedEOF { - return n, err - } - } - - upload.info.Offset += n - err = upload.writeInfo() - - return n, err -} - -// writeInfo updates the entire information. Everything will be overwritten. -func (upload *fileUpload) writeInfo() error { - data, err := json.Marshal(upload.info) - if err != nil { - return err - } - return ioutil.WriteFile(upload.infoPath, data, defaultFilePerm) -} - -// FinishUpload finishes an upload and moves the file to the internal destination -func (upload *fileUpload) FinishUpload(ctx context.Context) error { - - checksum := upload.info.MetaData["checksum"] - if checksum != "" { - // check checksum - s := strings.SplitN(checksum, " ", 2) - if len(s) == 2 { - alg, hash := s[0], s[1] - - log := appctx.GetLogger(ctx) - log.Debug(). - Interface("info", upload.info). - Str("alg", alg). - Str("hash", hash). - Msg("eos: TODO check checksum") // TODO this is done by eos if we write chunks to it directly - + return err } - } - np := filepath.Join(upload.info.MetaData["dir"], upload.info.MetaData["filename"]) - - // TODO check etag with If-Match header - // if destination exists - //if _, err := os.Stat(np); err == nil { - // copy attributes of existing file to tmp file before overwriting the target? - // eos creates revisions internally - //} - - err := upload.fs.c.WriteFile(ctx, upload.info.Storage["UID"], upload.info.Storage["GID"], np, upload.binPath) - - // only delete the upload if it was successfully written to eos - if err == nil { - if err := os.Remove(upload.infoPath); err != nil { - if !os.IsNotExist(err) { - log := appctx.GetLogger(ctx) - log.Err(err).Interface("info", upload.info).Msg("eos: could not delete upload info") - } + if p == "" { + return errtypes.PartialContent(ref.String()) } - if err := os.Remove(upload.binPath); err != nil { - if !os.IsNotExist(err) { - log := appctx.GetLogger(ctx) - log.Err(err).Interface("info", upload.info).Msg("eos: could not delete upload binary") - } + fd, err := os.Open(assembledFile) + if err != nil { + return errors.Wrap(err, "eos: error opening assembled file") } + defer fd.Close() + defer os.RemoveAll(assembledFile) + r = fd } - // TODO: set mtime if specified in metadata - - // metadata propagation is left to the storage implementation - return err -} - -// To implement the termination extension as specified in https://tus.io/protocols/resumable-upload.html#termination -// - the storage needs to implement AsTerminatableUpload -// - the upload needs to implement Terminate - -// AsTerminatableUpload returns a TerminatableUpload -func (fs *eosfs) AsTerminatableUpload(upload tusd.Upload) tusd.TerminatableUpload { - return upload.(*fileUpload) + fn := fs.wrap(ctx, p) + return fs.c.Write(ctx, uid, gid, fn, r) } -// Terminate terminates the upload -func (upload *fileUpload) Terminate(ctx context.Context) error { - if err := os.Remove(upload.infoPath); err != nil { - if !os.IsNotExist(err) { - return err - } - } - if err := os.Remove(upload.binPath); err != nil { - if !os.IsNotExist(err) { - return err - } - } - return nil +func (fs *eosfs) InitiateUpload(ctx context.Context, ref *provider.Reference, uploadLength int64, metadata map[string]string) (string, error) { + return ref.GetPath(), nil } diff --git a/pkg/storage/utils/localfs/localfs.go b/pkg/storage/utils/localfs/localfs.go index 5f643499d5..909b1acd9d 100644 --- a/pkg/storage/utils/localfs/localfs.go +++ b/pkg/storage/utils/localfs/localfs.go @@ -37,6 +37,7 @@ import ( "github.com/cs3org/reva/pkg/errtypes" "github.com/cs3org/reva/pkg/mime" "github.com/cs3org/reva/pkg/storage" + "github.com/cs3org/reva/pkg/storage/utils/chunking" "github.com/cs3org/reva/pkg/storage/utils/grants" "github.com/cs3org/reva/pkg/storage/utils/templates" "github.com/cs3org/reva/pkg/user" @@ -84,8 +85,9 @@ func (c *Config) init() { } type localfs struct { - conf *Config - db *sql.DB + conf *Config + db *sql.DB + chunkHandler *chunking.ChunkHandler } // NewLocalFS returns a storage.FS interface implementation that controls then @@ -111,7 +113,11 @@ func NewLocalFS(c *Config) (storage.FS, error) { return nil, errors.Wrap(err, "localfs: error initializing db") } - return &localfs{conf: c, db: db}, nil + return &localfs{ + conf: c, + db: db, + chunkHandler: chunking.NewChunkHandler(c.Uploads), + }, nil } func (fs *localfs) Shutdown(ctx context.Context) error { diff --git a/pkg/storage/utils/localfs/upload.go b/pkg/storage/utils/localfs/upload.go index 301fd51e57..20fc1fa805 100644 --- a/pkg/storage/utils/localfs/upload.go +++ b/pkg/storage/utils/localfs/upload.go @@ -29,6 +29,8 @@ import ( userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" "github.com/cs3org/reva/pkg/appctx" + "github.com/cs3org/reva/pkg/errtypes" + "github.com/cs3org/reva/pkg/storage/utils/chunking" "github.com/cs3org/reva/pkg/user" "github.com/google/uuid" "github.com/pkg/errors" @@ -37,41 +39,46 @@ import ( var defaultFilePerm = os.FileMode(0664) -// TODO deprecated ... use tus func (fs *localfs) Upload(ctx context.Context, ref *provider.Reference, r io.ReadCloser) error { - fn, err := fs.resolve(ctx, ref) + upload, err := fs.GetUpload(ctx, ref.GetPath()) if err != nil { - return errors.Wrap(err, "error resolving ref") + return errors.Wrap(err, "ocisfs: error retrieving upload") } - fn = fs.wrap(ctx, fn) - // we cannot rely on /tmp as it can live in another partition and we can - // hit invalid cross-device link errors, so we create the tmp file in the same directory - // the file is supposed to be written. - tmp, err := ioutil.TempFile(filepath.Dir(fn), "._reva_atomic_upload") - if err != nil { - return errors.Wrap(err, "localfs: error creating tmp fn at "+filepath.Dir(fn)) - } + uploadInfo := upload.(*fileUpload) - _, err = io.Copy(tmp, r) + p := uploadInfo.info.Storage["InternalDestination"] + ok, err := chunking.IsChunked(p) if err != nil { - return errors.Wrap(err, "localfs: error writing to tmp file "+tmp.Name()) + return errors.Wrap(err, "ocfs: error checking path") } - - // if destination exists - if _, err := os.Stat(fn); err == nil { - // create revision - if err := fs.archiveRevision(ctx, fn); err != nil { + if ok { + var assembledFile string + p, assembledFile, err = fs.chunkHandler.WriteChunk(p, r) + if err != nil { return err } + if p == "" { + if err = uploadInfo.Terminate(ctx); err != nil { + return errors.Wrap(err, "ocfs: error removing auxiliary files") + } + return errtypes.PartialContent(ref.String()) + } + uploadInfo.info.Storage["InternalDestination"] = p + fd, err := os.Open(assembledFile) + if err != nil { + return errors.Wrap(err, "eos: error opening assembled file") + } + defer fd.Close() + defer os.RemoveAll(assembledFile) + r = fd } - // TODO(labkode): make sure rename is atomic, missing fsync ... - if err := os.Rename(tmp.Name(), fn); err != nil { - return errors.Wrap(err, "localfs: error renaming from "+tmp.Name()+" to "+fn) + if _, err := uploadInfo.WriteChunk(ctx, 0, r); err != nil { + return errors.Wrap(err, "ocisfs: error writing to binary file") } - return nil + return uploadInfo.FinishUpload(ctx) } // InitiateUpload returns an upload id that can be used for uploads with tus @@ -80,6 +87,7 @@ func (fs *localfs) Upload(ctx context.Context, ref *provider.Reference, r io.Rea // TODO to implement LengthDeferrerDataStore make size optional // TODO read optional content for small files in this request func (fs *localfs) InitiateUpload(ctx context.Context, ref *provider.Reference, uploadLength int64, metadata map[string]string) (uploadID string, err error) { + np, err := fs.resolve(ctx, ref) if err != nil { return "", errors.Wrap(err, "localfs: error resolving reference")