Skip to content

Commit

Permalink
Added solarwindsapmsettingextension
Browse files Browse the repository at this point in the history
  • Loading branch information
jerrytfleung committed Jan 17, 2024
1 parent 2bbbcbb commit 3811585
Show file tree
Hide file tree
Showing 13 changed files with 580 additions and 0 deletions.
1 change: 1 addition & 0 deletions extension/solarwindsapmsettingsextension/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../../Makefile.Common
39 changes: 39 additions & 0 deletions extension/solarwindsapmsettingsextension/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Solarwinds APM Settings extension

<!-- status autogenerated section -->
| Status | |
| ------------- |-----------|
| Stability | [development] |
| Distributions | [] |
| Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Aextension%2Fsolarwindsapmsettings%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Aextension%2Fsolarwindsapmsettings) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Aextension%2Fsolarwindsapmsettings%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Aextension%2Fsolarwindsapmsettings) |

[development]: https://github.com/open-telemetry/opentelemetry-collector#development
<!-- end autogenerated section -->

## Overview
The Solarwinds APM Settings extension gets Solarwinds APM specific settings from Solarwinds APM collector and outputs `/tmp/solarwinds-apm-settings.json` periodically

## Configuration

Example:

```yaml
extensions:
solarwindsapmsettings:
endpoint: "apm.collector.cloud.solarwinds.com:443"
key: "<token>:<name>"
interval: 1m
```
### endpoint (Optional)
The endpoint which this extension calls `getSettings`.

Default: `apm.collector.cloud.solarwinds.com:443`

### key (Required)
The key in format `<token>:<name>` for `getSettings` from Solarwinds APM collector.

### interval (Optional)
Periodic interval to get Solarwinds APM specific settings from Solarwinds APM collector.

Default: `1m`
39 changes: 39 additions & 0 deletions extension/solarwindsapmsettingsextension/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package solarwindsapmsettingsextension

import (
"fmt"
"strconv"
"strings"
"time"
)

type Config struct {
Endpoint string `mapstructure:"endpoint"`
Key string `mapstructure:"key"`
Interval string `mapstructure:"interval"`
}

func (cfg *Config) Validate() error {
if len(cfg.Endpoint) == 0 {
return fmt.Errorf("endpoint must not be empty")
}
endpointArr := strings.Split(cfg.Endpoint, ":")
if len(endpointArr) != 2 {
return fmt.Errorf("endpoint should be in \"<host>:<port>\" format")
}
if _, err := strconv.Atoi(endpointArr[1]); err != nil {
return fmt.Errorf("the <port> portion of endpoint has to be an integer")
}
if len(cfg.Key) == 0 {
return fmt.Errorf("key must not be empty")
}
keyArr := strings.Split(cfg.Key, ":")
if len(keyArr) != 2 {
return fmt.Errorf("key should be in \"<token>:<service_name>\" format")
}

if _, err := time.ParseDuration(cfg.Interval); err != nil {
return fmt.Errorf("interval has to be a duration string. Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
}
return nil
}
62 changes: 62 additions & 0 deletions extension/solarwindsapmsettingsextension/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package solarwindsapmsettingsextension

import (
"fmt"
"github.com/stretchr/testify/require"
"testing"
)

func TestValidate(t *testing.T) {
tests := []struct {
name string
cfg *Config
err error
}{
{
name: "nothing",
cfg: &Config{},
err: fmt.Errorf("endpoint must not be empty"),
},
{
name: "empty key",
cfg: &Config{
Endpoint: "host:12345",
},
err: fmt.Errorf("key must not be empty"),
},
{
name: "invalid endpoint",
cfg: &Config{
Endpoint: "invalid",
Key: "token:name",
},
err: fmt.Errorf("endpoint should be in \"<host>:<port>\" format"),
},
{
name: "invalid endpoint format but port is not an integer",
cfg: &Config{
Endpoint: "host:abc",
Key: "token:name",
},
err: fmt.Errorf("the <port> portion of endpoint has to be an integer"),
},
{
name: "invalid key",
cfg: &Config{
Endpoint: "host:12345",
Key: "invalid",
},
err: fmt.Errorf("key should be in \"<token>:<service_name>\" format"),
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
err := tc.cfg.Validate()
if tc.err != nil {
require.EqualError(t, err, tc.err.Error())
} else {
require.NoError(t, err)
}
})
}
}
3 changes: 3 additions & 0 deletions extension/solarwindsapmsettingsextension/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
//go:generate mdatagen metadata.yaml

