diff --git a/registry/remote/repository.go b/registry/remote/repository.go index f51dda81..32ac347d 100644 --- a/registry/remote/repository.go +++ b/registry/remote/repository.go @@ -214,6 +214,19 @@ func (r *Repository) Push(ctx context.Context, expected ocispec.Descriptor, cont return r.blobStore(expected).Push(ctx, expected, content) } +// Mount makes the blob with the given digest in fromRepo +// available in the repository signified by the receiver. +// +// This avoids the need to pull content down from fromRepo only to push it to r. +// +// If the registry does not implement mounting, getContent will be used to get the +// content to push. If getContent is nil, the content will be pulled from the source +// repository. If getContent returns an error, it will be wrapped inside the error +// returned from Mount. +func (r *Repository) Mount(ctx context.Context, desc ocispec.Descriptor, fromRepo string, getContent func() (io.ReadCloser, error)) error { + return r.Blobs().(registry.Mounter).Mount(ctx, desc, fromRepo, getContent) +} + // Exists returns true if the described content exists. func (r *Repository) Exists(ctx context.Context, target ocispec.Descriptor) (bool, error) { return r.blobStore(target).Exists(ctx, target) @@ -660,6 +673,73 @@ func (s *blobStore) Fetch(ctx context.Context, target ocispec.Descriptor) (rc io } } +// Mount mounts the given descriptor from fromRepo into s. +func (s *blobStore) Mount(ctx context.Context, desc ocispec.Descriptor, fromRepo string, getContent func() (io.ReadCloser, error)) error { + // pushing usually requires both pull and push actions. + // Reference: https://github.com/distribution/distribution/blob/v2.7.1/registry/handlers/app.go#L921-L930 + ctx = registryutil.WithScopeHint(ctx, s.repo.Reference, auth.ActionPull, auth.ActionPush) + + // We also need pull access to the source repo. + fromRef := s.repo.Reference + fromRef.Repository = fromRepo + ctx = registryutil.WithScopeHint(ctx, fromRef, auth.ActionPull) + + url := buildRepositoryBlobMountURL(s.repo.PlainHTTP, s.repo.Reference, desc.Digest, fromRepo) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, nil) + if err != nil { + return err + } + resp, err := s.repo.client().Do(req) + if err != nil { + return err + } + if resp.StatusCode == http.StatusCreated { + defer resp.Body.Close() + // Check the server seems to be behaving. + return verifyContentDigest(resp, desc.Digest) + } + if resp.StatusCode != http.StatusAccepted { + defer resp.Body.Close() + return errutil.ParseErrorResponse(resp) + } + resp.Body.Close() + // From the [spec]: + // + // "If a registry does not support cross-repository mounting + // or is unable to mount the requested blob, + // it SHOULD return a 202. + // This indicates that the upload session has begun + // and that the client MAY proceed with the upload." + // + // So we need to get the content from somewhere in order to + // push it. If the caller has provided a getContent function, we + // can use that, otherwise pull the content from the source repository. + // + // [spec]: https://github.com/opencontainers/distribution-spec/blob/main/spec.md#mounting-a-blob-from-another-repository + + var r io.ReadCloser + if getContent != nil { + r, err = getContent() + } else { + r, err = s.sibling(fromRepo).Fetch(ctx, desc) + } + if err != nil { + return fmt.Errorf("cannot read source blob: %w", err) + } + defer r.Close() + return s.completePushAfterInitialPost(ctx, req, resp, desc, r) +} + +// sibling returns a blob store for another repository in the same +// registry. +func (s *blobStore) sibling(otherRepoName string) *blobStore { + otherRepo := *s.repo + otherRepo.Reference.Repository = otherRepoName + return &blobStore{ + repo: &otherRepo, + } +} + // Push pushes the content, matching the expected descriptor. // Existing content is not checked by Push() to minimize the number of out-going // requests. @@ -680,11 +760,8 @@ func (s *blobStore) Push(ctx context.Context, expected ocispec.Descriptor, conte if err != nil { return err } - reqHostname := req.URL.Hostname() - reqPort := req.URL.Port() - client := s.repo.client() - resp, err := client.Do(req) + resp, err := s.repo.client().Do(req) if err != nil { return err } @@ -694,7 +771,15 @@ func (s *blobStore) Push(ctx context.Context, expected ocispec.Descriptor, conte return errutil.ParseErrorResponse(resp) } resp.Body.Close() + return s.completePushAfterInitialPost(ctx, req, resp, expected, content) +} +// completePushAfterInitialPost implements step 2 of the push protocol. This can be invoked either by +// Push or by Mount when the receiving repository does not implement the +// mount endpoint. +func (s *blobStore) completePushAfterInitialPost(ctx context.Context, req *http.Request, resp *http.Response, expected ocispec.Descriptor, content io.Reader) error { + reqHostname := req.URL.Hostname() + reqPort := req.URL.Port() // monolithic upload location, err := resp.Location() if err != nil { @@ -711,7 +796,7 @@ func (s *blobStore) Push(ctx context.Context, expected ocispec.Descriptor, conte if reqPort == "443" && locationHostname == reqHostname && locationPort == "" { location.Host = locationHostname + ":" + reqPort } - url = location.String() + url := location.String() req, err = http.NewRequestWithContext(ctx, http.MethodPut, url, content) if err != nil { return err @@ -731,7 +816,7 @@ func (s *blobStore) Push(ctx context.Context, expected ocispec.Descriptor, conte if auth := resp.Request.Header.Get("Authorization"); auth != "" { req.Header.Set("Authorization", auth) } - resp, err = client.Do(req) + resp, err = s.repo.client().Do(req) if err != nil { return err } diff --git a/registry/remote/repository_test.go b/registry/remote/repository_test.go index d35a2e1a..f062509d 100644 --- a/registry/remote/repository_test.go +++ b/registry/remote/repository_test.go @@ -43,6 +43,7 @@ import ( "oras.land/oras-go/v2/internal/spec" "oras.land/oras-go/v2/registry" "oras.land/oras-go/v2/registry/remote/auth" + "oras.land/oras-go/v2/registry/remote/errcode" ) type testIOStruct struct { @@ -292,6 +293,331 @@ func TestRepository_Push(t *testing.T) { } } +func TestRepository_Mount(t *testing.T) { + blob := []byte("hello world") + blobDesc := ocispec.Descriptor{ + MediaType: "test", + Digest: digest.FromBytes(blob), + Size: int64(len(blob)), + } + gotMount := 0 + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if got, want := r.Method, "POST"; got != want { + t.Errorf("unexpected HTTP method; got %q want %q", got, want) + w.WriteHeader(http.StatusInternalServerError) + return + } + if err := r.ParseForm(); err != nil { + t.Errorf("invalid form in HTTP request: %v", err) + w.WriteHeader(http.StatusInternalServerError) + return + } + switch r.URL.Path { + case "/v2/test2/blobs/uploads/": + if got, want := r.Form.Get("mount"), blobDesc.Digest; digest.Digest(got) != want { + t.Errorf("unexpected value for 'mount' parameter; got %q want %q", got, want) + } + if got, want := r.Form.Get("from"), "test"; got != want { + t.Errorf("unexpected value for 'from' parameter; got %q want %q", got, want) + } + gotMount++ + w.Header().Set(dockerContentDigestHeader, blobDesc.Digest.String()) + w.WriteHeader(201) + return + default: + t.Errorf("unexpected URL for mount request %q", r.URL) + w.WriteHeader(http.StatusInternalServerError) + } + })) + defer ts.Close() + uri, err := url.Parse(ts.URL) + if err != nil { + t.Fatalf("invalid test http server: %v", err) + } + repo, err := NewRepository(uri.Host + "/test2") + if err != nil { + t.Fatalf("NewRepository() error = %v", err) + } + repo.PlainHTTP = true + ctx := context.Background() + + err = repo.Mount(ctx, blobDesc, "test", nil) + if err != nil { + t.Fatalf("Repository.Push() error = %v", err) + } + if gotMount != 1 { + t.Errorf("did not get expected mount request") + } +} + +func TestRepository_Mount_Fallback(t *testing.T) { + // This test checks the case where the server does not know + // about the mount query parameters, so the call falls back to + // the regular push flow. This test is thus very similar to TestPush, + // except that it doesn't push a manifest because mounts aren't + // documented to be supported for manifests. + + blob := []byte("hello world") + blobDesc := ocispec.Descriptor{ + MediaType: "test", + Digest: digest.FromBytes(blob), + Size: int64(len(blob)), + } + var sequence string + var gotBlob []byte + uuid := "4fd53bc9-565d-4527-ab80-3e051ac4880c" + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch { + case r.Method == http.MethodPost && r.URL.Path == "/v2/test2/blobs/uploads/": + w.Header().Set("Location", "/v2/test2/blobs/uploads/"+uuid) + w.WriteHeader(http.StatusAccepted) + sequence += "post " + return + case r.Method == http.MethodGet && r.URL.Path == "/v2/test/blobs/"+blobDesc.Digest.String(): + w.Header().Set("Content-Type", "application/octet-stream") + w.Header().Set("Docker-Content-Digest", blobDesc.Digest.String()) + if _, err := w.Write(blob); err != nil { + t.Errorf("failed to write %q: %v", r.URL, err) + } + sequence += "get " + return + case r.Method == http.MethodPut && r.URL.Path == "/v2/test2/blobs/uploads/"+uuid: + if got, want := r.Header.Get("Content-Type"), "application/octet-stream"; got != want { + t.Errorf("unexpected content type; got %q want %q", got, want) + w.WriteHeader(http.StatusBadRequest) + return + } + if got, want := r.URL.Query().Get("digest"), blobDesc.Digest.String(); got != want { + t.Errorf("unexpected content digest; got %q want %q", got, want) + w.WriteHeader(http.StatusBadRequest) + return + } + data, err := io.ReadAll(r.Body) + if err != nil { + t.Errorf("error reading body: %v", err) + w.WriteHeader(http.StatusInternalServerError) + return + } + gotBlob = data + w.Header().Set("Docker-Content-Digest", blobDesc.Digest.String()) + w.WriteHeader(http.StatusCreated) + sequence += "put " + return + default: + w.WriteHeader(http.StatusForbidden) + } + t.Errorf("unexpected access: %s %s", r.Method, r.URL) + })) + defer ts.Close() + uri, err := url.Parse(ts.URL) + if err != nil { + t.Fatalf("invalid test http server: %v", err) + } + + repo, err := NewRepository(uri.Host + "/test2") + if err != nil { + t.Fatalf("NewRepository() error = %v", err) + } + repo.PlainHTTP = true + ctx := context.Background() + + err = repo.Mount(ctx, blobDesc, "test", nil) + if err != nil { + t.Fatalf("Repository.Push() error = %v", err) + } + if !bytes.Equal(gotBlob, blob) { + t.Errorf("Repository.Mount() = %v, want %v", gotBlob, blob) + } + if got, want := sequence, "post get put "; got != want { + t.Errorf("unexpected request sequence; got %q want %q", got, want) + } +} + +func TestRepository_Mount_Error(t *testing.T) { + blob := []byte("hello world") + blobDesc := ocispec.Descriptor{ + MediaType: "test", + Digest: digest.FromBytes(blob), + Size: int64(len(blob)), + } + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if got, want := r.Method, "POST"; got != want { + t.Errorf("unexpected HTTP method; got %q want %q", got, want) + w.WriteHeader(http.StatusInternalServerError) + return + } + if err := r.ParseForm(); err != nil { + t.Errorf("invalid form in HTTP request: %v", err) + w.WriteHeader(http.StatusInternalServerError) + return + } + switch r.URL.Path { + case "/v2/test/blobs/uploads/": + w.WriteHeader(400) + w.Write([]byte(`{ "errors": [ { "code": "NAME_UNKNOWN", "message": "some error" } ] }`)) + default: + t.Errorf("unexpected URL for mount request %q", r.URL) + w.WriteHeader(http.StatusInternalServerError) + } + })) + defer ts.Close() + uri, err := url.Parse(ts.URL) + if err != nil { + t.Fatalf("invalid test http server: %v", err) + } + repo, err := NewRepository(uri.Host + "/test") + if err != nil { + t.Fatalf("NewRepository() error = %v", err) + } + repo.PlainHTTP = true + + err = repo.Mount(context.Background(), blobDesc, "foo", nil) + if err == nil { + t.Fatalf("expected error but got success instead") + } + var errResp *errcode.ErrorResponse + if !errors.As(err, &errResp) { + t.Fatalf("unexpected error type %#v", err) + } + if !reflect.DeepEqual(errResp.Errors, errcode.Errors{{ + Code: "NAME_UNKNOWN", + Message: "some error", + }}) { + t.Errorf("unexpected errors %#v", errResp.Errors) + } +} + +func TestRepository_Mount_Fallback_GetContent(t *testing.T) { + // This test checks the case where the server does not know + // about the mount query parameters, so the call falls back to + // the regular push flow, but using the getContent function + // parameter to get the content to push. + + blob := []byte("hello world") + blobDesc := ocispec.Descriptor{ + MediaType: "test", + Digest: digest.FromBytes(blob), + Size: int64(len(blob)), + } + var sequence string + var gotBlob []byte + uuid := "4fd53bc9-565d-4527-ab80-3e051ac4880c" + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch { + case r.Method == http.MethodPost && r.URL.Path == "/v2/test2/blobs/uploads/": + w.Header().Set("Location", "/v2/test2/blobs/uploads/"+uuid) + w.WriteHeader(http.StatusAccepted) + sequence += "post " + return + case r.Method == http.MethodPut && r.URL.Path == "/v2/test2/blobs/uploads/"+uuid: + if got, want := r.Header.Get("Content-Type"), "application/octet-stream"; got != want { + t.Errorf("unexpected content type; got %q want %q", got, want) + w.WriteHeader(http.StatusBadRequest) + return + } + if got, want := r.URL.Query().Get("digest"), blobDesc.Digest.String(); got != want { + t.Errorf("unexpected content digest; got %q want %q", got, want) + w.WriteHeader(http.StatusBadRequest) + return + } + data, err := io.ReadAll(r.Body) + if err != nil { + t.Errorf("error reading body: %v", err) + w.WriteHeader(http.StatusInternalServerError) + return + } + gotBlob = data + w.Header().Set("Docker-Content-Digest", blobDesc.Digest.String()) + w.WriteHeader(http.StatusCreated) + sequence += "put " + return + default: + w.WriteHeader(http.StatusForbidden) + } + t.Errorf("unexpected access: %s %s", r.Method, r.URL) + })) + defer ts.Close() + uri, err := url.Parse(ts.URL) + if err != nil { + t.Fatalf("invalid test http server: %v", err) + } + + repo, err := NewRepository(uri.Host + "/test2") + if err != nil { + t.Fatalf("NewRepository() error = %v", err) + } + repo.PlainHTTP = true + ctx := context.Background() + + err = repo.Mount(ctx, blobDesc, "test", func() (io.ReadCloser, error) { + return io.NopCloser(bytes.NewReader(blob)), nil + }) + if err != nil { + t.Fatalf("Repository.Push() error = %v", err) + } + if !bytes.Equal(gotBlob, blob) { + t.Errorf("Repository.Mount() = %v, want %v", gotBlob, blob) + } + if got, want := sequence, "post put "; got != want { + t.Errorf("unexpected request sequence; got %q want %q", got, want) + } +} + +func TestRepository_Mount_Fallback_GetContentError(t *testing.T) { + // This test checks the case where the server does not know + // about the mount query parameters, so the call falls back to + // the regular push flow, but it's possible the caller wants to + // avoid the pull/push pattern so returns an error from getContent + // and checks it to find out that's happened. + + blob := []byte("hello world") + blobDesc := ocispec.Descriptor{ + MediaType: "test", + Digest: digest.FromBytes(blob), + Size: int64(len(blob)), + } + var sequence string + uuid := "4fd53bc9-565d-4527-ab80-3e051ac4880c" + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch { + case r.Method == http.MethodPost && r.URL.Path == "/v2/test2/blobs/uploads/": + w.Header().Set("Location", "/v2/test2/blobs/uploads/"+uuid) + w.WriteHeader(http.StatusAccepted) + sequence += "post " + return + default: + w.WriteHeader(http.StatusForbidden) + } + t.Errorf("unexpected access: %s %s", r.Method, r.URL) + })) + defer ts.Close() + uri, err := url.Parse(ts.URL) + if err != nil { + t.Fatalf("invalid test http server: %v", err) + } + + repo, err := NewRepository(uri.Host + "/test2") + if err != nil { + t.Fatalf("NewRepository() error = %v", err) + } + repo.PlainHTTP = true + ctx := context.Background() + + testErr := errors.New("test error") + err = repo.Mount(ctx, blobDesc, "test", func() (io.ReadCloser, error) { + return nil, testErr + }) + if err == nil { + t.Fatalf("expected error but found no error") + } + if !errors.Is(err, testErr) { + t.Fatalf("expected getContent error to be wrapped") + } + if got, want := sequence, "post "; got != want { + t.Errorf("unexpected request sequence; got %q want %q", got, want) + } +} + func TestRepository_Exists(t *testing.T) { blob := []byte("hello world") blobDesc := ocispec.Descriptor{ @@ -2662,6 +2988,13 @@ func TestManifestStoreInterface(t *testing.T) { } } +func TestRepositoryMounterInterface(t *testing.T) { + var r interface{} = &Repository{} + if _, ok := r.(registry.Mounter); !ok { + t.Error("&Repository{} does not conform to registry.Mounter") + } +} + func Test_ManifestStore_Fetch(t *testing.T) { manifest := []byte(`{"layers":[]}`) manifestDesc := ocispec.Descriptor{ diff --git a/registry/remote/url.go b/registry/remote/url.go index 1cd4209e..d3eee3ee 100644 --- a/registry/remote/url.go +++ b/registry/remote/url.go @@ -20,6 +20,7 @@ import ( "net/url" "strings" + "github.com/opencontainers/go-digest" "oras.land/oras-go/v2/registry" ) @@ -87,6 +88,17 @@ func buildRepositoryBlobUploadURL(plainHTTP bool, ref registry.Reference) string return buildRepositoryBaseURL(plainHTTP, ref) + "/blobs/uploads/" } +// buildRepositoryBlobMountURLbuilds the URL for cross-repository mounting. +// Format: :///v2//blobs/uploads/?mount=&from= +// Reference: https://docs.docker.com/registry/spec/api/#blob +func buildRepositoryBlobMountURL(plainHTTP bool, ref registry.Reference, d digest.Digest, fromRepo string) string { + return fmt.Sprintf("%s?mount=%s&from=%s", + buildRepositoryBlobUploadURL(plainHTTP, ref), + d, + fromRepo, + ) +} + // buildReferrersURL builds the URL for querying the Referrers API. // Format: :///v2//referrers/?artifactType= // Reference: https://github.com/opencontainers/distribution-spec/blob/v1.1.0-rc1/spec.md#listing-referrers diff --git a/registry/repository.go b/registry/repository.go index c2cb0d05..2dd7ff99 100644 --- a/registry/repository.go +++ b/registry/repository.go @@ -107,6 +107,19 @@ type TagLister interface { Tags(ctx context.Context, last string, fn func(tags []string) error) error } +// Mounter allows cross-repository blob mounts. +// For backward compatibility reasons, this is not implemented by +// BlobStore: use a type assertion to check availability. +type Mounter interface { + // Mount makes the blob with the given descriptor in fromRepo + // available in the repository signified by the receiver. + Mount(ctx context.Context, + desc ocispec.Descriptor, + fromRepo string, + getContent func() (io.ReadCloser, error), + ) error +} + // Tags lists the tags available in the repository. func Tags(ctx context.Context, repo TagLister) ([]string, error) { var res []string