From 268ec392d4bff5275631afdff10d9f26e0f9aca3 Mon Sep 17 00:00:00 2001 From: samrabelachew Date: Tue, 25 Jul 2023 14:23:26 -0700 Subject: [PATCH] Support retries on first commit (#713) We originally required retries to only be allowed on the same revision as the last attempted deploy to prevent accidental reverts in history. However, there is an edge case where the first deployment on a root needs a retry. When it's the first deployment, the commit direction will always default to `DirectionAhead` instead of `DirectionIdentical` so we need to handle that scenario separately. This change is technically a non-deterministic change (very unlikely to occur, as it's a rare scenario) so I've protected the rollout with versioning. While doing this, I also went ahead and cleaned up old versions in the Deploy workflow that are no longer relevant. --- .../deploy/revision/queue/deployer.go | 14 +++- .../deploy/revision/queue/deployer_test.go | 76 +++++++++++++++++++ .../internal/deploy/terraform/runner.go | 5 -- .../internal/deploy/terraform/runner_test.go | 3 - .../workflows/internal/deploy/workflow.go | 19 +---- .../internal/deploy/workflow_test.go | 10 +-- 6 files changed, 95 insertions(+), 32 deletions(-) diff --git a/server/neptune/workflows/internal/deploy/revision/queue/deployer.go b/server/neptune/workflows/internal/deploy/revision/queue/deployer.go index 75deb6d6a..002a3a063 100644 --- a/server/neptune/workflows/internal/deploy/revision/queue/deployer.go +++ b/server/neptune/workflows/internal/deploy/revision/queue/deployer.go @@ -20,6 +20,8 @@ import ( "go.temporal.io/sdk/workflow" ) +const ValidRerunCriteria = "validrerun" + type ValidationError struct { error } @@ -71,7 +73,7 @@ func (p *Deployer) Deploy(ctx workflow.Context, requestedDeployment terraform.De p.updateCheckRun(ctx, requestedDeployment, github.CheckRunFailure, DirectionBehindSummary, nil) return nil, NewValidationError("requested revision %s is behind latest deployed revision %s", requestedDeployment.Commit.Revision, latestDeployment.Revision) } - if requestedDeployment.Root.TriggerInfo.Rerun && commitDirection != activities.DirectionIdentical { + if requestedDeployment.Root.TriggerInfo.Rerun && !validRerun(ctx, commitDirection, latestDeployment) { scope.Counter("invalid_rerun_err").Inc(1) // always returns error for caller to skip revision p.updateCheckRun(ctx, requestedDeployment, github.CheckRunFailure, RerunNotIdenticalSummary, nil) @@ -102,6 +104,16 @@ func (p *Deployer) Deploy(ctx workflow.Context, requestedDeployment terraform.De return requestedDeployment.BuildPersistableInfo(), err } +func validRerun(ctx workflow.Context, commitDirection activities.DiffDirection, latestDeployment *deployment.Info) bool { + v := workflow.GetVersion(ctx, ValidRerunCriteria, workflow.DefaultVersion, workflow.Version(1)) + if v == workflow.DefaultVersion { + return commitDirection == activities.DirectionIdentical + } + + // we allow for a rerun only if requested revision matches the latest deployment or is the first deployment + return latestDeployment == nil || commitDirection == activities.DirectionIdentical +} + func (p *Deployer) runPostDeployTasks(ctx workflow.Context, deployment terraform.DeploymentInfo) error { if err := p.persistLatestDeployment(ctx, deployment.BuildPersistableInfo()); err != nil { return errors.Wrap(err, "persisting deployment") diff --git a/server/neptune/workflows/internal/deploy/revision/queue/deployer_test.go b/server/neptune/workflows/internal/deploy/revision/queue/deployer_test.go index 75579be52..9989e8dbc 100644 --- a/server/neptune/workflows/internal/deploy/revision/queue/deployer_test.go +++ b/server/neptune/workflows/internal/deploy/revision/queue/deployer_test.go @@ -190,6 +190,82 @@ func TestDeployer_FirstDeploy(t *testing.T) { assert.Equal(t, latestDeployedRevision, resp.Info) } +func TestDeployer_FirstDeploy_Retry(t *testing.T) { + ts := testsuite.WorkflowTestSuite{} + env := ts.NewTestWorkflowEnvironment() + + da := &testDeployActivity{} + env.RegisterActivity(da) + + repo := github.Repo{ + Owner: "owner", + Name: "test", + } + + root := model.Root{ + Name: "root_1", + TriggerInfo: model.TriggerInfo{ + Rerun: true, + }, + } + + deploymentInfo := terraform.DeploymentInfo{ + ID: uuid.UUID{}, + Commit: github.Commit{ + Revision: "3455", + Branch: "default-branch", + }, + CheckRunID: 1234, + Root: root, + Repo: repo, + } + + latestDeployedRevision := &deployment.Info{ + ID: deploymentInfo.ID.String(), + Version: 1.0, + Revision: "3455", + Branch: "default-branch", + Root: deployment.Root{ + Name: deploymentInfo.Root.Name, + ManualRerun: true, + }, + Repo: deployment.Repo{ + Owner: deploymentInfo.Repo.Owner, + Name: deploymentInfo.Repo.Name, + }, + } + + storeDeploymentRequest := activities.StoreLatestDeploymentRequest{ + DeploymentInfo: &deployment.Info{ + Version: deployment.InfoSchemaVersion, + ID: deploymentInfo.ID.String(), + Revision: deploymentInfo.Commit.Revision, + Branch: deploymentInfo.Commit.Branch, + Root: deployment.Root{ + Name: deploymentInfo.Root.Name, + ManualRerun: true, + }, + Repo: deployment.Repo{ + Owner: deploymentInfo.Repo.Owner, + Name: deploymentInfo.Repo.Name, + }, + }, + } + + env.OnActivity(da.StoreLatestDeployment, mock.Anything, storeDeploymentRequest).Return(nil) + env.ExecuteWorkflow(testDeployerWorkflow, deployerRequest{ + Info: deploymentInfo, + }) + + env.AssertExpectations(t) + + var resp *deployResponse + err := env.GetWorkflowResult(&resp) + assert.NoError(t, err) + + assert.Equal(t, latestDeployedRevision, resp.Info) +} + func TestDeployer_CompareCommit_DeployAhead(t *testing.T) { ts := testsuite.WorkflowTestSuite{} env := ts.NewTestWorkflowEnvironment() diff --git a/server/neptune/workflows/internal/deploy/terraform/runner.go b/server/neptune/workflows/internal/deploy/terraform/runner.go index ee4895556..fcc556b4e 100644 --- a/server/neptune/workflows/internal/deploy/terraform/runner.go +++ b/server/neptune/workflows/internal/deploy/terraform/runner.go @@ -12,7 +12,6 @@ import ( ) const DivergedMetric = "diverged" -const PlanRejected = "planrejected" type PlanRejectionError struct { msg string @@ -120,10 +119,6 @@ func (r *WorkflowRunner) awaitWorkflow(ctx workflow.Context, future workflow.Chi msg = "plan has been rejected" } if appErr.Type() == terraform.PlanRejectedErrorType { - v := workflow.GetVersion(ctx, PlanRejected, workflow.DefaultVersion, workflow.Version(1)) - if v == workflow.DefaultVersion { - return PlanRejectionError{msg: msg} - } return NewPlanRejectionError(msg) } } diff --git a/server/neptune/workflows/internal/deploy/terraform/runner_test.go b/server/neptune/workflows/internal/deploy/terraform/runner_test.go index 348f228c7..d19baedf9 100644 --- a/server/neptune/workflows/internal/deploy/terraform/runner_test.go +++ b/server/neptune/workflows/internal/deploy/terraform/runner_test.go @@ -174,9 +174,6 @@ func TestWorkflowRunner_RunWithDivergedCommit(t *testing.T) { func TestWorkflowRunner_PlanRejected(t *testing.T) { ts := testsuite.WorkflowTestSuite{} env := ts.NewTestWorkflowEnvironment() - - env.OnGetVersion(internalTerraform.PlanRejected, workflow.DefaultVersion, workflow.Version(1)).Return(workflow.Version(1)) - env.RegisterWorkflow(testTerraformWorklfowWithPlanRejectionError) env.ExecuteWorkflow(parentWorkflow, request{ diff --git a/server/neptune/workflows/internal/deploy/workflow.go b/server/neptune/workflows/internal/deploy/workflow.go index e1108c64f..986317743 100644 --- a/server/neptune/workflows/internal/deploy/workflow.go +++ b/server/neptune/workflows/internal/deploy/workflow.go @@ -20,12 +20,10 @@ import ( ) const ( - TaskQueue = "deploy" - AddNotifierVersion = "add-notifier" + TaskQueue = "deploy" RevisionReceiveTimeout = 60 * time.Minute - QueueStatusNotifierHourPST = 10 QueueStatusNotifierHourUTC = 17 ActiveDeployWorkflowStat = "active" @@ -200,19 +198,8 @@ func (r *Runner) Run(ctx workflow.Context) error { action = OnNotify } - - v := workflow.GetVersion(ctx, AddNotifierVersion, workflow.DefaultVersion, workflow.Version(2)) - - var notifierPeriod time.Duration - if v == workflow.Version(1) { - notifierPeriod = r.NotifierPeriod(ctx, QueueStatusNotifierHourPST) - } else if v == workflow.Version(2) { - notifierPeriod = r.NotifierPeriod(ctx, QueueStatusNotifierHourUTC) - } - - if v > workflow.DefaultVersion { - s.AddTimeout(ctx, notifierPeriod, notifyTimerFunc) - } + notifierPeriod := r.NotifierPeriod(ctx, QueueStatusNotifierHourUTC) + s.AddTimeout(ctx, notifierPeriod, notifyTimerFunc) // main loop which handles external signals // and in turn signals the queue worker diff --git a/server/neptune/workflows/internal/deploy/workflow_test.go b/server/neptune/workflows/internal/deploy/workflow_test.go index d3101c5eb..522df7109 100644 --- a/server/neptune/workflows/internal/deploy/workflow_test.go +++ b/server/neptune/workflows/internal/deploy/workflow_test.go @@ -114,7 +114,6 @@ func TestRunner(t *testing.T) { t.Run("cancels waiting worker", func(t *testing.T) { ts := testsuite.WorkflowTestSuite{} env := ts.NewTestWorkflowEnvironment() - env.OnGetVersion(deploy.AddNotifierVersion, workflow.DefaultVersion, workflow.Version(2)).Return(workflow.DefaultVersion) // should timeout since we're not sending any signal env.ExecuteWorkflow(testWorkflow, request{}) @@ -122,13 +121,12 @@ func TestRunner(t *testing.T) { var resp response err := env.GetWorkflowResult(&resp) assert.NoError(t, err) - assert.Equal(t, response{WorkerCtxCancelled: true}, resp) + assert.Equal(t, response{WorkerCtxCancelled: true, NotifierCalled: true}, resp) }) t.Run("doesn't cancel if queue has items", func(t *testing.T) { ts := testsuite.WorkflowTestSuite{} env := ts.NewTestWorkflowEnvironment() - env.OnGetVersion(deploy.AddNotifierVersion, workflow.DefaultVersion, workflow.Version(2)).Return(workflow.DefaultVersion) // should timeout since we're not sending any signal env.ExecuteWorkflow(testWorkflow, request{ @@ -138,13 +136,12 @@ func TestRunner(t *testing.T) { var resp response err := env.GetWorkflowResult(&resp) assert.NoError(t, err) - assert.Equal(t, response{WorkerCtxCancelled: true}, resp) + assert.Equal(t, response{WorkerCtxCancelled: true, NotifierCalled: true}, resp) }) t.Run("receives signal and then times out", func(t *testing.T) { ts := testsuite.WorkflowTestSuite{} env := ts.NewTestWorkflowEnvironment() - env.OnGetVersion(deploy.AddNotifierVersion, workflow.DefaultVersion, workflow.Version(2)).Return(workflow.DefaultVersion) env.RegisterDelayedCallback(func() { env.SignalWorkflow(testSignalID, "") @@ -156,13 +153,12 @@ func TestRunner(t *testing.T) { var resp response err := env.GetWorkflowResult(&resp) assert.NoError(t, err) - assert.Equal(t, response{WorkerCtxCancelled: true, ReceiverCalled: true}, resp) + assert.Equal(t, response{WorkerCtxCancelled: true, ReceiverCalled: true, NotifierCalled: true}, resp) }) t.Run("receives signal and then times out new version", func(t *testing.T) { ts := testsuite.WorkflowTestSuite{} env := ts.NewTestWorkflowEnvironment() - env.OnGetVersion(deploy.AddNotifierVersion, workflow.DefaultVersion, workflow.Version(2)).Return(workflow.Version(1)) env.RegisterDelayedCallback(func() { env.SignalWorkflow(testSignalID, "")