package solarwindsapmsettingsextension
218 changes: 218 additions & 0 deletions extension/solarwindsapmsettingsextension/extension.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
package solarwindsapmsettingsextension

import (
"context"
"crypto/tls"
"encoding/binary"
"encoding/json"
"fmt"
"github.com/gogo/protobuf/proto"
"github.com/solarwindscloud/apm-proto/go/collectorpb"
"go.opentelemetry.io/collector/component"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/protobuf/encoding/protojson"
"math"
"os"
"strconv"
"time"
)

const (
RawOutputFile = "/tmp/solarwinds-apm-settings-raw"
JSONOutputFile = "/tmp/solarwinds-apm-settings.json"
)

type solarwindsapmSettingsExtension struct {
logger *zap.Logger
config *Config
cancel context.CancelFunc
conn *grpc.ClientConn
client collectorpb.TraceCollectorClient
}

func Refresh(extension *solarwindsapmSettingsExtension) {
extension.logger.Info("Time to refresh from " + extension.config.Endpoint)
if hostname, err := os.Hostname(); err != nil {
extension.logger.Fatal("Unable to call os.Hostname() " + err.Error())
} else {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

request := &collectorpb.SettingsRequest{
ApiKey: extension.config.Key,
Identity: &collectorpb.HostID{
Hostname: hostname,
},
ClientVersion: "2",
}
if response, err := extension.client.GetSettings(ctx, request); err != nil {
extension.logger.Fatal("Unable to getSettings from " + extension.config.Endpoint + " " + err.Error())
} else {
switch result := response.GetResult(); result {
case collectorpb.ResultCode_OK:
if bytes, err := proto.Marshal(response); err != nil {
extension.logger.Error("Unable to marshal response to bytes " + err.Error())
} else {
// Output in raw format
if err := os.WriteFile(RawOutputFile, bytes, 0644); err != nil {
extension.logger.Error("Unable to write " + RawOutputFile + " " + err.Error())
} else {
extension.logger.Info(RawOutputFile + " is refreshed")
}
}
// Output in human-readable format
var settings []map[string]interface{}
for _, item := range response.GetSettings() {

marshalOptions := protojson.MarshalOptions{
UseEnumNumbers: true,
EmitUnpopulated: true,
}
if settingBytes, err := marshalOptions.Marshal(item); err != nil {
extension.logger.Warn("Error to marshal setting JSON[] byte from response.GetSettings() " + err.Error())
} else {
setting := make(map[string]interface{})
if err := json.Unmarshal(settingBytes, &setting); err != nil {
extension.logger.Warn("Error to unmarshal setting JSON object from setting JSON[]byte " + err.Error())
} else {
if value, ok := setting["value"].(string); ok {
if num, e := strconv.ParseInt(value, 10, 0); e != nil {
extension.logger.Warn("Unable to parse value " + value + " as number " + e.Error())
} else {
setting["value"] = num
}
}
if timestamp, ok := setting["timestamp"].(string); ok {
if num, e := strconv.ParseInt(timestamp, 10, 0); e != nil {
extension.logger.Warn("Unable to parse timestamp " + timestamp + " as number " + e.Error())
} else {
setting["timestamp"] = num
}
}
if ttl, ok := setting["ttl"].(string); ok {
if num, e := strconv.ParseInt(ttl, 10, 0); e != nil {
extension.logger.Warn("Unable to parse ttl " + ttl + " as number " + e.Error())
} else {
setting["ttl"] = num
}
}
if _, ok := setting["flags"]; ok {
setting["flags"] = string(item.Flags)
}
if arguments, ok := setting["arguments"].(map[string]interface{}); ok {
if value, ok := item.Arguments["BucketCapacity"]; ok {
arguments["BucketCapacity"] = math.Float64frombits(binary.LittleEndian.Uint64(value))
}
if value, ok := item.Arguments["BucketRate"]; ok {
arguments["BucketRate"] = math.Float64frombits(binary.LittleEndian.Uint64(value))
}
if value, ok := item.Arguments["TriggerRelaxedBucketCapacity"]; ok {
arguments["TriggerRelaxedBucketCapacity"] = math.Float64frombits(binary.LittleEndian.Uint64(value))
}
if value, ok := item.Arguments["TriggerRelaxedBucketRate"]; ok {
arguments["TriggerRelaxedBucketRate"] = math.Float64frombits(binary.LittleEndian.Uint64(value))
}
if value, ok := item.Arguments["TriggerStrictBucketCapacity"]; ok {
arguments["TriggerStrictBucketCapacity"] = math.Float64frombits(binary.LittleEndian.Uint64(value))
}
if value, ok := item.Arguments["TriggerStrictBucketRate"]; ok {
arguments["TriggerStrictBucketRate"] = math.Float64frombits(binary.LittleEndian.Uint64(value))
}
if value, ok := item.Arguments["MetricsFlushInterval"]; ok {
arguments["MetricsFlushInterval"] = int32(binary.LittleEndian.Uint32(value))
}
if value, ok := item.Arguments["MaxTransactions"]; ok {
arguments["MaxTransactions"] = int32(binary.LittleEndian.Uint32(value))
}
if value, ok := item.Arguments["MaxCustomMetrics"]; ok {
arguments["MaxCustomMetrics"] = int32(binary.LittleEndian.Uint32(value))
}
if value, ok := item.Arguments["EventsFlushInterval"]; ok {
arguments["EventsFlushInterval"] = int32(binary.LittleEndian.Uint32(value))
}
if value, ok := item.Arguments["ProfilingInterval"]; ok {
arguments["ProfilingInterval"] = int32(binary.LittleEndian.Uint32(value))
}
/**
* We don't want to expose SignatureKey now
*/
//if value, ok := item.Arguments["SignatureKey"]; ok {
// arguments["SignatureKey"] = string(value)
//}
}
settings = append(settings, setting)
}
}
}
if content, err := json.Marshal(settings); err != nil {
extension.logger.Warn("Error to marshal setting JSON[] byte from settings " + err.Error())
} else {
if err := os.WriteFile(JSONOutputFile, content, 0644); err != nil {
extension.logger.Error("Unable to write " + JSONOutputFile + " " + err.Error())
} else {
extension.logger.Info(JSONOutputFile + " is refreshed")
extension.logger.Info(string(content))
}
}
case collectorpb.ResultCode_TRY_LATER:
extension.logger.Warn("GetSettings returned TRY_LATER " + response.GetWarning())
case collectorpb.ResultCode_INVALID_API_KEY:
extension.logger.Warn("GetSettings returned INVALID_API_KEY " + response.GetWarning())
case collectorpb.ResultCode_LIMIT_EXCEEDED:
extension.logger.Warn("GetSettings returned LIMIT_EXCEEDED " + response.GetWarning())
case collectorpb.ResultCode_REDIRECT:
extension.logger.Warn("GetSettings returned REDIRECT " + response.GetWarning())
default:
extension.logger.Warn("Unknown ResultCode from GetSettings " + response.GetWarning())
}
}
}
}

