Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Public GC function of oci.Store #656

Merged
merged 25 commits into from
Jan 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 93 additions & 0 deletions content/oci/oci.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
"fmt"
"io"
"os"
"path"
"path/filepath"
"sync"

Expand Down Expand Up @@ -454,6 +455,77 @@
return os.WriteFile(s.indexPath, indexJSON, 0666)
}

// reloadIndex reloads the index and updates metadata by creating a new store.
func (s *Store) reloadIndex(ctx context.Context) error {
newStore, err := NewWithContext(ctx, s.root)
if err != nil {
return err
}

Check warning on line 463 in content/oci/oci.go

View check run for this annotation

Codecov / codecov/patch

content/oci/oci.go#L462-L463

Added lines #L462 - L463 were not covered by tests
s.index = newStore.index
s.storage = newStore.storage
s.tagResolver = newStore.tagResolver
s.graph = newStore.graph
return nil
}

// GC removes garbage from Store. Unsaved index will be lost. To prevent unexpected
// loss, call SaveIndex() before GC or set AutoSaveIndex to true.
// The garbage to be cleaned are:
// - unreferenced (dangling) blobs in Store which have no predecessors
// - garbage blobs in the storage whose metadata is not stored in Store
func (s *Store) GC(ctx context.Context) error {
Wwwsylvia marked this conversation as resolved.
Show resolved Hide resolved
s.sync.Lock()
defer s.sync.Unlock()

// get reachable nodes by reloading the index
err := s.reloadIndex(ctx)
if err != nil {
return fmt.Errorf("unable to reload index: %w", err)
}

Check warning on line 484 in content/oci/oci.go

View check run for this annotation

Codecov / codecov/patch

content/oci/oci.go#L483-L484

Added lines #L483 - L484 were not covered by tests
reachableNodes := s.graph.DigestSet()

// clean up garbage blobs in the storage
rootpath := filepath.Join(s.root, ocispec.ImageBlobsDir)
algDirs, err := os.ReadDir(rootpath)
if err != nil {
return err
}

Check warning on line 492 in content/oci/oci.go

View check run for this annotation

Codecov / codecov/patch

content/oci/oci.go#L491-L492

Added lines #L491 - L492 were not covered by tests
for _, algDir := range algDirs {
if !algDir.IsDir() {
continue
}
alg := algDir.Name()
// skip unsupported directories
if !isKnownAlgorithm(alg) {
continue
}
algPath := path.Join(rootpath, alg)
digestEntries, err := os.ReadDir(algPath)
if err != nil {
return err
}

Check warning on line 506 in content/oci/oci.go

View check run for this annotation

Codecov / codecov/patch

content/oci/oci.go#L505-L506

Added lines #L505 - L506 were not covered by tests
for _, digestEntry := range digestEntries {
if err := isContextDone(ctx); err != nil {
return err
wangxiaoxuan273 marked this conversation as resolved.
Show resolved Hide resolved
}

Check warning on line 510 in content/oci/oci.go

View check run for this annotation

Codecov / codecov/patch

content/oci/oci.go#L509-L510

Added lines #L509 - L510 were not covered by tests
dgst := digestEntry.Name()
blobDigest := digest.NewDigestFromEncoded(digest.Algorithm(alg), dgst)
if err := blobDigest.Validate(); err != nil {
// skip irrelevant content
continue
}
if !reachableNodes.Contains(blobDigest) {
// remove the blob from storage if it does not exist in Store
err = os.Remove(path.Join(algPath, dgst))
if err != nil {
return err
}
}
}
}
return nil
}

// unsafeStore is used to bypass lock restrictions in Delete.
type unsafeStore struct {
*Store
Expand All @@ -467,6 +539,17 @@
return s.graph.Predecessors(ctx, node)
}

// isContextDone returns an error if the context is done.
// Reference: https://pkg.go.dev/context#Context
func isContextDone(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
shizhMSFT marked this conversation as resolved.
Show resolved Hide resolved
default:
return nil
}
}

// validateReference validates ref.
func validateReference(ref string) error {
if ref == "" {
Expand All @@ -476,3 +559,13 @@
// TODO: may enforce more strict validation if needed.
return nil
}

// isKnownAlgorithm checks is a string is a supported hash algorithm
func isKnownAlgorithm(alg string) bool {
switch digest.Algorithm(alg) {
case digest.SHA256, digest.SHA512, digest.SHA384:
return true
default:
return false
}
}
206 changes: 206 additions & 0 deletions content/oci/oci_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"fmt"
"io"
"os"
"path"
"path/filepath"
"reflect"
"strconv"
Expand Down Expand Up @@ -2844,6 +2845,199 @@ func TestStore_UntagErrorPath(t *testing.T) {
}
}

