Skip to content

Commit

Permalink
Check if key is in roots map of Processor (#718)
Browse files Browse the repository at this point in the history
Before notifying, we should ensure the key is in the roots map to
prevent attempts to notify check run on invalid, empty repo metadata.
There is a chance the root map can miss a key if multiple revisions come
in quickly and a new Processor for the latest revision is receiving a
signal on an old revision. We shouldn't care about this signal since we
already perform a clean defer to cancel those check runs on shutdown of
the old revision Processor.
  • Loading branch information
samrabelachew authored Jul 26, 2023
1 parent 268ec39 commit f17c2eb
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 2 deletions.
18 changes: 17 additions & 1 deletion server/neptune/workflows/internal/pr/revision/state.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package revision

import (
"fmt"
"github.com/pkg/errors"
"github.com/runatlantis/atlantis/server/metrics"
"github.com/runatlantis/atlantis/server/neptune/workflows/internal/notifier"
Expand All @@ -9,6 +10,10 @@ import (
"go.temporal.io/sdk/workflow"
)

const (
CheckBeforeNotify = "checkbeforenotify"
)

type WorkflowNotifier interface {
Notify(workflow.Context, notifier.Info, *state.Workflow) error
}
Expand All @@ -30,7 +35,18 @@ func (s *StateReceiver) Receive(ctx workflow.Context, c workflow.ReceiveChannel,
}

func (s *StateReceiver) Notify(ctx workflow.Context, workflowState *state.Workflow, roots map[string]RootInfo) {
rootInfo := roots[workflowState.ID]
rootInfo, ok := roots[workflowState.ID]

// TODO remove versioning
// there can be an edge case where we've canceled a previous check run to handle a new revision
// but the canceled child terraform workflow has sent over one last update prior to shutdown
// we should avoid notifying on this case
v := workflow.GetVersion(ctx, CheckBeforeNotify, workflow.DefaultVersion, workflow.Version(1))
if v != workflow.DefaultVersion && !ok {
workflow.GetLogger(ctx).Warn(fmt.Sprintf("skipping notifying root %s", workflowState.ID))
return
}

for _, notifier := range s.InternalNotifiers {
if err := notifier.Notify(ctx, rootInfo.ToInternalInfo(), workflowState); err != nil {
workflow.GetMetricsHandler(ctx).Counter("notifier_failure").Inc(1)
Expand Down
26 changes: 25 additions & 1 deletion server/neptune/workflows/internal/pr/revision/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func TestStateReceive(t *testing.T) {
},
}

t.Run("calls notifiers with state", func(t *testing.T) {
t.Run("calls notifiers with missing root info", func(t *testing.T) {
ts := testsuite.WorkflowTestSuite{}
env := ts.NewTestWorkflowEnvironment()

Expand All @@ -98,6 +98,30 @@ func TestStateReceive(t *testing.T) {
Output: jobOutput,
Status: state.WaitingJobStatus,
},
ID: internalRootInfo.ID.String(),
},
T: t,
})

env.AssertExpectations(t)

var result stateReceiveResponse
err = env.GetWorkflowResult(&result)
assert.False(t, result.NotifierCalled)
assert.NoError(t, err)
})

t.Run("calls notifiers with valid state", func(t *testing.T) {
ts := testsuite.WorkflowTestSuite{}
env := ts.NewTestWorkflowEnvironment()

env.ExecuteWorkflow(testStateReceiveWorkflow, stateReceiveRequest{
State: &state.Workflow{
Plan: &state.Job{
Output: jobOutput,
Status: state.WaitingJobStatus,
},
ID: internalRootInfo.ID.String(),
},
RootInfo: internalRootInfo,
T: t,
Expand Down

0 comments on commit f17c2eb

Please sign in to comment.