Skip to content

Commit

Permalink
Support flattened data_stream.* fields under Elastic-Agent
Browse files Browse the repository at this point in the history
When an input configuration comes from Elastic-Agent the data_stream.*
fields sometimes are flattened (e.g: data_stream.dataset: mystuff),
this was not supported by Beats.

This commit enables support for it.
  • Loading branch information
belimawr committed Sep 6, 2023
1 parent 1209bca commit fc0a3fd
Show file tree
Hide file tree
Showing 4 changed files with 293 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Fix ndjson parser to store JSON fields correctly under `target` {issue}29395[29395]
- Support build of projects outside of beats directory {pull}36126[36126]
- Fix environment capture by `add_process_metadata` processor. {issue}36469[36469] {pull}36471[36471]
- Support fattened `data_stream` object when running under Elastic-Agent {pr}36516[36516]


*Auditbeat*
Expand Down
79 changes: 78 additions & 1 deletion x-pack/libbeat/management/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@
package management

import (
"errors"
"fmt"

"google.golang.org/protobuf/types/known/structpb"

"github.com/elastic/beats/v7/libbeat/common/reload"
"github.com/elastic/elastic-agent-client/v7/pkg/client"
"github.com/elastic/elastic-agent-client/v7/pkg/proto"
Expand Down Expand Up @@ -82,10 +85,78 @@ func handleSimpleConfig(raw *proto.UnitExpectedConfig) (map[string]any, error) {
return m, nil
}

// dataStreamAndSource is a generic way to represent proto mesages
// that contain a source field and a datastream field.
type dataStreamAndSource interface {
GetDataStream() *proto.DataStream
GetSource() *structpb.Struct
}

// deDotDataStream reads any datastream value from the dotted notation
// (data_stream.*) and returns it as a *proto.DataStream. If raw already
// contains a DataStream but no fields are duplicated, then the values are merged.
func deDotDataStream(raw dataStreamAndSource) (*proto.DataStream, error) {
ds := raw.GetDataStream()
if ds == nil {
ds = &proto.DataStream{}
}

tmp := struct {
DataStream struct {
Dataset string `config:"dataset" yaml:"dataset"`
Type string `config:"type" yaml:"type"`
Namespace string `config:"namespace" yaml:"namespace"`
} `config:"data_stream" yaml:"data_stream"`
}{}

cfg, err := conf.NewConfigFrom(raw.GetSource().AsMap())
if err != nil {
return nil, fmt.Errorf("cannot generate config from source field: %w", err)
}

if err := cfg.Unpack(&tmp); err != nil {
return nil, fmt.Errorf("cannot unpack source field into struct: %w", err)
}

if ds.Dataset != "" && tmp.DataStream.Dataset != "" {
return nil, errors.New("duplicated key 'datastream.dataset'")
}

if ds.Type != "" && tmp.DataStream.Type != "" {
return nil, errors.New("duplicated key 'datastream.type'")
}

if ds.Namespace != "" && tmp.DataStream.Namespace != "" {
return nil, errors.New("duplicated key 'datastream.namespace'")
}

ret := &proto.DataStream{
Dataset: merge(tmp.DataStream.Dataset, ds.Dataset),
Type: merge(tmp.DataStream.Type, ds.Type),
Namespace: merge(tmp.DataStream.Namespace, ds.Namespace),
}

return ret, nil
}

// merge returns b if a is an empty string
func merge(a, b string) string {
if a == "" {
return b
}
return a
}

// CreateInputsFromStreams breaks down the raw Expected config into an array of individual inputs/modules from the Streams values
// that can later be formatted into the reloader's ConfigWithMetaData and sent to an indvidual beat/
// This also performs the basic task of inserting module-level add_field processors into the inputs/modules.
func CreateInputsFromStreams(raw *proto.UnitExpectedConfig, defaultDataStreamType string, agentInfo *client.AgentInfo, defaultProcessors ...mapstr.M) ([]map[string]interface{}, error) {
ds, err := deDotDataStream(raw)
if err != nil {
return nil, fmt.Errorf("could not read 'data_stream': %w", err)
}
raw.DataStream = ds

// If there are no streams, we fall into the 'simple input config' case,
// this means the key configuration values are on the root level instead of
// an element in the `streams` array.
Expand All @@ -106,8 +177,14 @@ func CreateInputsFromStreams(raw *proto.UnitExpectedConfig, defaultDataStreamTyp
inputs := make([]map[string]interface{}, len(raw.GetStreams()))

for iter, stream := range raw.GetStreams() {
ds, err := deDotDataStream(stream)
if err != nil {
return nil, fmt.Errorf("could not read 'data_stream' from stream ID '%s': %w",
stream.GetId(), err)
}
stream.DataStream = ds
streamSource := raw.GetStreams()[iter].GetSource().AsMap()
streamSource, err := createStreamRules(raw, streamSource, stream, defaultDataStreamType, agentInfo, defaultProcessors...)
streamSource, err = createStreamRules(raw, streamSource, stream, defaultDataStreamType, agentInfo, defaultProcessors...)
if err != nil {
return nil, fmt.Errorf("error creating stream rules: %w", err)
}
Expand Down
84 changes: 84 additions & 0 deletions x-pack/libbeat/management/generate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ package management
import (
"testing"

"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/testing/protocmp"
"google.golang.org/protobuf/types/known/structpb"

"github.com/elastic/elastic-agent-client/v7/pkg/client"
Expand Down Expand Up @@ -230,3 +232,85 @@ func buildConfigMap(t *testing.T, unitRaw *proto.UnitExpectedConfig, agentInfo *
require.NoError(t, err, "error in unpack for config %#v", reloadCfg[0].Config)
return cfgMap
}

func TestDeDotDataStream(t *testing.T) {
testCases := map[string]struct {
source map[string]any
dataStream *proto.DataStream
wantError bool
expectedDataStream *proto.DataStream
}{
"all data is flattened": {
source: map[string]any{
"data_stream.dataset": "my dataset",
"data_stream.namespace": "my namespace",
"data_stream.type": "my type",
},
expectedDataStream: &proto.DataStream{
Dataset: "my dataset",
Namespace: "my namespace",
Type: "my type",
},
},
"no data is flattened": {
dataStream: &proto.DataStream{
Dataset: "my dataset",
Namespace: "my namespace",
Type: "my type",
},
expectedDataStream: &proto.DataStream{
Dataset: "my dataset",
Namespace: "my namespace",
Type: "my type",
},
},
"mix of flattened and data_stream": {
dataStream: &proto.DataStream{
Dataset: "my dataset",
Type: "my type",
},
source: map[string]any{
"data_stream.namespace": "my namespace",
},
expectedDataStream: &proto.DataStream{
Dataset: "my dataset",
Namespace: "my namespace",
Type: "my type",
},
},
"duplicated keys generate error": {
dataStream: &proto.DataStream{
Dataset: "my dataset",
},
source: map[string]any{
"data_stream.dataset": "another dataset",
},
wantError: true,
},
}

for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
raw := &proto.UnitExpectedConfig{
Source: requireNewStruct(t, tc.source),
DataStream: tc.dataStream,
}

final, err := deDotDataStream(raw)
if tc.wantError {
if err == nil {
t.Error("expecting an error")
}
return
}
if err != nil {
t.Fatalf("deDotDataStream returned an error: %s", err)
}

if !cmp.Equal(final, tc.expectedDataStream, protocmp.Transform()) {
t.Errorf("expecting a different value: --got/++want\n'%s'",
cmp.Diff(final, tc.expectedDataStream, protocmp.Transform()))
}
})
}
}
130 changes: 130 additions & 0 deletions x-pack/libbeat/management/managerV2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,136 @@ func TestErrorPerUnit(t *testing.T) {
}, 10*time.Second, 100*time.Millisecond, "desired state, was not reached")
}