func TestStore_GC(t *testing.T) {
tempDir := t.TempDir()
s, err := New(tempDir)
if err != nil {
t.Fatal("New() error =", err)
}
ctx := context.Background()

// generate test content
var blobs [][]byte
var descs []ocispec.Descriptor
appendBlob := func(mediaType string, blob []byte) {
blobs = append(blobs, blob)
descs = append(descs, ocispec.Descriptor{
MediaType: mediaType,
Digest: digest.FromBytes(blob),
Size: int64(len(blob)),
})
}
generateManifest := func(config ocispec.Descriptor, subject *ocispec.Descriptor, layers ...ocispec.Descriptor) {
manifest := ocispec.Manifest{
Config: config,
Subject: subject,
Layers: layers,
}
manifestJSON, err := json.Marshal(manifest)
if err != nil {
t.Fatal(err)
}
appendBlob(ocispec.MediaTypeImageManifest, manifestJSON)
}
generateImageIndex := func(manifests ...ocispec.Descriptor) {
index := ocispec.Index{
Manifests: manifests,
}
indexJSON, err := json.Marshal(index)
if err != nil {
t.Fatal(err)
}
appendBlob(ocispec.MediaTypeImageIndex, indexJSON)
}
generateArtifactManifest := func(blobs ...ocispec.Descriptor) {
var manifest spec.Artifact
manifest.Blobs = append(manifest.Blobs, blobs...)
manifestJSON, err := json.Marshal(manifest)
if err != nil {
t.Fatal(err)
}
appendBlob(spec.MediaTypeArtifactManifest, manifestJSON)
}

appendBlob(ocispec.MediaTypeImageConfig, []byte("config")) // Blob 0
appendBlob(ocispec.MediaTypeImageLayer, []byte("blob")) // Blob 1
appendBlob(ocispec.MediaTypeImageLayer, []byte("dangling layer")) // Blob 2, dangling layer
generateManifest(descs[0], nil, descs[1]) // Blob 3, valid manifest
generateManifest(descs[0], &descs[3], descs[1]) // Blob 4, referrer of a valid manifest, not in index.json, should be cleaned with current implementation
appendBlob(ocispec.MediaTypeImageLayer, []byte("dangling layer 2")) // Blob 5, dangling layer
generateArtifactManifest(descs[4]) // blob 6, dangling artifact
generateManifest(descs[0], &descs[5], descs[1]) // Blob 7, referrer of a dangling manifest
appendBlob(ocispec.MediaTypeImageLayer, []byte("dangling layer 3")) // Blob 8, dangling layer
generateArtifactManifest(descs[6]) // blob 9, dangling artifact
generateImageIndex(descs[7], descs[5]) // blob 10, dangling image index
appendBlob(ocispec.MediaTypeImageLayer, []byte("garbage layer 1")) // Blob 11, garbage layer 1
generateManifest(descs[0], nil, descs[4]) // Blob 12, garbage manifest 1
appendBlob(ocispec.MediaTypeImageConfig, []byte("garbage config")) // Blob 13, garbage config
appendBlob(ocispec.MediaTypeImageLayer, []byte("garbage layer 2")) // Blob 14, garbage layer 2
generateManifest(descs[6], nil, descs[7]) // Blob 15, garbage manifest 2
generateManifest(descs[0], &descs[13], descs[1]) // Blob 16, referrer of a garbage manifest

// push blobs 0 - blobs 10 into s
for i := 0; i <= 10; i++ {
err := s.Push(ctx, descs[i], bytes.NewReader(blobs[i]))
if err != nil {
t.Errorf("failed to push test content to src: %d: %v", i, err)
}
}

// remove blobs 4 - blobs 10 from index.json
for i := 4; i <= 10; i++ {
s.tagResolver.Untag(string(descs[i].Digest))
}
s.SaveIndex()

// push blobs 11 - blobs 16 into s.storage, making them garbage as their metadata
// doesn't exist in s
for i := 11; i < len(blobs); i++ {
err := s.storage.Push(ctx, descs[i], bytes.NewReader(blobs[i]))
if err != nil {
t.Errorf("failed to push test content to src: %d: %v", i, err)
}
}

// confirm that all the blobs are in the storage
for i := 11; i < len(blobs); i++ {
exists, err := s.Exists(ctx, descs[i])
if err != nil {
t.Fatal(err)
}
if !exists {
t.Fatalf("descs[%d] should exist", i)
}
}

// perform GC
if err = s.GC(ctx); err != nil {
t.Fatal(err)
}

// verify existence
wantExistence := []bool{true, true, false, true, false, false, false, false, false, false, false, false, false, false, false, false, false}
for i, wantValue := range wantExistence {
exists, err := s.Exists(ctx, descs[i])
if err != nil {
t.Fatal(err)
}
if exists != wantValue {
t.Fatalf("want existence %d to be %v, got %v", i, wantValue, exists)
}
}
}