func (extension *solarwindsapmSettingsExtension) Start(ctx context.Context, _ component.Host) error {
extension.logger.Debug("Starting up solarwinds apm settings extension")
ctx = context.Background()
ctx, extension.cancel = context.WithCancel(ctx)

var err error
extension.conn, err = grpc.Dial(extension.config.Endpoint, grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{})))
if err != nil {
return fmt.Errorf("Failed to dial: " + err.Error())
} else {
extension.logger.Info("Dailed to " + extension.config.Endpoint)
}
extension.client = collectorpb.NewTraceCollectorClient(extension.conn)

// Refresh immediately
Refresh(extension)

// setup lightweight thread to refresh
var interval time.Duration
interval, err = time.ParseDuration(extension.config.Interval)
go func() {
ticker := time.NewTicker(interval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
// Refresh at each ticker event
Refresh(extension)
case <-ctx.Done():
extension.logger.Info("Received ctx.Done() from ticker")
return
}
}
}()

return nil
}

func (extension *solarwindsapmSettingsExtension) Shutdown(_ context.Context) error {
extension.logger.Debug("Shutting down solarwinds apm settings extension")
extension.conn.Close()
extension.cancel()
return nil
}
1 change: 1 addition & 0 deletions extension/solarwindsapmsettingsextension/extension_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package solarwindsapmsettingsextension
Loading

0 comments on commit 3811585

Please sign in to comment.