Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Workaround for container registry push/pull errors (#21862) #22069

Merged
merged 1 commit into from
Dec 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions integrations/api_packages_container_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ package integrations

import (
"bytes"
"crypto/sha256"
"encoding/base64"
"fmt"
"net/http"
"strings"
"sync"
"testing"

"code.gitea.io/gitea/models/db"
Expand Down Expand Up @@ -549,6 +551,32 @@ func TestPackageContainer(t *testing.T) {
})
}

// https://github.com/go-gitea/gitea/issues/19586
t.Run("ParallelUpload", func(t *testing.T) {
defer PrintCurrentTest(t)()

url := fmt.Sprintf("%sv2/%s/parallel", setting.AppURL, user.Name)

var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)

content := []byte{byte(i)}
digest := fmt.Sprintf("sha256:%x", sha256.Sum256(content))

go func() {
defer wg.Done()

req := NewRequestWithBody(t, "POST", fmt.Sprintf("%s/blobs/uploads?digest=%s", url, digest), bytes.NewReader(content))
addTokenAuthHeader(req, userToken)
resp := MakeRequest(t, req, http.StatusCreated)

assert.Equal(t, digest, resp.Header().Get("Docker-Content-Digest"))
}()
}
wg.Wait()
})

t.Run("OwnerNameChange", func(t *testing.T) {
defer PrintCurrentTest(t)()

Expand Down
7 changes: 7 additions & 0 deletions modules/packages/content_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ func (s *ContentStore) Get(key BlobHash256Key) (storage.Object, error) {
return s.store.Open(KeyToRelativePath(key))
}

// FIXME: Workaround to be removed in v1.20
// https://github.com/go-gitea/gitea/issues/19586
func (s *ContentStore) Has(key BlobHash256Key) error {
_, err := s.store.Stat(KeyToRelativePath(key))
return err
}

// Save stores a package blob
func (s *ContentStore) Save(key BlobHash256Key, r io.Reader, size int64) error {
_, err := s.store.Save(KeyToRelativePath(key), r, size)
Expand Down
31 changes: 30 additions & 1 deletion routers/api/packages/container/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@ package container
import (
"context"
"encoding/hex"
"errors"
"fmt"
"os"
"strings"
"sync"

"code.gitea.io/gitea/models/db"
packages_model "code.gitea.io/gitea/models/packages"
Expand All @@ -19,6 +22,8 @@ import (
packages_service "code.gitea.io/gitea/services/packages"
)

var uploadVersionMutex sync.Mutex

// saveAsPackageBlob creates a package blob from an upload
// The uploaded blob gets stored in a special upload version to link them to the package/image
func saveAsPackageBlob(hsr packages_module.HashedSizeReader, pi *packages_service.PackageInfo) (*packages_model.PackageBlob, error) {
Expand All @@ -28,6 +33,11 @@ func saveAsPackageBlob(hsr packages_module.HashedSizeReader, pi *packages_servic

contentStore := packages_module.NewContentStore()

var uploadVersion *packages_model.PackageVersion

// FIXME: Replace usage of mutex with database transaction
// https://github.com/go-gitea/gitea/pull/21862
uploadVersionMutex.Lock()
err := db.WithTx(func(ctx context.Context) error {
created := true
p := &packages_model.Package{
Expand Down Expand Up @@ -68,11 +78,30 @@ func saveAsPackageBlob(hsr packages_module.HashedSizeReader, pi *packages_servic
}
}

uploadVersion = pv

return nil
})
uploadVersionMutex.Unlock()
if err != nil {
return nil, err
}

err = db.WithTx(func(ctx context.Context) error {
pb, exists, err = packages_model.GetOrInsertBlob(ctx, pb)
if err != nil {
log.Error("Error inserting package blob: %v", err)
return err
}
// FIXME: Workaround to be removed in v1.20
// https://github.com/go-gitea/gitea/issues/19586
if exists {
err = contentStore.Has(packages_module.BlobHash256Key(pb.HashSHA256))
if err != nil && errors.Is(err, os.ErrNotExist) {
log.Debug("Package registry inconsistent: blob %s does not exist on file system", pb.HashSHA256)
exists = false
}
}
if !exists {
if err := contentStore.Save(packages_module.BlobHash256Key(pb.HashSHA256), hsr, hsr.Size()); err != nil {
log.Error("Error saving package blob in content store: %v", err)
Expand All @@ -83,7 +112,7 @@ func saveAsPackageBlob(hsr packages_module.HashedSizeReader, pi *packages_servic
filename := strings.ToLower(fmt.Sprintf("sha256_%s", pb.HashSHA256))

pf := &packages_model.PackageFile{
VersionID: pv.ID,
VersionID: uploadVersion.ID,
BlobID: pb.ID,
Name: filename,
LowerName: filename,
Expand Down
27 changes: 24 additions & 3 deletions routers/api/packages/container/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"io"
"net/http"
"net/url"
"os"
"regexp"
"strconv"
"strings"
Expand Down Expand Up @@ -193,7 +194,7 @@ func InitiateUploadBlob(ctx *context.Context) {
mount := ctx.FormTrim("mount")
from := ctx.FormTrim("from")
if mount != "" {
blob, _ := container_model.GetContainerBlob(ctx, &container_model.BlobSearchOptions{
blob, _ := workaroundGetContainerBlob(ctx, &container_model.BlobSearchOptions{
Image: from,
Digest: mount,
})
Expand Down Expand Up @@ -361,7 +362,7 @@ func getBlobFromContext(ctx *context.Context) (*packages_model.PackageFileDescri
return nil, container_model.ErrContainerBlobNotExist
}

return container_model.GetContainerBlob(ctx, &container_model.BlobSearchOptions{
return workaroundGetContainerBlob(ctx, &container_model.BlobSearchOptions{
OwnerID: ctx.Package.Owner.ID,
Image: ctx.Params("image"),
Digest: digest,
Expand Down Expand Up @@ -503,7 +504,7 @@ func getManifestFromContext(ctx *context.Context) (*packages_model.PackageFileDe
return nil, container_model.ErrContainerBlobNotExist
}

return container_model.GetContainerBlob(ctx, opts)
return workaroundGetContainerBlob(ctx, opts)
}

// https://github.com/opencontainers/distribution-spec/blob/main/spec.md#checking-if-content-exists-in-the-registry
Expand Down Expand Up @@ -643,3 +644,23 @@ func GetTagList(ctx *context.Context) {
Tags: tags,
})
}

// FIXME: Workaround to be removed in v1.20
// https://github.com/go-gitea/gitea/issues/19586
func workaroundGetContainerBlob(ctx *context.Context, opts *container_model.BlobSearchOptions) (*packages_model.PackageFileDescriptor, error) {
blob, err := container_model.GetContainerBlob(ctx, opts)
if err != nil {
return nil, err
}

err = packages_module.NewContentStore().Has(packages_module.BlobHash256Key(blob.Blob.HashSHA256))
if err != nil {
if errors.Is(err, os.ErrNotExist) {
log.Debug("Package registry inconsistent: blob %s does not exist on file system", blob.Blob.HashSHA256)
return nil, container_model.ErrContainerBlobNotExist
}
return nil, err
}

return blob, nil
}
11 changes: 11 additions & 0 deletions routers/api/packages/container/manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ package container

import (
"context"
"errors"
"fmt"
"io"
"os"
"strings"

"code.gitea.io/gitea/models/db"
Expand Down Expand Up @@ -403,6 +405,15 @@ func createManifestBlob(ctx context.Context, mci *manifestCreationInfo, pv *pack
log.Error("Error inserting package blob: %v", err)
return nil, false, "", err
}
// FIXME: Workaround to be removed in v1.20
// https://github.com/go-gitea/gitea/issues/19586
if exists {
err = packages_module.NewContentStore().Has(packages_module.BlobHash256Key(pb.HashSHA256))
if err != nil && errors.Is(err, os.ErrNotExist) {
log.Debug("Package registry inconsistent: blob %s does not exist on file system", pb.HashSHA256)
exists = false
}
}
if !exists {
contentStore := packages_module.NewContentStore()
if err := contentStore.Save(packages_module.BlobHash256Key(pb.HashSHA256), buf, buf.Size()); err != nil {
Expand Down