Skip to content

Commit

Permalink
some refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
KalmanMeth committed Nov 2, 2023
1 parent e9d8fb2 commit a2aaf87
Show file tree
Hide file tree
Showing 7 changed files with 900 additions and 3 deletions.
42 changes: 42 additions & 0 deletions pkg/api/encode_otlp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright (C) 2022 IBM, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package api

type EncodeOtlpLogs struct {
*OtlpConnectionInfo
}

type EncodeOtlpTraces struct {
*OtlpConnectionInfo
}

type EncodeOtlpMetrics struct {
*OtlpConnectionInfo
Prefix string `yaml:"prefix,omitempty" json:"prefix,omitempty" doc:"prefix added to each metric name"`
Metrics MetricsItems `yaml:"metrics,omitempty" json:"metrics,omitempty" doc:"list of metric definitions, each includes:"`
PushTimeInterval Duration `yaml:"pushTimeInterval,omitempty" json:"pushTimeInterval,omitempty" doc:"how often should metrics be sent to collector:"`
ExpiryTime Duration `yaml:"expiryTime,omitempty" json:"expiryTime,omitempty" doc:"time duration of no-flow to wait before deleting data item"`
}

type OtlpConnectionInfo struct {
Address string `yaml:"address,omitempty" json:"address,omitempty" doc:"endpoint address to expose"`
Port int `yaml:"port,omitempty" json:"port,omitempty" doc:"endpoint port number to expose"`
ConnectionType string `yaml:"connectionType,omitempty" json:"connectionType,omitempty" doc:"interface mechanism: either http or grpc"`
TLS *ClientTLS `yaml:"tls,omitempty" json:"tls,omitempty" doc:"TLS configuration for the endpoint"`
Headers map[string]string `yaml:"headers,omitempty" json:"headers,omitempty" doc:"headers to add to messages (optional)"`
}
6 changes: 3 additions & 3 deletions pkg/pipeline/encode/encode_prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (e *EncodeProm) prepareMetric(flow config.GenericMap, info *api.MetricsItem
return nil, 0
}