func TestStore_GCErrorPath(t *testing.T) {
tempDir := t.TempDir()
s, err := New(tempDir)
if err != nil {
t.Fatal("New() error =", err)
}
ctx := context.Background()

// generate test content
var blobs [][]byte
var descs []ocispec.Descriptor
appendBlob := func(mediaType string, blob []byte) {
blobs = append(blobs, blob)
descs = append(descs, ocispec.Descriptor{
MediaType: mediaType,
Digest: digest.FromBytes(blob),
Size: int64(len(blob)),
})
}
appendBlob(ocispec.MediaTypeImageLayer, []byte("valid blob")) // Blob 0

// push the valid blob
err = s.Push(ctx, descs[0], bytes.NewReader(blobs[0]))
if err != nil {
t.Error("failed to push test content to src")
}

// write random contents
algPath := path.Join(tempDir, "blobs")
dgstPath := path.Join(algPath, "sha256")
if err := os.WriteFile(path.Join(algPath, "other"), []byte("random"), 0444); err != nil {
t.Fatal("error calling WriteFile(), error =", err)
}
if err := os.WriteFile(path.Join(dgstPath, "other2"), []byte("random2"), 0444); err != nil {
t.Fatal("error calling WriteFile(), error =", err)
}

// perform GC
if err = s.GC(ctx); err != nil {
t.Fatal(err)
}

appendBlob(ocispec.MediaTypeImageLayer, []byte("valid blob 2")) // Blob 1

// push the valid blob
err = s.Push(ctx, descs[1], bytes.NewReader(blobs[1]))
if err != nil {
t.Error("failed to push test content to src")
}

// unknown algorithm
if err := os.Mkdir(path.Join(algPath, "sha666"), 0777); err != nil {
t.Fatal(err)
}
if err = s.GC(ctx); err != nil {
t.Fatal("this error should be silently ignored")
}

// os.Remove() error
badDigest := digest.FromBytes([]byte("bad digest")).Encoded()
badPath := path.Join(algPath, "sha256", badDigest)
if err := os.Mkdir(badPath, 0777); err != nil {
t.Fatal(err)
}
if err := os.WriteFile(path.Join(badPath, "whatever"), []byte("extra content"), 0444); err != nil {
t.Fatal("error calling WriteFile(), error =", err)
}
if err = s.GC(ctx); err == nil {
t.Fatal("expect an error when os.Remove()")
}
}

func equalDescriptorSet(actual []ocispec.Descriptor, expected []ocispec.Descriptor) bool {
if len(actual) != len(expected) {
return false
Expand All @@ -2863,3 +3057,15 @@ func equalDescriptorSet(actual []ocispec.Descriptor, expected []ocispec.Descript
}
return true
}

func Test_isContextDone(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
if err := isContextDone(ctx); err != nil {
t.Errorf("expect error = %v, got %v", nil, err)
}
cancel()
if err := isContextDone(ctx); err != context.Canceled {
t.Errorf("expect error = %v, got %v", context.Canceled, err)
}
}
10 changes: 10 additions & 0 deletions internal/graph/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"errors"
"sync"

"github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"oras.land/oras-go/v2/content"
"oras.land/oras-go/v2/errdef"
Expand Down Expand Up @@ -147,6 +148,15 @@ func (m *Memory) Remove(node ocispec.Descriptor) []ocispec.Descriptor {
return danglings
}

// DigestSet returns the set of node digest in memory.
func (m *Memory) DigestSet() set.Set[digest.Digest] {
s := set.New[digest.Digest]()
for desc := range m.nodes {
s.Add(desc.Digest)
}
return s
}

// index indexes predecessors for each direct successor of the given node.
func (m *Memory) index(ctx context.Context, fetcher content.Fetcher, node ocispec.Descriptor) ([]ocispec.Descriptor, error) {
successors, err := content.Successors(ctx, fetcher, node)
Expand Down
Loading
Loading