Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Support flattened data_stream.* fields under Elastic-Agent" #36610

Merged
merged 1 commit into from
Sep 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Support build of projects outside of beats directory {pull}36126[36126]
- Add default cgroup regex for add_process_metadata processor {pull}36484[36484] {issue}32961[32961]
- Fix environment capture by `add_process_metadata` processor. {issue}36469[36469] {pull}36471[36471]
- Support fattened `data_stream` object when running under Elastic-Agent {pr}36516[36516]


*Auditbeat*
Expand Down
79 changes: 1 addition & 78 deletions x-pack/libbeat/management/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,8 @@
package management

import (
"errors"
"fmt"

"google.golang.org/protobuf/types/known/structpb"

"github.com/elastic/beats/v7/libbeat/common/reload"
"github.com/elastic/elastic-agent-client/v7/pkg/client"
"github.com/elastic/elastic-agent-client/v7/pkg/proto"
Expand Down Expand Up @@ -85,78 +82,10 @@ func handleSimpleConfig(raw *proto.UnitExpectedConfig) (map[string]any, error) {
return m, nil
}

// dataStreamAndSource is a generic way to represent proto mesages
// that contain a source field and a datastream field.
type dataStreamAndSource interface {
GetDataStream() *proto.DataStream
GetSource() *structpb.Struct
}

// deDotDataStream reads any datastream value from the dotted notation
// (data_stream.*) and returns it as a *proto.DataStream. If raw already
// contains a DataStream but no fields are duplicated, then the values are merged.
func deDotDataStream(raw dataStreamAndSource) (*proto.DataStream, error) {
ds := raw.GetDataStream()
if ds == nil {
ds = &proto.DataStream{}
}

tmp := struct {
DataStream struct {
Dataset string `config:"dataset" yaml:"dataset"`
Type string `config:"type" yaml:"type"`
Namespace string `config:"namespace" yaml:"namespace"`
} `config:"data_stream" yaml:"data_stream"`
}{}

cfg, err := conf.NewConfigFrom(raw.GetSource().AsMap())
if err != nil {
return nil, fmt.Errorf("cannot generate config from source field: %w", err)
}

if err := cfg.Unpack(&tmp); err != nil {
return nil, fmt.Errorf("cannot unpack source field into struct: %w", err)
}

if ds.Dataset != "" && tmp.DataStream.Dataset != "" {
return nil, errors.New("duplicated key 'datastream.dataset'")
}

if ds.Type != "" && tmp.DataStream.Type != "" {
return nil, errors.New("duplicated key 'datastream.type'")
}

if ds.Namespace != "" && tmp.DataStream.Namespace != "" {
return nil, errors.New("duplicated key 'datastream.namespace'")
}

ret := &proto.DataStream{
Dataset: merge(tmp.DataStream.Dataset, ds.Dataset),
Type: merge(tmp.DataStream.Type, ds.Type),
Namespace: merge(tmp.DataStream.Namespace, ds.Namespace),
}

return ret, nil
}

// merge returns b if a is an empty string
func merge(a, b string) string {
if a == "" {
return b
}
return a
}

// CreateInputsFromStreams breaks down the raw Expected config into an array of individual inputs/modules from the Streams values
// that can later be formatted into the reloader's ConfigWithMetaData and sent to an indvidual beat/
// This also performs the basic task of inserting module-level add_field processors into the inputs/modules.
func CreateInputsFromStreams(raw *proto.UnitExpectedConfig, defaultDataStreamType string, agentInfo *client.AgentInfo, defaultProcessors ...mapstr.M) ([]map[string]interface{}, error) {
ds, err := deDotDataStream(raw)
if err != nil {
return nil, fmt.Errorf("could not read 'data_stream': %w", err)
}
raw.DataStream = ds

// If there are no streams, we fall into the 'simple input config' case,
// this means the key configuration values are on the root level instead of
// an element in the `streams` array.
Expand All @@ -177,14 +106,8 @@ func CreateInputsFromStreams(raw *proto.UnitExpectedConfig, defaultDataStreamTyp
inputs := make([]map[string]interface{}, len(raw.GetStreams()))

for iter, stream := range raw.GetStreams() {
ds, err := deDotDataStream(stream)
if err != nil {
return nil, fmt.Errorf("could not read 'data_stream' from stream ID '%s': %w",
stream.GetId(), err)
}
stream.DataStream = ds
streamSource := raw.GetStreams()[iter].GetSource().AsMap()
streamSource, err = createStreamRules(raw, streamSource, stream, defaultDataStreamType, agentInfo, defaultProcessors...)
streamSource, err := createStreamRules(raw, streamSource, stream, defaultDataStreamType, agentInfo, defaultProcessors...)
if err != nil {
return nil, fmt.Errorf("error creating stream rules: %w", err)
}
Expand Down
84 changes: 0 additions & 84 deletions x-pack/libbeat/management/generate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,8 @@ package management
import (
"testing"

"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/testing/protocmp"
"google.golang.org/protobuf/types/known/structpb"

"github.com/elastic/elastic-agent-client/v7/pkg/client"
Expand Down Expand Up @@ -232,85 +230,3 @@ func buildConfigMap(t *testing.T, unitRaw *proto.UnitExpectedConfig, agentInfo *
require.NoError(t, err, "error in unpack for config %#v", reloadCfg[0].Config)
return cfgMap
}

func TestDeDotDataStream(t *testing.T) {
testCases := map[string]struct {
source map[string]any
dataStream *proto.DataStream
wantError bool
expectedDataStream *proto.DataStream
}{
"all data is flattened": {
source: map[string]any{
"data_stream.dataset": "my dataset",
"data_stream.namespace": "my namespace",
"data_stream.type": "my type",
},
expectedDataStream: &proto.DataStream{
Dataset: "my dataset",
Namespace: "my namespace",
Type: "my type",
},
},
"no data is flattened": {
dataStream: &proto.DataStream{
Dataset: "my dataset",
Namespace: "my namespace",
Type: "my type",
},
expectedDataStream: &proto.DataStream{
Dataset: "my dataset",
Namespace: "my namespace",
Type: "my type",
},
},
"mix of flattened and data_stream": {
dataStream: &proto.DataStream{
Dataset: "my dataset",
Type: "my type",
},
source: map[string]any{
"data_stream.namespace": "my namespace",
},
expectedDataStream: &proto.DataStream{
Dataset: "my dataset",
Namespace: "my namespace",
Type: "my type",
},
},
"duplicated keys generate error": {
dataStream: &proto.DataStream{
Dataset: "my dataset",
},
source: map[string]any{
"data_stream.dataset": "another dataset",
},
wantError: true,
},
}

for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
raw := &proto.UnitExpectedConfig{
Source: requireNewStruct(t, tc.source),
DataStream: tc.dataStream,
}

final, err := deDotDataStream(raw)
if tc.wantError {
if err == nil {
t.Error("expecting an error")
}
return
}
if err != nil {
t.Fatalf("deDotDataStream returned an error: %s", err)
}

if !cmp.Equal(final, tc.expectedDataStream, protocmp.Transform()) {
t.Errorf("expecting a different value: --got/++want\n'%s'",
cmp.Diff(final, tc.expectedDataStream, protocmp.Transform()))
}
})
}
}
130 changes: 0 additions & 130 deletions x-pack/libbeat/management/managerV2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -511,136 +511,6 @@ func TestErrorPerUnit(t *testing.T) {
}, 10*time.Second, 100*time.Millisecond, "desired state, was not reached")
}

