From 3c3c4fd139e67066a0c08173a963850c6c2e3ee6 Mon Sep 17 00:00:00 2001 From: Philip Laine Date: Mon, 10 Jun 2024 15:32:38 +0200 Subject: [PATCH] refactor: move k8s tunnel to cluster package (#2566) ## Description Moves the tunnel code to cluster pacakge. This is far from a perfect solution but I do not want to make a enormous change. The tunnel code needs some refactoring in the future but before that can be done we need to understand how dependencies on Cluster looks. ## Related Issue Relates to #2507 ## Checklist before merging - [x] Test, docs, adr added or updated as needed - [x] [Contributor Guide Steps](https://github.com/defenseunicorns/zarf/blob/main/.github/CONTRIBUTING.md#developer-workflow) followed --- go.mod | 18 +- go.sum | 37 ++-- src/cmd/connect.go | 5 +- src/cmd/tools/zarf.go | 2 +- src/internal/packager/git/gitea.go | 7 +- src/internal/packager/helm/zarf.go | 30 ++- src/internal/packager/images/push.go | 3 +- src/pkg/cluster/injector.go | 2 +- src/pkg/cluster/tunnel.go | 279 ++++++++++++++++++++++++++- src/pkg/k8s/common.go | 7 + src/pkg/k8s/tunnel.go | 275 -------------------------- src/pkg/k8s/types.go | 2 + src/pkg/packager/deploy.go | 3 +- 13 files changed, 345 insertions(+), 325 deletions(-) delete mode 100644 src/pkg/k8s/tunnel.go diff --git a/go.mod b/go.mod index 5af4b55f94..cc3784ac57 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( github.com/anchore/stereoscope v0.0.1 github.com/anchore/syft v0.100.0 github.com/defenseunicorns/pkg/helpers/v2 v2.0.1 + github.com/defenseunicorns/pkg/kubernetes v0.0.1 github.com/defenseunicorns/pkg/oci v1.0.1 github.com/derailed/k9s v0.31.7 github.com/distribution/reference v0.5.0 @@ -45,9 +46,9 @@ require ( github.com/spf13/viper v1.18.2 github.com/stretchr/testify v1.9.0 github.com/xeipuuv/gojsonschema v1.2.0 - golang.org/x/crypto v0.23.0 + golang.org/x/crypto v0.24.0 golang.org/x/sync v0.7.0 - golang.org/x/term v0.20.0 + golang.org/x/term v0.21.0 helm.sh/helm/v3 v3.14.2 k8s.io/api v0.29.1 k8s.io/apimachinery v0.29.1 @@ -56,11 +57,14 @@ require ( k8s.io/klog/v2 v2.120.1 k8s.io/kubectl v0.29.1 oras.land/oras-go/v2 v2.5.0 + sigs.k8s.io/cli-utils v0.36.0 sigs.k8s.io/kustomize/api v0.16.0 sigs.k8s.io/kustomize/kyaml v0.16.0 sigs.k8s.io/yaml v1.4.0 ) +require github.com/evanphx/json-patch/v5 v5.6.0 // indirect + require ( atomicgo.dev/cursor v0.2.0 // indirect atomicgo.dev/keyboard v0.2.9 // indirect @@ -474,13 +478,13 @@ require ( go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.26.0 // indirect golang.org/x/exp v0.0.0-20231108232855-2478ac86f678 // indirect - golang.org/x/mod v0.14.0 // indirect - golang.org/x/net v0.25.0 // indirect + golang.org/x/mod v0.17.0 // indirect + golang.org/x/net v0.26.0 // indirect golang.org/x/oauth2 v0.20.0 // indirect - golang.org/x/sys v0.20.0 // indirect - golang.org/x/text v0.15.0 // indirect + golang.org/x/sys v0.21.0 // indirect + golang.org/x/text v0.16.0 // indirect golang.org/x/time v0.5.0 // indirect - golang.org/x/tools v0.16.1 // indirect + golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect google.golang.org/api v0.180.0 // indirect google.golang.org/genproto v0.0.0-20240513163218-0867130af1f8 // indirect diff --git a/go.sum b/go.sum index 8b31775074..3fbc8acdb4 100644 --- a/go.sum +++ b/go.sum @@ -599,6 +599,8 @@ github.com/defenseunicorns/gojsonschema v0.0.0-20231116163348-e00f069122d6 h1:gw github.com/defenseunicorns/gojsonschema v0.0.0-20231116163348-e00f069122d6/go.mod h1:StKLYMmPj1R5yIs6CK49EkcW1TvUYuw5Vri+LRk7Dy8= github.com/defenseunicorns/pkg/helpers/v2 v2.0.1 h1:j08rz9vhyD9Bs+yKiyQMY2tSSejXRMxTqEObZ5M1Wbk= github.com/defenseunicorns/pkg/helpers/v2 v2.0.1/go.mod h1:u1PAqOICZyiGIVA2v28g55bQH1GiAt0Bc4U9/rnWQvQ= +github.com/defenseunicorns/pkg/kubernetes v0.0.1 h1:HNQBV6XXFvlDvFdOCCWam0/LCgq67M+ggQKiRIoM2vU= +github.com/defenseunicorns/pkg/kubernetes v0.0.1/go.mod h1:AWB1iBbDO4VTmRO/E/8e0tVN0kkWbg+v8dhs9Hd9KXA= github.com/defenseunicorns/pkg/oci v1.0.1 h1:WPrWRrae1L19X1vuhy6yYMR2zrTzgBbJHp3ImgUm4ZM= github.com/defenseunicorns/pkg/oci v1.0.1/go.mod h1:qZ3up/d0P81taW37fKR4lb19jJhQZJVtNOEJMu00dHQ= github.com/deitch/magic v0.0.0-20230404182410-1ff89d7342da h1:ZOjWpVsFZ06eIhnh4mkaceTiVoktdU67+M7KDHJ268M= @@ -684,6 +686,8 @@ github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7 github.com/envoyproxy/protoc-gen-validate v0.6.2/go.mod h1:2t7qjJNvHPx8IjnBOzl9E9/baC+qXE/TeeyBRzgJDws= github.com/evanphx/json-patch v5.7.0+incompatible h1:vgGkfT/9f8zE6tvSCe74nfpAVDQ2tG6yudJd8LBksgI= github.com/evanphx/json-patch v5.7.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= +github.com/evanphx/json-patch/v5 v5.6.0 h1:b91NhWfaz02IuVxO9faSllyAtNXHMPkC5J8sJCLunww= +github.com/evanphx/json-patch/v5 v5.6.0/go.mod h1:G79N1coSVB93tBe7j6PhzjmR3/2VvlbKOFpnXhI9Bw4= github.com/exponent-io/jsonpath v0.0.0-20151013193312-d6023ce2651d h1:105gxyaGwCFad8crR9dcMQWvV9Hvulu6hwUh4tWPJnM= github.com/exponent-io/jsonpath v0.0.0-20151013193312-d6023ce2651d/go.mod h1:ZZMPRZwes7CROmyNKgQzC3XPs6L/G2EJLHddWejkmf4= github.com/facebookincubator/flog v0.0.0-20190930132826-d2511d0ce33c/go.mod h1:QGzNH9ujQ2ZUr/CjDGZGWeDAVStrWNjHeEcjJL96Nuk= @@ -1109,6 +1113,7 @@ github.com/jedisct1/go-minisign v0.0.0-20230811132847-661be99b8267 h1:TMtDYDHKYY github.com/jedisct1/go-minisign v0.0.0-20230811132847-661be99b8267/go.mod h1:h1nSAbGFqGVzn6Jyl1R/iCcBUHN4g+gW1u9CoBTrb9E= github.com/jellydator/ttlcache/v3 v3.1.1 h1:RCgYJqo3jgvhl+fEWvjNW8thxGWsgxi+TPhRir1Y9y8= github.com/jellydator/ttlcache/v3 v3.1.1/go.mod h1:hi7MGFdMAwZna5n2tuvh63DvFLzVKySzCVW6+0gA2n4= +github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jinzhu/copier v0.4.0 h1:w3ciUoD19shMCRargcpm0cm91ytaBhDvuRpz1ODO/U8= github.com/jinzhu/copier v0.4.0/go.mod h1:DfbEm0FYsaqBcKcFuvmOZb218JkPGtvSHsKg8S8hyyg= github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= @@ -1807,8 +1812,8 @@ golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58 golang.org/x/crypto v0.7.0/go.mod h1:pYwdfH91IfpZVANVyUOhSIPZaFoJGxTFbZhFTx+dXZU= golang.org/x/crypto v0.10.0/go.mod h1:o4eNf7Ede1fv+hwOwZsTHl9EsPFO6q6ZvYR8vYfY45I= golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= -golang.org/x/crypto v0.23.0 h1:dIJU/v2J8Mdglj/8rJ6UUOM3Zc9zLZxVZwwxMooUSAI= -golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8= +golang.org/x/crypto v0.24.0 h1:mnl8DM0o513X8fdIkmyFE/5hTYxbwYOjDS/+rK6qpRI= +golang.org/x/crypto v0.24.0/go.mod h1:Z1PMYSOR5nyMcyAVAIQSKCDwalqy85Aqn1x3Ws4L5DM= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -1849,8 +1854,8 @@ golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.5.0/go.mod h1:5OXOZSfqPIIbmVBIIKWRFfZjPR0E5r58TLhUjH0a2Ro= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= -golang.org/x/mod v0.14.0 h1:dGoOF9QVLYng8IHTm7BAyWqCqSheQ5pYWGhzW00YJr0= -golang.org/x/mod v0.14.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= +golang.org/x/mod v0.17.0 h1:zY54UmvipHiNd+pm+m0x9KhZ9hl1/7QNMyxXbc6ICqA= +golang.org/x/mod v0.17.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -1916,8 +1921,8 @@ golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc= golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= golang.org/x/net v0.11.0/go.mod h1:2L/ixqYpgIVXmeoSA/4Lu7BzTG4KIyPIryS4IsOd1oQ= -golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac= -golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM= +golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ= +golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -2065,8 +2070,8 @@ golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.9.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= -golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws= +golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= @@ -2080,8 +2085,8 @@ golang.org/x/term v0.6.0/go.mod h1:m6U89DPEgQRMq3DNkDClhWw02AUbt2daBVO4cn4Hv9U= golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= golang.org/x/term v0.9.0/go.mod h1:M6DEAAIenWoTxdKrOltXcmDY3rSplQUkrvaDU5FcQyo= golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk= -golang.org/x/term v0.20.0 h1:VnkxpohqXaOBYJtBmEppKUG6mXpi+4O6purfc2+sMhw= -golang.org/x/term v0.20.0/go.mod h1:8UkIAJTvZgivsXaD6/pH6U9ecQzZ45awqEOzuCvwpFY= +golang.org/x/term v0.21.0 h1:WVXCp+/EBEHOj53Rvu+7KiT/iElMrO8ACK16SMZ3jaA= +golang.org/x/term v0.21.0/go.mod h1:ooXLefLobQVslOqselCNF4SxFAaoS6KujMbsGzSDmX0= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -2099,8 +2104,8 @@ golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.10.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/text v0.11.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= -golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk= -golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= +golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= @@ -2166,8 +2171,8 @@ golang.org/x/tools v0.1.4/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= -golang.org/x/tools v0.16.1 h1:TLyB3WofjdOEepBHAU20JdNC1Zbg87elYofWYAY5oZA= -golang.org/x/tools v0.16.1/go.mod h1:kYVVN6I1mBNoB1OX+noeBjbRk4IUEPa7JJ+TJMEooJ0= +golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d h1:vU5i/LfpvrRCpgM/VPfJLg5KjxD3E+hfT1SH+d9zLwg= +golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -2178,6 +2183,8 @@ golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f/go.mod h1:K8+ghG5WaK9qNq golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8= golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 h1:+cNy6SZtPcJQH3LJVLOSmiC7MMxXNOb3PU/VUEz+EhU= golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028/go.mod h1:NDW/Ps6MPRej6fsCIbMTohpP40sJ/P/vI1MoTEGwX90= +gomodules.xyz/jsonpatch/v2 v2.4.0 h1:Ci3iUJyx9UeRx7CeFN8ARgGbkESwJK+KB9lLcWxY/Zw= +gomodules.xyz/jsonpatch/v2 v2.4.0/go.mod h1:AH3dM2RI6uoBZxn3LVrfvJ3E0/9dG4cSrbuBJT4moAY= google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE= google.golang.org/api v0.7.0/go.mod h1:WtwebWUNSVBH/HAw79HIFXZNqEvBhG+Ra+ax0hx3E3M= google.golang.org/api v0.8.0/go.mod h1:o4eAsZoiT+ibD93RtjEohWalFOjRDx6CVaqeizhEnKg= @@ -2497,6 +2504,8 @@ oras.land/oras-go/v2 v2.5.0/go.mod h1:z4eisnLP530vwIOUOJeBIj0aGI0L1C3d53atvCBqZH rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA= +sigs.k8s.io/cli-utils v0.36.0 h1:k7GM6LmIMydtvM6Ad91XuqKk0QEVL9bVbaiX1uvWIrA= +sigs.k8s.io/cli-utils v0.36.0/go.mod h1:uCFC3BPXB3xHFQyKkWUlTrncVDCKzbdDfqZqRTCrk24= sigs.k8s.io/controller-runtime v0.16.3 h1:2TuvuokmfXvDUamSx1SuAOO3eTyye+47mJCigwG62c4= sigs.k8s.io/controller-runtime v0.16.3/go.mod h1:j7bialYoSn142nv9sCOJmQgDXQXxnroFU4VnX/brVJ0= sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd h1:EDPBXCAspyGV4jQlpZSudPeMmr1bNJefnuqLsRAsHZo= diff --git a/src/cmd/connect.go b/src/cmd/connect.go index ba6eed3d2d..054587337c 100644 --- a/src/cmd/connect.go +++ b/src/cmd/connect.go @@ -11,7 +11,6 @@ import ( "github.com/defenseunicorns/zarf/src/cmd/common" "github.com/defenseunicorns/zarf/src/config/lang" "github.com/defenseunicorns/zarf/src/pkg/cluster" - "github.com/defenseunicorns/zarf/src/pkg/k8s" "github.com/defenseunicorns/zarf/src/pkg/message" "github.com/defenseunicorns/zarf/src/pkg/utils/exec" "github.com/spf13/cobra" @@ -43,7 +42,7 @@ var ( ctx := cmd.Context() - var tunnel *k8s.Tunnel + var tunnel *cluster.Tunnel if connectResourceName != "" { zt := cluster.NewTunnelInfo(connectNamespace, connectResourceType, connectResourceName, "", connectLocalPort, connectRemotePort) tunnel, err = c.ConnectTunnelInfo(ctx, zt) @@ -100,7 +99,7 @@ func init() { connectCmd.Flags().StringVar(&connectResourceName, "name", "", lang.CmdConnectFlagName) connectCmd.Flags().StringVar(&connectNamespace, "namespace", cluster.ZarfNamespaceName, lang.CmdConnectFlagNamespace) - connectCmd.Flags().StringVar(&connectResourceType, "type", k8s.SvcResource, lang.CmdConnectFlagType) + connectCmd.Flags().StringVar(&connectResourceType, "type", cluster.SvcResource, lang.CmdConnectFlagType) connectCmd.Flags().IntVar(&connectLocalPort, "local-port", 0, lang.CmdConnectFlagLocalPort) connectCmd.Flags().IntVar(&connectRemotePort, "remote-port", 0, lang.CmdConnectFlagRemotePort) connectCmd.Flags().BoolVar(&cliOnly, "cli-only", false, lang.CmdConnectFlagCliOnly) diff --git a/src/cmd/tools/zarf.go b/src/cmd/tools/zarf.go index 843fcb9ae3..84395d0e82 100644 --- a/src/cmd/tools/zarf.go +++ b/src/cmd/tools/zarf.go @@ -145,7 +145,7 @@ var updateCredsCmd = &cobra.Command{ h := helm.NewClusterOnly(&types.PackagerConfig{}, template.GetZarfVariableConfig(), newState, c) if slices.Contains(args, message.RegistryKey) && newState.RegistryInfo.InternalRegistry { - err = h.UpdateZarfRegistryValues() + err = h.UpdateZarfRegistryValues(ctx) if err != nil { // Warn if we couldn't actually update the registry (it might not be installed and we should try to continue) message.Warnf(lang.CmdToolsUpdateCredsUnableUpdateRegistry, err.Error()) diff --git a/src/internal/packager/git/gitea.go b/src/internal/packager/git/gitea.go index c6644c6b1a..7bf9afe53f 100644 --- a/src/internal/packager/git/gitea.go +++ b/src/internal/packager/git/gitea.go @@ -18,7 +18,6 @@ import ( "github.com/defenseunicorns/zarf/src/config" "github.com/defenseunicorns/zarf/src/pkg/cluster" - "github.com/defenseunicorns/zarf/src/pkg/k8s" "github.com/defenseunicorns/zarf/src/pkg/message" "github.com/defenseunicorns/zarf/src/types" ) @@ -41,7 +40,7 @@ func (g *Git) CreateReadOnlyUser(ctx context.Context) error { } // Establish a git tunnel to send the repo - tunnel, err := c.NewTunnel(cluster.ZarfNamespaceName, k8s.SvcResource, cluster.ZarfGitServerName, "", 0, cluster.ZarfGitServerPort) + tunnel, err := c.NewTunnel(cluster.ZarfNamespaceName, cluster.SvcResource, cluster.ZarfGitServerName, "", 0, cluster.ZarfGitServerPort) if err != nil { return err } @@ -128,7 +127,7 @@ func (g *Git) UpdateGitUser(ctx context.Context, oldAdminPass string, username s return err } // Establish a git tunnel to send the repo - tunnel, err := c.NewTunnel(cluster.ZarfNamespaceName, k8s.SvcResource, cluster.ZarfGitServerName, "", 0, cluster.ZarfGitServerPort) + tunnel, err := c.NewTunnel(cluster.ZarfNamespaceName, cluster.SvcResource, cluster.ZarfGitServerName, "", 0, cluster.ZarfGitServerPort) if err != nil { return err } @@ -167,7 +166,7 @@ func (g *Git) CreatePackageRegistryToken(ctx context.Context) (CreateTokenRespon } // Establish a git tunnel to send the repo - tunnel, err := c.NewTunnel(cluster.ZarfNamespaceName, k8s.SvcResource, cluster.ZarfGitServerName, "", 0, cluster.ZarfGitServerPort) + tunnel, err := c.NewTunnel(cluster.ZarfNamespaceName, cluster.SvcResource, cluster.ZarfGitServerName, "", 0, cluster.ZarfGitServerPort) if err != nil { return CreateTokenResponse{}, err } diff --git a/src/internal/packager/helm/zarf.go b/src/internal/packager/helm/zarf.go index 94996b4173..b3a13f8d6e 100644 --- a/src/internal/packager/helm/zarf.go +++ b/src/internal/packager/helm/zarf.go @@ -7,6 +7,13 @@ package helm import ( "context" "fmt" + "time" + + "helm.sh/helm/v3/pkg/action" + "k8s.io/apimachinery/pkg/runtime/schema" + "sigs.k8s.io/cli-utils/pkg/object" + + pkgkubernetes "github.com/defenseunicorns/pkg/kubernetes" "github.com/defenseunicorns/zarf/src/internal/packager/template" "github.com/defenseunicorns/zarf/src/pkg/cluster" @@ -16,37 +23,48 @@ import ( "github.com/defenseunicorns/zarf/src/pkg/utils" "github.com/defenseunicorns/zarf/src/pkg/variables" "github.com/defenseunicorns/zarf/src/types" - "helm.sh/helm/v3/pkg/action" ) // UpdateZarfRegistryValues updates the Zarf registry deployment with the new state values -func (h *Helm) UpdateZarfRegistryValues() error { +func (h *Helm) UpdateZarfRegistryValues(ctx context.Context) error { pushUser, err := utils.GetHtpasswdString(h.state.RegistryInfo.PushUsername, h.state.RegistryInfo.PushPassword) if err != nil { return fmt.Errorf("error generating htpasswd string: %w", err) } - pullUser, err := utils.GetHtpasswdString(h.state.RegistryInfo.PullUsername, h.state.RegistryInfo.PullPassword) if err != nil { return fmt.Errorf("error generating htpasswd string: %w", err) } - registryValues := map[string]interface{}{ "secrets": map[string]interface{}{ "htpasswd": fmt.Sprintf("%s\n%s", pushUser, pullUser), }, } - h.chart = types.ZarfChart{ Namespace: "zarf", ReleaseName: "zarf-docker-registry", } - err = h.UpdateReleaseValues(registryValues) if err != nil { return fmt.Errorf("error updating the release values: %w", err) } + objs := []object.ObjMetadata{ + { + GroupKind: schema.GroupKind{ + Group: "apps", + Kind: "Deployment", + }, + Namespace: "zarf", + Name: "zarf-docker-registry", + }, + } + waitCtx, waitCancel := context.WithTimeout(ctx, 60*time.Second) + defer waitCancel() + err = pkgkubernetes.WaitForReady(waitCtx, h.cluster.Watcher, objs) + if err != nil { + return err + } return nil } diff --git a/src/internal/packager/images/push.go b/src/internal/packager/images/push.go index 84a00a2455..3048218662 100644 --- a/src/internal/packager/images/push.go +++ b/src/internal/packager/images/push.go @@ -11,7 +11,6 @@ import ( "github.com/defenseunicorns/pkg/helpers/v2" "github.com/defenseunicorns/zarf/src/pkg/cluster" - "github.com/defenseunicorns/zarf/src/pkg/k8s" "github.com/defenseunicorns/zarf/src/pkg/message" "github.com/defenseunicorns/zarf/src/pkg/transform" "github.com/defenseunicorns/zarf/src/pkg/utils" @@ -48,7 +47,7 @@ func Push(ctx context.Context, cfg PushConfig) error { var ( err error - tunnel *k8s.Tunnel + tunnel *cluster.Tunnel registryURL = cfg.RegInfo.Address ) diff --git a/src/pkg/cluster/injector.go b/src/pkg/cluster/injector.go index 8342315a3e..2af0694113 100644 --- a/src/pkg/cluster/injector.go +++ b/src/pkg/cluster/injector.go @@ -276,7 +276,7 @@ func (c *Cluster) createPayloadConfigMaps(ctx context.Context, seedImagesDir, ta // Test for pod readiness and seed image presence. func (c *Cluster) injectorIsReady(ctx context.Context, seedImages []transform.Image, spinner *message.Spinner) bool { - tunnel, err := c.NewTunnel(ZarfNamespaceName, k8s.SvcResource, ZarfInjectorName, "", 0, ZarfInjectorPort) + tunnel, err := c.NewTunnel(ZarfNamespaceName, SvcResource, ZarfInjectorName, "", 0, ZarfInjectorPort) if err != nil { return false } diff --git a/src/pkg/cluster/tunnel.go b/src/pkg/cluster/tunnel.go index 113779671b..7a2f635016 100644 --- a/src/pkg/cluster/tunnel.go +++ b/src/pkg/cluster/tunnel.go @@ -7,16 +7,23 @@ package cluster import ( "context" "fmt" + "io" + "net/http" "net/url" "strconv" "strings" + "sync" + "time" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/portforward" + "k8s.io/client-go/transport/spdy" "github.com/defenseunicorns/pkg/helpers/v2" "github.com/defenseunicorns/zarf/src/config" - "github.com/defenseunicorns/zarf/src/pkg/k8s" "github.com/defenseunicorns/zarf/src/pkg/message" "github.com/defenseunicorns/zarf/src/types" ) @@ -36,7 +43,7 @@ const ( ZarfGitServerPort = 3000 ) -// TunnelInfo is a struct that contains the necessary info to create a new k8s.Tunnel +// TunnelInfo is a struct that contains the necessary info to create a new Tunnel type TunnelInfo struct { localPort int remotePort int @@ -88,11 +95,11 @@ func (c *Cluster) PrintConnectTable(ctx context.Context) error { } // Connect will establish a tunnel to the specified target. -func (c *Cluster) Connect(ctx context.Context, target string) (*k8s.Tunnel, error) { +func (c *Cluster) Connect(ctx context.Context, target string) (*Tunnel, error) { var err error zt := TunnelInfo{ namespace: ZarfNamespaceName, - resourceType: k8s.SvcResource, + resourceType: SvcResource, } switch strings.ToUpper(target) { @@ -134,7 +141,7 @@ func (c *Cluster) Connect(ctx context.Context, target string) (*k8s.Tunnel, erro } // ConnectTunnelInfo connects to the cluster with the provided TunnelInfo -func (c *Cluster) ConnectTunnelInfo(ctx context.Context, zt TunnelInfo) (*k8s.Tunnel, error) { +func (c *Cluster) ConnectTunnelInfo(ctx context.Context, zt TunnelInfo) (*Tunnel, error) { tunnel, err := c.NewTunnel(zt.namespace, zt.resourceType, zt.resourceName, zt.urlSuffix, zt.localPort, zt.remotePort) if err != nil { return nil, err @@ -149,14 +156,14 @@ func (c *Cluster) ConnectTunnelInfo(ctx context.Context, zt TunnelInfo) (*k8s.Tu } // ConnectToZarfRegistryEndpoint determines if a registry endpoint is in cluster, and if so opens a tunnel to connect to it -func (c *Cluster) ConnectToZarfRegistryEndpoint(ctx context.Context, registryInfo types.RegistryInfo) (string, *k8s.Tunnel, error) { +func (c *Cluster) ConnectToZarfRegistryEndpoint(ctx context.Context, registryInfo types.RegistryInfo) (string, *Tunnel, error) { registryEndpoint := registryInfo.Address var err error - var tunnel *k8s.Tunnel + var tunnel *Tunnel if registryInfo.InternalRegistry { // Establish a registry tunnel to send the images to the zarf registry - if tunnel, err = c.NewTunnel(ZarfNamespaceName, k8s.SvcResource, ZarfRegistryName, "", 0, ZarfRegistryPort); err != nil { + if tunnel, err = c.NewTunnel(ZarfNamespaceName, SvcResource, ZarfRegistryName, "", 0, ZarfRegistryPort); err != nil { return "", tunnel, err } } else { @@ -168,7 +175,7 @@ func (c *Cluster) ConnectToZarfRegistryEndpoint(ctx context.Context, registryInf // If this is a service (no error getting svcInfo), create a port-forward tunnel to that resource if err == nil { - if tunnel, err = c.NewTunnel(namespace, k8s.SvcResource, name, "", 0, port); err != nil { + if tunnel, err = c.NewTunnel(namespace, SvcResource, name, "", 0, port); err != nil { return "", tunnel, err } } @@ -211,7 +218,7 @@ func (c *Cluster) checkForZarfConnectLabel(ctx context.Context, name string) (Tu svc := serviceList.Items[0] // Reset based on the matched params. - zt.resourceType = k8s.SvcResource + zt.resourceType = SvcResource zt.resourceName = svc.Name zt.namespace = svc.Namespace // Only support a service with a single port. @@ -273,3 +280,255 @@ func serviceInfoFromNodePortURL(services []corev1.Service, nodePortURL string) ( return "", "", 0, fmt.Errorf("no matching node port services found") } + +// Global lock to synchronize port selections. +var globalMutex sync.Mutex + +// Zarf Tunnel Configuration Constants. +const ( + PodResource = "pod" + SvcResource = "svc" +) + +// Tunnel is the main struct that configures and manages port forwarding tunnels to Kubernetes resources. +type Tunnel struct { + clientset kubernetes.Interface + restConfig *rest.Config + out io.Writer + localPort int + remotePort int + namespace string + resourceType string + resourceName string + urlSuffix string + attempt int + stopChan chan struct{} + readyChan chan struct{} + errChan chan error +} + +// NewTunnel will create a new Tunnel struct. +// Note that if you use 0 for the local port, an open port on the host system +// will be selected automatically, and the Tunnel struct will be updated with the selected port. +func (c *Cluster) NewTunnel(namespace, resourceType, resourceName, urlSuffix string, local, remote int) (*Tunnel, error) { + return &Tunnel{ + clientset: c.Clientset, + restConfig: c.RestConfig, + out: io.Discard, + localPort: local, + remotePort: remote, + namespace: namespace, + resourceType: resourceType, + resourceName: resourceName, + urlSuffix: urlSuffix, + stopChan: make(chan struct{}, 1), + readyChan: make(chan struct{}, 1), + }, nil +} + +// Wrap takes a function that returns an error and wraps it to check for tunnel errors as well. +func (tunnel *Tunnel) Wrap(function func() error) error { + var err error + funcErrChan := make(chan error) + + go func() { + funcErrChan <- function() + }() + + select { + case err = <-funcErrChan: + return err + case err = <-tunnel.ErrChan(): + return err + } +} + +// Connect will establish a tunnel to the specified target. +func (tunnel *Tunnel) Connect(ctx context.Context) (string, error) { + url, err := tunnel.establish(ctx) + + // Try to establish the tunnel up to 3 times. + if err != nil { + tunnel.attempt++ + + // If we have exceeded the number of attempts, exit with an error. + if tunnel.attempt > 3 { + return "", fmt.Errorf("unable to establish tunnel after 3 attempts: %w", err) + } + + // Otherwise, retry the connection but delay increasing intervals between attempts. + delay := tunnel.attempt * 10 + message.Debugf("%s", err.Error()) + message.Debugf("Delay creating tunnel, waiting %d seconds...", delay) + + timer := time.NewTimer(0) + defer timer.Stop() + + select { + case <-ctx.Done(): + return "", ctx.Err() + case <-timer.C: + url, err = tunnel.Connect(ctx) + if err != nil { + return "", err + } + + timer.Reset(time.Duration(delay) * time.Second) + } + } + + return url, nil +} + +// Endpoint returns the tunnel ip address and port (i.e. for docker registries) +func (tunnel *Tunnel) Endpoint() string { + return fmt.Sprintf("%s:%d", helpers.IPV4Localhost, tunnel.localPort) +} + +// ErrChan returns the tunnel's error channel +func (tunnel *Tunnel) ErrChan() chan error { + return tunnel.errChan +} + +// HTTPEndpoint returns the tunnel endpoint as a HTTP URL string. +func (tunnel *Tunnel) HTTPEndpoint() string { + return fmt.Sprintf("http://%s", tunnel.Endpoint()) +} + +// FullURL returns the tunnel endpoint as a HTTP URL string with the urlSuffix appended. +func (tunnel *Tunnel) FullURL() string { + return fmt.Sprintf("%s%s", tunnel.HTTPEndpoint(), tunnel.urlSuffix) +} + +// Close disconnects a tunnel connection by closing the StopChan, thereby stopping the goroutine. +func (tunnel *Tunnel) Close() { + close(tunnel.stopChan) +} + +// establish opens a tunnel to a kubernetes resource, as specified by the provided tunnel struct. +func (tunnel *Tunnel) establish(ctx context.Context) (string, error) { + var err error + + // Track this locally as we may need to retry if the tunnel fails. + localPort := tunnel.localPort + + // If the local-port is 0, get an available port before continuing. We do this here instead of relying on the + // underlying port-forwarder library, because the port-forwarder library does not expose the selected local port in a + // machine-readable manner. + // Synchronize on the global lock to avoid race conditions with concurrently selecting the same available port, + // since there is a brief moment between `GetAvailablePort` and `forwarder.ForwardPorts` where the selected port + // is available for selection again. + if localPort == 0 { + message.Debugf("Requested local port is 0. Selecting an open port on host system") + localPort, err = helpers.GetAvailablePort() + if err != nil { + return "", fmt.Errorf("unable to find an available port: %w", err) + } + message.Debugf("Selected port %d", localPort) + globalMutex.Lock() + defer globalMutex.Unlock() + } + + msg := fmt.Sprintf("Opening tunnel %d -> %d for %s/%s in namespace %s", + localPort, + tunnel.remotePort, + tunnel.resourceType, + tunnel.resourceName, + tunnel.namespace, + ) + message.Debugf(msg) + + // Find the pod to port forward to + podName, err := tunnel.getAttachablePodForResource(ctx) + if err != nil { + return "", fmt.Errorf("unable to find pod attached to given resource: %w", err) + } + message.Debugf("Selected pod %s to open port forward to", podName) + + // Build url to the port forward endpoint. + // Example: http://localhost:8080/api/v1/namespaces/helm/pods/tiller-deploy-9itlq/portforward. + postEndpoint := tunnel.clientset.CoreV1().RESTClient().Post() + namespace := tunnel.namespace + portForwardCreateURL := postEndpoint. + Resource("pods"). + Namespace(namespace). + Name(podName). + SubResource("portforward"). + URL() + + message.Debugf("Using URL %s to create portforward", portForwardCreateURL) + + // Construct the spdy client required by the client-go portforward library. + transport, upgrader, err := spdy.RoundTripperFor(tunnel.restConfig) + if err != nil { + return "", fmt.Errorf("unable to create the spdy client %w", err) + } + dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, "POST", portForwardCreateURL) + + // Construct a new PortForwarder struct that manages the instructed port forward tunnel. + ports := []string{fmt.Sprintf("%d:%d", localPort, tunnel.remotePort)} + portforwarder, err := portforward.New(dialer, ports, tunnel.stopChan, tunnel.readyChan, tunnel.out, tunnel.out) + if err != nil { + return "", fmt.Errorf("unable to create the port forward: %w", err) + } + + // Open the tunnel in a goroutine so that it is available in the background. Report errors to the main goroutine via + // a new channel. + errChan := make(chan error) + go func() { + errChan <- portforwarder.ForwardPorts() + }() + + // Wait for an error or the tunnel to be ready. + select { + case err = <-errChan: + return "", fmt.Errorf("unable to start the tunnel: %w", err) + case <-portforwarder.Ready: + // Store for endpoint output + tunnel.localPort = localPort + url := tunnel.FullURL() + + // Store the error channel to listen for errors + tunnel.errChan = errChan + + message.Debugf("Creating port forwarding tunnel at %s", url) + return url, nil + } +} + +// getAttachablePodForResource will find a pod that can be port forwarded to the provided resource type and return +// the name. +func (tunnel *Tunnel) getAttachablePodForResource(ctx context.Context) (string, error) { + switch tunnel.resourceType { + case PodResource: + return tunnel.resourceName, nil + case SvcResource: + return tunnel.getAttachablePodForService(ctx) + default: + return "", fmt.Errorf("unknown resource type: %s", tunnel.resourceType) + } +} + +// getAttachablePodForService will find an active pod associated with the Service and return the pod name. +func (tunnel *Tunnel) getAttachablePodForService(ctx context.Context) (string, error) { + service, err := tunnel.clientset.CoreV1().Services(tunnel.namespace).Get(ctx, tunnel.resourceName, metav1.GetOptions{}) + if err != nil { + return "", fmt.Errorf("unable to find the service: %w", err) + } + selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{MatchLabels: service.Spec.Selector}) + if err != nil { + return "", err + } + listOpt := metav1.ListOptions{ + LabelSelector: selector.String(), + FieldSelector: fmt.Sprintf("status.phase=%s", corev1.PodRunning), + } + podList, err := tunnel.clientset.CoreV1().Pods(tunnel.namespace).List(ctx, listOpt) + if err != nil { + return "", err + } + if len(podList.Items) < 1 { + return "", fmt.Errorf("no pods found for service %s", tunnel.resourceName) + } + return podList.Items[0].Name, nil +} diff --git a/src/pkg/k8s/common.go b/src/pkg/k8s/common.go index 9041775825..42d2da270a 100644 --- a/src/pkg/k8s/common.go +++ b/src/pkg/k8s/common.go @@ -19,6 +19,8 @@ import ( _ "k8s.io/client-go/plugin/pkg/client/auth" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" + + pkgkubernetes "github.com/defenseunicorns/pkg/kubernetes" ) const ( @@ -38,10 +40,15 @@ func New(logger Log) (*K8s, error) { if err != nil { return nil, fmt.Errorf("failed to connect to k8s cluster: %w", err) } + watcher, err := pkgkubernetes.WatcherForConfig(config) + if err != nil { + return nil, err + } return &K8s{ RestConfig: config, Clientset: clientset, + Watcher: watcher, Log: logger, }, nil } diff --git a/src/pkg/k8s/tunnel.go b/src/pkg/k8s/tunnel.go deleted file mode 100644 index a2e5f25db9..0000000000 --- a/src/pkg/k8s/tunnel.go +++ /dev/null @@ -1,275 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -// SPDX-FileCopyrightText: 2021-Present The Zarf Authors - -// Package k8s provides a client for interacting with a Kubernetes cluster. -package k8s - -// Forked from https://github.com/gruntwork-io/terratest/blob/v0.38.8/modules/k8s/tunnel.go - -import ( - "context" - "fmt" - "io" - "net/http" - "sync" - "time" - - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/tools/portforward" - "k8s.io/client-go/transport/spdy" - - "github.com/defenseunicorns/pkg/helpers/v2" -) - -// Global lock to synchronize port selections. -var globalMutex sync.Mutex - -// Zarf Tunnel Configuration Constants. -const ( - PodResource = "pod" - SvcResource = "svc" -) - -// Tunnel is the main struct that configures and manages port forwarding tunnels to Kubernetes resources. -type Tunnel struct { - kube *K8s - out io.Writer - localPort int - remotePort int - namespace string - resourceType string - resourceName string - urlSuffix string - attempt int - stopChan chan struct{} - readyChan chan struct{} - errChan chan error -} - -// NewTunnel will create a new Tunnel struct. -// Note that if you use 0 for the local port, an open port on the host system -// will be selected automatically, and the Tunnel struct will be updated with the selected port. -func (k *K8s) NewTunnel(namespace, resourceType, resourceName, urlSuffix string, local, remote int) (*Tunnel, error) { - return &Tunnel{ - out: io.Discard, - localPort: local, - remotePort: remote, - namespace: namespace, - resourceType: resourceType, - resourceName: resourceName, - urlSuffix: urlSuffix, - stopChan: make(chan struct{}, 1), - readyChan: make(chan struct{}, 1), - kube: k, - }, nil -} - -// Wrap takes a function that returns an error and wraps it to check for tunnel errors as well. -func (tunnel *Tunnel) Wrap(function func() error) error { - var err error - funcErrChan := make(chan error) - - go func() { - funcErrChan <- function() - }() - - select { - case err = <-funcErrChan: - return err - case err = <-tunnel.ErrChan(): - return err - } -} - -// Connect will establish a tunnel to the specified target. -func (tunnel *Tunnel) Connect(ctx context.Context) (string, error) { - url, err := tunnel.establish(ctx) - - // Try to establish the tunnel up to 3 times. - if err != nil { - tunnel.attempt++ - - // If we have exceeded the number of attempts, exit with an error. - if tunnel.attempt > 3 { - return "", fmt.Errorf("unable to establish tunnel after 3 attempts: %w", err) - } - - // Otherwise, retry the connection but delay increasing intervals between attempts. - delay := tunnel.attempt * 10 - tunnel.kube.Log("%s", err.Error()) - tunnel.kube.Log("Delay creating tunnel, waiting %d seconds...", delay) - - timer := time.NewTimer(0) - defer timer.Stop() - - select { - case <-ctx.Done(): - return "", ctx.Err() - case <-timer.C: - url, err = tunnel.Connect(ctx) - if err != nil { - return "", err - } - - timer.Reset(time.Duration(delay) * time.Second) - } - } - - return url, nil -} - -// Endpoint returns the tunnel ip address and port (i.e. for docker registries) -func (tunnel *Tunnel) Endpoint() string { - return fmt.Sprintf("%s:%d", helpers.IPV4Localhost, tunnel.localPort) -} - -// ErrChan returns the tunnel's error channel -func (tunnel *Tunnel) ErrChan() chan error { - return tunnel.errChan -} - -// HTTPEndpoint returns the tunnel endpoint as a HTTP URL string. -func (tunnel *Tunnel) HTTPEndpoint() string { - return fmt.Sprintf("http://%s", tunnel.Endpoint()) -} - -// FullURL returns the tunnel endpoint as a HTTP URL string with the urlSuffix appended. -func (tunnel *Tunnel) FullURL() string { - return fmt.Sprintf("%s%s", tunnel.HTTPEndpoint(), tunnel.urlSuffix) -} - -// Close disconnects a tunnel connection by closing the StopChan, thereby stopping the goroutine. -func (tunnel *Tunnel) Close() { - close(tunnel.stopChan) -} - -// establish opens a tunnel to a kubernetes resource, as specified by the provided tunnel struct. -func (tunnel *Tunnel) establish(ctx context.Context) (string, error) { - var err error - - // Track this locally as we may need to retry if the tunnel fails. - localPort := tunnel.localPort - - // If the local-port is 0, get an available port before continuing. We do this here instead of relying on the - // underlying port-forwarder library, because the port-forwarder library does not expose the selected local port in a - // machine-readable manner. - // Synchronize on the global lock to avoid race conditions with concurrently selecting the same available port, - // since there is a brief moment between `GetAvailablePort` and `forwarder.ForwardPorts` where the selected port - // is available for selection again. - if localPort == 0 { - tunnel.kube.Log("Requested local port is 0. Selecting an open port on host system") - localPort, err = helpers.GetAvailablePort() - if err != nil { - return "", fmt.Errorf("unable to find an available port: %w", err) - } - tunnel.kube.Log("Selected port %d", localPort) - globalMutex.Lock() - defer globalMutex.Unlock() - } - - message := fmt.Sprintf("Opening tunnel %d -> %d for %s/%s in namespace %s", - localPort, - tunnel.remotePort, - tunnel.resourceType, - tunnel.resourceName, - tunnel.namespace, - ) - - tunnel.kube.Log(message) - - // Find the pod to port forward to - podName, err := tunnel.getAttachablePodForResource(ctx) - if err != nil { - return "", fmt.Errorf("unable to find pod attached to given resource: %w", err) - } - tunnel.kube.Log("Selected pod %s to open port forward to", podName) - - // Build url to the port forward endpoint. - // Example: http://localhost:8080/api/v1/namespaces/helm/pods/tiller-deploy-9itlq/portforward. - postEndpoint := tunnel.kube.Clientset.CoreV1().RESTClient().Post() - namespace := tunnel.namespace - portForwardCreateURL := postEndpoint. - Resource("pods"). - Namespace(namespace). - Name(podName). - SubResource("portforward"). - URL() - - tunnel.kube.Log("Using URL %s to create portforward", portForwardCreateURL) - - // Construct the spdy client required by the client-go portforward library. - transport, upgrader, err := spdy.RoundTripperFor(tunnel.kube.RestConfig) - if err != nil { - return "", fmt.Errorf("unable to create the spdy client %w", err) - } - dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, "POST", portForwardCreateURL) - - // Construct a new PortForwarder struct that manages the instructed port forward tunnel. - ports := []string{fmt.Sprintf("%d:%d", localPort, tunnel.remotePort)} - portforwarder, err := portforward.New(dialer, ports, tunnel.stopChan, tunnel.readyChan, tunnel.out, tunnel.out) - if err != nil { - return "", fmt.Errorf("unable to create the port forward: %w", err) - } - - // Open the tunnel in a goroutine so that it is available in the background. Report errors to the main goroutine via - // a new channel. - errChan := make(chan error) - go func() { - errChan <- portforwarder.ForwardPorts() - }() - - // Wait for an error or the tunnel to be ready. - select { - case err = <-errChan: - return "", fmt.Errorf("unable to start the tunnel: %w", err) - case <-portforwarder.Ready: - // Store for endpoint output - tunnel.localPort = localPort - url := tunnel.FullURL() - - // Store the error channel to listen for errors - tunnel.errChan = errChan - - tunnel.kube.Log("Creating port forwarding tunnel at %s", url) - return url, nil - } -} - -// getAttachablePodForResource will find a pod that can be port forwarded to the provided resource type and return -// the name. -func (tunnel *Tunnel) getAttachablePodForResource(ctx context.Context) (string, error) { - switch tunnel.resourceType { - case PodResource: - return tunnel.resourceName, nil - case SvcResource: - return tunnel.getAttachablePodForService(ctx) - default: - return "", fmt.Errorf("unknown resource type: %s", tunnel.resourceType) - } -} - -// getAttachablePodForService will find an active pod associated with the Service and return the pod name. -func (tunnel *Tunnel) getAttachablePodForService(ctx context.Context) (string, error) { - service, err := tunnel.kube.Clientset.CoreV1().Services(tunnel.namespace).Get(ctx, tunnel.resourceName, metav1.GetOptions{}) - if err != nil { - return "", fmt.Errorf("unable to find the service: %w", err) - } - selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{MatchLabels: service.Spec.Selector}) - if err != nil { - return "", err - } - - servicePods := tunnel.kube.WaitForPodsAndContainers( - ctx, - PodLookup{ - Namespace: tunnel.namespace, - Selector: selector.String(), - }, - nil, - ) - - if len(servicePods) < 1 { - return "", fmt.Errorf("no pods found for service %s", tunnel.resourceName) - } - return servicePods[0].Name, nil -} diff --git a/src/pkg/k8s/types.go b/src/pkg/k8s/types.go index d52ea6389b..5c3a2cc36c 100644 --- a/src/pkg/k8s/types.go +++ b/src/pkg/k8s/types.go @@ -8,6 +8,7 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" + "sigs.k8s.io/cli-utils/pkg/kstatus/watcher" ) // Log is a function that logs a message. @@ -20,6 +21,7 @@ type Labels map[string]string type K8s struct { Clientset kubernetes.Interface RestConfig *rest.Config + Watcher watcher.StatusWatcher Log Log } diff --git a/src/pkg/packager/deploy.go b/src/pkg/packager/deploy.go index 39fb2eaa88..4fe2b770e6 100644 --- a/src/pkg/packager/deploy.go +++ b/src/pkg/packager/deploy.go @@ -30,7 +30,6 @@ import ( "github.com/defenseunicorns/zarf/src/internal/packager/images" "github.com/defenseunicorns/zarf/src/internal/packager/template" "github.com/defenseunicorns/zarf/src/pkg/cluster" - "github.com/defenseunicorns/zarf/src/pkg/k8s" "github.com/defenseunicorns/zarf/src/pkg/layout" "github.com/defenseunicorns/zarf/src/pkg/message" "github.com/defenseunicorns/zarf/src/pkg/packager/actions" @@ -545,7 +544,7 @@ func (p *Packager) pushReposToRepository(ctx context.Context, reposPath string, } } - tunnel, err := p.cluster.NewTunnel(namespace, k8s.SvcResource, name, "", 0, port) + tunnel, err := p.cluster.NewTunnel(namespace, cluster.SvcResource, name, "", 0, port) if err != nil { return err }