Skip to content

Commit

Permalink
[cmd/opampsupervisor] Implement Collector bootstrapping (#29848)
Browse files Browse the repository at this point in the history
**Description:**

Utilize the OpAMP extension to get identifying attributes from the
Collector.

A few things I want to call out:
* I moved the Supervisor's various config fragments into separate files
that are embedded into the binary. I think this makes them easier to
edit. I can also move the changes for the existing fragments to a
separate PR if it adds too much to the diff.
* I opted to use the OTLP receiver instead of the filelog receiver
because it is included in both existing upstream distributions and I
expect it is slightly more common. Ideally we should look at other
approaches to solve this.

**Link to tracking Issue:**

Resolves
#21071

**Testing:**

Added an integration test.

---------

Co-authored-by: Antoine Toulme <antoine@toulme.name>
Co-authored-by: Evan Bradley <evan-bradley@users.noreply.github.com>
  • Loading branch information
3 people committed Jan 12, 2024
1 parent 593ea9a commit cee8ccd
Show file tree
Hide file tree
Showing 13 changed files with 500 additions and 108 deletions.
27 changes: 27 additions & 0 deletions .chloggen/supervisor-bootstrapping.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: cmd/opampsupervisor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Use a bootstrapping flow to get the Collector's agent description.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [21071]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
92 changes: 88 additions & 4 deletions cmd/opampsupervisor/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"net/http"
"net/http/httptest"
"os"
"os/exec"
"path"
"runtime"
"strings"
Expand All @@ -23,12 +24,18 @@ import (
"text/template"
"time"

"github.com/knadh/koanf/parsers/yaml"
"github.com/knadh/koanf/providers/file"
"github.com/knadh/koanf/providers/rawbytes"
"github.com/knadh/koanf/v2"
"github.com/open-telemetry/opamp-go/protobufs"
"github.com/open-telemetry/opamp-go/server"
"github.com/open-telemetry/opamp-go/server/types"
"github.com/open-telemetry/opentelemetry-collector-contrib/cmd/opampsupervisor/supervisor"
"github.com/open-telemetry/opentelemetry-collector-contrib/cmd/opampsupervisor/supervisor/config"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
semconv "go.opentelemetry.io/collector/semconv/v1.21.0"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -122,6 +129,14 @@ func newOpAMPServer(t *testing.T, connectingCallback onConnectingFuncFactory, ca
}

func newSupervisor(t *testing.T, configType string, extraConfigData map[string]string) *supervisor.Supervisor {
cfgFile := getSupervisorConfig(t, configType, extraConfigData)
s, err := supervisor.NewSupervisor(zap.NewNop(), cfgFile.Name())
require.NoError(t, err)

return s
}

func getSupervisorConfig(t *testing.T, configType string, extraConfigData map[string]string) *os.File {
tpl, err := os.ReadFile(path.Join("testdata", "supervisor", "supervisor_"+configType+".yaml"))
require.NoError(t, err)

Expand All @@ -148,10 +163,7 @@ func newSupervisor(t *testing.T, configType string, extraConfigData map[string]s
_, err = cfgFile.Write(buf.Bytes())
require.NoError(t, err)

s, err := supervisor.NewSupervisor(zap.NewNop(), cfgFile.Name())
require.NoError(t, err)

return s
return cfgFile
}

func TestSupervisorStartsCollectorWithRemoteConfig(t *testing.T) {
Expand Down Expand Up @@ -323,6 +335,78 @@ func TestSupervisorConfiguresCapabilities(t *testing.T) {
}, 5*time.Second, 250*time.Millisecond)
}

func TestSupervisorBootstrapsCollector(t *testing.T) {
agentDescription := atomic.Value{}

// Load the Supervisor config so we can get the location of
// the Collector that will be run.
var cfg config.Supervisor
cfgFile := getSupervisorConfig(t, "nocap", map[string]string{})
k := koanf.New("::")
err := k.Load(file.Provider(cfgFile.Name()), yaml.Parser())
require.NoError(t, err)
err = k.UnmarshalWithConf("", &cfg, koanf.UnmarshalConf{
Tag: "mapstructure",
})
require.NoError(t, err)

// Get the binary name and version from the Collector binary
// using the `components` command that prints a YAML-encoded
// map of information about the Collector build. Some of this
// information will be used as defaults for the telemetry
// attributes.
agentPath := cfg.Agent.Executable
componentsInfo, err := exec.Command(agentPath, "components").Output()
require.NoError(t, err)
k = koanf.New("::")
err = k.Load(rawbytes.Provider(componentsInfo), yaml.Parser())
require.NoError(t, err)
buildinfo := k.StringMap("buildinfo")
command := buildinfo["command"]
version := buildinfo["version"]

server := newOpAMPServer(
t,
defaultConnectingHandler,
server.ConnectionCallbacksStruct{
OnMessageFunc: func(_ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
if message.AgentDescription != nil {
agentDescription.Store(message.AgentDescription)
}

return &protobufs.ServerToAgent{}
},
})

s := newSupervisor(t, "nocap", map[string]string{"url": server.addr})
defer s.Shutdown()

waitForSupervisorConnection(server.supervisorConnected, true)

require.Eventually(t, func() bool {
ad, ok := agentDescription.Load().(*protobufs.AgentDescription)
if !ok {
return false
}

var agentName, agentVersion string
identAttr := ad.IdentifyingAttributes
for _, attr := range identAttr {
switch attr.Key {
case semconv.AttributeServiceName:
agentName = attr.Value.GetStringValue()
case semconv.AttributeServiceVersion:
agentVersion = attr.Value.GetStringValue()
}
}

// By default the Collector should report its name and version
// from the component.BuildInfo struct built into the Collector
// binary.
return agentName == command && agentVersion == version
}, 5*time.Second, 250*time.Millisecond)
}

// Creates a Collector config that reads and writes logs to files and provides
// file descriptors for I/O operations to those files. The files are placed
// in a unique temp directory that is cleaned up after the test's completion.
Expand Down
1 change: 1 addition & 0 deletions cmd/opampsupervisor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
github.com/open-telemetry/opamp-go v0.10.0
github.com/stretchr/testify v1.8.4
go.opentelemetry.io/collector/config/configtls v0.92.1-0.20240110091511-bf804d6c4ecc
go.opentelemetry.io/collector/semconv v0.92.1-0.20240110091511-bf804d6c4ecc
go.uber.org/zap v1.26.0
)

Expand Down
2 changes: 2 additions & 0 deletions cmd/opampsupervisor/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions cmd/opampsupervisor/specification/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,8 @@ configuration.
To overcome this problem the Supervisor starts the Collector with an
"noop" configuration that collects nothing but allows the opamp
extension to be started. The "noop" configuration is a single pipeline
with a filelog receiver that points to a non-existing file and a logging
exporter and the opamp extension. The purpose of the "noop"
with an OTLP receiver that listens on a random port and a debug
exporter, and the opamp extension. The purpose of the "noop"
configuration is to make sure the Collector starts and the opamp
extension communicates with the Supervisor.
Expand Down
45 changes: 45 additions & 0 deletions cmd/opampsupervisor/supervisor/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package supervisor

import (
"net/http"

"github.com/open-telemetry/opamp-go/protobufs"
"github.com/open-telemetry/opamp-go/server"
serverTypes "github.com/open-telemetry/opamp-go/server/types"
)

type flattenedSettings struct {
onMessageFunc func(conn serverTypes.Connection, message *protobufs.AgentToServer)
onConnectingFunc func(request *http.Request)
endpoint string
}

func newServerSettings(fs flattenedSettings) server.StartSettings {
return server.StartSettings{
Settings: server.Settings{
Callbacks: server.CallbacksStruct{
OnConnectingFunc: func(request *http.Request) serverTypes.ConnectionResponse {
if fs.onConnectingFunc != nil {
fs.onConnectingFunc(request)
}
return serverTypes.ConnectionResponse{
Accept: true,
ConnectionCallbacks: server.ConnectionCallbacksStruct{
OnMessageFunc: func(conn serverTypes.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
if fs.onMessageFunc != nil {
fs.onMessageFunc(conn, message)
}

return &protobufs.ServerToAgent{}
},
},
}
},
},
},
ListenEndpoint: fs.endpoint,
}
}
Loading

0 comments on commit cee8ccd

Please sign in to comment.