Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Elastic Agent] Improve GRPC stop to be more relaxed. #20118

Merged
merged 3 commits into from
Jul 23, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions x-pack/elastic-agent/CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
- Avoid comparing uncomparable types on enroll {issue}19976[19976]
- Fix issues with merging of elastic-agent.yml and fleet.yml {pull}20026[20026]
- Unzip failures on Windows 8/Windows server 2012 {pull}20088[20088]
- Improve GRPC stop to be more relaxed {pull}20118[20118]

==== New features

Expand Down
7 changes: 5 additions & 2 deletions x-pack/elastic-agent/pkg/core/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,7 @@ func (as *ApplicationState) WriteConnInfo(w io.Writer) error {
// the application times out during stop and ErrApplication
func (as *ApplicationState) Stop(timeout time.Duration) error {
as.checkinLock.Lock()
wasConn := as.checkinDone != nil
cfgIdx := as.statusConfigIdx
as.expected = proto.StateExpected_STOPPING
as.checkinLock.Unlock()
Expand All @@ -548,8 +549,10 @@ func (as *ApplicationState) Stop(timeout time.Duration) error {
s := as.status
doneChan := as.checkinDone
as.checkinLock.RUnlock()
if s == proto.StateObserved_STOPPING && doneChan == nil {
// sent stopping and now is disconnected (so its stopped)
if (wasConn && doneChan == nil) || (!wasConn && s == proto.StateObserved_STOPPING && doneChan == nil) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm not sure i follow this. if it was connected and doneChan is nil means it got disconnected this seems ok.
second part means if status of application is stopping and doneChan is nil (got disconnected) then we;re destroying but only in case doneChan was nil before so nothing changed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes the second case is only in the case that Stop() was called but the client was disconnected from the GRPC at that time (which is very rare, but possible).

So if the client was disconnected from the GPRC at the time Stop() was called, it needs to know that it did receive the stopping state. So it waits for the client to send that it is actually stopping and then it has disconnected. This requires that the client actually reconnect to get the stopping message or timeout occurs, which ever comes first.

In the normal case the wasConn && doneChan == nil will almost always be used in this loop.

Hopefully that it explains it better.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense thanks

// either occurred
// * client was connected then disconnected on stop
// * client was not connected; connected; received stopping; then disconnected
as.Destroy()
return nil
}
Expand Down
51 changes: 51 additions & 0 deletions x-pack/elastic-agent/pkg/core/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,57 @@ func TestServer_Stop(t *testing.T) {
assert.NoError(t, stopErr)
}

func TestServer_StopJustDisconnect(t *testing.T) {
initConfig := "initial_config"
app := &StubApp{}
srv := createAndStartServer(t, &StubHandler{})
defer srv.Stop()
as, err := srv.Register(app, initConfig)
require.NoError(t, err)
cImpl := &StubClientImpl{}
c := newClientFromApplicationState(t, as, cImpl)
require.NoError(t, c.Start(context.Background()))
defer c.Stop()

// clients should get initial check-ins then set as healthy
require.NoError(t, waitFor(func() error {
if cImpl.Config() != initConfig {
return fmt.Errorf("client never got intial config")
}
return nil
}))
c.Status(proto.StateObserved_HEALTHY, "Running", nil)
assert.NoError(t, waitFor(func() error {
if app.Status() != proto.StateObserved_HEALTHY {
return fmt.Errorf("server never updated currect application state")
}
return nil
}))

// send stop to the client
done := make(chan bool)
var stopErr error
go func() {
stopErr = as.Stop(time.Second * 5)
close(done)
}()

// process of testing the flow
// 1. server sends stop
// 2. client disconnects
require.NoError(t, waitFor(func() error {
if cImpl.Stop() == 0 {
return fmt.Errorf("client never got expected stop")
}
return nil
}))
c.Stop()
<-done

// no error on stop
assert.NoError(t, stopErr)
}

func TestServer_StopTimeout(t *testing.T) {
initConfig := "initial_config"
app := &StubApp{}
Expand Down