Skip to content

Commit

Permalink
Support retries on first commit (#713)
Browse files Browse the repository at this point in the history
We originally required retries to only be allowed on the same revision
as the last attempted deploy to prevent accidental reverts in history.
However, there is an edge case where the first deployment on a root
needs a retry. When it's the first deployment, the commit direction will
always default to `DirectionAhead` instead of `DirectionIdentical` so we
need to handle that scenario separately.

This change is technically a non-deterministic change (very unlikely to
occur, as it's a rare scenario) so I've protected the rollout with
versioning. While doing this, I also went ahead and cleaned up old
versions in the Deploy workflow that are no longer relevant.
  • Loading branch information
samrabelachew authored Jul 25, 2023
1 parent fcd0d39 commit 268ec39
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"go.temporal.io/sdk/workflow"
)

const ValidRerunCriteria = "validrerun"

type ValidationError struct {
error
}
Expand Down Expand Up @@ -71,7 +73,7 @@ func (p *Deployer) Deploy(ctx workflow.Context, requestedDeployment terraform.De
p.updateCheckRun(ctx, requestedDeployment, github.CheckRunFailure, DirectionBehindSummary, nil)
return nil, NewValidationError("requested revision %s is behind latest deployed revision %s", requestedDeployment.Commit.Revision, latestDeployment.Revision)
}
if requestedDeployment.Root.TriggerInfo.Rerun && commitDirection != activities.DirectionIdentical {
if requestedDeployment.Root.TriggerInfo.Rerun && !validRerun(ctx, commitDirection, latestDeployment) {
scope.Counter("invalid_rerun_err").Inc(1)
// always returns error for caller to skip revision
p.updateCheckRun(ctx, requestedDeployment, github.CheckRunFailure, RerunNotIdenticalSummary, nil)
Expand Down Expand Up @@ -102,6 +104,16 @@ func (p *Deployer) Deploy(ctx workflow.Context, requestedDeployment terraform.De
return requestedDeployment.BuildPersistableInfo(), err
}

func validRerun(ctx workflow.Context, commitDirection activities.DiffDirection, latestDeployment *deployment.Info) bool {
v := workflow.GetVersion(ctx, ValidRerunCriteria, workflow.DefaultVersion, workflow.Version(1))
if v == workflow.DefaultVersion {
return commitDirection == activities.DirectionIdentical
}

// we allow for a rerun only if requested revision matches the latest deployment or is the first deployment
return latestDeployment == nil || commitDirection == activities.DirectionIdentical
}

func (p *Deployer) runPostDeployTasks(ctx workflow.Context, deployment terraform.DeploymentInfo) error {
if err := p.persistLatestDeployment(ctx, deployment.BuildPersistableInfo()); err != nil {
return errors.Wrap(err, "persisting deployment")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,82 @@ func TestDeployer_FirstDeploy(t *testing.T) {
assert.Equal(t, latestDeployedRevision, resp.Info)
}

func TestDeployer_FirstDeploy_Retry(t *testing.T) {
ts := testsuite.WorkflowTestSuite{}
env := ts.NewTestWorkflowEnvironment()

da := &testDeployActivity{}
env.RegisterActivity(da)

repo := github.Repo{
Owner: "owner",
Name: "test",
}

root := model.Root{
Name: "root_1",
TriggerInfo: model.TriggerInfo{
Rerun: true,
},
}

deploymentInfo := terraform.DeploymentInfo{
ID: uuid.UUID{},
Commit: github.Commit{
Revision: "3455",
Branch: "default-branch",
},
CheckRunID: 1234,
Root: root,
Repo: repo,
}

latestDeployedRevision := &deployment.Info{
ID: deploymentInfo.ID.String(),
Version: 1.0,
Revision: "3455",
Branch: "default-branch",
Root: deployment.Root{
Name: deploymentInfo.Root.Name,
ManualRerun: true,
},
Repo: deployment.Repo{
Owner: deploymentInfo.Repo.Owner,
Name: deploymentInfo.Repo.Name,
},
}

storeDeploymentRequest := activities.StoreLatestDeploymentRequest{
DeploymentInfo: &deployment.Info{
Version: deployment.InfoSchemaVersion,
ID: deploymentInfo.ID.String(),
Revision: deploymentInfo.Commit.Revision,
Branch: deploymentInfo.Commit.Branch,
Root: deployment.Root{
Name: deploymentInfo.Root.Name,
ManualRerun: true,
},
Repo: deployment.Repo{
Owner: deploymentInfo.Repo.Owner,
Name: deploymentInfo.Repo.Name,
},
},
}

env.OnActivity(da.StoreLatestDeployment, mock.Anything, storeDeploymentRequest).Return(nil)
env.ExecuteWorkflow(testDeployerWorkflow, deployerRequest{
Info: deploymentInfo,
})

env.AssertExpectations(t)

var resp *deployResponse
err := env.GetWorkflowResult(&resp)
assert.NoError(t, err)

assert.Equal(t, latestDeployedRevision, resp.Info)
}

func TestDeployer_CompareCommit_DeployAhead(t *testing.T) {
ts := testsuite.WorkflowTestSuite{}
env := ts.NewTestWorkflowEnvironment()
Expand Down
5 changes: 0 additions & 5 deletions server/neptune/workflows/internal/deploy/terraform/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
)

const DivergedMetric = "diverged"
const PlanRejected = "planrejected"

type PlanRejectionError struct {
msg string
Expand Down Expand Up @@ -120,10 +119,6 @@ func (r *WorkflowRunner) awaitWorkflow(ctx workflow.Context, future workflow.Chi
msg = "plan has been rejected"
}
if appErr.Type() == terraform.PlanRejectedErrorType {
v := workflow.GetVersion(ctx, PlanRejected, workflow.DefaultVersion, workflow.Version(1))
if v == workflow.DefaultVersion {
return PlanRejectionError{msg: msg}
}
return NewPlanRejectionError(msg)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,6 @@ func TestWorkflowRunner_RunWithDivergedCommit(t *testing.T) {
func TestWorkflowRunner_PlanRejected(t *testing.T) {
ts := testsuite.WorkflowTestSuite{}
env := ts.NewTestWorkflowEnvironment()

env.OnGetVersion(internalTerraform.PlanRejected, workflow.DefaultVersion, workflow.Version(1)).Return(workflow.Version(1))

env.RegisterWorkflow(testTerraformWorklfowWithPlanRejectionError)

env.ExecuteWorkflow(parentWorkflow, request{
Expand Down
19 changes: 3 additions & 16 deletions server/neptune/workflows/internal/deploy/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,10 @@ import (
)

const (
TaskQueue = "deploy"
AddNotifierVersion = "add-notifier"
TaskQueue = "deploy"

RevisionReceiveTimeout = 60 * time.Minute

QueueStatusNotifierHourPST = 10
QueueStatusNotifierHourUTC = 17

ActiveDeployWorkflowStat = "active"
Expand Down Expand Up @@ -200,19 +198,8 @@ func (r *Runner) Run(ctx workflow.Context) error {

action = OnNotify
}

v := workflow.GetVersion(ctx, AddNotifierVersion, workflow.DefaultVersion, workflow.Version(2))

var notifierPeriod time.Duration
if v == workflow.Version(1) {
notifierPeriod = r.NotifierPeriod(ctx, QueueStatusNotifierHourPST)
} else if v == workflow.Version(2) {
notifierPeriod = r.NotifierPeriod(ctx, QueueStatusNotifierHourUTC)
}

if v > workflow.DefaultVersion {
s.AddTimeout(ctx, notifierPeriod, notifyTimerFunc)
}
notifierPeriod := r.NotifierPeriod(ctx, QueueStatusNotifierHourUTC)
s.AddTimeout(ctx, notifierPeriod, notifyTimerFunc)

// main loop which handles external signals
// and in turn signals the queue worker
Expand Down
10 changes: 3 additions & 7 deletions server/neptune/workflows/internal/deploy/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,21 +114,19 @@ func TestRunner(t *testing.T) {
t.Run("cancels waiting worker", func(t *testing.T) {
ts := testsuite.WorkflowTestSuite{}
env := ts.NewTestWorkflowEnvironment()
env.OnGetVersion(deploy.AddNotifierVersion, workflow.DefaultVersion, workflow.Version(2)).Return(workflow.DefaultVersion)

// should timeout since we're not sending any signal
env.ExecuteWorkflow(testWorkflow, request{})

var resp response
err := env.GetWorkflowResult(&resp)
assert.NoError(t, err)
assert.Equal(t, response{WorkerCtxCancelled: true}, resp)
assert.Equal(t, response{WorkerCtxCancelled: true, NotifierCalled: true}, resp)
})

t.Run("doesn't cancel if queue has items", func(t *testing.T) {
ts := testsuite.WorkflowTestSuite{}
env := ts.NewTestWorkflowEnvironment()
env.OnGetVersion(deploy.AddNotifierVersion, workflow.DefaultVersion, workflow.Version(2)).Return(workflow.DefaultVersion)

// should timeout since we're not sending any signal
env.ExecuteWorkflow(testWorkflow, request{
Expand All @@ -138,13 +136,12 @@ func TestRunner(t *testing.T) {
var resp response
err := env.GetWorkflowResult(&resp)
assert.NoError(t, err)
assert.Equal(t, response{WorkerCtxCancelled: true}, resp)
assert.Equal(t, response{WorkerCtxCancelled: true, NotifierCalled: true}, resp)
})

t.Run("receives signal and then times out", func(t *testing.T) {
ts := testsuite.WorkflowTestSuite{}
env := ts.NewTestWorkflowEnvironment()
env.OnGetVersion(deploy.AddNotifierVersion, workflow.DefaultVersion, workflow.Version(2)).Return(workflow.DefaultVersion)

env.RegisterDelayedCallback(func() {
env.SignalWorkflow(testSignalID, "")
Expand All @@ -156,13 +153,12 @@ func TestRunner(t *testing.T) {
var resp response
err := env.GetWorkflowResult(&resp)
assert.NoError(t, err)
assert.Equal(t, response{WorkerCtxCancelled: true, ReceiverCalled: true}, resp)
assert.Equal(t, response{WorkerCtxCancelled: true, ReceiverCalled: true, NotifierCalled: true}, resp)
})

t.Run("receives signal and then times out new version", func(t *testing.T) {
ts := testsuite.WorkflowTestSuite{}
env := ts.NewTestWorkflowEnvironment()
env.OnGetVersion(deploy.AddNotifierVersion, workflow.DefaultVersion, workflow.Version(2)).Return(workflow.Version(1))

env.RegisterDelayedCallback(func() {
env.SignalWorkflow(testSignalID, "")
Expand Down

0 comments on commit 268ec39

Please sign in to comment.