From 2a0da8280c0f9ce59c2c578e54bf40b75b21245a Mon Sep 17 00:00:00 2001 From: Shawna Monero <66325812+smonero@users.noreply.github.com> Date: Fri, 8 Mar 2024 15:37:41 -0800 Subject: [PATCH] [DEVCON-6849] First part of new flow of adhoc mode (#727) --- cmd/admin.go | 67 +++++++ cmd/server.go | 9 +- server/config/raw/global_cfg.go | 12 +- server/config/valid/global_cfg.go | 4 +- server/legacy/user_config.go | 3 + server/legacy/user_config_test.go | 8 + server/neptune/adhoc/config/config.go | 25 +++ server/neptune/adhoc/server.go | 265 ++++++++++++++++++++++++++ 8 files changed, 384 insertions(+), 9 deletions(-) create mode 100644 cmd/admin.go create mode 100644 server/neptune/adhoc/config/config.go create mode 100644 server/neptune/adhoc/server.go diff --git a/cmd/admin.go b/cmd/admin.go new file mode 100644 index 000000000..21b93c0fb --- /dev/null +++ b/cmd/admin.go @@ -0,0 +1,67 @@ +package cmd + +import ( + "github.com/pkg/errors" + cfgParser "github.com/runatlantis/atlantis/server/config" + "github.com/runatlantis/atlantis/server/config/valid" + "github.com/runatlantis/atlantis/server/legacy" + "github.com/runatlantis/atlantis/server/logging" + adhoc "github.com/runatlantis/atlantis/server/neptune/adhoc" + adhocconfig "github.com/runatlantis/atlantis/server/neptune/adhoc/config" + neptune "github.com/runatlantis/atlantis/server/neptune/temporalworker/config" +) + +type Adhoc struct{} + +func (a *Adhoc) NewServer(userConfig legacy.UserConfig, config legacy.Config) (ServerStarter, error) { + ctxLogger, err := logging.NewLoggerFromLevel(userConfig.ToLogLevel()) + if err != nil { + return nil, errors.Wrap(err, "failed to build context logger") + } + + globalCfg := valid.NewGlobalCfg(userConfig.DataDir) + validator := &cfgParser.ParserValidator{} + + if userConfig.RepoConfig != "" { + globalCfg, err = validator.ParseGlobalCfg(userConfig.RepoConfig, globalCfg) + if err != nil { + return nil, errors.Wrapf(err, "parsing %s file", userConfig.RepoConfig) + } + } + + parsedURL, err := legacy.ParseAtlantisURL(userConfig.AtlantisURL) + if err != nil { + return nil, errors.Wrapf(err, + "parsing atlantis url %q", userConfig.AtlantisURL) + } + + appConfig, err := createGHAppConfig(userConfig) + if err != nil { + return nil, err + } + + cfg := &adhocconfig.Config{ + AuthCfg: neptune.AuthConfig{ + SslCertFile: userConfig.SSLCertFile, + SslKeyFile: userConfig.SSLKeyFile, + }, + ServerCfg: neptune.ServerConfig{ + URL: parsedURL, + Version: config.AtlantisVersion, + Port: userConfig.Port, + }, + TerraformCfg: neptune.TerraformConfig{ + DefaultVersion: userConfig.DefaultTFVersion, + DownloadURL: userConfig.TFDownloadURL, + LogFilters: globalCfg.TerraformLogFilter, + }, + DataDir: userConfig.DataDir, + TemporalCfg: globalCfg.Temporal, + GithubCfg: globalCfg.Github, + App: appConfig, + CtxLogger: ctxLogger, + StatsNamespace: userConfig.StatsNamespace, + Metrics: globalCfg.Metrics, + } + return adhoc.NewServer(cfg) +} diff --git a/cmd/server.go b/cmd/server.go index 58b5f1a9c..e7a24de2d 100644 --- a/cmd/server.go +++ b/cmd/server.go @@ -231,7 +231,8 @@ var stringFlags = map[string]stringFlag{ "default: Runs atlantis with default event handler that processes events within same.\n" + "gateway: Runs atlantis with gateway event handler that publishes events through sns.\n" + "worker: Runs atlantis with a sqs handler that polls for events in the queue to process.\n" + - "hybrid: Runs atlantis with both a gateway event handler and sqs handler to perform both gateway and worker behaviors.", + "hybrid: Runs atlantis with both a gateway event handler and sqs handler to perform both gateway and worker behaviors.\n" + + "adhoc: Runs atlantis in an admin mode that allows for running adhoc terraform commands.", defaultValue: "", }, LyftWorkerQueueURLFlag: { @@ -344,6 +345,7 @@ func NewServerCmd(v *viper.Viper, version string) *ServerCmd { GatewayCreator: &GatewayCreator{}, WorkerCreator: &WorkerCreator{}, TemporalWorkerCreator: &TemporalWorker{}, + AdhocCreator: &Adhoc{}, }, Viper: v, AtlantisVersion: version, @@ -374,6 +376,7 @@ type ServerCreatorProxy struct { GatewayCreator ServerCreator WorkerCreator ServerCreator TemporalWorkerCreator ServerCreator + AdhocCreator ServerCreator } func (d *ServerCreatorProxy) NewServer(userConfig server.UserConfig, config server.Config) (ServerStarter, error) { @@ -389,6 +392,10 @@ func (d *ServerCreatorProxy) NewServer(userConfig server.UserConfig, config serv return d.TemporalWorkerCreator.NewServer(userConfig, config) } + if userConfig.ToLyftMode() == server.Adhoc { + return d.AdhocCreator.NewServer(userConfig, config) + } + return d.WorkerCreator.NewServer(userConfig, config) } diff --git a/server/config/raw/global_cfg.go b/server/config/raw/global_cfg.go index 9f760ff44..03870625a 100644 --- a/server/config/raw/global_cfg.go +++ b/server/config/raw/global_cfg.go @@ -26,22 +26,22 @@ type GlobalCfg struct { Persistence Persistence `yaml:"persistence" json:"persistence"` RevisionSetter RevisionSetter `yaml:"revision_setter" json:"revision_setter"` Admin Admin `yaml:"admin" json:"admin"` - TerraformAdminMode TerraformAdminMode `yaml:"terraform_admin_mode" json:"terraform_admin_mode"` + AdhocMode AdhocMode `yaml:"adhoc_mode" json:"adhoc_mode"` } -type TerraformAdminMode struct { +type AdhocMode struct { Repo string `yaml:"repo" json:"repo"` Root string `yaml:"root" json:"root"` } -func (t TerraformAdminMode) ToValid() valid.TerraformAdminMode { - return valid.TerraformAdminMode{ +func (t AdhocMode) ToValid() valid.AdhocMode { + return valid.AdhocMode{ Repo: t.Repo, Root: t.Root, } } -func (t TerraformAdminMode) Validate() error { +func (t AdhocMode) Validate() error { // We don't need to validate the inputs so we can just return nil return nil } @@ -214,7 +214,7 @@ func (g GlobalCfg) ToValid(defaultCfg valid.GlobalCfg) valid.GlobalCfg { Github: g.Github.ToValid(), Admin: g.Admin.ToValid(), RevisionSetter: g.RevisionSetter.ToValid(), - TerraformAdminMode: g.TerraformAdminMode.ToValid(), + AdhocMode: g.AdhocMode.ToValid(), } } diff --git a/server/config/valid/global_cfg.go b/server/config/valid/global_cfg.go index 7c2b45f8b..fb847af43 100644 --- a/server/config/valid/global_cfg.go +++ b/server/config/valid/global_cfg.go @@ -64,10 +64,10 @@ type GlobalCfg struct { Github Github RevisionSetter RevisionSetter Admin Admin - TerraformAdminMode TerraformAdminMode + AdhocMode AdhocMode } -type TerraformAdminMode struct { +type AdhocMode struct { Repo string Root string } diff --git a/server/legacy/user_config.go b/server/legacy/user_config.go index fb84431db..486e45999 100644 --- a/server/legacy/user_config.go +++ b/server/legacy/user_config.go @@ -11,6 +11,7 @@ const ( Gateway Worker TemporalWorker + Adhoc ) // UserConfig holds config values passed in by the user. @@ -104,6 +105,8 @@ func (u UserConfig) ToLyftMode() Mode { return Worker case "temporalworker": return TemporalWorker + case "adhoc": + return Adhoc } return Default } diff --git a/server/legacy/user_config_test.go b/server/legacy/user_config_test.go index 52e0a85d2..669e27cd7 100644 --- a/server/legacy/user_config_test.go +++ b/server/legacy/user_config_test.go @@ -70,6 +70,14 @@ func TestUserConfig_ToLyftMode(t *testing.T) { "", server.Default, }, + { + "adhoc", + server.Adhoc, + }, + { + "temporalworker", + server.TemporalWorker, + }, } for _, c := range cases { diff --git a/server/neptune/adhoc/config/config.go b/server/neptune/adhoc/config/config.go new file mode 100644 index 000000000..dd77db7f1 --- /dev/null +++ b/server/neptune/adhoc/config/config.go @@ -0,0 +1,25 @@ +package adhocconfig + +import ( + "github.com/palantir/go-githubapp/githubapp" + "github.com/runatlantis/atlantis/server/config/valid" + "github.com/runatlantis/atlantis/server/logging" + neptune "github.com/runatlantis/atlantis/server/neptune/temporalworker/config" +) + +// Config is TerraformAdmin (Adhoc mode) specific user config +type Config struct { + AuthCfg neptune.AuthConfig + ServerCfg neptune.ServerConfig + FeatureConfig neptune.FeatureConfig + TemporalCfg valid.Temporal + GithubCfg valid.Github + TerraformCfg neptune.TerraformConfig + Metrics valid.Metrics + + StatsNamespace string + + DataDir string + CtxLogger logging.Logger + App githubapp.Config +} diff --git a/server/neptune/adhoc/server.go b/server/neptune/adhoc/server.go new file mode 100644 index 000000000..447b929f0 --- /dev/null +++ b/server/neptune/adhoc/server.go @@ -0,0 +1,265 @@ +package admin + +import ( + "context" + "encoding/json" + "fmt" + "io" + "log" + "net/http" + "os" + "os/signal" + "sync" + "syscall" + "time" + + "github.com/palantir/go-githubapp/githubapp" + "github.com/runatlantis/atlantis/server/neptune/lyft/feature" + "github.com/runatlantis/atlantis/server/neptune/sync/crons" + ghClient "github.com/runatlantis/atlantis/server/neptune/workflows/activities/github" + "github.com/runatlantis/atlantis/server/vcs/provider/github" + + assetfs "github.com/elazarl/go-bindata-assetfs" + "github.com/gorilla/mux" + "github.com/pkg/errors" + "github.com/runatlantis/atlantis/server/logging" + "github.com/runatlantis/atlantis/server/metrics" + adhocconfig "github.com/runatlantis/atlantis/server/neptune/adhoc/config" + neptune_http "github.com/runatlantis/atlantis/server/neptune/http" + internalSync "github.com/runatlantis/atlantis/server/neptune/sync" + "github.com/runatlantis/atlantis/server/neptune/temporal" + neptune "github.com/runatlantis/atlantis/server/neptune/temporalworker/config" + "github.com/runatlantis/atlantis/server/neptune/workflows" + "github.com/runatlantis/atlantis/server/neptune/workflows/activities" + "github.com/runatlantis/atlantis/server/static" + "github.com/uber-go/tally/v4" + "github.com/urfave/negroni" + "go.temporal.io/sdk/interceptor" + "go.temporal.io/sdk/worker" +) + +type Server struct { + Logger logging.Logger + CronScheduler *internalSync.CronScheduler + Crons []*internalSync.Cron + HTTPServerProxy *neptune_http.ServerProxy + Port int + StatsScope tally.Scope + StatsCloser io.Closer + TemporalClient *temporal.ClientWrapper + TerraformActivities *activities.Terraform + GithubActivities *activities.Github + + TerraformTaskQueue string +} + +func NewServer(config *adhocconfig.Config) (*Server, error) { + statsReporter, err := metrics.NewReporter(config.Metrics, config.CtxLogger) + + if err != nil { + return nil, err + } + + scope, statsCloser := metrics.NewScopeWithReporter(config.Metrics, config.CtxLogger, config.StatsNamespace, statsReporter) + if err != nil { + return nil, err + } + + scope = scope.Tagged(map[string]string{ + "mode": "adhoc", + }) + + opts := &temporal.Options{ + StatsReporter: statsReporter, + } + opts = opts.WithClientInterceptors(temporal.NewMetricsInterceptor(scope)) + temporalClient, err := temporal.NewClient(config.CtxLogger, config.TemporalCfg, opts) + if err != nil { + return nil, errors.Wrap(err, "initializing temporal client") + } + + router := mux.NewRouter() + router.HandleFunc("/healthz", Healthz).Methods(http.MethodGet) + router.PathPrefix("/static/").Handler(http.FileServer(&assetfs.AssetFS{Asset: static.Asset, AssetDir: static.AssetDir, AssetInfo: static.AssetInfo})) + n := negroni.New(&negroni.Recovery{ + Logger: log.New(os.Stdout, "", log.LstdFlags), + PrintStack: false, + StackAll: false, + StackSize: 1024 * 8, + }) + n.UseHandler(router) + httpServerProxy := &neptune_http.ServerProxy{ + SSLCertFile: config.AuthCfg.SslCertFile, + SSLKeyFile: config.AuthCfg.SslKeyFile, + Server: &http.Server{Addr: fmt.Sprintf(":%d", config.ServerCfg.Port), Handler: n, ReadHeaderTimeout: time.Second * 10}, + Logger: config.CtxLogger, + } + + terraformActivities, err := activities.NewTerraform( + config.TerraformCfg, + neptune.ValidationConfig{}, + config.App, + config.DataDir, + config.ServerCfg.URL, + config.TemporalCfg.TerraformTaskQueue, + config.GithubCfg.TemporalAppInstallationID, + nil, + ) + if err != nil { + return nil, errors.Wrap(err, "initializing terraform activities") + } + clientCreator, err := githubapp.NewDefaultCachingClientCreator( + config.App, + githubapp.WithClientMiddleware( + ghClient.ClientMetrics(scope.SubScope("app")), + )) + if err != nil { + return nil, errors.Wrap(err, "client creator") + } + + repoConfig := feature.RepoConfig{ + Owner: config.FeatureConfig.FFOwner, + Repo: config.FeatureConfig.FFRepo, + Branch: config.FeatureConfig.FFBranch, + Path: config.FeatureConfig.FFPath, + } + installationFetcher := &github.InstallationRetriever{ + ClientCreator: clientCreator, + } + fileFetcher := &github.SingleFileContentsFetcher{ + ClientCreator: clientCreator, + } + retriever := &feature.CustomGithubInstallationRetriever{ + InstallationFetcher: installationFetcher, + FileContentsFetcher: fileFetcher, + Cfg: repoConfig, + } + featureAllocator, err := feature.NewGHSourcedAllocator(retriever, config.CtxLogger) + if err != nil { + return nil, errors.Wrap(err, "initializing feature allocator") + } + + githubActivities, err := activities.NewGithub( + clientCreator, + config.GithubCfg.TemporalAppInstallationID, + config.DataDir, + featureAllocator, + ) + if err != nil { + return nil, errors.Wrap(err, "initializing github activities") + } + + cronScheduler := internalSync.NewCronScheduler(config.CtxLogger) + + server := Server{ + Logger: config.CtxLogger, + CronScheduler: cronScheduler, + Crons: []*internalSync.Cron{ + { + Executor: crons.NewRuntimeStats(scope).Run, + Frequency: 1 * time.Minute, + }, + }, + HTTPServerProxy: httpServerProxy, + Port: config.ServerCfg.Port, + StatsScope: scope, + StatsCloser: statsCloser, + TemporalClient: temporalClient, + TerraformActivities: terraformActivities, + TerraformTaskQueue: config.TemporalCfg.TerraformTaskQueue, + GithubActivities: githubActivities, + } + return &server, nil +} + +func (s Server) Start() error { + defer s.shutdown() + + ctx := context.Background() + + var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + + terraformWorker := s.buildTerraformWorker() + if err := terraformWorker.Run(worker.InterruptCh()); err != nil { + log.Fatalln("unable to start terraform worker", err) + } + + s.Logger.InfoContext(ctx, "Shutting down terraform worker, resource clean up may still be occurring in the background") + }() + + // Ensure server gracefully drains connections when stopped. + stop := make(chan os.Signal, 1) + // Stop on SIGINTs and SIGTERMs. + signal.Notify(stop, os.Interrupt, syscall.SIGTERM) + + s.Logger.Info(fmt.Sprintf("Atlantis started - listening on port %v", s.Port)) + + go func() { + err := s.HTTPServerProxy.ListenAndServe() + + if err != nil && err != http.ErrServerClosed { + s.Logger.Error(err.Error()) + } + }() + + <-stop + wg.Wait() + + return nil +} + +func (s Server) shutdown() { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := s.HTTPServerProxy.Shutdown(ctx); err != nil { + s.Logger.Error(err.Error()) + } + + s.TemporalClient.Close() + + // flush stats before shutdown + if err := s.StatsCloser.Close(); err != nil { + s.Logger.Error(err.Error()) + } + + s.Logger.Close() +} + +// Note that we will need to do things similar to how gateway does it to get the metadata we need +// specifically the root + +func (s Server) buildTerraformWorker() worker.Worker { + // pass the underlying client otherwise this will panic() + terraformWorker := worker.New(s.TemporalClient.Client, s.TerraformTaskQueue, worker.Options{ + Interceptors: []interceptor.WorkerInterceptor{ + temporal.NewWorkerInterceptor(), + }, + MaxConcurrentActivityExecutionSize: 30, + }) + terraformWorker.RegisterActivity(s.TerraformActivities) + terraformWorker.RegisterActivity(s.GithubActivities) + terraformWorker.RegisterWorkflow(workflows.Terraform) + return terraformWorker +} + +// TODO: eventually we can make it so the pod is ready when the repo is done cloning... + +// Healthz returns the health check response. It always returns a 200 currently. +func Healthz(w http.ResponseWriter, _ *http.Request) { + data, err := json.MarshalIndent(&struct { + Status string `json:"status"` + }{ + Status: "ok", + }, "", " ") + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + fmt.Fprintf(w, "Error creating status json response: %s", err) + return + } + w.Header().Set("Content-Type", "application/json") + w.Write(data) // nolint: errcheck +}