func TestFlattenedDataStreams(t *testing.T) {
stateReached := atomic.Bool{}

expectedDataset := "my-dataset"
expectedNamespace := "my-namespace"
expectedType := "my-type"
expectedIndex := fmt.Sprintf("%s-%s-%s",
expectedType, expectedDataset, expectedNamespace)

r := reload.NewRegistry()

output := &mockOutput{
ReloadFn: func(config *reload.ConfigWithMeta) error {
return nil
},
}
r.MustRegisterOutput(output)

inputs := &mockReloadable{
ReloadFn: func(configs []*reload.ConfigWithMeta) error {
for _, input := range configs {
tmp := struct {
Index string `config:"index" yaml:"index"`
}{}

if err := input.Config.Unpack(&tmp); err != nil {
t.Fatalf("error unpacking config: %s", err)
}

if tmp.Index != expectedIndex {
t.Fatalf("expecting index %q, got %q", expectedIndex, tmp.Index)
}

stateReached.Store(true)
}
return nil
},
}
r.MustRegisterInput(inputs)

outputUnit := proto.UnitExpected{
Id: "output-unit",
Type: proto.UnitType_OUTPUT,
State: proto.State_HEALTHY,
ConfigStateIdx: 1,
LogLevel: proto.UnitLogLevel_DEBUG,
Config: &proto.UnitExpectedConfig{
Id: "default",
Type: "mock",
Name: "mock",
Source: integration.RequireNewStruct(t,
map[string]interface{}{
"Is": "this",
"required?": "Yes!",
}),
},
}

inputUnit1 := proto.UnitExpected{
Id: "input-unit1",
Type: proto.UnitType_INPUT,
State: proto.State_HEALTHY,
ConfigStateIdx: 1,
LogLevel: proto.UnitLogLevel_DEBUG,
Config: &proto.UnitExpectedConfig{
Id: "input-unit-config-id",
Type: "filestream",
Name: "foo",
Source: requireNewStruct(t, map[string]any{
"data_stream.dataset": expectedDataset,
"data_stream.namespace": expectedNamespace,
"data_stream.type": expectedType,
}),
Streams: []*proto.Stream{
{
Id: "filestream-id",
Source: integration.RequireNewStruct(t, map[string]interface{}{
"id": "input-unit1",
}),
},
},
},
}
units := []*proto.UnitExpected{
&outputUnit,
&inputUnit1,
}
server := &mock.StubServerV2{
CheckinV2Impl: func(observed *proto.CheckinObserved) *proto.CheckinExpected {
// Nothing to do here, just keep sending the same units.
return &proto.CheckinExpected{
Units: units,
}
},
ActionImpl: func(response *proto.ActionResponse) error { return nil },
}

if err := server.Start(); err != nil {
t.Fatalf("could not start mock Elastic-Agent server: %s", err)
}
defer server.Stop()

client := client.NewV2(
fmt.Sprintf(":%d", server.Port),
"",
client.VersionInfo{},
grpc.WithTransportCredentials(insecure.NewCredentials()))

m, err := NewV2AgentManagerWithClient(
&Config{
Enabled: true,
},
r,
client,
)
if err != nil {
t.Fatalf("could not instantiate ManagerV2: %s", err)
}

if err := m.Start(); err != nil {
t.Fatalf("could not start ManagerV2: %s", err)
}
defer m.Stop()

require.Eventually(t, func() bool {
return stateReached.Load()
}, 10*time.Second, 100*time.Millisecond,
"did not find expected 'index' field on input final config")
}

type reloadable struct {
mx sync.Mutex
config *reload.ConfigWithMeta
Expand Down

0 comments on commit fc0a3fd

Please sign in to comment.