diff --git a/pkg/api/encode_otlp.go b/pkg/api/encode_otlp.go new file mode 100644 index 000000000..3f5fcfbdf --- /dev/null +++ b/pkg/api/encode_otlp.go @@ -0,0 +1,42 @@ +/* + * Copyright (C) 2022 IBM, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package api + +type EncodeOtlpLogs struct { + *OtlpConnectionInfo +} + +type EncodeOtlpTraces struct { + *OtlpConnectionInfo +} + +type EncodeOtlpMetrics struct { + *OtlpConnectionInfo + Prefix string `yaml:"prefix,omitempty" json:"prefix,omitempty" doc:"prefix added to each metric name"` + Metrics MetricsItems `yaml:"metrics,omitempty" json:"metrics,omitempty" doc:"list of metric definitions, each includes:"` + PushTimeInterval Duration `yaml:"pushTimeInterval,omitempty" json:"pushTimeInterval,omitempty" doc:"how often should metrics be sent to collector:"` + ExpiryTime Duration `yaml:"expiryTime,omitempty" json:"expiryTime,omitempty" doc:"time duration of no-flow to wait before deleting data item"` +} + +type OtlpConnectionInfo struct { + Address string `yaml:"address,omitempty" json:"address,omitempty" doc:"endpoint address to expose"` + Port int `yaml:"port,omitempty" json:"port,omitempty" doc:"endpoint port number to expose"` + ConnectionType string `yaml:"connectionType,omitempty" json:"connectionType,omitempty" doc:"interface mechanism: either http or grpc"` + TLS *ClientTLS `yaml:"tls,omitempty" json:"tls,omitempty" doc:"TLS configuration for the endpoint"` + Headers map[string]string `yaml:"headers,omitempty" json:"headers,omitempty" doc:"headers to add to messages (optional)"` +} diff --git a/pkg/pipeline/encode/encode_prom.go b/pkg/pipeline/encode/encode_prom.go index 6350b9039..7baf05102 100644 --- a/pkg/pipeline/encode/encode_prom.go +++ b/pkg/pipeline/encode/encode_prom.go @@ -176,7 +176,7 @@ func (e *EncodeProm) prepareMetric(flow config.GenericMap, info *api.MetricsItem return nil, 0 } - entryLabels, key := e.extractLabelsAndKey(flow, info) + entryLabels, key := ExtractLabelsAndKey(flow, info) // Update entry for expiry mechanism (the entry itself is its own cleanup function) _, ok := e.mCache.UpdateCacheEntry(key, func() { m.Delete(entryLabels) }) if !ok { @@ -197,7 +197,7 @@ func (e *EncodeProm) prepareAggHisto(flow config.GenericMap, info *api.MetricsIt return nil, nil } - entryLabels, key := e.extractLabelsAndKey(flow, info) + entryLabels, key := ExtractLabelsAndKey(flow, info) // Update entry for expiry mechanism (the entry itself is its own cleanup function) _, ok = e.mCache.UpdateCacheEntry(key, func() { m.Delete(entryLabels) }) if !ok { @@ -243,7 +243,7 @@ func (e *EncodeProm) extractGenericValue(flow config.GenericMap, info *api.Metri return val } -func (e *EncodeProm) extractLabelsAndKey(flow config.GenericMap, info *api.MetricsItem) (map[string]string, string) { +func ExtractLabelsAndKey(flow config.GenericMap, info *api.MetricsItem) (map[string]string, string) { entryLabels := make(map[string]string, len(info.Labels)) key := strings.Builder{} key.WriteString(info.Name) diff --git a/pkg/pipeline/encode/opentelemetry/encode_otlp_test.go b/pkg/pipeline/encode/opentelemetry/encode_otlp_test.go new file mode 100644 index 000000000..7126dda90 --- /dev/null +++ b/pkg/pipeline/encode/opentelemetry/encode_otlp_test.go @@ -0,0 +1,158 @@ +/* + * Copyright (C) 2023 IBM, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package opentelemetry + +import ( + "encoding/json" + "testing" + + otel "github.com/agoda-com/opentelemetry-logs-go" + "github.com/agoda-com/opentelemetry-logs-go/logs" + "github.com/netobserv/flowlogs-pipeline/pkg/api" + "github.com/netobserv/flowlogs-pipeline/pkg/config" + "github.com/netobserv/flowlogs-pipeline/pkg/operational" + "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/encode" + "github.com/netobserv/flowlogs-pipeline/pkg/test" + "github.com/stretchr/testify/require" +) + +const testOtlpConfig = `--- +log-level: debug +pipeline: + - name: encode1 +parameters: + - name: encode1 + encode: + type: otlplogs + otlplogs: + address: localhost + port: 4317 + connectionType: grpc +` + +type fakeOltpLoggerProvider struct { +} +type fakeOltpLogger struct { +} + +var receivedData []logs.LogRecord + +func (f *fakeOltpLogger) Emit(msg logs.LogRecord) { + receivedData = append(receivedData, msg) +} + +func (f *fakeOltpLoggerProvider) Logger(name string, options ...logs.LoggerOption) logs.Logger { + return &fakeOltpLogger{} +} + +func initNewEncodeOltp(t *testing.T) encode.Encoder { + receivedData = []logs.LogRecord{} + v, cfg := test.InitConfig(t, testOtlpConfig) + require.NotNil(t, v) + + newEncode, err := NewEncodeOtlpLogs(operational.NewMetrics(&config.MetricsSettings{}), cfg.Parameters[0]) + require.NoError(t, err) + + // intercept the loggerProvider function + loggerProvider := fakeOltpLoggerProvider{} + otel.SetLoggerProvider(&loggerProvider) + return newEncode +} + +func Test_EncodeOtlp(t *testing.T) { + newEncode := initNewEncodeOltp(t) + encodeOtlp := newEncode.(*EncodeOtlpLogs) + require.Equal(t, "localhost", encodeOtlp.cfg.OtlpConnectionInfo.Address) + require.Equal(t, 4317, encodeOtlp.cfg.OtlpConnectionInfo.Port) + require.Equal(t, "grpc", encodeOtlp.cfg.ConnectionType) + require.Nil(t, encodeOtlp.cfg.TLS) + require.Empty(t, encodeOtlp.cfg.Headers) + + entry1 := test.GetIngestMockEntry(true) + entry2 := test.GetIngestMockEntry(false) + newEncode.Encode(entry1) + newEncode.Encode(entry2) + // verify contents of the output + require.Len(t, receivedData, 2) + expected1, _ := json.Marshal(entry1) + expected2, _ := json.Marshal(entry2) + require.Equal(t, string(expected1), *receivedData[0].Body()) + require.Equal(t, string(expected2), *receivedData[1].Body()) +} + +func Test_TLSConfigEmpty(t *testing.T) { + cfg := config.StageParam{ + Encode: &config.Encode{ + OtlpLogs: &api.EncodeOtlpLogs{OtlpConnectionInfo: &api.OtlpConnectionInfo{ + Address: "1.2.3.4", + Port: 999, + TLS: nil, + ConnectionType: "grpc", + Headers: nil, + }}}, + } + newEncode, err := NewEncodeOtlpLogs(operational.NewMetrics(&config.MetricsSettings{}), cfg) + require.NoError(t, err) + require.NotNil(t, newEncode) +} + +func Test_TLSConfigNotEmpty(t *testing.T) { + cfg := config.StageParam{ + Encode: &config.Encode{ + OtlpLogs: &api.EncodeOtlpLogs{OtlpConnectionInfo: &api.OtlpConnectionInfo{ + Address: "1.2.3.4", + Port: 999, + ConnectionType: "grpc", + TLS: &api.ClientTLS{InsecureSkipVerify: true}, + }}}, + } + newEncode, err := NewEncodeOtlpLogs(operational.NewMetrics(&config.MetricsSettings{}), cfg) + require.NoError(t, err) + require.NotNil(t, newEncode) +} + +func Test_HeadersNotEmpty(t *testing.T) { + headers := make(map[string]string) + headers["key1"] = "value1" + headers["key2"] = "value2" + cfg := config.StageParam{ + Encode: &config.Encode{ + OtlpLogs: &api.EncodeOtlpLogs{OtlpConnectionInfo: &api.OtlpConnectionInfo{ + Address: "1.2.3.4", + Port: 999, + ConnectionType: "grpc", + Headers: headers, + }}}, + } + newEncode, err := NewEncodeOtlpLogs(operational.NewMetrics(&config.MetricsSettings{}), cfg) + require.NoError(t, err) + require.NotNil(t, newEncode) +} + +func Test_NoConnectionType(t *testing.T) { + cfg := config.StageParam{ + Encode: &config.Encode{ + OtlpLogs: &api.EncodeOtlpLogs{OtlpConnectionInfo: &api.OtlpConnectionInfo{ + Address: "1.2.3.4", + Port: 999, + }}}, + } + newEncode, err := NewEncodeOtlpLogs(operational.NewMetrics(&config.MetricsSettings{}), cfg) + require.Error(t, err) + require.Nil(t, newEncode) +} diff --git a/pkg/pipeline/encode/opentelemetry/encode_otlplogs.go b/pkg/pipeline/encode/opentelemetry/encode_otlplogs.go new file mode 100644 index 000000000..c849625ca --- /dev/null +++ b/pkg/pipeline/encode/opentelemetry/encode_otlplogs.go @@ -0,0 +1,69 @@ +/* + * Copyright (C) 2023 IBM, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package opentelemetry + +import ( + "context" + + sdklog "github.com/agoda-com/opentelemetry-logs-go/sdk/logs" + "github.com/netobserv/flowlogs-pipeline/pkg/api" + "github.com/netobserv/flowlogs-pipeline/pkg/config" + "github.com/netobserv/flowlogs-pipeline/pkg/operational" + "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/encode" + log "github.com/sirupsen/logrus" + "go.opentelemetry.io/otel/sdk/resource" +) + +type EncodeOtlpLogs struct { + cfg api.EncodeOtlpLogs + ctx context.Context + res *resource.Resource + lp *sdklog.LoggerProvider +} + +// Encode encodes a log entry to be exported +func (e *EncodeOtlpLogs) Encode(entry config.GenericMap) { + log.Tracef("entering EncodeOtlpLogs. entry = %v", entry) + e.LogWrite(entry) +} + +func NewEncodeOtlpLogs(opMetrics *operational.Metrics, params config.StageParam) (encode.Encoder, error) { + log.Tracef("entering NewEncodeOtlpLogs \n") + cfg := api.EncodeOtlpLogs{} + if params.Encode != nil && params.Encode.OtlpLogs != nil { + cfg = *params.Encode.OtlpLogs + } + log.Debugf("NewEncodeOtlpLogs cfg = %v \n", cfg) + + ctx := context.Background() + res := newResource() + + lp, err := NewOtlpLoggerProvider(ctx, params, res) + if err != nil { + log.Fatal(err) + return nil, err + } + + w := &EncodeOtlpLogs{ + cfg: cfg, + ctx: ctx, + res: res, + lp: lp, + } + return w, nil +} diff --git a/pkg/pipeline/encode/opentelemetry/encode_otlpmetrics.go b/pkg/pipeline/encode/opentelemetry/encode_otlpmetrics.go new file mode 100644 index 000000000..2c871fc94 --- /dev/null +++ b/pkg/pipeline/encode/opentelemetry/encode_otlpmetrics.go @@ -0,0 +1,219 @@ +/* + * Copyright (C) 2023 IBM, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package opentelemetry + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/netobserv/flowlogs-pipeline/pkg/api" + "github.com/netobserv/flowlogs-pipeline/pkg/config" + "github.com/netobserv/flowlogs-pipeline/pkg/operational" + "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/encode" + putils "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils" + "github.com/netobserv/flowlogs-pipeline/pkg/utils" + log "github.com/sirupsen/logrus" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/metric" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/resource" + "k8s.io/utils/strings/slices" +) + +const ( + defaultExpiryTime = time.Duration(2 * time.Minute) + flpMeterName = "flp_meter" +) + +type counterInfo struct { + counter *metric.Float64Counter + info api.MetricsItem +} + +type gaugeInfo struct { + gauge *metric.Float64ObservableGauge + info api.MetricsItem +} + +type histoInfo struct { + histo *metric.Float64Histogram + info api.MetricsItem +} + +type EncodeOtlpMetrics struct { + cfg api.EncodeOtlpMetrics + ctx context.Context + res *resource.Resource + mp *sdkmetric.MeterProvider + counters []counterInfo + gauges []gaugeInfo + histos []histoInfo + expiryTime time.Duration + mCache *putils.TimedCache + exitChan <-chan struct{} +} + +// Encode encodes a metric to be exported +func (e *EncodeOtlpMetrics) Encode(metricRecord config.GenericMap) { + log.Tracef("entering EncodeOtlpMetrics. entry = %v", metricRecord) + + // Process counters + for _, mInfo := range e.counters { + labels, value := e.prepareMetric(metricRecord, &mInfo.info, mInfo.counter) + if labels == nil { + continue + } + // set attributes using the labels + attributes := obtainAttributesFromLabels(labels) + (*mInfo.counter).Add(e.ctx, value, metric.WithAttributes(attributes...)) + } +} + +func (e *EncodeOtlpMetrics) prepareMetric(flow config.GenericMap, info *api.MetricsItem, m *metric.Float64Counter) (map[string]string, float64) { + val := e.extractGenericValue(flow, info) + if val == nil { + return nil, 0 + } + floatVal, err := utils.ConvertToFloat64(val) + if err != nil { + return nil, 0 + } + + entryLabels, key := encode.ExtractLabelsAndKey(flow, info) + // Update entry for expiry mechanism (the entry itself is its own cleanup function) + _, ok := e.mCache.UpdateCacheEntry(key, entryLabels) + if !ok { + return nil, 0 + } + return entryLabels, floatVal +} + +func (e *EncodeOtlpMetrics) extractGenericValue(flow config.GenericMap, info *api.MetricsItem) interface{} { + for _, filter := range info.GetFilters() { + val, found := flow[filter.Key] + switch filter.Value { + case "nil": + if found { + return nil + } + case "!nil": + if !found { + return nil + } + default: + if found { + sVal, ok := val.(string) + if !ok { + sVal = fmt.Sprint(val) + } + if !slices.Contains(strings.Split(filter.Value, "|"), sVal) { + return nil + } + } + } + } + if info.ValueKey == "" { + // No value key means it's a records / flows counter (1 flow = 1 increment), so just return 1 + return 1 + } + val, found := flow[info.ValueKey] + if !found { + return nil + } + return val +} + +func NewEncodeOtlpMetrics(opMetrics *operational.Metrics, params config.StageParam) (encode.Encoder, error) { + log.Tracef("entering NewEncodeOtlpMetrics \n") + cfg := api.EncodeOtlpMetrics{} + if params.Encode != nil && params.Encode.OtlpMetrics != nil { + cfg = *params.Encode.OtlpMetrics + } + log.Debugf("NewEncodeOtlpMetrics cfg = %v \n", cfg) + + ctx := context.Background() + res := newResource() + + mp, err := NewOtlpMetricsProvider(ctx, params, res) + if err != nil { + log.Fatal(err) + return nil, err + } + + expiryTime := cfg.ExpiryTime + if expiryTime.Duration == 0 { + expiryTime.Duration = defaultExpiryTime + } + + meter := otel.Meter(flpMeterName) + counters := []counterInfo{} + for _, mInfo := range cfg.Metrics { + fullMetricName := cfg.Prefix + mInfo.Name + labels := mInfo.Labels + log.Debugf("fullMetricName = %v", fullMetricName) + log.Debugf("Labels = %v", labels) + switch mInfo.Type { + case api.PromEncodeOperationName("Counter"): + counter, err := meter.Float64Counter(fullMetricName) + if err != nil { + log.Errorf("error during counter creation: %v", err) + return nil, err + } + counters = append(counters, counterInfo{ + counter: &counter, + info: mInfo, + }) + case "default": + log.Errorf("invalid metric type = %v, skipping", mInfo.Type) + continue + } + } + + w := &EncodeOtlpMetrics{ + cfg: cfg, + ctx: ctx, + res: res, + mp: mp, + counters: counters, + expiryTime: expiryTime.Duration, + mCache: putils.NewTimedCache(0, nil), + exitChan: putils.ExitChannel(), + } + go w.cleanupExpiredEntriesLoop() + return w, nil +} + +// callback function from lru cleanup +func (e *EncodeOtlpMetrics) Cleanup(cleanupFunc interface{}) { + // nothing more to do +} + +func (e *EncodeOtlpMetrics) cleanupExpiredEntriesLoop() { + ticker := time.NewTicker(e.expiryTime) + for { + select { + case <-e.exitChan: + log.Debugf("exiting cleanupExpiredEntriesLoop because of signal") + return + case <-ticker.C: + e.mCache.CleanupExpiredEntries(e.expiryTime, e.Cleanup) + } + } +} diff --git a/pkg/pipeline/encode/opentelemetry/encode_otlptrace.go b/pkg/pipeline/encode/opentelemetry/encode_otlptrace.go new file mode 100644 index 000000000..f8d2fb55c --- /dev/null +++ b/pkg/pipeline/encode/opentelemetry/encode_otlptrace.go @@ -0,0 +1,82 @@ +/* + * Copyright (C) 2023 IBM, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package opentelemetry + +import ( + "context" + + "github.com/netobserv/flowlogs-pipeline/pkg/api" + "github.com/netobserv/flowlogs-pipeline/pkg/config" + "github.com/netobserv/flowlogs-pipeline/pkg/operational" + "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/encode" + log "github.com/sirupsen/logrus" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/sdk/resource" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/trace" +) + +const ( + flpTracerName = "flp_tracer" + flpEncodeSpanName = "flp_encode" +) + +type EncodeOtlpTrace struct { + cfg api.EncodeOtlpTraces + ctx context.Context + res *resource.Resource + tp *sdktrace.TracerProvider + tracer trace.Tracer +} + +// Encode encodes a metric to be exported +func (e *EncodeOtlpTrace) Encode(metricRecord config.GenericMap) { + log.Tracef("entering EncodeOtlpTrace. entry = %v", metricRecord) + _, span := e.tracer.Start(e.ctx, flpEncodeSpanName) + defer span.End() + attributes := obtainAttributesFromEntry(metricRecord) + span.SetAttributes(*attributes...) +} + +func NewEncodeOtlpTraces(opMetrics *operational.Metrics, params config.StageParam) (encode.Encoder, error) { + log.Tracef("entering NewEncodeOtlpTraces \n") + cfg := api.EncodeOtlpTraces{} + if params.Encode != nil && params.Encode.OtlpTraces != nil { + cfg = *params.Encode.OtlpTraces + } + log.Debugf("NewEncodeOtlpTraces cfg = %v \n", cfg) + + ctx := context.Background() + res := newResource() + tracer := otel.Tracer(flpTracerName) + + tp, err := NewOtlpTracerProvider(ctx, params, res) + if err != nil { + log.Fatal(err) + return nil, err + } + + w := &EncodeOtlpTrace{ + cfg: cfg, + ctx: ctx, + res: res, + tp: tp, + tracer: tracer, + } + return w, nil +} diff --git a/pkg/pipeline/encode/opentelemetry/opentelemetry.go b/pkg/pipeline/encode/opentelemetry/opentelemetry.go new file mode 100644 index 000000000..f84f546c0 --- /dev/null +++ b/pkg/pipeline/encode/opentelemetry/opentelemetry.go @@ -0,0 +1,327 @@ +/* + * Copyright (C) 2023 IBM, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package opentelemetry + +import ( + "context" + "encoding/json" + "fmt" + "time" + + otel2 "github.com/agoda-com/opentelemetry-logs-go" + "github.com/agoda-com/opentelemetry-logs-go/exporters/otlp/otlplogs" + "github.com/agoda-com/opentelemetry-logs-go/exporters/otlp/otlplogs/otlplogsgrpc" + "github.com/agoda-com/opentelemetry-logs-go/exporters/otlp/otlplogs/otlplogshttp" + "github.com/agoda-com/opentelemetry-logs-go/logs" + sdklog2 "github.com/agoda-com/opentelemetry-logs-go/sdk/logs" + "github.com/netobserv/flowlogs-pipeline/pkg/api" + "github.com/netobserv/flowlogs-pipeline/pkg/config" + "github.com/netobserv/flowlogs-pipeline/pkg/utils" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/resource" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + semconv "go.opentelemetry.io/otel/semconv/v1.21.0" + "google.golang.org/grpc/credentials" +) + +// Note: +// As of the writing of this module, go.opentelemetry.io does not provide interfaces for logs. +// We therefore temporarily use agoda-com/opentelemetry-logs-go for logs. +// When go.opentelemetry.io provides interfaces for logs, the code here should be updated to use those interfaces. + +const ( + flpOtlpLoggerName = "flp-otlp-logger" + defaultTimeInterval = time.Duration(20 * time.Second) + flpOtlpResourceVersion = "v0.1.0" + flpOtlpResourceName = "flp-otlp" + grpcType = "grpc" + httpType = "http" +) + +func NewOtlpTracerProvider(ctx context.Context, params config.StageParam, res *resource.Resource) (*sdktrace.TracerProvider, error) { + cfg := api.EncodeOtlpTraces{} + if params.Encode != nil && params.Encode.OtlpTraces != nil { + cfg = *params.Encode.OtlpTraces + } + if cfg.OtlpConnectionInfo == nil { + return nil, fmt.Errorf("otlptraces missing connection ino") + } + addr := fmt.Sprintf("%s:%v", cfg.OtlpConnectionInfo.Address, cfg.OtlpConnectionInfo.Port) + var err error + var traceProvider *sdktrace.TracerProvider + var traceExporter *otlptrace.Exporter + if cfg.ConnectionType == grpcType { + var expOption otlptracegrpc.Option + var tlsOption otlptracegrpc.Option + tlsOption = otlptracegrpc.WithInsecure() + if cfg.TLS != nil { + tlsConfig, err := cfg.OtlpConnectionInfo.TLS.Build() + if err != nil { + return nil, err + } + tlsOption = otlptracegrpc.WithTLSCredentials(credentials.NewTLS(tlsConfig)) + } + expOption = otlptracegrpc.WithEndpoint(addr) + traceExporter, err = otlptracegrpc.New(ctx, + expOption, + tlsOption, + otlptracegrpc.WithHeaders(cfg.Headers)) + if err != nil { + return nil, err + } + } else if cfg.ConnectionType == httpType { + var expOption otlptracehttp.Option + var tlsOption otlptracehttp.Option + tlsOption = otlptracehttp.WithInsecure() + if cfg.TLS != nil { + tlsConfig, err := cfg.TLS.Build() + if err != nil { + return nil, err + } + tlsOption = otlptracehttp.WithTLSClientConfig(tlsConfig) + } + expOption = otlptracehttp.WithEndpoint(addr) + traceExporter, err = otlptracehttp.New(ctx, + expOption, + tlsOption, + otlptracehttp.WithHeaders(cfg.Headers)) + if err != nil { + return nil, err + } + } else { + return nil, fmt.Errorf("must specify grpcaddress or httpaddress") + } + traceProvider = sdktrace.NewTracerProvider( + sdktrace.WithResource(res), + sdktrace.WithSpanProcessor(sdktrace.NewBatchSpanProcessor(traceExporter)), + ) + + otel.SetTracerProvider(traceProvider) + return traceProvider, nil +} + +func NewOtlpMetricsProvider(ctx context.Context, params config.StageParam, res *resource.Resource) (*sdkmetric.MeterProvider, error) { + cfg := api.EncodeOtlpMetrics{} + if params.Encode != nil && params.Encode.OtlpMetrics != nil { + cfg = *params.Encode.OtlpMetrics + } + timeInterval := cfg.PushTimeInterval + if timeInterval.Duration == 0 { + timeInterval.Duration = defaultTimeInterval + } + if cfg.OtlpConnectionInfo == nil { + return nil, fmt.Errorf("otlptraces missing connection ino") + } + addr := fmt.Sprintf("%s:%v", cfg.OtlpConnectionInfo.Address, cfg.OtlpConnectionInfo.Port) + var err error + var meterProvider *sdkmetric.MeterProvider + if cfg.ConnectionType == grpcType { + var metricExporter *otlpmetricgrpc.Exporter + var expOption otlpmetricgrpc.Option + var tlsOption otlpmetricgrpc.Option + tlsOption = otlpmetricgrpc.WithInsecure() + if cfg.TLS != nil { + tlsConfig, err := cfg.TLS.Build() + if err != nil { + return nil, err + } + tlsOption = otlpmetricgrpc.WithTLSCredentials(credentials.NewTLS(tlsConfig)) + } + expOption = otlpmetricgrpc.WithEndpoint(addr) + metricExporter, err = otlpmetricgrpc.New(ctx, expOption, tlsOption) + if err != nil { + return nil, err + } + meterProvider = sdkmetric.NewMeterProvider( + sdkmetric.WithResource(res), + sdkmetric.WithReader(sdkmetric.NewPeriodicReader(metricExporter, + sdkmetric.WithInterval(timeInterval.Duration))), + ) + } else if cfg.ConnectionType == httpType { + var metricExporter *otlpmetrichttp.Exporter + var expOption otlpmetrichttp.Option + var tlsOption otlpmetrichttp.Option + tlsOption = otlpmetrichttp.WithInsecure() + if cfg.TLS != nil { + tlsConfig, err := cfg.TLS.Build() + if err != nil { + return nil, err + } + tlsOption = otlpmetrichttp.WithTLSClientConfig(tlsConfig) + } + expOption = otlpmetrichttp.WithEndpoint(addr) + metricExporter, err = otlpmetrichttp.New(ctx, expOption, tlsOption) + if err != nil { + return nil, err + } + meterProvider = sdkmetric.NewMeterProvider( + sdkmetric.WithResource(res), + sdkmetric.WithReader(sdkmetric.NewPeriodicReader(metricExporter, + sdkmetric.WithInterval(timeInterval.Duration))), + ) + } else { + return nil, fmt.Errorf("must specify grpcaddress or httpaddress") + } + + otel.SetMeterProvider(meterProvider) + return meterProvider, nil +} + +func NewOtlpLoggerProvider(ctx context.Context, params config.StageParam, res *resource.Resource) (*sdklog2.LoggerProvider, error) { + cfg := api.EncodeOtlpLogs{} + if params.Encode != nil && params.Encode.OtlpLogs != nil { + cfg = *params.Encode.OtlpLogs + } + if cfg.OtlpConnectionInfo == nil { + return nil, fmt.Errorf("otlptraces missing connection ino") + } + addr := fmt.Sprintf("%s:%v", cfg.OtlpConnectionInfo.Address, cfg.OtlpConnectionInfo.Port) + var expOption otlplogs.ExporterOption + if cfg.ConnectionType == grpcType { + var tlsOption otlplogsgrpc.Option + tlsOption = otlplogsgrpc.WithInsecure() + if params.Encode.OtlpLogs.TLS != nil { + tlsConfig, err := cfg.TLS.Build() + if err != nil { + return nil, err + } + tlsOption = otlplogsgrpc.WithTLSCredentials(credentials.NewTLS(tlsConfig)) + } + expOption = otlplogs.WithClient(otlplogsgrpc.NewClient( + otlplogsgrpc.WithEndpoint(addr), + tlsOption, + otlplogsgrpc.WithHeaders(cfg.Headers), + )) + } else if cfg.ConnectionType == httpType { + var tlsOption otlplogshttp.Option + tlsOption = otlplogshttp.WithInsecure() + if params.Encode.OtlpLogs.TLS != nil { + tlsConfig, err := cfg.TLS.Build() + if err != nil { + return nil, err + } + tlsOption = otlplogshttp.WithTLSClientConfig(tlsConfig) + } + expOption = otlplogs.WithClient(otlplogshttp.NewClient( + otlplogshttp.WithEndpoint(addr), + tlsOption, + otlplogshttp.WithHeaders(cfg.Headers), + )) + } else { + return nil, fmt.Errorf("must specify grpcaddress or httpaddress") + } + logExporter, err := otlplogs.NewExporter(ctx, expOption) + if err != nil { + return nil, err + } + + loggerProvider := sdklog2.NewLoggerProvider( + sdklog2.WithBatcher(logExporter), + sdklog2.WithResource(res), + ) + otel2.SetLoggerProvider(loggerProvider) + return loggerProvider, nil +} + +func (e *EncodeOtlpLogs) LogWrite(entry config.GenericMap) { + now := time.Now() + sn := logs.INFO + st := "INFO" + msgByteArray, _ := json.Marshal(entry) + msg := string(msgByteArray) + // TODO: Decide whether the content should be delivered as Body or as Attributes + lrc := logs.LogRecordConfig{ + //Timestamp: &now, // take timestamp from entry, if present? + ObservedTimestamp: now, + SeverityNumber: &sn, + SeverityText: &st, + Resource: e.res, + Body: &msg, + Attributes: obtainAttributesFromEntry(entry), + } + logRecord := logs.NewLogRecord(lrc) + + logger := otel2.GetLoggerProvider().Logger( + flpOtlpLoggerName, + logs.WithSchemaURL(semconv.SchemaURL), + ) + logger.Emit(logRecord) +} + +func obtainAttributesFromEntry(entry config.GenericMap) *[]attribute.KeyValue { + // convert the entry fields to Attributes of the message + var att = make([]attribute.KeyValue, len(entry)) + index := 0 + for k, v := range entry { + switch v.(type) { + case string: + valString := v + att[index] = attribute.String(k, valString.(string)) + case int, int32, int64, int16, uint, uint8, uint16, uint32, uint64: + valInt, _ := utils.ConvertToInt64(v) + att[index] = attribute.Int64(k, valInt) + case float32, float64: + valFloat, _ := utils.ConvertToFloat64(v) + att[index] = attribute.Float64(k, valFloat) + case bool: + valBool := v + att[index] = attribute.Bool(k, valBool.(bool)) + case nil: + // skip this field + continue + } + index++ + } + addjustedAtt := att[0:index] + return &addjustedAtt +} + +func obtainAttributesFromLabels(labels map[string]string) []attribute.KeyValue { + // convert the entry fields to Attributes of the message + var att = make([]attribute.KeyValue, len(labels)) + index := 0 + for k, v := range labels { + att[index] = attribute.String(k, v) + index++ + } + return att +} + +func (e *EncodeOtlpMetrics) MetricWrite(entry config.GenericMap) { + // nothing more to do at present +} + +// newResource returns a resource describing this application. +func newResource() *resource.Resource { + r, _ := resource.Merge( + resource.Default(), + resource.NewWithAttributes( + semconv.SchemaURL, + semconv.ServiceName(flpOtlpResourceName), + semconv.ServiceVersion(flpOtlpResourceVersion), + ), + ) + return r +}