entryLabels, key := e.extractLabelsAndKey(flow, info)
entryLabels, key := ExtractLabelsAndKey(flow, info)
// Update entry for expiry mechanism (the entry itself is its own cleanup function)
_, ok := e.mCache.UpdateCacheEntry(key, func() { m.Delete(entryLabels) })
if !ok {
Expand All @@ -197,7 +197,7 @@ func (e *EncodeProm) prepareAggHisto(flow config.GenericMap, info *api.MetricsIt
return nil, nil
}

entryLabels, key := e.extractLabelsAndKey(flow, info)
entryLabels, key := ExtractLabelsAndKey(flow, info)
// Update entry for expiry mechanism (the entry itself is its own cleanup function)
_, ok = e.mCache.UpdateCacheEntry(key, func() { m.Delete(entryLabels) })
if !ok {
Expand Down Expand Up @@ -243,7 +243,7 @@ func (e *EncodeProm) extractGenericValue(flow config.GenericMap, info *api.Metri
return val
}

func (e *EncodeProm) extractLabelsAndKey(flow config.GenericMap, info *api.MetricsItem) (map[string]string, string) {
func ExtractLabelsAndKey(flow config.GenericMap, info *api.MetricsItem) (map[string]string, string) {
entryLabels := make(map[string]string, len(info.Labels))
key := strings.Builder{}
key.WriteString(info.Name)
Expand Down
158 changes: 158 additions & 0 deletions pkg/pipeline/encode/opentelemetry/encode_otlp_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
* Copyright (C) 2023 IBM, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package opentelemetry

import (
"encoding/json"
"testing"

otel "github.com/agoda-com/opentelemetry-logs-go"
"github.com/agoda-com/opentelemetry-logs-go/logs"
"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/operational"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/encode"
"github.com/netobserv/flowlogs-pipeline/pkg/test"
"github.com/stretchr/testify/require"
)

const testOtlpConfig = `---
log-level: debug
pipeline:
- name: encode1
parameters:
- name: encode1
encode:
type: otlplogs
otlplogs:
address: localhost
port: 4317
connectionType: grpc
`

type fakeOltpLoggerProvider struct {
}
type fakeOltpLogger struct {
}

var receivedData []logs.LogRecord

func (f *fakeOltpLogger) Emit(msg logs.LogRecord) {
receivedData = append(receivedData, msg)
}

func (f *fakeOltpLoggerProvider) Logger(name string, options ...logs.LoggerOption) logs.Logger {
return &fakeOltpLogger{}
}

func initNewEncodeOltp(t *testing.T) encode.Encoder {
receivedData = []logs.LogRecord{}
v, cfg := test.InitConfig(t, testOtlpConfig)
require.NotNil(t, v)

newEncode, err := NewEncodeOtlpLogs(operational.NewMetrics(&config.MetricsSettings{}), cfg.Parameters[0])
require.NoError(t, err)

// intercept the loggerProvider function
loggerProvider := fakeOltpLoggerProvider{}
otel.SetLoggerProvider(&loggerProvider)
return newEncode
}

func Test_EncodeOtlp(t *testing.T) {
newEncode := initNewEncodeOltp(t)
encodeOtlp := newEncode.(*EncodeOtlpLogs)
require.Equal(t, "localhost", encodeOtlp.cfg.OtlpConnectionInfo.Address)
require.Equal(t, 4317, encodeOtlp.cfg.OtlpConnectionInfo.Port)
require.Equal(t, "grpc", encodeOtlp.cfg.ConnectionType)
require.Nil(t, encodeOtlp.cfg.TLS)
require.Empty(t, encodeOtlp.cfg.Headers)

entry1 := test.GetIngestMockEntry(true)
entry2 := test.GetIngestMockEntry(false)
newEncode.Encode(entry1)
newEncode.Encode(entry2)
// verify contents of the output
require.Len(t, receivedData, 2)
expected1, _ := json.Marshal(entry1)
expected2, _ := json.Marshal(entry2)
require.Equal(t, string(expected1), *receivedData[0].Body())
require.Equal(t, string(expected2), *receivedData[1].Body())
}

func Test_TLSConfigEmpty(t *testing.T) {
cfg := config.StageParam{
Encode: &config.Encode{
OtlpLogs: &api.EncodeOtlpLogs{OtlpConnectionInfo: &api.OtlpConnectionInfo{
Address: "1.2.3.4",
Port: 999,
TLS: nil,
ConnectionType: "grpc",
Headers: nil,
}}},
}
newEncode, err := NewEncodeOtlpLogs(operational.NewMetrics(&config.MetricsSettings{}), cfg)
require.NoError(t, err)
require.NotNil(t, newEncode)
}

func Test_TLSConfigNotEmpty(t *testing.T) {
cfg := config.StageParam{
Encode: &config.Encode{
OtlpLogs: &api.EncodeOtlpLogs{OtlpConnectionInfo: &api.OtlpConnectionInfo{
Address: "1.2.3.4",
Port: 999,
ConnectionType: "grpc",
TLS: &api.ClientTLS{InsecureSkipVerify: true},
}}},
}
newEncode, err := NewEncodeOtlpLogs(operational.NewMetrics(&config.MetricsSettings{}), cfg)
require.NoError(t, err)
require.NotNil(t, newEncode)
}

func Test_HeadersNotEmpty(t *testing.T) {
headers := make(map[string]string)
headers["key1"] = "value1"
headers["key2"] = "value2"
cfg := config.StageParam{
Encode: &config.Encode{
OtlpLogs: &api.EncodeOtlpLogs{OtlpConnectionInfo: &api.OtlpConnectionInfo{
Address: "1.2.3.4",
Port: 999,
ConnectionType: "grpc",
Headers: headers,
}}},
}
newEncode, err := NewEncodeOtlpLogs(operational.NewMetrics(&config.MetricsSettings{}), cfg)
require.NoError(t, err)
require.NotNil(t, newEncode)
}

func Test_NoConnectionType(t *testing.T) {
cfg := config.StageParam{
Encode: &config.Encode{
OtlpLogs: &api.EncodeOtlpLogs{OtlpConnectionInfo: &api.OtlpConnectionInfo{
Address: "1.2.3.4",
Port: 999,
}}},
}
newEncode, err := NewEncodeOtlpLogs(operational.NewMetrics(&config.MetricsSettings{}), cfg)
require.Error(t, err)
require.Nil(t, newEncode)
}
69 changes: 69 additions & 0 deletions pkg/pipeline/encode/opentelemetry/encode_otlplogs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright (C) 2023 IBM, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package opentelemetry

import (
"context"

sdklog "github.com/agoda-com/opentelemetry-logs-go/sdk/logs"
"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/operational"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/encode"
log "github.com/sirupsen/logrus"
"go.opentelemetry.io/otel/sdk/resource"
)

type EncodeOtlpLogs struct {
cfg api.EncodeOtlpLogs
ctx context.Context
res *resource.Resource
lp *sdklog.LoggerProvider
}

// Encode encodes a log entry to be exported
func (e *EncodeOtlpLogs) Encode(entry config.GenericMap) {
log.Tracef("entering EncodeOtlpLogs. entry = %v", entry)
e.LogWrite(entry)
}

func NewEncodeOtlpLogs(opMetrics *operational.Metrics, params config.StageParam) (encode.Encoder, error) {
log.Tracef("entering NewEncodeOtlpLogs \n")
cfg := api.EncodeOtlpLogs{}
if params.Encode != nil && params.Encode.OtlpLogs != nil {
cfg = *params.Encode.OtlpLogs
}
log.Debugf("NewEncodeOtlpLogs cfg = %v \n", cfg)

ctx := context.Background()
res := newResource()

lp, err := NewOtlpLoggerProvider(ctx, params, res)
if err != nil {
log.Fatal(err)
return nil, err
}

w := &EncodeOtlpLogs{
cfg: cfg,
ctx: ctx,
res: res,
lp: lp,
}
return w, nil
}
Loading

0 comments on commit a2aaf87

Please sign in to comment.