diff --git a/go.mod b/go.mod index dc0944fcb4..9fe6c7a968 100644 --- a/go.mod +++ b/go.mod @@ -46,6 +46,7 @@ require ( github.com/stretchr/testify v1.9.0 github.com/xeipuuv/gojsonschema v1.2.0 golang.org/x/crypto v0.23.0 + golang.org/x/sync v0.7.0 golang.org/x/term v0.20.0 helm.sh/helm/v3 v3.14.2 k8s.io/api v0.29.1 @@ -476,7 +477,6 @@ require ( golang.org/x/mod v0.14.0 // indirect golang.org/x/net v0.25.0 // indirect golang.org/x/oauth2 v0.20.0 // indirect - golang.org/x/sync v0.7.0 // indirect golang.org/x/sys v0.20.0 // indirect golang.org/x/text v0.15.0 // indirect golang.org/x/time v0.5.0 // indirect diff --git a/src/cmd/tools/crane.go b/src/cmd/tools/crane.go index b6fd47e5a0..792e4b555c 100644 --- a/src/cmd/tools/crane.go +++ b/src/cmd/tools/crane.go @@ -13,6 +13,7 @@ import ( "github.com/defenseunicorns/zarf/src/cmd/common" "github.com/defenseunicorns/zarf/src/config" "github.com/defenseunicorns/zarf/src/config/lang" + "github.com/defenseunicorns/zarf/src/internal/packager/images" "github.com/defenseunicorns/zarf/src/pkg/cluster" "github.com/defenseunicorns/zarf/src/pkg/message" "github.com/defenseunicorns/zarf/src/pkg/transform" @@ -85,12 +86,12 @@ func init() { craneCopy := craneCmd.NewCmdCopy(&craneOptions) registryCmd.AddCommand(craneCopy) - registryCmd.AddCommand(zarfCraneCatalog(&craneOptions)) - registryCmd.AddCommand(zarfCraneInternalWrapper(craneCmd.NewCmdList, &craneOptions, lang.CmdToolsRegistryListExample, 0)) - registryCmd.AddCommand(zarfCraneInternalWrapper(craneCmd.NewCmdPush, &craneOptions, lang.CmdToolsRegistryPushExample, 1)) - registryCmd.AddCommand(zarfCraneInternalWrapper(craneCmd.NewCmdPull, &craneOptions, lang.CmdToolsRegistryPullExample, 0)) - registryCmd.AddCommand(zarfCraneInternalWrapper(craneCmd.NewCmdDelete, &craneOptions, lang.CmdToolsRegistryDeleteExample, 0)) - registryCmd.AddCommand(zarfCraneInternalWrapper(craneCmd.NewCmdDigest, &craneOptions, lang.CmdToolsRegistryDigestExample, 0)) + registryCmd.AddCommand(zarfCraneCatalog(craneOptions)) + registryCmd.AddCommand(zarfCraneInternalWrapper(craneCmd.NewCmdList, craneOptions, lang.CmdToolsRegistryListExample, 0)) + registryCmd.AddCommand(zarfCraneInternalWrapper(craneCmd.NewCmdPush, craneOptions, lang.CmdToolsRegistryPushExample, 1)) + registryCmd.AddCommand(zarfCraneInternalWrapper(craneCmd.NewCmdPull, craneOptions, lang.CmdToolsRegistryPullExample, 0)) + registryCmd.AddCommand(zarfCraneInternalWrapper(craneCmd.NewCmdDelete, craneOptions, lang.CmdToolsRegistryDeleteExample, 0)) + registryCmd.AddCommand(zarfCraneInternalWrapper(craneCmd.NewCmdDigest, craneOptions, lang.CmdToolsRegistryDigestExample, 0)) registryCmd.AddCommand(pruneCmd) registryCmd.AddCommand(craneCmd.NewCmdVersion()) @@ -103,8 +104,8 @@ func init() { } // Wrap the original crane catalog with a zarf specific version -func zarfCraneCatalog(cranePlatformOptions *[]crane.Option) *cobra.Command { - craneCatalog := craneCmd.NewCmdCatalog(cranePlatformOptions) +func zarfCraneCatalog(cranePlatformOptions []crane.Option) *cobra.Command { + craneCatalog := craneCmd.NewCmdCatalog(&cranePlatformOptions) craneCatalog.Example = lang.CmdToolsRegistryCatalogExample craneCatalog.Args = nil @@ -135,8 +136,8 @@ func zarfCraneCatalog(cranePlatformOptions *[]crane.Option) *cobra.Command { } // Add the correct authentication to the crane command options - authOption := config.GetCraneAuthOption(zarfState.RegistryInfo.PullUsername, zarfState.RegistryInfo.PullPassword) - *cranePlatformOptions = append(*cranePlatformOptions, authOption) + authOption := images.WithPullAuth(zarfState.RegistryInfo) + cranePlatformOptions = append(cranePlatformOptions, authOption) if tunnel != nil { message.Notef(lang.CmdToolsRegistryTunnel, registryEndpoint, zarfState.RegistryInfo.Address) @@ -151,8 +152,8 @@ func zarfCraneCatalog(cranePlatformOptions *[]crane.Option) *cobra.Command { } // Wrap the original crane list with a zarf specific version -func zarfCraneInternalWrapper(commandToWrap func(*[]crane.Option) *cobra.Command, cranePlatformOptions *[]crane.Option, exampleText string, imageNameArgumentIndex int) *cobra.Command { - wrappedCommand := commandToWrap(cranePlatformOptions) +func zarfCraneInternalWrapper(commandToWrap func(*[]crane.Option) *cobra.Command, cranePlatformOptions []crane.Option, exampleText string, imageNameArgumentIndex int) *cobra.Command { + wrappedCommand := commandToWrap(&cranePlatformOptions) wrappedCommand.Example = exampleText wrappedCommand.Args = nil @@ -190,8 +191,8 @@ func zarfCraneInternalWrapper(commandToWrap func(*[]crane.Option) *cobra.Command } // Add the correct authentication to the crane command options - authOption := config.GetCraneAuthOption(zarfState.RegistryInfo.PushUsername, zarfState.RegistryInfo.PushPassword) - *cranePlatformOptions = append(*cranePlatformOptions, authOption) + authOption := images.WithPushAuth(zarfState.RegistryInfo) + cranePlatformOptions = append(cranePlatformOptions, authOption) if tunnel != nil { message.Notef(lang.CmdToolsRegistryTunnel, tunnel.Endpoint(), zarfState.RegistryInfo.Address) @@ -245,7 +246,7 @@ func pruneImages(_ *cobra.Command, _ []string) error { } func doPruneImagesForPackages(zarfState *types.ZarfState, zarfPackages []types.DeployedPackage, registryEndpoint string) error { - authOption := config.GetCraneAuthOption(zarfState.RegistryInfo.PushUsername, zarfState.RegistryInfo.PushPassword) + authOption := images.WithPushAuth(zarfState.RegistryInfo) spinner := message.NewProgressSpinner(lang.CmdToolsRegistryPruneLookup) defer spinner.Stop() diff --git a/src/config/config.go b/src/config/config.go index 63396255b0..8aedce6abb 100644 --- a/src/config/config.go +++ b/src/config/config.go @@ -5,10 +5,8 @@ package config import ( - "crypto/tls" "embed" "fmt" - "net/http" "os" "path/filepath" "runtime" @@ -16,9 +14,6 @@ import ( "time" "github.com/defenseunicorns/zarf/src/types" - "github.com/google/go-containerregistry/pkg/authn" - "github.com/google/go-containerregistry/pkg/crane" - v1 "github.com/google/go-containerregistry/pkg/v1" ) // Zarf Global Configuration Constants. @@ -121,43 +116,6 @@ func GetDataInjectionMarker() string { return fmt.Sprintf(dataInjectionMarker, operationStartTime) } -// GetCraneOptions returns a crane option object with the correct options & platform. -func GetCraneOptions(insecure bool, archs ...string) []crane.Option { - var options []crane.Option - - // Handle insecure registry option - if insecure { - roundTripper := http.DefaultTransport.(*http.Transport).Clone() - roundTripper.TLSClientConfig = &tls.Config{ - InsecureSkipVerify: true, - } - options = append(options, crane.Insecure, crane.WithTransport(roundTripper)) - } - - if archs != nil { - options = append(options, crane.WithPlatform(&v1.Platform{OS: "linux", Architecture: GetArch(archs...)})) - } - - options = append(options, - crane.WithUserAgent("zarf"), - crane.WithNoClobber(true), - // TODO: (@WSTARR) this is set to limit pushes to registry pods and reduce the likelihood that crane will get stuck. - // We should investigate this further in the future to dig into more of what is happening (see https://github.com/defenseunicorns/zarf/issues/1568) - crane.WithJobs(1), - ) - - return options -} - -// GetCraneAuthOption returns a crane auth option with the provided credentials. -func GetCraneAuthOption(username string, secret string) crane.Option { - return crane.WithAuth( - authn.FromConfig(authn.AuthConfig{ - Username: username, - Password: secret, - })) -} - // GetAbsCachePath gets the absolute cache path for images and git repos. func GetAbsCachePath() string { return GetAbsHomePath(CommonOptions.CachePath) diff --git a/src/config/lang/english.go b/src/config/lang/english.go index 224e269166..b72ffa1461 100644 --- a/src/config/lang/english.go +++ b/src/config/lang/english.go @@ -729,11 +729,10 @@ const ( // Collection of reusable error messages. var ( - ErrInitNotFound = errors.New("this command requires a zarf-init package, but one was not found on the local system. Re-run the last command again without '--confirm' to download the package") - ErrUnableToCheckArch = errors.New("unable to get the configured cluster's architecture") - ErrInterrupt = errors.New("execution cancelled due to an interrupt") - ErrUnableToGetPackages = errors.New("unable to load the Zarf Package data from the cluster") - ErrUnsupportedImageType = errors.New("zarf does not currently support image indexes or docker manifest lists") + ErrInitNotFound = errors.New("this command requires a zarf-init package, but one was not found on the local system. Re-run the last command again without '--confirm' to download the package") + ErrUnableToCheckArch = errors.New("unable to get the configured cluster's architecture") + ErrInterrupt = errors.New("execution cancelled due to an interrupt") + ErrUnableToGetPackages = errors.New("unable to load the Zarf Package data from the cluster") ) // Collection of reusable warn messages. diff --git a/src/internal/packager/images/common.go b/src/internal/packager/images/common.go index 9a27edddbc..9fca5c0a0c 100644 --- a/src/internal/packager/images/common.go +++ b/src/internal/packager/images/common.go @@ -5,13 +5,35 @@ package images import ( + "net/http" + "time" + + "github.com/defenseunicorns/pkg/helpers" + "github.com/defenseunicorns/zarf/src/config" + "github.com/defenseunicorns/zarf/src/pkg/message" "github.com/defenseunicorns/zarf/src/pkg/transform" "github.com/defenseunicorns/zarf/src/types" + "github.com/google/go-containerregistry/pkg/authn" + "github.com/google/go-containerregistry/pkg/crane" + v1 "github.com/google/go-containerregistry/pkg/v1" ) -// ImageConfig is the main struct for managing container images. -type ImageConfig struct { - ImagesPath string +// PullConfig is the configuration for pulling images. +type PullConfig struct { + DestinationDirectory string + + ImageList []transform.Image + + Arch string + + RegistryOverrides map[string]string + + CacheDirectory string +} + +// PushConfig is the configuration for pushing images. +type PushConfig struct { + SourceDirectory string ImageList []transform.Image @@ -19,9 +41,75 @@ type ImageConfig struct { NoChecksum bool - Insecure bool + Arch string + + Retries int +} - Architectures []string +// NoopOpt is a no-op option for crane. +func NoopOpt(*crane.Options) {} - RegistryOverrides map[string]string +// WithGlobalInsecureFlag returns an option for crane that configures insecure +// based upon Zarf's global --insecure flag. +func WithGlobalInsecureFlag() []crane.Option { + if config.CommonOptions.Insecure { + return []crane.Option{crane.Insecure} + } + // passing a nil option will cause panic + return []crane.Option{NoopOpt} +} + +// WithArchitecture sets the platform option for crane. +// +// This option is actually a slight mis-use of the platform option, as it is +// setting the architecture only and hard coding the OS to linux. +func WithArchitecture(arch string) crane.Option { + return crane.WithPlatform(&v1.Platform{OS: "linux", Architecture: arch}) +} + +// CommonOpts returns a set of common options for crane under Zarf. +func CommonOpts(arch string) []crane.Option { + opts := WithGlobalInsecureFlag() + opts = append(opts, WithArchitecture(arch)) + + opts = append(opts, + crane.WithUserAgent("zarf"), + crane.WithNoClobber(true), + crane.WithJobs(1), + ) + return opts +} + +// WithBasicAuth returns an option for crane that sets basic auth. +func WithBasicAuth(username, password string) crane.Option { + return crane.WithAuth(authn.FromConfig(authn.AuthConfig{ + Username: username, + Password: password, + })) +} + +// WithPullAuth returns an option for crane that sets pull auth from a given registry info. +func WithPullAuth(ri types.RegistryInfo) crane.Option { + return WithBasicAuth(ri.PullUsername, ri.PullPassword) +} + +// WithPushAuth returns an option for crane that sets push auth from a given registry info. +func WithPushAuth(ri types.RegistryInfo) crane.Option { + return WithBasicAuth(ri.PushUsername, ri.PushPassword) +} + +func createPushOpts(cfg PushConfig, pb *message.ProgressBar) []crane.Option { + opts := CommonOpts(cfg.Arch) + opts = append(opts, WithPushAuth(cfg.RegInfo)) + + transport := http.DefaultTransport.(*http.Transport).Clone() + transport.TLSClientConfig.InsecureSkipVerify = config.CommonOptions.Insecure + // TODO (@WSTARR) This is set to match the TLSHandshakeTimeout to potentially mitigate effects of https://github.com/defenseunicorns/zarf/issues/1444 + transport.ResponseHeaderTimeout = 10 * time.Second + + transportWithProgressBar := helpers.NewTransport(transport, pb) + + opts = append(opts, crane.WithTransport(transportWithProgressBar)) + + return opts } diff --git a/src/internal/packager/images/pull.go b/src/internal/packager/images/pull.go index 2db0200d53..c440f8d33d 100644 --- a/src/internal/packager/images/pull.go +++ b/src/internal/packager/images/pull.go @@ -7,13 +7,19 @@ package images import ( "context" "encoding/json" + "errors" "fmt" + "io/fs" + "maps" + "os" "path/filepath" "strings" + "sync" + "sync/atomic" + "time" "github.com/defenseunicorns/pkg/helpers" "github.com/defenseunicorns/zarf/src/config" - "github.com/defenseunicorns/zarf/src/config/lang" "github.com/defenseunicorns/zarf/src/pkg/layout" "github.com/defenseunicorns/zarf/src/pkg/message" "github.com/defenseunicorns/zarf/src/pkg/transform" @@ -27,32 +33,17 @@ import ( "github.com/google/go-containerregistry/pkg/v1/empty" clayout "github.com/google/go-containerregistry/pkg/v1/layout" "github.com/google/go-containerregistry/pkg/v1/partial" - ctypes "github.com/google/go-containerregistry/pkg/v1/types" + "github.com/google/go-containerregistry/pkg/v1/remote" + "github.com/google/go-containerregistry/pkg/v1/types" "github.com/moby/moby/client" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" + "golang.org/x/sync/errgroup" ) -// ImgInfo wraps references/information about an image -type ImgInfo struct { - RefInfo transform.Image - Img v1.Image - HasImageLayers bool -} - -// PullAll pulls all of the images in the provided tag map. -func (i *ImageConfig) PullAll() ([]ImgInfo, error) { - var ( - longer string - imageCount = len(i.ImageList) - refInfoToImage = map[transform.Image]v1.Image{} - referenceToDigest = make(map[string]string) - imgInfoList []ImgInfo - ) - - type digestInfo struct { - refInfo transform.Image - digest string - } - +// Pull pulls all of the images from the given config. +func Pull(ctx context.Context, cfg PullConfig) (map[transform.Image]v1.Image, error) { + var longer string + imageCount := len(cfg.ImageList) // Give some additional user feedback on larger image sets if imageCount > 15 { longer = "This step may take a couple of minutes to complete." @@ -60,296 +51,298 @@ func (i *ImageConfig) PullAll() ([]ImgInfo, error) { longer = "This step may take several seconds to complete." } - spinner := message.NewProgressSpinner("Loading metadata for %d images. %s", imageCount, longer) + if err := helpers.CreateDirectory(cfg.DestinationDirectory, helpers.ReadExecuteAllWriteUser); err != nil { + return nil, fmt.Errorf("failed to create image path %s: %w", cfg.DestinationDirectory, err) + } + + cranePath, err := clayout.Write(cfg.DestinationDirectory, empty.Index) + if err != nil { + return nil, err + } + + spinner := message.NewProgressSpinner("Fetching info for %d images. %s", imageCount, longer) defer spinner.Stop() logs.Warn.SetOutput(&message.DebugWriter{}) logs.Progress.SetOutput(&message.DebugWriter{}) - metadataImageConcurrency := helpers.NewConcurrencyTools[ImgInfo, error](len(i.ImageList)) + eg, ectx := errgroup.WithContext(ctx) + eg.SetLimit(10) - defer metadataImageConcurrency.Cancel() + var shaLock sync.Mutex + shas := map[string]bool{} + opts := CommonOpts(cfg.Arch) - spinner.Updatef("Fetching image metadata (0 of %d)", len(i.ImageList)) + fetched := map[transform.Image]v1.Image{} - // Spawn a goroutine for each image to load its metadata - for _, refInfo := range i.ImageList { - // Create a closure so that we can pass the src into the goroutine - refInfo := refInfo - go func() { + var counter, totalBytes atomic.Int64 - if metadataImageConcurrency.IsDone() { - return - } + for _, refInfo := range cfg.ImageList { + refInfo := refInfo + eg.Go(func() error { + idx := counter.Add(1) + spinner.Updatef("Fetching image info (%d of %d)", idx, imageCount) - actualSrc := refInfo.Reference - for k, v := range i.RegistryOverrides { + ref := refInfo.Reference + for k, v := range cfg.RegistryOverrides { if strings.HasPrefix(refInfo.Reference, k) { - actualSrc = strings.Replace(refInfo.Reference, k, v, 1) + ref = strings.Replace(refInfo.Reference, k, v, 1) } } - if metadataImageConcurrency.IsDone() { - return - } - - img, hasImageLayers, err := i.PullImage(actualSrc, spinner) - if err != nil { - metadataImageConcurrency.ErrorChan <- fmt.Errorf("failed to pull %s: %w", actualSrc, err) - return - } + var img v1.Image + var desc *remote.Descriptor - if metadataImageConcurrency.IsDone() { - return + // load from local fs if it's a tarball + if strings.HasSuffix(ref, ".tar") || strings.HasSuffix(ref, ".tar.gz") || strings.HasSuffix(ref, ".tgz") { + img, err = crane.Load(ref, opts...) + if err != nil { + return fmt.Errorf("unable to load %s: %w", refInfo.Reference, err) + } + } else { + reference, err := name.ParseReference(ref) + if err != nil { + return fmt.Errorf("failed to parse reference: %w", err) + } + desc, err = crane.Get(ref, opts...) + if err != nil { + if strings.Contains(err.Error(), "unexpected status code 429 Too Many Requests") { + return fmt.Errorf("rate limited by registry: %w", err) + } + + message.Warnf("Falling back to local 'docker', failed to find the manifest on a remote: %s", err.Error()) + + // Attempt to connect to the local docker daemon. + cli, err := client.NewClientWithOpts(client.FromEnv) + if err != nil { + return fmt.Errorf("docker not available: %w", err) + } + cli.NegotiateAPIVersion(ectx) + + // Inspect the image to get the size. + rawImg, _, err := cli.ImageInspectWithRaw(ectx, ref) + if err != nil { + return err + } + + // Warn the user if the image is large. + if rawImg.Size > 750*1000*1000 { + message.Warnf("%s is %s and may take a very long time to load via docker. "+ + "See https://docs.zarf.dev/faq for suggestions on how to improve large local image loading operations.", + ref, utils.ByteFormat(float64(rawImg.Size), 2)) + } + + // Use unbuffered opener to avoid OOM Kill issues https://github.com/defenseunicorns/zarf/issues/1214. + // This will also take forever to load large images. + img, err = daemon.Image(reference, daemon.WithUnbufferedOpener()) + if err != nil { + return fmt.Errorf("failed to load from docker daemon: %w", err) + } + } else { + img, err = crane.Pull(ref, opts...) + if err != nil { + return fmt.Errorf("unable to pull image %s: %w", refInfo.Reference, err) + } + } } - metadataImageConcurrency.ProgressChan <- ImgInfo{RefInfo: refInfo, Img: img, HasImageLayers: hasImageLayers} - }() - } + if refInfo.Digest != "" && desc != nil && types.MediaType(desc.MediaType).IsIndex() { + message.Warn("Zarf does not currently support direct consumption of OCI image indexes or Docker manifest lists") - onMetadataProgress := func(finishedImage ImgInfo, iteration int) { - spinner.Updatef("Fetching image metadata (%d of %d): %s", iteration+1, len(i.ImageList), finishedImage.RefInfo.Reference) - refInfoToImage[finishedImage.RefInfo] = finishedImage.Img - imgInfoList = append(imgInfoList, finishedImage) - } - - onMetadataError := func(err error) error { - return err - } + var idx v1.IndexManifest + if err := json.Unmarshal(desc.Manifest, &idx); err != nil { + return fmt.Errorf("unable to unmarshal index manifest: %w", err) + } + lines := []string{"The following images are available in the index:"} + name := refInfo.Name + if refInfo.Tag != "" { + name += ":" + refInfo.Tag + } + for _, desc := range idx.Manifests { + lines = append(lines, fmt.Sprintf("\n(%s) %s@%s", desc.Platform, name, desc.Digest)) + } + message.Warn(strings.Join(lines, "\n")) + return fmt.Errorf("%s resolved to an index, please select a specific platform to use", refInfo.Reference) + } - if err := metadataImageConcurrency.WaitWithProgress(onMetadataProgress, onMetadataError); err != nil { - return nil, err - } + img = cache.Image(img, cache.NewFilesystemCache(cfg.CacheDirectory)) - // Create the ImagePath directory - if err := helpers.CreateDirectory(i.ImagesPath, helpers.ReadExecuteAllWriteUser); err != nil { - return nil, fmt.Errorf("failed to create image path %s: %w", i.ImagesPath, err) - } + manifest, err := img.Manifest() + if err != nil { + return fmt.Errorf("unable to get manifest for %s: %w", refInfo.Reference, err) + } + totalBytes.Add(manifest.Config.Size) - totalBytes := int64(0) - processedLayers := make(map[string]v1.Layer) - for refInfo, img := range refInfoToImage { - // Get the byte size for this image - layers, err := img.Layers() - if err != nil { - return nil, fmt.Errorf("unable to get layers for image %s: %w", refInfo.Reference, err) - } - for _, layer := range layers { - layerDigest, err := layer.Digest() + layers, err := img.Layers() if err != nil { - return nil, fmt.Errorf("unable to get digest for image layer: %w", err) + return fmt.Errorf("unable to get layers for %s: %w", refInfo.Reference, err) } - // Only calculate this layer size if we haven't already looked at it - if _, ok := processedLayers[layerDigest.Hex]; !ok { - size, err := layer.Size() + shaLock.Lock() + defer shaLock.Unlock() + for _, layer := range layers { + digest, err := layer.Digest() if err != nil { - return nil, fmt.Errorf("unable to get size of layer: %w", err) + return fmt.Errorf("unable to get digest for image layer: %w", err) + } + + if _, ok := shas[digest.Hex]; !ok { + shas[digest.Hex] = true + size, err := layer.Size() + if err != nil { + return fmt.Errorf("unable to get size for image layer: %w", err) + } + totalBytes.Add(size) } - totalBytes += size - processedLayers[layerDigest.Hex] = layer } - } - } - spinner.Updatef("Preparing image sources and cache for image pulling") + if img == nil { + return fmt.Errorf("failed to fetch image %s", refInfo.Reference) + } - // Create special sauce crane Path object - // If it already exists use it - cranePath, err := clayout.FromPath(i.ImagesPath) - // Use crane pattern for creating OCI layout if it doesn't exist - if err != nil { - // If it doesn't exist create it - cranePath, err = clayout.Write(i.ImagesPath, empty.Index) - if err != nil { - return nil, err - } + fetched[refInfo] = img + + return nil + }) } - for refInfo, img := range refInfoToImage { - imgDigest, err := img.Digest() - if err != nil { - return nil, fmt.Errorf("unable to get digest for image %s: %w", refInfo.Reference, err) - } - referenceToDigest[refInfo.Reference] = imgDigest.String() + if err := eg.Wait(); err != nil { + return nil, err } - spinner.Success() + spinner.Successf("Fetched info for %d images", imageCount) - // Create a thread to update a progress bar as we save the image files to disk doneSaving := make(chan error) updateText := fmt.Sprintf("Pulling %d images", imageCount) - go utils.RenderProgressBarForLocalDirWrite(i.ImagesPath, totalBytes, doneSaving, updateText, updateText) - - imageSavingConcurrency := helpers.NewConcurrencyTools[digestInfo, error](len(refInfoToImage)) - - defer imageSavingConcurrency.Cancel() - - // Spawn a goroutine for each image to write it's config and manifest to disk using crane - for refInfo, img := range refInfoToImage { - // Create a closure so that we can pass the refInfo and img into the goroutine - refInfo, img := refInfo, img - go func() { - if err := cranePath.WriteImage(img); err != nil { - // Check if the cache has been invalidated, and warn the user if so - if strings.HasPrefix(err.Error(), "error writing layer: expected blob size") { - message.Warnf("Potential image cache corruption: %s - try clearing cache with \"zarf tools clear-cache\"", err.Error()) - } - imageSavingConcurrency.ErrorChan <- fmt.Errorf("error when trying to save the img (%s): %w", refInfo.Reference, err) - return - } - - if imageSavingConcurrency.IsDone() { - return - } + go utils.RenderProgressBarForLocalDirWrite(cfg.DestinationDirectory, totalBytes.Load(), doneSaving, updateText, updateText) - // Get the image digest so we can set an annotation in the image.json later - imgDigest, err := img.Digest() - if err != nil { - imageSavingConcurrency.ErrorChan <- err - return - } - - if imageSavingConcurrency.IsDone() { - return - } - - imageSavingConcurrency.ProgressChan <- digestInfo{digest: imgDigest.String(), refInfo: refInfo} - }() - } - - onImageSavingProgress := func(finishedImage digestInfo, _ int) { - referenceToDigest[finishedImage.refInfo.Reference] = finishedImage.digest - } + toPull := maps.Clone(fetched) - onImageSavingError := func(err error) error { - // Send a signal to the progress bar that we're done and wait for the thread to finish - doneSaving <- err - <-doneSaving - message.WarnErr(err, "Failed to write image config or manifest, trying again up to 3 times...") + sc := func() error { + saved, err := SaveConcurrent(ctx, cranePath, toPull) + for k := range saved { + delete(toPull, k) + } return err } - if err := imageSavingConcurrency.WaitWithProgress(onImageSavingProgress, onImageSavingError); err != nil { - return nil, err - } - - // for every image sequentially append OCI descriptor - for refInfo, img := range refInfoToImage { - desc, err := partial.Descriptor(img) - if err != nil { - return nil, err + ss := func() error { + saved, err := SaveSequential(ctx, cranePath, toPull) + for k := range saved { + delete(toPull, k) } + return err + } - if err := cranePath.AppendDescriptor(*desc); err != nil { - return nil, err - } + if err := helpers.Retry(sc, 2, 5*time.Second, message.Warnf); err != nil { + message.Warnf("Failed to save images in parallel, falling back to sequential save: %s", err.Error()) - imgDigest, err := img.Digest() - if err != nil { + if err := helpers.Retry(ss, 2, 5*time.Second, message.Warnf); err != nil { return nil, err } - - referenceToDigest[refInfo.Reference] = imgDigest.String() - } - - if err := utils.AddImageNameAnnotation(i.ImagesPath, referenceToDigest); err != nil { - return nil, fmt.Errorf("unable to format OCI layout: %w", err) } // Send a signal to the progress bar that we're done and wait for the thread to finish doneSaving <- nil <-doneSaving - return imgInfoList, nil + return fetched, nil +} + +// CleanupInProgressLayers removes incomplete layers from the cache. +func CleanupInProgressLayers(ctx context.Context, img v1.Image) error { + layers, err := img.Layers() + if err != nil { + return err + } + eg, _ := errgroup.WithContext(ctx) + for _, layer := range layers { + layer := layer + eg.Go(func() error { + digest, err := layer.Digest() + if err != nil { + return err + } + size, err := layer.Size() + if err != nil { + return err + } + cacheDir := filepath.Join(config.GetAbsCachePath(), layout.ImagesDir) + location := filepath.Join(cacheDir, digest.String()) + info, err := os.Stat(location) + if errors.Is(err, fs.ErrNotExist) { + return nil + } + if err != nil { + return err + } + if info.Size() != size { + if err := os.Remove(location); err != nil { + return fmt.Errorf("failed to remove incomplete layer %s: %w", digest.Hex, err) + } + } + return nil + }) + } + return eg.Wait() } -// PullImage returns a v1.Image either by loading a local tarball or pulling from the wider internet. -func (i *ImageConfig) PullImage(src string, spinner *message.Spinner) (img v1.Image, hasImageLayers bool, err error) { - cacheImage := false - // Load image tarballs from the local filesystem. - if strings.HasSuffix(src, ".tar") || strings.HasSuffix(src, ".tar.gz") || strings.HasSuffix(src, ".tgz") { - spinner.Updatef("Reading image tarball: %s", src) - img, err = crane.Load(src, config.GetCraneOptions(true, i.Architectures...)...) - if err != nil { - return nil, false, err +// SaveSequential saves images sequentially. +func SaveSequential(ctx context.Context, cl clayout.Path, m map[transform.Image]v1.Image) (map[transform.Image]v1.Image, error) { + saved := map[transform.Image]v1.Image{} + for info, img := range m { + annotations := map[string]string{ + ocispec.AnnotationBaseImageName: info.Reference, } - } else if desc, err := crane.Get(src, config.GetCraneOptions(i.Insecure)...); err != nil { - // If crane is unable to pull the image, try to load it from the local docker daemon. - message.Notef("Falling back to local 'docker' images, failed to find the manifest on a remote: %s", err.Error()) - - // Parse the image reference to get the image name. - reference, err := name.ParseReference(src) - if err != nil { - return nil, false, fmt.Errorf("failed to parse image reference: %w", err) + if err := cl.AppendImage(img, clayout.WithAnnotations(annotations)); err != nil { + if err := CleanupInProgressLayers(ctx, img); err != nil { + message.WarnErr(err, "failed to clean up in-progress layers, please run `zarf tools clear-cache`") + } + return saved, err } + saved[info] = img + } + return saved, nil +} - // Attempt to connect to the local docker daemon. - ctx := context.TODO() - cli, err := client.NewClientWithOpts(client.FromEnv) - if err != nil { - return nil, false, fmt.Errorf("docker not available: %w", err) - } - cli.NegotiateAPIVersion(ctx) +// SaveConcurrent saves images in a concurrent, bounded manner. +func SaveConcurrent(ctx context.Context, cl clayout.Path, m map[transform.Image]v1.Image) (map[transform.Image]v1.Image, error) { + saved := map[transform.Image]v1.Image{} - // Inspect the image to get the size. - rawImg, _, err := cli.ImageInspectWithRaw(ctx, src) - if err != nil { - return nil, false, fmt.Errorf("failed to inspect image via docker: %w", err) - } + var mu sync.Mutex - // Warn the user if the image is large. - if rawImg.Size > 750*1000*1000 { - message.Warnf("%s is %s and may take a very long time to load via docker. "+ - "See https://docs.zarf.dev/docs/faq for suggestions on how to improve large local image loading operations.", - src, utils.ByteFormat(float64(rawImg.Size), 2)) - } + eg, ectx := errgroup.WithContext(ctx) + eg.SetLimit(10) - // Use unbuffered opener to avoid OOM Kill issues https://github.com/defenseunicorns/zarf/issues/1214. - // This will also take for ever to load large images. - if img, err = daemon.Image(reference, daemon.WithUnbufferedOpener()); err != nil { - return nil, false, fmt.Errorf("failed to load image from docker daemon: %w", err) - } - } else { - refInfo, err := transform.ParseImageRef(src) - if err != nil { - return nil, false, err - } - // Check if we have an image index or manifest list and if so error out - if refInfo.Digest != "" && (desc.MediaType == ctypes.OCIImageIndex || desc.MediaType == ctypes.DockerManifestList) { - var idx v1.IndexManifest - if err := json.Unmarshal(desc.Manifest, &idx); err != nil { - return nil, false, fmt.Errorf("%w: %w", lang.ErrUnsupportedImageType, err) - } - imageOptions := "please select one of the images below based on your platform to use instead" - imageBaseName := refInfo.Name - if refInfo.Tag != "" { - imageBaseName = fmt.Sprintf("%s:%s", imageBaseName, refInfo.Tag) - } - for _, manifest := range idx.Manifests { - imageOptions = fmt.Sprintf("%s\n %s@%s for platform %s", imageOptions, imageBaseName, manifest.Digest, manifest.Platform) + for info, img := range m { + info, img := info, img + eg.Go(func() error { + desc, err := partial.Descriptor(img) + if err != nil { + return err } - return nil, false, fmt.Errorf("%w: %s", lang.ErrUnsupportedImageType, imageOptions) - } - // Manifest was found, so use crane to pull the image. - if img, err = crane.Pull(src, config.GetCraneOptions(i.Insecure, i.Architectures...)...); err != nil { - return nil, false, fmt.Errorf("failed to pull image: %w", err) - } - cacheImage = true - } + if err := cl.WriteImage(img); err != nil { + if err := CleanupInProgressLayers(ectx, img); err != nil { + message.WarnErr(err, "failed to clean up in-progress layers, please run `zarf tools clear-cache`") + } + return err + } - hasImageLayers, err = utils.HasImageLayers(img) - if err != nil { - return nil, false, fmt.Errorf("failed to check image layer mediatype: %w", err) - } + mu.Lock() + defer mu.Unlock() + annotations := map[string]string{ + ocispec.AnnotationBaseImageName: info.Reference, + } + desc.Annotations = annotations + if err := cl.AppendDescriptor(*desc); err != nil { + return err + } - if hasImageLayers && cacheImage { - spinner.Updatef("Preparing image %s", src) - imageCachePath := filepath.Join(config.GetAbsCachePath(), layout.ImagesDir) - img = cache.Image(img, cache.NewFilesystemCache(imageCachePath)) + saved[info] = img + return nil + }) } - return img, hasImageLayers, nil - + return saved, eg.Wait() } diff --git a/src/internal/packager/images/push.go b/src/internal/packager/images/push.go index 900b925651..64e592a5ab 100644 --- a/src/internal/packager/images/push.go +++ b/src/internal/packager/images/push.go @@ -6,11 +6,9 @@ package images import ( "fmt" - "net/http" "time" "github.com/defenseunicorns/pkg/helpers" - "github.com/defenseunicorns/zarf/src/config" "github.com/defenseunicorns/zarf/src/pkg/cluster" "github.com/defenseunicorns/zarf/src/pkg/k8s" "github.com/defenseunicorns/zarf/src/pkg/message" @@ -21,23 +19,20 @@ import ( v1 "github.com/google/go-containerregistry/pkg/v1" ) -// PushToZarfRegistry pushes a provided image into the configured Zarf registry -// This function will optionally shorten the image name while appending a checksum of the original image name. -func (i *ImageConfig) PushToZarfRegistry() error { - message.Debug("images.PushToZarfRegistry()") - +// Push pushes images to a registry. +func Push(cfg PushConfig) error { logs.Warn.SetOutput(&message.DebugWriter{}) logs.Progress.SetOutput(&message.DebugWriter{}) - refInfoToImage := map[transform.Image]v1.Image{} + toPush := map[transform.Image]v1.Image{} var totalSize int64 // Build an image list from the references - for _, refInfo := range i.ImageList { - img, err := utils.LoadOCIImage(i.ImagesPath, refInfo) + for _, refInfo := range cfg.ImageList { + img, err := utils.LoadOCIImage(cfg.SourceDirectory, refInfo) if err != nil { return err } - refInfoToImage[refInfo] = img + toPush[refInfo] = img imgSize, err := calcImgSize(img) if err != nil { return err @@ -46,85 +41,95 @@ func (i *ImageConfig) PushToZarfRegistry() error { } // If this is not a no checksum image push we will be pushing two images (the second will go faster as it checks the same layers) - if !i.NoChecksum { + if !cfg.NoChecksum { totalSize = totalSize * 2 } - httpTransport := http.DefaultTransport.(*http.Transport).Clone() - httpTransport.TLSClientConfig.InsecureSkipVerify = i.Insecure - // TODO (@WSTARR) This is set to match the TLSHandshakeTimeout to potentially mitigate effects of https://github.com/defenseunicorns/zarf/issues/1444 - httpTransport.ResponseHeaderTimeout = 10 * time.Second - progressBar := message.NewProgressBar(totalSize, fmt.Sprintf("Pushing %d images to the zarf registry", len(i.ImageList))) - defer progressBar.Stop() - craneTransport := helpers.NewTransport(httpTransport, progressBar) - - pushOptions := config.GetCraneOptions(i.Insecure, i.Architectures...) - pushOptions = append(pushOptions, config.GetCraneAuthOption(i.RegInfo.PushUsername, i.RegInfo.PushPassword)) - pushOptions = append(pushOptions, crane.WithTransport(craneTransport)) - var ( err error tunnel *k8s.Tunnel - registryURL string + registryURL = cfg.RegInfo.Address ) - registryURL = i.RegInfo.Address + progress := message.NewProgressBar(totalSize, fmt.Sprintf("Pushing %d images", len(toPush))) + defer progress.Stop() - c, _ := cluster.NewCluster() - if c != nil { - registryURL, tunnel, err = c.ConnectToZarfRegistryEndpoint(i.RegInfo) - if err != nil { - return err + if err := helpers.Retry(func() error { + c, _ := cluster.NewCluster() + if c != nil { + registryURL, tunnel, err = c.ConnectToZarfRegistryEndpoint(cfg.RegInfo) + if err != nil { + return err + } + if tunnel != nil { + defer tunnel.Close() + } } - } - if tunnel != nil { - defer tunnel.Close() - } + progress = message.NewProgressBar(totalSize, fmt.Sprintf("Pushing %d images", len(toPush))) + pushOptions := createPushOpts(cfg, progress) - pushImage := func(img v1.Image, name string) error { - if tunnel != nil { - return tunnel.Wrap(func() error { return crane.Push(img, name, pushOptions...) }) - } + pushImage := func(img v1.Image, name string) error { + if tunnel != nil { + return tunnel.Wrap(func() error { return crane.Push(img, name, pushOptions...) }) + } - return crane.Push(img, name, pushOptions...) - } + return crane.Push(img, name, pushOptions...) + } - for refInfo, img := range refInfoToImage { - refTruncated := helpers.Truncate(refInfo.Reference, 55, true) - progressBar.UpdateTitle(fmt.Sprintf("Pushing %s", refTruncated)) + pushed := []transform.Image{} + defer func() { + for _, refInfo := range pushed { + delete(toPush, refInfo) + } + }() + for refInfo, img := range toPush { + refTruncated := helpers.Truncate(refInfo.Reference, 55, true) + progress.UpdateTitle(fmt.Sprintf("Pushing %s", refTruncated)) - // If this is not a no checksum image push it for use with the Zarf agent - if !i.NoChecksum { - offlineNameCRC, err := transform.ImageTransformHost(registryURL, refInfo.Reference) + size, err := calcImgSize(img) if err != nil { return err } - message.Debugf("crane.Push() %s:%s -> %s)", i.ImagesPath, refInfo.Reference, offlineNameCRC) + // If this is not a no checksum image push it for use with the Zarf agent + if !cfg.NoChecksum { + offlineNameCRC, err := transform.ImageTransformHost(registryURL, refInfo.Reference) + if err != nil { + return err + } + + message.Debugf("push %s -> %s)", refInfo.Reference, offlineNameCRC) + + if err = pushImage(img, offlineNameCRC); err != nil { + return err + } - err = pushImage(img, offlineNameCRC) + totalSize -= size + } + + // To allow for other non-zarf workloads to easily see the images upload a non-checksum version + // (this may result in collisions but this is acceptable for this use case) + offlineName, err := transform.ImageTransformHostWithoutChecksum(registryURL, refInfo.Reference) if err != nil { return err } - } - // To allow for other non-zarf workloads to easily see the images upload a non-checksum version - // (this may result in collisions but this is acceptable for this use case) - offlineName, err := transform.ImageTransformHostWithoutChecksum(registryURL, refInfo.Reference) - if err != nil { - return err - } + message.Debugf("push %s -> %s)", refInfo.Reference, offlineName) - message.Debugf("crane.Push() %s:%s -> %s)", i.ImagesPath, refInfo.Reference, offlineName) + if err = pushImage(img, offlineName); err != nil { + return err + } - err = pushImage(img, offlineName) - if err != nil { - return err + pushed = append(pushed, refInfo) + totalSize -= size } + return nil + }, cfg.Retries, 5*time.Second, message.Warnf); err != nil { + return err } - progressBar.Successf("Pushed %d images to the zarf registry", len(i.ImageList)) + progress.Successf("Pushed %d images", len(cfg.ImageList)) return nil } diff --git a/src/pkg/packager/creator/normal.go b/src/pkg/packager/creator/normal.go index 81478c730e..0cb5c7d921 100644 --- a/src/pkg/packager/creator/normal.go +++ b/src/pkg/packager/creator/normal.go @@ -8,6 +8,7 @@ import ( "context" "errors" "fmt" + "math/rand" "os" "path/filepath" "strconv" @@ -168,6 +169,9 @@ func (pc *PackageCreator) Assemble(dst *layout.PackagePaths, components []types. } imageList = helpers.Unique(imageList) + rs := rand.NewSource(time.Now().UnixNano()) + rnd := rand.New(rs) + rnd.Shuffle(len(imageList), func(i, j int) { imageList[i], imageList[j] = imageList[j], imageList[i] }) var sbomImageList []transform.Image // Images are handled separately from other component assets. @@ -176,37 +180,31 @@ func (pc *PackageCreator) Assemble(dst *layout.PackagePaths, components []types. dst.AddImages() - var pulled []images.ImgInfo - var err error - - ctx, cancel := context.WithCancel(context.TODO()) - doPull := func() error { - imgConfig := images.ImageConfig{ - ImagesPath: dst.Images.Base, - ImageList: imageList, - Insecure: config.CommonOptions.Insecure, - Architectures: []string{arch}, - RegistryOverrides: pc.createOpts.RegistryOverrides, - } - - pulled, err = imgConfig.PullAll() - if errors.Is(err, lang.ErrUnsupportedImageType) { - cancel() - } + ctx := context.TODO() - return err + pullCfg := images.PullConfig{ + DestinationDirectory: dst.Images.Base, + ImageList: imageList, + Arch: arch, + RegistryOverrides: pc.createOpts.RegistryOverrides, + CacheDirectory: filepath.Join(config.GetAbsCachePath(), layout.ImagesDir), } - if err := helpers.RetryWithContext(ctx, doPull, 3, 5*time.Second, message.Warnf); err != nil { - return fmt.Errorf("unable to pull images: %w", err) + pulled, err := images.Pull(ctx, pullCfg) + if err != nil { + return err } - for _, imgInfo := range pulled { - if err := dst.Images.AddV1Image(imgInfo.Img); err != nil { + for info, img := range pulled { + if err := dst.Images.AddV1Image(img); err != nil { return err } - if imgInfo.HasImageLayers { - sbomImageList = append(sbomImageList, imgInfo.RefInfo) + ok, err := utils.HasImageLayers(img) + if err != nil { + return fmt.Errorf("failed to validate %s is an image and not an artifact: %w", info, err) + } + if ok { + sbomImageList = append(sbomImageList, info) } } } diff --git a/src/pkg/packager/deploy.go b/src/pkg/packager/deploy.go index 181a52cb42..a595888414 100644 --- a/src/pkg/packager/deploy.go +++ b/src/pkg/packager/deploy.go @@ -481,10 +481,6 @@ func (p *Packager) populatePackageVariableConfig() error { // Push all of the components images to the configured container registry. func (p *Packager) pushImagesToRegistry(componentImages []string, noImgChecksum bool) error { - if len(componentImages) == 0 { - return nil - } - var combinedImageList []transform.Image for _, src := range componentImages { ref, err := transform.ParseImageRef(src) @@ -496,18 +492,16 @@ func (p *Packager) pushImagesToRegistry(componentImages []string, noImgChecksum imageList := helpers.Unique(combinedImageList) - imgConfig := images.ImageConfig{ - ImagesPath: p.layout.Images.Base, - ImageList: imageList, - NoChecksum: noImgChecksum, - RegInfo: p.state.RegistryInfo, - Insecure: config.CommonOptions.Insecure, - Architectures: []string{p.cfg.Pkg.Build.Architecture}, + pushCfg := images.PushConfig{ + SourceDirectory: p.layout.Images.Base, + ImageList: imageList, + RegInfo: p.state.RegistryInfo, + NoChecksum: noImgChecksum, + Arch: p.cfg.Pkg.Build.Architecture, + Retries: p.cfg.PkgOpts.Retries, } - return helpers.Retry(func() error { - return imgConfig.PushToZarfRegistry() - }, p.cfg.PkgOpts.Retries, 5*time.Second, message.Warnf) + return images.Push(pushCfg) } // Push all of the components git repos to the configured git server. diff --git a/src/pkg/packager/prepare.go b/src/pkg/packager/prepare.go index 6f5cbf0432..54c9158d5e 100644 --- a/src/pkg/packager/prepare.go +++ b/src/pkg/packager/prepare.go @@ -15,9 +15,9 @@ import ( "github.com/goccy/go-yaml" "github.com/defenseunicorns/pkg/helpers" - "github.com/defenseunicorns/zarf/src/config" "github.com/defenseunicorns/zarf/src/config/lang" "github.com/defenseunicorns/zarf/src/internal/packager/helm" + "github.com/defenseunicorns/zarf/src/internal/packager/images" "github.com/defenseunicorns/zarf/src/internal/packager/kustomize" "github.com/defenseunicorns/zarf/src/pkg/layout" "github.com/defenseunicorns/zarf/src/pkg/message" @@ -294,7 +294,7 @@ func (p *Packager) findImages() (imgMap map[string][]string, err error) { if sortedImages := sortImages(maybeImages, matchedImages); len(sortedImages) > 0 { var validImages []string for _, image := range sortedImages { - if descriptor, err := crane.Head(image, config.GetCraneOptions(config.CommonOptions.Insecure)...); err != nil { + if descriptor, err := crane.Head(image, images.WithGlobalInsecureFlag()...); err != nil { // Test if this is a real image, if not just quiet log to debug, this is normal message.Debugf("Suspected image does not appear to be valid: %#v", err) } else { diff --git a/src/pkg/utils/image.go b/src/pkg/utils/image.go index 5a05d987df..0074ea9036 100644 --- a/src/pkg/utils/image.go +++ b/src/pkg/utils/image.go @@ -48,13 +48,12 @@ func LoadOCIImage(imgPath string, refInfo transform.Image) (v1.Image, error) { func AddImageNameAnnotation(ociPath string, referenceToDigest map[string]string) error { indexPath := filepath.Join(ociPath, "index.json") - // Read the file contents and turn it into a usable struct that we can manipulate var index ocispec.Index - byteValue, err := os.ReadFile(indexPath) + b, err := os.ReadFile(indexPath) if err != nil { return fmt.Errorf("unable to read the contents of the file (%s) so we can add an annotation: %w", indexPath, err) } - if err = json.Unmarshal(byteValue, &index); err != nil { + if err = json.Unmarshal(b, &index); err != nil { return fmt.Errorf("unable to process the contents of the file (%s): %w", indexPath, err) } @@ -80,14 +79,14 @@ func AddImageNameAnnotation(ociPath string, referenceToDigest map[string]string) } // Write the file back to the package - indexJSONBytes, err := json.Marshal(index) + b, err = json.Marshal(index) if err != nil { return err } - return os.WriteFile(indexPath, indexJSONBytes, helpers.ReadWriteUser) + return os.WriteFile(indexPath, b, helpers.ReadWriteUser) } -// HasImageLayers checks if any layers in the v1.Image are known image layers. +// HasImageLayers checks if all layers in the v1.Image are known image layers. func HasImageLayers(img v1.Image) (bool, error) { layers, err := img.Layers() if err != nil { @@ -98,10 +97,9 @@ func HasImageLayers(img v1.Image) (bool, error) { if err != nil { return false, err } - // Check if mediatype is a known image layer - if mediatype.IsLayer() { - return true, nil + if !mediatype.IsLayer() { + return false, nil } } - return false, nil + return true, nil }