func TestFlattenedDataStreams(t *testing.T) {
stateReached := atomic.Bool{}

expectedDataset := "my-dataset"
expectedNamespace := "my-namespace"
expectedType := "my-type"
expectedIndex := fmt.Sprintf("%s-%s-%s",
expectedType, expectedDataset, expectedNamespace)

r := reload.NewRegistry()

output := &mockOutput{
ReloadFn: func(config *reload.ConfigWithMeta) error {
return nil
},
}
r.MustRegisterOutput(output)

inputs := &mockReloadable{
ReloadFn: func(configs []*reload.ConfigWithMeta) error {
for _, input := range configs {
tmp := struct {
Index string `config:"index" yaml:"index"`
}{}

if err := input.Config.Unpack(&tmp); err != nil {
t.Fatalf("error unpacking config: %s", err)
}

if tmp.Index != expectedIndex {
t.Fatalf("expecting index %q, got %q", expectedIndex, tmp.Index)
}

stateReached.Store(true)
}
return nil
},
}
r.MustRegisterInput(inputs)

outputUnit := proto.UnitExpected{
Id: "output-unit",
Type: proto.UnitType_OUTPUT,
State: proto.State_HEALTHY,
ConfigStateIdx: 1,
LogLevel: proto.UnitLogLevel_DEBUG,
Config: &proto.UnitExpectedConfig{
Id: "default",
Type: "mock",
Name: "mock",
Source: integration.RequireNewStruct(t,
map[string]interface{}{
"Is": "this",
"required?": "Yes!",
}),
},
}

inputUnit1 := proto.UnitExpected{
Id: "input-unit1",
Type: proto.UnitType_INPUT,
State: proto.State_HEALTHY,
ConfigStateIdx: 1,
LogLevel: proto.UnitLogLevel_DEBUG,
Config: &proto.UnitExpectedConfig{
Id: "input-unit-config-id",
Type: "filestream",
Name: "foo",
Source: requireNewStruct(t, map[string]any{
"data_stream.dataset": expectedDataset,
"data_stream.namespace": expectedNamespace,
"data_stream.type": expectedType,
}),
Streams: []*proto.Stream{
{
Id: "filestream-id",
Source: integration.RequireNewStruct(t, map[string]interface{}{
"id": "input-unit1",
}),
},
},
},
}
units := []*proto.UnitExpected{
&outputUnit,
&inputUnit1,
}
server := &mock.StubServerV2{
CheckinV2Impl: func(observed *proto.CheckinObserved) *proto.CheckinExpected {
// Nothing to do here, just keep sending the same units.
return &proto.CheckinExpected{
Units: units,
}
},
ActionImpl: func(response *proto.ActionResponse) error { return nil },
}

if err := server.Start(); err != nil {
t.Fatalf("could not start mock Elastic-Agent server: %s", err)
}
defer server.Stop()

client := client.NewV2(
fmt.Sprintf(":%d", server.Port),
"",
client.VersionInfo{},
grpc.WithTransportCredentials(insecure.NewCredentials()))

m, err := NewV2AgentManagerWithClient(
&Config{
Enabled: true,
},
r,
client,
)
if err != nil {
t.Fatalf("could not instantiate ManagerV2: %s", err)
}

if err := m.Start(); err != nil {
t.Fatalf("could not start ManagerV2: %s", err)
}
defer m.Stop()

require.Eventually(t, func() bool {
return stateReached.Load()
}, 10*time.Second, 100*time.Millisecond,
"did not find expected 'index' field on input final config")
}

type reloadable struct {
mx sync.Mutex
config *reload.ConfigWithMeta
Expand Down
Loading