Skip to content

Commit

Permalink
registry/remote: implement Mount
Browse files Browse the repository at this point in the history
This adds support for "mounting" (a.k.a. copy by reference) a blob
from one repository to another in the same registry.

As documented in https://docs.docker.com/registry/spec/api/#blob
and https://github.com/opencontainers/distribution-spec/blob/main/spec.md#mounting-a-blob-from-another-repository.

I've also manually verified that this works against at least one real registry implementation (zot).

Fixes #337

This implements blob-mount functionality as described
  • Loading branch information
rogpeppe committed May 5, 2023
1 parent 8aed964 commit 4490a20
Show file tree
Hide file tree
Showing 4 changed files with 240 additions and 6 deletions.
85 changes: 79 additions & 6 deletions registry/remote/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ type Client interface {
Do(*http.Request) (*http.Response, error)
}

// Static check that Repository implements Mounter.
var _ registry.Mounter = (*Repository)(nil)

// Repository is an HTTP client to a remote repository.
type Repository struct {
// Client is the underlying HTTP client used to access the remote registry.
Expand Down Expand Up @@ -198,6 +201,8 @@ func (r *Repository) client() Client {

// blobStore detects the blob store for the given descriptor.
func (r *Repository) blobStore(desc ocispec.Descriptor) registry.BlobStore {
// TODO this seems dubious. Perhaps we actually want to push
// a blob with a manifest media type as a regular blob?
if isManifest(r.ManifestMediaTypes, desc) {
return r.Manifests()
}
Expand All @@ -214,6 +219,14 @@ func (r *Repository) Push(ctx context.Context, expected ocispec.Descriptor, cont
return r.blobStore(expected).Push(ctx, expected, content)
}

// Mount makes the blob with the given digest in fromRepo
// available in the repository signified by the receiver.
//
// This avoids the need to pull content down from fromRepo only to push it to r.
func (r *Repository) Mount(ctx context.Context, desc ocispec.Descriptor, fromRepo string) error {
return r.Blobs().(*blobStore).Mount(ctx, desc, fromRepo)
}

// Exists returns true if the described content exists.
func (r *Repository) Exists(ctx context.Context, target ocispec.Descriptor) (bool, error) {
return r.blobStore(target).Exists(ctx, target)
Expand Down Expand Up @@ -660,6 +673,61 @@ func (s *blobStore) Fetch(ctx context.Context, target ocispec.Descriptor) (rc io
}
}

// Mount mounts the given descriptor from fromRepo into s.
func (s *blobStore) Mount(ctx context.Context, desc ocispec.Descriptor, fromRepo string) error {
// pushing usually requires both pull and push actions.
// Reference: https://github.com/distribution/distribution/blob/v2.7.1/registry/handlers/app.go#L921-L930
ctx = registryutil.WithScopeHint(ctx, s.repo.Reference, auth.ActionPull, auth.ActionPush)

// We also need pull access to the source repo.
fromRef := s.repo.Reference
fromRef.Repository = fromRepo
ctx = registryutil.WithScopeHint(ctx, fromRef, auth.ActionPull)

url := buildRepositoryBlobMountURL(s.repo.PlainHTTP, s.repo.Reference, desc.Digest, fromRepo)
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, nil)
if err != nil {
return err
}
resp, err := s.repo.client().Do(req)
if err != nil {
return err
}
resp.Body.Close()
if resp.StatusCode == http.StatusCreated {
return nil
}
if resp.StatusCode != http.StatusAccepted {
return errutil.ParseErrorResponse(resp)
}
// From the spec:
//
// "If a registry does not support cross-repository mounting
// or is unable to mount the requested blob,
// it SHOULD return a 202.
// This indicates that the upload session has begun
// and that the client MAY proceed with the upload."
//
// So we need to pull the content in order to push it.

r, err := s.sibling(fromRepo).Fetch(ctx, desc)
if err != nil {
return fmt.Errorf("cannot read source blob: %v", err)
}
defer r.Close()
return s.pushStep2(ctx, req, resp, desc, r)
}

// sibling returns a blob store for another repository in the same
// registry.
func (s *blobStore) sibling(otherRepoName string) *blobStore {
otherRepo := *s.repo
otherRepo.Reference.Repository = otherRepoName
return &blobStore{
repo: &otherRepo,
}
}

// Push pushes the content, matching the expected descriptor.
// Existing content is not checked by Push() to minimize the number of out-going
// requests.
Expand All @@ -680,11 +748,8 @@ func (s *blobStore) Push(ctx context.Context, expected ocispec.Descriptor, conte
if err != nil {
return err
}
reqHostname := req.URL.Hostname()
reqPort := req.URL.Port()

client := s.repo.client()
resp, err := client.Do(req)
resp, err := s.repo.client().Do(req)
if err != nil {
return err
}
Expand All @@ -694,7 +759,15 @@ func (s *blobStore) Push(ctx context.Context, expected ocispec.Descriptor, conte
return errutil.ParseErrorResponse(resp)
}
resp.Body.Close()
return s.pushStep2(ctx, req, resp, expected, content)
}

// pushStep2 implements step 2 of the push protocol. This can be invoked either by
// Push or by Mount when the receiving repository does not implement the
// mount endpoint.
func (s *blobStore) pushStep2(ctx context.Context, req *http.Request, resp *http.Response, expected ocispec.Descriptor, content io.Reader) error {
reqHostname := req.URL.Hostname()
reqPort := req.URL.Port()
// monolithic upload
location, err := resp.Location()
if err != nil {
Expand All @@ -711,7 +784,7 @@ func (s *blobStore) Push(ctx context.Context, expected ocispec.Descriptor, conte
if reqPort == "443" && locationHostname == reqHostname && locationPort == "" {
location.Host = locationHostname + ":" + reqPort
}
url = location.String()
url := location.String()
req, err = http.NewRequestWithContext(ctx, http.MethodPut, url, content)
if err != nil {
return err
Expand All @@ -731,7 +804,7 @@ func (s *blobStore) Push(ctx context.Context, expected ocispec.Descriptor, conte
if auth := resp.Request.Header.Get("Authorization"); auth != "" {
req.Header.Set("Authorization", auth)
}
resp, err = client.Do(req)
resp, err = s.repo.client().Do(req)
if err != nil {
return err
}
Expand Down
139 changes: 139 additions & 0 deletions registry/remote/repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,145 @@ func TestRepository_Push(t *testing.T) {
}
}

func TestRepository_Mount(t *testing.T) {
blob := []byte("hello world")
blobDesc := ocispec.Descriptor{
MediaType: "test",
Digest: digest.FromBytes(blob),
Size: int64(len(blob)),
}
gotMount := 0
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if got, want := r.Method, "POST"; got != want {
t.Errorf("unexpected HTTP method; got %q want %q", got, want)
w.WriteHeader(http.StatusInternalServerError)
return
}
if err := r.ParseForm(); err != nil {
t.Errorf("invalid form in HTTP request: %v", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
switch r.URL.Path {
case "/v2/test2/blobs/uploads/":
if got, want := r.Form.Get("mount"), blobDesc.Digest; digest.Digest(got) != want {
t.Errorf("unexpected value for 'mount' parameter; got %q want %q", got, want)
}
if got, want := r.Form.Get("from"), "test"; got != want {
t.Errorf("unexpected value for 'from' parameter; got %q want %q", got, want)
}
gotMount++
w.WriteHeader(201)
return
default:
t.Errorf("unexpected URL for mount request %q", r.URL)
w.WriteHeader(http.StatusInternalServerError)
}
}))
defer ts.Close()
uri, err := url.Parse(ts.URL)
if err != nil {
t.Fatalf("invalid test http server: %v", err)
}
repo, err := NewRepository(uri.Host + "/test2")
if err != nil {
t.Fatalf("NewRepository() error = %v", err)
}
repo.PlainHTTP = true
ctx := context.Background()

err = repo.Mount(ctx, blobDesc, "test")
if err != nil {
t.Fatalf("Repository.Push() error = %v", err)
}
if gotMount != 1 {
t.Errorf("did not get expected mount request")
}
}

func TestRepository_Mount_Fallback(t *testing.T) {
// This test checks the case where the server does not know
// about the mount query parameters, so the call falls back to
// the regular push flow. This test is thus very similar to TestPush,
// except that it doesn't push a manifest because mounts aren't
// documented to be supported for manifests.

blob := []byte("hello world")
blobDesc := ocispec.Descriptor{
MediaType: "test",
Digest: digest.FromBytes(blob),
Size: int64(len(blob)),
}
var sequence string
var gotBlob []byte
uuid := "4fd53bc9-565d-4527-ab80-3e051ac4880c"
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch {
case r.Method == http.MethodPost && r.URL.Path == "/v2/test2/blobs/uploads/":
w.Header().Set("Location", "/v2/test2/blobs/uploads/"+uuid)
w.WriteHeader(http.StatusAccepted)
sequence += "post "
return
case r.Method == http.MethodGet && r.URL.Path == "/v2/test/blobs/"+blobDesc.Digest.String():
w.Header().Set("Content-Type", "application/octet-stream")
w.Header().Set("Docker-Content-Digest", blobDesc.Digest.String())
if _, err := w.Write(blob); err != nil {
t.Errorf("failed to write %q: %v", r.URL, err)
}
sequence += "get "
return
case r.Method == http.MethodPut && r.URL.Path == "/v2/test2/blobs/uploads/"+uuid:
if got, want := r.Header.Get("Content-Type"), "application/octet-stream"; got != want {
t.Errorf("unexpected content type; got %q want %q", got, want)
w.WriteHeader(http.StatusBadRequest)
return
}
if got, want := r.URL.Query().Get("digest"), blobDesc.Digest.String(); got != want {
t.Errorf("unexpected content digest; got %q want %q", got, want)
w.WriteHeader(http.StatusBadRequest)
return
}
data, err := io.ReadAll(r.Body)
if err != nil {
t.Errorf("error reading body: %v", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
gotBlob = data
w.Header().Set("Docker-Content-Digest", blobDesc.Digest.String())
w.WriteHeader(http.StatusCreated)
sequence += "put "
return
default:
w.WriteHeader(http.StatusForbidden)
}
t.Errorf("unexpected access: %s %s", r.Method, r.URL)
}))
defer ts.Close()
uri, err := url.Parse(ts.URL)
if err != nil {
t.Fatalf("invalid test http server: %v", err)
}

repo, err := NewRepository(uri.Host + "/test2")
if err != nil {
t.Fatalf("NewRepository() error = %v", err)
}
repo.PlainHTTP = true
ctx := context.Background()

err = repo.Mount(ctx, blobDesc, "test")
if err != nil {
t.Fatalf("Repository.Push() error = %v", err)
}
if !bytes.Equal(gotBlob, blob) {
t.Errorf("Repository.Mount() = %v, want %v", gotBlob, blob)
}
if got, want := sequence, "post get put "; got != want {
t.Errorf("unexpected request sequence; got %q want %q", got, want)
}
}

func TestRepository_Exists(t *testing.T) {
blob := []byte("hello world")
blobDesc := ocispec.Descriptor{
Expand Down
13 changes: 13 additions & 0 deletions registry/remote/url.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"net/url"
"strings"

"github.com/opencontainers/go-digest"

"oras.land/oras-go/v2/registry"
)

Expand Down Expand Up @@ -87,6 +89,17 @@ func buildRepositoryBlobUploadURL(plainHTTP bool, ref registry.Reference) string
return buildRepositoryBaseURL(plainHTTP, ref) + "/blobs/uploads/"
}

// buildRepositoryBlobMountURLbuilds the URL for cross-repository mounting.
// Format: <scheme>://<registry>/v2/<name>/blobs/uploads/?mount=<digest>&from=<other_name>
// Reference: https://docs.docker.com/registry/spec/api/#blob
func buildRepositoryBlobMountURL(plainHTTP bool, ref registry.Reference, d digest.Digest, fromRepo string) string {
return fmt.Sprintf("%s/blobs/uploads/?mount=%s&from=%s",
buildRepositoryBaseURL(plainHTTP, ref),
d,
fromRepo,
)
}

// buildReferrersURL builds the URL for querying the Referrers API.
// Format: <scheme>://<registry>/v2/<repository>/referrers/<digest>?artifactType=<artifactType>
// Reference: https://github.com/opencontainers/distribution-spec/blob/v1.1.0-rc1/spec.md#listing-referrers
Expand Down
9 changes: 9 additions & 0 deletions registry/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,15 @@ type TagLister interface {
Tags(ctx context.Context, last string, fn func(tags []string) error) error
}

// Mounter allows cross-repository blob mounts.
// For backward compatibility reasons, this is not implemented by
// BlobStore: use a type assertion to check availability.
type Mounter interface {
// Mount makes the blob with the given descriptor in fromRepo
// available in the repository signified by the receiver.
Mount(ctx context.Context, desc ocispec.Descriptor, fromRepo string) error
}

// Tags lists the tags available in the repository.
func Tags(ctx context.Context, repo TagLister) ([]string, error) {
var res []string
Expand Down

0 comments on commit 4490a20

Please sign in to comment.