Skip to content

Commit

Permalink
implement checksums in the owncloud storage driver (#1629)
Browse files Browse the repository at this point in the history
  • Loading branch information
C0rby authored Apr 14, 2021
1 parent a7acbd3 commit a4b5148
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 54 deletions.
5 changes: 5 additions & 0 deletions changelog/unreleased/owncloud-storage-checksums.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Enhancement: Implement checksums in the owncloud storage

Implemented checksums in the owncloud storage driver.

https://github.com/cs3org/reva/pull/1629
67 changes: 64 additions & 3 deletions pkg/storage/fs/owncloud/owncloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package owncloud

import (
"context"
"encoding/hex"
"fmt"
"io"
"io/ioutil"
Expand All @@ -35,6 +36,7 @@ import (
rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
types "github.com/cs3org/go-cs3apis/cs3/types/v1beta1"
"github.com/cs3org/reva/internal/grpc/services/storageprovider"
"github.com/cs3org/reva/pkg/appctx"
"github.com/cs3org/reva/pkg/errtypes"
"github.com/cs3org/reva/pkg/logger"
Expand Down Expand Up @@ -72,8 +74,9 @@ const (
mdPrefix string = ocPrefix + "md." // arbitrary metadata
favPrefix string = ocPrefix + "fav." // favorite flag, per user
etagPrefix string = ocPrefix + "etag." // allow overriding a calculated etag with one from the extended attributes
// checksumPrefix string = ocPrefix + "cs." // TODO add checksum support

checksumPrefix string = ocPrefix + "cs."
checksumsKey string = "http://owncloud.org/ns/checksums"
favoriteKey string = "http://owncloud.org/ns/favorite"
)

var defaultPermissions *provider.ResourcePermissions = &provider.ResourcePermissions{
Expand Down Expand Up @@ -612,7 +615,6 @@ func (fs *ocfs) convertToResourceInfo(ctx context.Context, fi os.FileInfo, ip st

metadata := map[string]string{}

favoriteKey := "http://owncloud.org/ns/favorite"
if _, ok := mdKeysMap[favoriteKey]; returnAllKeys || ok {
favorite := ""
if u, ok := user.ContextGetUser(ctx); ok {
Expand Down Expand Up @@ -682,6 +684,16 @@ func (fs *ocfs) convertToResourceInfo(ctx context.Context, fi os.FileInfo, ip st

ri.PermissionSet = fs.permissionSet(ctx, ri.Owner)

// checksums
if !fi.IsDir() {
if _, checksumRequested := mdKeysMap[checksumsKey]; returnAllKeys || checksumRequested {
// TODO which checksum was requested? sha1 adler32 or md5? for now hardcode sha1?
readChecksumIntoResourceChecksum(ctx, ip, storageprovider.XSSHA1, ri)
readChecksumIntoOpaque(ctx, ip, storageprovider.XSMD5, ri)
readChecksumIntoOpaque(ctx, ip, storageprovider.XSAdler32, ri)
}
}

return ri
}
func getResourceType(isDir bool) provider.ResourceType {
Expand Down Expand Up @@ -2228,5 +2240,54 @@ func (fs *ocfs) propagate(ctx context.Context, leafPath string) error {
return nil
}

func readChecksumIntoResourceChecksum(ctx context.Context, nodePath, algo string, ri *provider.ResourceInfo) {
v, err := xattr.Get(nodePath, checksumPrefix+algo)
log := appctx.GetLogger(ctx).
Debug().
Err(err).
Str("nodepath", nodePath).
Str("algorithm", algo)
switch {
case err == nil:
ri.Checksum = &provider.ResourceChecksum{
Type: storageprovider.PKG2GRPCXS(algo),
Sum: hex.EncodeToString(v),
}
case isNoData(err):
log.Msg("checksum not set")
case isNotFound(err):
log.Msg("file not found")
default:
log.Msg("could not read checksum")
}
}

func readChecksumIntoOpaque(ctx context.Context, nodePath, algo string, ri *provider.ResourceInfo) {
v, err := xattr.Get(nodePath, checksumPrefix+algo)
log := appctx.GetLogger(ctx).
Debug().
Err(err).
Str("nodepath", nodePath).
Str("algorithm", algo)
switch {
case err == nil:
if ri.Opaque == nil {
ri.Opaque = &types.Opaque{
Map: map[string]*types.OpaqueEntry{},
}
}
ri.Opaque.Map[algo] = &types.OpaqueEntry{
Decoder: "plain",
Value: []byte(hex.EncodeToString(v)),
}
case isNoData(err):
log.Msg("checksum not set")
case isNotFound(err):
log.Msg("file not found")
default:
log.Msg("could not read checksum")
}
}

// TODO propagate etag and mtime or append event to history? propagate on disk ...
// - but propagation is a separate task. only if upload was successful ...
100 changes: 90 additions & 10 deletions pkg/storage/fs/owncloud/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,17 @@ package owncloud

import (
"context"
"crypto/md5"
"crypto/sha1"
"encoding/hex"
"encoding/json"
"fmt"
"hash/adler32"
"io"
"io/ioutil"
"os"
"path/filepath"
"strings"

userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
Expand All @@ -36,6 +42,8 @@ import (
"github.com/cs3org/reva/pkg/user"
"github.com/google/uuid"
"github.com/pkg/errors"
"github.com/pkg/xattr"
"github.com/rs/zerolog"
tusd "github.com/tus/tusd/pkg/handler"
)

Expand Down Expand Up @@ -363,18 +371,54 @@ func (upload *fileUpload) writeInfo() error {

// FinishUpload finishes an upload and moves the file to the internal destination
func (upload *fileUpload) FinishUpload(ctx context.Context) error {
log := appctx.GetLogger(upload.ctx)

/*
checksum := upload.info.MetaData["checksum"]
if checksum != "" {
// TODO check checksum
s := strings.SplitN(checksum, " ", 2)
if len(s) == 2 {
alg, hash := s[0], s[1]
sha1Sum := make([]byte, 0, 32)
md5Sum := make([]byte, 0, 32)
adler32Sum := make([]byte, 0, 32)
{
sha1h := sha1.New()
md5h := md5.New()
adler32h := adler32.New()
f, err := os.Open(upload.binPath)
if err != nil {
log.Err(err).Msg("Decomposedfs: could not open file for checksumming")
// we can continue if no oc checksum header is set
}
defer f.Close()

}
r1 := io.TeeReader(f, sha1h)
r2 := io.TeeReader(r1, md5h)

if _, err := io.Copy(adler32h, r2); err != nil {
log.Err(err).Msg("Decomposedfs: could not copy bytes for checksumming")
}
*/

sha1Sum = sha1h.Sum(sha1Sum)
md5Sum = md5h.Sum(md5Sum)
adler32Sum = adler32h.Sum(adler32Sum)
}

if upload.info.MetaData["checksum"] != "" {
parts := strings.SplitN(upload.info.MetaData["checksum"], " ", 2)
if len(parts) != 2 {
return errtypes.BadRequest("invalid checksum format. must be '[algorithm] [checksum]'")
}
var err error
switch parts[0] {
case "sha1":
err = upload.checkHash(parts[1], sha1Sum)
case "md5":
err = upload.checkHash(parts[1], md5Sum)
case "adler32":
err = upload.checkHash(parts[1], adler32Sum)
default:
err = errtypes.BadRequest("unsupported checksum algorithm: " + parts[0])
}
if err != nil {
return err
}
}

ip := upload.info.Storage["InternalDestination"]

Expand All @@ -391,7 +435,6 @@ func (upload *fileUpload) FinishUpload(ctx context.Context) error {
}
}

log := appctx.GetLogger(upload.ctx)
err := os.Rename(upload.binPath, ip)
if err != nil {
log.Err(err).Interface("info", upload.info).
Expand All @@ -417,6 +460,11 @@ func (upload *fileUpload) FinishUpload(ctx context.Context) error {
}
}

// now try write all checksums
tryWritingChecksum(log, ip, "sha1", sha1Sum)
tryWritingChecksum(log, ip, "md5", md5Sum)
tryWritingChecksum(log, ip, "adler32", adler32Sum)

return upload.fs.propagate(upload.ctx, ip)
}

Expand Down Expand Up @@ -492,3 +540,35 @@ func (upload *fileUpload) ConcatUploads(ctx context.Context, uploads []tusd.Uplo

return
}

func (upload *fileUpload) checkHash(expected string, h []byte) error {
if expected != hex.EncodeToString(h) {
upload.discardChunk()
return errtypes.ChecksumMismatch(fmt.Sprintf("invalid checksum: expected %s got %x", upload.info.MetaData["checksum"], h))
}
return nil
}

func (upload *fileUpload) discardChunk() {
if err := os.Remove(upload.binPath); err != nil {
if !os.IsNotExist(err) {
appctx.GetLogger(upload.ctx).Err(err).Interface("info", upload.info).Str("binPath", upload.binPath).Interface("info", upload.info).Msg("Decomposedfs: could not discard chunk")
return
}
}
if err := os.Remove(upload.infoPath); err != nil {
if !os.IsNotExist(err) {
appctx.GetLogger(upload.ctx).Err(err).Interface("info", upload.info).Str("infoPath", upload.infoPath).Interface("info", upload.info).Msg("Decomposedfs: could not discard chunk info")
return
}
}
}

func tryWritingChecksum(log *zerolog.Logger, path, algo string, h []byte) {
if err := xattr.Set(path, checksumPrefix+algo, h); err != nil {
log.Err(err).
Str("csType", algo).
Bytes("hash", h).
Msg("ocfs: could not write checksum")
}
}
Loading

0 comments on commit a4b5148

Please sign in to comment.