Skip to content

Commit

Permalink
add missing documentation in k8s_observer extension
Browse files Browse the repository at this point in the history
  • Loading branch information
a-thaler committed Nov 8, 2023
1 parent 902b1a9 commit 26b1f85
Show file tree
Hide file tree
Showing 21 changed files with 451 additions and 35 deletions.
37 changes: 37 additions & 0 deletions extension/observer/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ const (
PortType EndpointType = "port"
// PodType is a pod endpoint.
PodType EndpointType = "pod"
// K8sServiceType is a service endpoint.
K8sServiceType EndpointType = "k8s.service"
// K8sNodeType is a Kubernetes Node endpoint.
K8sNodeType EndpointType = "k8s.node"
// HostPortType is a hostport endpoint.
Expand All @@ -34,6 +36,7 @@ const (
var (
_ EndpointDetails = (*Pod)(nil)
_ EndpointDetails = (*Port)(nil)
_ EndpointDetails = (*K8sService)(nil)
_ EndpointDetails = (*K8sNode)(nil)
_ EndpointDetails = (*HostPort)(nil)
_ EndpointDetails = (*Container)(nil)
Expand Down Expand Up @@ -92,6 +95,40 @@ func (e Endpoint) equals(other Endpoint) bool {
}
}

// K8sService is a discovered k8s service.
type K8sService struct {
// Name of the service.
Name string
// UID is the unique ID in the cluster for the service.
UID string
// Labels is a map of user-specified metadata.
Labels map[string]string
// Annotations is a map of user-specified metadata.
Annotations map[string]string
// Namespace must be unique for services with same name.
Namespace string
// ClusterIP is the IP under which the service is reachable within the cluster.
ClusterIP string
// The type of the service: ClusterIP, NodePort, LoadBalancer, ExternalName
ServiceType string
}

func (s *K8sService) Env() EndpointEnv {
return map[string]interface{}{
"uid": s.UID,
"name": s.Name,
"labels": s.Labels,
"annotations": s.Annotations,
"namespace": s.Namespace,
"cluster_ip": s.ClusterIP,
"service_type": s.ServiceType,
}
}

func (p *K8sService) Type() EndpointType {
return K8sServiceType
}

// Pod is a discovered k8s pod.
type Pod struct {
// Name of the pod.
Expand Down
34 changes: 33 additions & 1 deletion extension/observer/endpoints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestEndpointEnv(t *testing.T) {
},
},
{
name: "K8s port",
name: "K8s pod port",
endpoint: Endpoint{
ID: EndpointID("port_id"),
Target: "192.68.73.2",
Expand Down Expand Up @@ -90,6 +90,38 @@ func TestEndpointEnv(t *testing.T) {
"transport": ProtocolTCP,
},
},
{
name: "Service",
endpoint: Endpoint{
ID: EndpointID("service_id"),
Target: "192.68.73.2",
Details: &K8sService{
Name: "service_name",
UID: "service-uid",
Labels: map[string]string{
"label_key": "label_val",
},
Annotations: map[string]string{
"annotation_1": "value_1",
},
Namespace: "service-namespace",
},
},
want: EndpointEnv{
"type": "k8s.service",
"endpoint": "192.68.73.2",
"id": "service_id",
"name": "service_name",
"labels": map[string]string{
"label_key": "label_val",
},
"annotations": map[string]string{
"annotation_1": "value_1",
},
"uid": "service-uid",
"namespace": "service-namespace",
},
},
{
name: "Host port",
endpoint: Endpoint{
Expand Down
4 changes: 3 additions & 1 deletion extension/observer/k8sobserver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
<!-- end autogenerated section -->

The `k8s_observer` is a [Receiver Creator](../../../receiver/receivercreator/README.md)-compatible "watch observer" that will detect and report
Kubernetes pod, port, and node endpoints via the Kubernetes API.
Kubernetes pod, port, service and node endpoints via the Kubernetes API.

## Example Config

Expand All @@ -26,6 +26,7 @@ extensions:
node: ${env:K8S_NODE_NAME}
observe_pods: true
observe_nodes: true
observe_services: true

receivers:
receiver_creator:
Expand Down Expand Up @@ -71,3 +72,4 @@ All fields are optional.
| node | string | <no value> | The node name to limit the discovery of pod, port, and node endpoints. Providing no value (the default) results in discovering endpoints for all available nodes. |
| observe_pods | bool | `true` | Whether to report observer pod and port endpoints. If `true` and `node` is specified it will only discover pod and port endpoints whose `spec.nodeName` matches the provided node name. If `true` and `node` isn't specified, it will discover all available pod and port endpoints. Please note that Collector connectivity to pods from other nodes is dependent on your cluster configuration and isn't guaranteed. |
| observe_nodes | bool | `false` | Whether to report observer k8s.node endpoints. If `true` and `node` is specified it will only discover node endpoints whose `metadata.name` matches the provided node name. If `true` and `node` isn't specified, it will discover all available node endpoints. Please note that Collector connectivity to nodes is dependent on your cluster configuration and isn't guaranteed.|
| observe_services | bool | `false` | Whether to report observer k8s.service endpoints.|
6 changes: 4 additions & 2 deletions extension/observer/k8sobserver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,14 @@ type Config struct {
// it will only discover node endpoints whose `metadata.name` matches the provided node name. If `true` and
// Node isn't specified, it will discover all available node endpoints. `false` by default.
ObserveNodes bool `mapstructure:"observe_nodes"`
// ObserveServices determines whether to report observer service and port endpoints. `false` by default.
ObserveServices bool `mapstructure:"observe_services"`
}

// Validate checks if the extension configuration is valid
func (cfg *Config) Validate() error {
if !cfg.ObservePods && !cfg.ObserveNodes {
return fmt.Errorf("one of observe_pods and observe_nodes must be true")
if !cfg.ObservePods && !cfg.ObserveNodes && !cfg.ObserveServices {
return fmt.Errorf("one of observe_pods, observe_nodes and observe_services must be true")
}
return nil
}
11 changes: 6 additions & 5 deletions extension/observer/k8sobserver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,11 @@ func TestLoadConfig(t *testing.T) {
{
id: component.NewIDWithName(metadata.Type, "observe-all"),
expected: &Config{
Node: "",
APIConfig: k8sconfig.APIConfig{AuthType: k8sconfig.AuthTypeNone},
ObservePods: true,
ObserveNodes: true,
Node: "",
APIConfig: k8sconfig.APIConfig{AuthType: k8sconfig.AuthTypeNone},
ObservePods: true,
ObserveNodes: true,
ObserveServices: true,
},
},
{
Expand All @@ -51,7 +52,7 @@ func TestLoadConfig(t *testing.T) {
},
{
id: component.NewIDWithName(metadata.Type, "invalid_no_observing"),
expectedErr: "one of observe_pods and observe_nodes must be true",
expectedErr: "one of observe_pods, observe_nodes and observe_services must be true",
},
}
for _, tt := range tests {
Expand Down
47 changes: 32 additions & 15 deletions extension/observer/k8sobserver/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,14 @@ var _ observer.Observable = (*k8sObserver)(nil)

type k8sObserver struct {
*observer.EndpointsWatcher
telemetry component.TelemetrySettings
podListerWatcher cache.ListerWatcher
nodeListerWatcher cache.ListerWatcher
handler *handler
once *sync.Once
stop chan struct{}
config *Config
telemetry component.TelemetrySettings
podListerWatcher cache.ListerWatcher
serviceListerWatcher cache.ListerWatcher
nodeListerWatcher cache.ListerWatcher
handler *handler
once *sync.Once
stop chan struct{}
config *Config
}

// Start will populate the cache.SharedInformers for pods and nodes as configured and run them as goroutines.
Expand All @@ -52,6 +53,14 @@ func (k *k8sObserver) Start(_ context.Context, _ component.Host) error {
}
go podInformer.Run(k.stop)
}
if k.serviceListerWatcher != nil {
k.telemetry.Logger.Debug("creating and starting service informer")
serviceInformer := cache.NewSharedInformer(k.serviceListerWatcher, &v1.Service{}, 0)
if _, err := serviceInformer.AddEventHandler(k.handler); err != nil {
k.telemetry.Logger.Error("error adding event handler to service informer", zap.Error(err))
}
go serviceInformer.Run(k.stop)
}
if k.nodeListerWatcher != nil {
k.telemetry.Logger.Debug("creating and starting node informer")
nodeInformer := cache.NewSharedInformer(k.nodeListerWatcher, &v1.Node{}, 0)
Expand Down Expand Up @@ -90,6 +99,13 @@ func newObserver(config *Config, set extension.CreateSettings) (extension.Extens
podListerWatcher = cache.NewListWatchFromClient(restClient, "pods", v1.NamespaceAll, podSelector)
}

var serviceListerWatcher cache.ListerWatcher
if config.ObserveServices {
var serviceSelector = fields.Everything()
set.Logger.Debug("observing services")
serviceListerWatcher = cache.NewListWatchFromClient(restClient, "services", v1.NamespaceAll, serviceSelector)
}

var nodeListerWatcher cache.ListerWatcher
if config.ObserveNodes {
var nodeSelector fields.Selector
Expand All @@ -103,14 +119,15 @@ func newObserver(config *Config, set extension.CreateSettings) (extension.Extens
}
h := &handler{idNamespace: set.ID.String(), endpoints: &sync.Map{}, logger: set.TelemetrySettings.Logger}
obs := &k8sObserver{
EndpointsWatcher: observer.NewEndpointsWatcher(h, time.Second, set.TelemetrySettings.Logger),
telemetry: set.TelemetrySettings,
podListerWatcher: podListerWatcher,
nodeListerWatcher: nodeListerWatcher,
stop: make(chan struct{}),
config: config,
handler: h,
once: &sync.Once{},
EndpointsWatcher: observer.NewEndpointsWatcher(h, time.Second, set.TelemetrySettings.Logger),
telemetry: set.TelemetrySettings,
podListerWatcher: podListerWatcher,
serviceListerWatcher: serviceListerWatcher,
nodeListerWatcher: nodeListerWatcher,
stop: make(chan struct{}),
config: config,
handler: h,
once: &sync.Once{},
}

return obs, nil
Expand Down
88 changes: 88 additions & 0 deletions extension/observer/k8sobserver/extension_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,94 @@ func TestNewExtension(t *testing.T) {
require.NotNil(t, ext)
}

func TestExtensionObserveServices(t *testing.T) {
factory := NewFactory()
config := factory.CreateDefaultConfig().(*Config)
mockServiceHost(t, config)

set := extensiontest.NewNopCreateSettings()
set.ID = component.NewID(metadata.Type)
ext, err := newObserver(config, set)
require.NoError(t, err)
require.NotNil(t, ext)

obs := ext.(*k8sObserver)
serviceListerWatcher := framework.NewFakeControllerSource()
obs.serviceListerWatcher = serviceListerWatcher

serviceListerWatcher.Add(serviceWithClusterIP)

require.NoError(t, ext.Start(context.Background(), componenttest.NewNopHost()))

sink := &endpointSink{}
obs.ListAndWatch(sink)

requireSink(t, sink, func() bool {
return len(sink.added) == 1
})

assert.Equal(t, observer.Endpoint{
ID: "k8s_observer/service-1-UID",
Target: "service-1.default.svc.cluster.local",
Details: &observer.K8sService{
Name: "service-1",
Namespace: "default",
UID: "service-1-UID",
Labels: map[string]string{
"env": "prod",
},
ClusterIP: "1.2.3.4",
ServiceType: "ClusterIP",
},
}, sink.added[0])

serviceListerWatcher.Modify(serviceWithClusterIPV2)

requireSink(t, sink, func() bool {
return len(sink.changed) == 1
})

assert.Equal(t, observer.Endpoint{
ID: "k8s_observer/service-1-UID",
Target: "service-1.default.svc.cluster.local",
Details: &observer.K8sService{
Name: "service-1",
Namespace: "default",
UID: "service-1-UID",
Labels: map[string]string{
"env": "prod",
"service-version": "2",
},
ClusterIP: "1.2.3.4",
ServiceType: "ClusterIP",
},
}, sink.changed[0])

serviceListerWatcher.Delete(serviceWithClusterIPV2)

requireSink(t, sink, func() bool {
return len(sink.removed) == 1
})

assert.Equal(t, observer.Endpoint{
ID: "k8s_observer/service-1-UID",
Target: "service-1.default.svc.cluster.local",
Details: &observer.K8sService{
Name: "service-1",
Namespace: "default",
UID: "service-1-UID",
Labels: map[string]string{
"env": "prod",
"service-version": "2",
},
ClusterIP: "1.2.3.4",
ServiceType: "ClusterIP",
},
}, sink.removed[0])

require.NoError(t, ext.Shutdown(context.Background()))
}

func TestExtensionObservePods(t *testing.T) {
factory := NewFactory()
config := factory.CreateDefaultConfig().(*Config)
Expand Down
Loading

0 comments on commit 26b1f85

Please sign in to comment.