Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Resiliency Metrics #226

Merged
merged 1 commit into from
Aug 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions internal/aws/containerinsight/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,11 @@ const (
GpuRequest = "gpu_request"
GpuReservedCapacity = "gpu_reserved_capacity"

HyperPodUnschedulablePendingReplacement = "unschedulable_pending_replacement"
HyperPodUnschedulablePendingReboot = "unschedulable_pending_reboot"
HyperPodSchedulable = "schedulable"
HyperPodUnschedulable = "unschedulable"

// Define the metric types
TypeCluster = "Cluster"
TypeClusterService = "ClusterService"
Expand Down Expand Up @@ -179,6 +184,7 @@ const (
TypeContainerEFA = "ContainerEFA"
TypePodEFA = "PodEFA"
TypeNodeEFA = "NodeEFA"
TypeHyperPodNode = "HyperPodNode"
sky333999 marked this conversation as resolved.
Show resolved Hide resolved

// unit
UnitBytes = "Bytes"
Expand All @@ -202,6 +208,13 @@ var WaitingReasonLookup = map[string]string{
"StartError": StatusContainerWaitingReasonStartError,
}

var HyperPodConditionToMetric = map[string]string{
"UnschedulablePendingReplacement": HyperPodUnschedulablePendingReplacement,
"UnschedulablePendingReboot": HyperPodUnschedulablePendingReboot,
"Schedulable": HyperPodSchedulable,
"Unschedulable": HyperPodUnschedulable,
}

var metricToUnitMap map[string]string

func init() {
Expand Down Expand Up @@ -330,5 +343,10 @@ func init() {
GpuUsageTotal: UnitCount,
GpuRequest: UnitCount,
GpuReservedCapacity: UnitPercent,

HyperPodUnschedulablePendingReplacement: UnitCount,
HyperPodUnschedulablePendingReboot: UnitCount,
HyperPodSchedulable: UnitCount,
HyperPodUnschedulable: UnitCount,
}
}
6 changes: 5 additions & 1 deletion internal/aws/containerinsight/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ func IsNode(mType string) bool {
TypeNodeEFA,
TypeNodeFS,
TypeNodeGPU,
TypeNodeNet:
TypeNodeNet,
TypeHyperPodNode:
return true
}
return false
Expand Down Expand Up @@ -107,6 +108,7 @@ func getPrefixByMetricType(mType string) string {
instanceNetPrefix := "instance_interface_"
nodeNetPrefix := "node_interface_"
nodeEfaPrefix := "node_efa_"
hyperPodNodeHealthStatus := "hyper_pod_node_health_status_"
sky333999 marked this conversation as resolved.
Show resolved Hide resolved
podPrefix := "pod_"
podNetPrefix := "pod_interface_"
podEfaPrefix := "pod_efa_"
Expand Down Expand Up @@ -169,6 +171,8 @@ func getPrefixByMetricType(mType string) string {
prefix = statefulSet
case TypeClusterReplicaSet:
prefix = replicaSet
case TypeHyperPodNode:
Reham77 marked this conversation as resolved.
Show resolved Hide resolved
prefix = hyperPodNodeHealthStatus
default:
log.Printf("E! Unexpected MetricType: %s", mType)
}
Expand Down
37 changes: 37 additions & 0 deletions internal/aws/containerinsight/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func TestIsNode(t *testing.T) {
assert.Equal(t, true, IsNode(TypeNodeGPU))
assert.Equal(t, true, IsNode(TypeNodeNet))
assert.Equal(t, false, IsNode(TypePod))
assert.Equal(t, true, IsNode(TypeHyperPodNode))
}

func TestIsInstance(t *testing.T) {
Expand Down Expand Up @@ -929,3 +930,39 @@ func TestConvertToOTLPMetricsForPodEfaMetrics(t *testing.T) {
md = ConvertToOTLPMetrics(fields, tags, zap.NewNop())
checkMetricsAreExpected(t, md, fields, tags, expectedUnits)
}

func TestConvertToOTLPMetricsForHyperPodNodeMetrics(t *testing.T) {
var fields map[string]any
var expectedUnits map[string]string
var tags map[string]string
var md pmetric.Metrics
now := time.Now()
timestamp := strconv.FormatInt(now.UnixNano(), 10)

fields = map[string]any{
"unschedulable_pending_replacement": 0,
"unschedulable_pending_reboot": 0,
"schedulable": 1,
"unschedulable": 0,
}
expectedUnits = map[string]string{
"unschedulable_pending_replacement": UnitCount,
"unschedulable_pending_reboot": UnitCount,
"schedulable": UnitCount,
"unschedulable": UnitCount,
}
tags = map[string]string{
"ClusterName": "eks-aoc",
"InstanceId": "i-01bf9fb097cbf3205",
"InstanceType": "t2.xlarge",
"Namespace": "amazon-cloudwatch",
"NodeName": "hyperpod-ip-192-168-12-170.ec2.internal",
"PodName": "cloudwatch-agent",
"ContainerName": "cloudwatch-agent",
"Type": "HyperPodNode",
"Version": "0",
"Timestamp": timestamp,
}
md = ConvertToOTLPMetrics(fields, tags, zap.NewNop())
checkMetricsAreExpected(t, md, fields, tags, expectedUnits)
}
19 changes: 16 additions & 3 deletions internal/aws/k8s/k8sclient/clientset.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,17 @@ func CaptureNodeLevelInfo(captureNodeLevelInfo bool) Option {
}
}

// CaptureOnlyNodeLabelsInfo allows one to specify whether node label
// should be captured and retained in memory
func CaptureOnlyNodeLabelsInfo(captureOnlyNodeLabelInfo bool) Option {
Reham77 marked this conversation as resolved.
Show resolved Hide resolved
return Option{
name: "captureOnlyNodeLabelInfo:" + strconv.FormatBool(captureOnlyNodeLabelInfo),
set: func(kc *K8sClient) {
kc.captureOnlyNodeLabelInfo = captureOnlyNodeLabelInfo
},
}
}

func getStringifiedOptions(options ...Option) string {
opts := make([]string, len(options))
for i, option := range options {
Expand Down Expand Up @@ -225,8 +236,9 @@ type K8sClient struct {
nodeMu sync.Mutex
node nodeClientWithStopper

nodeSelector fields.Selector
captureNodeLevelInfo bool
nodeSelector fields.Selector
captureNodeLevelInfo bool
captureOnlyNodeLabelInfo bool

jobMu sync.Mutex
job jobClientWithStopper
Expand Down Expand Up @@ -326,7 +338,8 @@ func (c *K8sClient) ShutdownPodClient() {
func (c *K8sClient) GetNodeClient() NodeClient {
c.nodeMu.Lock()
if c.node == nil {
opts := []nodeClientOption{nodeSyncCheckerOption(c.syncChecker), captureNodeLevelInfoOption(c.captureNodeLevelInfo)}
opts := []nodeClientOption{nodeSyncCheckerOption(c.syncChecker), captureNodeLevelInfoOption(c.captureNodeLevelInfo),
captureOnlyNodeLabelInfoOption(c.captureOnlyNodeLabelInfo)}
if c.nodeSelector != nil {
opts = append(opts, nodeSelectorOption(c.nodeSelector))
}
Expand Down
95 changes: 82 additions & 13 deletions internal/aws/k8s/k8sclient/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@ import (
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/k8s/k8sutil"
)

const (
instanceTypeLabelKey = "node.kubernetes.io/instance-type"
instanceTypeLabelKeyBeta = "beta.kubernetes.io/instance-type"
)

// This needs to be reviewed for newer versions of k8s.
Expand All @@ -27,13 +34,15 @@ var failedNodeConditions = map[v1.NodeConditionType]bool{
}

type NodeClient interface {
NodeInfos() map[string]*NodeInfo
// Get the number of failed nodes for current cluster
ClusterFailedNodeCount() int
// Get the number of nodes for current cluster
ClusterNodeCount() int
NodeToCapacityMap() map[string]v1.ResourceList
NodeToAllocatableMap() map[string]v1.ResourceList
NodeToConditionsMap() map[string]map[v1.NodeConditionType]v1.ConditionStatus
NodeToLabelsMap() map[string]map[Label]int8
}

type nodeClientOption func(*nodeClient)
Expand All @@ -56,6 +65,12 @@ func captureNodeLevelInfoOption(captureNodeLevelInfo bool) nodeClientOption {
}
}

func captureOnlyNodeLabelInfoOption(captureOnlyNodeLabelInfo bool) nodeClientOption {
return func(n *nodeClient) {
n.captureOnlyNodeLabelInfo = captureOnlyNodeLabelInfo
}
}

type nodeClient struct {
stopChan chan struct{}
store *ObjStore
Expand All @@ -69,14 +84,26 @@ type nodeClient struct {
// The node client can be used in several places, including code paths that execute on both leader and non-leader nodes.
// But for logic on the leader node (for ex in k8sapiserver.go), there is no need to obtain node level info since only cluster
// level info is needed there. Hence, this optimization allows us to save on memory by not capturing node level info when not needed.
captureNodeLevelInfo bool
captureNodeLevelInfo bool
captureOnlyNodeLabelInfo bool

mu sync.RWMutex
nodeInfos map[string]*NodeInfo
clusterFailedNodeCount int
clusterNodeCount int
nodeToCapacityMap map[string]v1.ResourceList
nodeToAllocatableMap map[string]v1.ResourceList
nodeToConditionsMap map[string]map[v1.NodeConditionType]v1.ConditionStatus
nodeToLabelsMap map[string]map[Label]int8
}

func (c *nodeClient) NodeInfos() map[string]*NodeInfo {
if c.store.GetResetRefreshStatus() {
c.refresh()
}
c.mu.RLock()
defer c.mu.RUnlock()
return c.nodeInfos
}

func (c *nodeClient) ClusterFailedNodeCount() int {
Expand All @@ -97,6 +124,18 @@ func (c *nodeClient) ClusterNodeCount() int {
return c.clusterNodeCount
}

func (c *nodeClient) NodeToLabelsMap() map[string]map[Label]int8 {
if !c.captureOnlyNodeLabelInfo {
c.logger.Warn("trying to access node label info when captureOnlyNodeLabelInfo is not set, will return empty data")
}
if c.store.GetResetRefreshStatus() {
c.refresh()
}
c.mu.RLock()
defer c.mu.RUnlock()
return c.nodeToLabelsMap
}

func (c *nodeClient) NodeToCapacityMap() map[string]v1.ResourceList {
if !c.captureNodeLevelInfo {
c.logger.Warn("trying to access node level info when captureNodeLevelInfo is not set, will return empty data")
Expand Down Expand Up @@ -144,25 +183,37 @@ func (c *nodeClient) refresh() {
nodeToCapacityMap := make(map[string]v1.ResourceList)
nodeToAllocatableMap := make(map[string]v1.ResourceList)
nodeToConditionsMap := make(map[string]map[v1.NodeConditionType]v1.ConditionStatus)
nodeToLabelsMap := make(map[string]map[Label]int8)
Reham77 marked this conversation as resolved.
Show resolved Hide resolved

nodeInfos := map[string]*NodeInfo{}

for _, obj := range objsList {
node := obj.(*nodeInfo)
node := obj.(*NodeInfo)
nodeInfos[node.Name] = node

if c.captureNodeLevelInfo {
nodeToCapacityMap[node.name] = node.capacity
nodeToAllocatableMap[node.name] = node.allocatable
nodeToCapacityMap[node.Name] = node.Capacity
nodeToAllocatableMap[node.Name] = node.Allocatable
conditionsMap := make(map[v1.NodeConditionType]v1.ConditionStatus)
for _, condition := range node.conditions {
for _, condition := range node.Conditions {
conditionsMap[condition.Type] = condition.Status
}
nodeToConditionsMap[node.name] = conditionsMap
nodeToConditionsMap[node.Name] = conditionsMap
}

if c.captureOnlyNodeLabelInfo {
labelsMap := make(map[Label]int8)
if HyperPodLabel, ok := node.Labels[SageMakerNodeHealthStatus]; ok {
labelsMap[SageMakerNodeHealthStatus] = HyperPodLabel
nodeToLabelsMap[node.Name] = labelsMap
}
}
clusterNodeCountNew++

failed := false

Loop:
for _, condition := range node.conditions {
for _, condition := range node.Conditions {
if _, ok := failedNodeConditions[condition.Type]; ok {
// match the failedNodeConditions type we care about
if condition.Status != v1.ConditionFalse {
Expand All @@ -178,11 +229,13 @@ func (c *nodeClient) refresh() {
}
}

c.nodeInfos = nodeInfos
c.clusterFailedNodeCount = clusterFailedNodeCountNew
c.clusterNodeCount = clusterNodeCountNew
c.nodeToCapacityMap = nodeToCapacityMap
c.nodeToAllocatableMap = nodeToAllocatableMap
c.nodeToConditionsMap = nodeToConditionsMap
c.nodeToLabelsMap = nodeToLabelsMap
}

func newNodeClient(clientSet kubernetes.Interface, logger *zap.Logger, options ...nodeClientOption) *nodeClient {
Expand Down Expand Up @@ -222,17 +275,33 @@ func transformFuncNode(obj any) (any, error) {
if !ok {
return nil, fmt.Errorf("input obj %v is not Node type", obj)
}
info := new(nodeInfo)
info.name = node.Name
info.capacity = node.Status.Capacity
info.allocatable = node.Status.Allocatable
info.conditions = []*NodeCondition{}
info := new(NodeInfo)
info.Name = node.Name
info.Capacity = node.Status.Capacity
info.Allocatable = node.Status.Allocatable
if instanceType, ok := node.Labels[instanceTypeLabelKey]; ok {
info.InstanceType = instanceType
} else {
// fallback for compatibility with k8s versions older than v1.17
// https://kubernetes.io/docs/reference/labels-annotations-taints/#beta-kubernetes-io-instance-type-deprecated
if instanceType, ok := node.Labels[instanceTypeLabelKeyBeta]; ok {
info.InstanceType = instanceType
}
}
info.Conditions = []*NodeCondition{}
for _, condition := range node.Status.Conditions {
info.conditions = append(info.conditions, &NodeCondition{
info.Conditions = append(info.Conditions, &NodeCondition{
Type: condition.Type,
Status: condition.Status,
})
}

if sageMakerHealthStatus, ok := node.Labels[SageMakerNodeHealthStatus.String()]; ok {
info.Labels = make(map[Label]int8)
if condition, ok := k8sutil.ParseString(sageMakerHealthStatus); ok {
info.Labels[SageMakerNodeHealthStatus] = condition

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this map will have a max of one label for the health status and not have any other labels.
Does this even have to be a map in that case? Cant this just be a string called sageMakerNodeHealthStatus as part of NodeInfo and itll be empty for non hyperpod nodes. Callers can check for empty string or ret pointer and check for nil.

You can probably remove the captureOnlyNodeLabelInfo as well at that point?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calling it something generic like NodeToLabelsMap gives readers an expectation that all the labels set on the node are accessible via this map.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The map are part of an allowlist that uses an ENUM instead of strings to store the label keys/values, any additional labels to be stored will have to be made into an ENUM, thus we will continue to use this pattern for now.

}
}
return info, nil
}

Expand Down
22 changes: 17 additions & 5 deletions internal/aws/k8s/k8sclient/node_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,26 @@ import (
v1 "k8s.io/api/core/v1"
)

type nodeInfo struct {
name string
conditions []*NodeCondition
capacity v1.ResourceList
allocatable v1.ResourceList
type NodeInfo struct {
Name string
Conditions []*NodeCondition
Capacity v1.ResourceList
Allocatable v1.ResourceList
InstanceType string
Labels map[Label]int8
}

type NodeCondition struct {
Type v1.NodeConditionType
Status v1.ConditionStatus
}

type Label int8

const (
SageMakerNodeHealthStatus Label = iota
)

func (ct Label) String() string {
return [...]string{"sagemaker.amazonaws.com/node-health-status"}[ct]
}
Loading
Loading