Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Report memory usage by application #7966

Merged
merged 7 commits into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 37 additions & 9 deletions docs/operating-eck/licensing.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,23 @@ The operator periodically writes the total amount of Elastic resources under man
----
> kubectl -n elastic-system get configmap elastic-licensing -o json | jq .data
{
"apm_memory": "0.50GiB",
"apm_memory_bytes": "536870912",
"eck_license_expiry_date": "2025-01-01T00:59:59+01:00",
"eck_license_level": "enterprise",
"eck_license_expiry_date": "2022-01-01T00:59:59+01:00",
"elasticsearch_memory": "18.00GiB",
"elasticsearch_memory_bytes": "19327352832",
"enterprise_resource_units": "1",
"max_enterprise_resource_units": "10",
"timestamp": "2020-01-03T23:38:20Z",
"total_managed_memory": "64GiB",
"total_managed_memory_bytes": "68719476736"
"enterprise_search_memory": "4.00GiB",
"enterprise_search_memory_bytes": "4294967296",
"kibana_memory": "1.00GiB",
"kibana_memory_bytes": "1073741824",
"logstash_memory": "2.00GiB",
"logstash_memory_bytes": "2147483648",
"max_enterprise_resource_units": "250",
"timestamp": "2024-07-26T12:40:42+02:00",
"total_managed_memory": "25.50GiB",
"total_managed_memory_bytes": "27380416512"
}
----

Expand All @@ -120,12 +130,30 @@ If the operator metrics endpoint is enabled with the `--metrics-port` flag (chec
[source,shell]
----
> curl "$ECK_METRICS_ENDPOINT" | grep elastic_licensing
# HELP elastic_licensing_enterprise_resource_units_max Maximum number of enterprise resource units available
# TYPE elastic_licensing_enterprise_resource_units_max gauge
elastic_licensing_enterprise_resource_units_max{license_level="enterprise"} 250
# HELP elastic_licensing_enterprise_resource_units_total Total enterprise resource units used
# TYPE elastic_licensing_enterprise_resource_units_total gauge
elastic_licensing_enterprise_resource_units_total{license_level="basic"} 6
# HELP elastic_licensing_memory_gigabytes_total Total memory used in GB
# TYPE elastic_licensing_memory_gigabytes_total gauge
elastic_licensing_memory_gigabytes_total{license_level="basic"} 357.01915648
elastic_licensing_enterprise_resource_units_total{license_level="enterprise"} 1
# HELP elastic_licensing_memory_gibibytes_apm Memory used by APM server in GiB
# TYPE elastic_licensing_memory_gibibytes_apm gauge
elastic_licensing_memory_gibibytes_apm{license_level="enterprise"} 0.5
# HELP elastic_licensing_memory_gibibytes_elasticsearch Memory used by Elasticsearch in GiB
# TYPE elastic_licensing_memory_gibibytes_elasticsearch gauge
elastic_licensing_memory_gibibytes_elasticsearch{license_level="enterprise"} 18
# HELP elastic_licensing_memory_gibibytes_enterprise_search Memory used by Enterprise Search in GiB
# TYPE elastic_licensing_memory_gibibytes_enterprise_search gauge
elastic_licensing_memory_gibibytes_enterprise_search{license_level="enterprise"} 4
# HELP elastic_licensing_memory_gibibytes_kibana Memory used by Kibana in GiB
# TYPE elastic_licensing_memory_gibibytes_kibana gauge
elastic_licensing_memory_gibibytes_kibana{license_level="enterprise"} 1
# HELP elastic_licensing_memory_gibibytes_logstash Memory used by Logstash in GiB
# TYPE elastic_licensing_memory_gibibytes_logstash gauge
elastic_licensing_memory_gibibytes_logstash{license_level="enterprise"} 2
# HELP elastic_licensing_memory_gibibytes_total Total memory used in GiB
# TYPE elastic_licensing_memory_gibibytes_total gauge
elastic_licensing_memory_gibibytes_total{license_level="enterprise"} 25.5
----

NOTE: Logstash resources managed by ECK will be counted towards ERU usage for informational purposes. Billable consumption depends on license terms on a per customer basis (See link:https://www.elastic.co/agreements/global/self-managed[Self Managed Subscription Agreement])
58 changes: 29 additions & 29 deletions pkg/license/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,16 @@ import (
ulog "github.com/elastic/cloud-on-k8s/v2/pkg/utils/log"
)

// Aggregator aggregates the total of resources of all Elastic managed components
type Aggregator struct {
// aggregator aggregates the total of resources of all Elastic managed components
type aggregator struct {
client k8s.Client
}

type aggregate func(ctx context.Context) (resource.Quantity, error)
type aggregate func(ctx context.Context) (managedMemory, error)

// AggregateMemory aggregates the total memory of all Elastic managed components
func (a Aggregator) AggregateMemory(ctx context.Context) (resource.Quantity, error) {
var totalMemory resource.Quantity
// aggregateMemory aggregates the total memory of all Elastic managed components
func (a aggregator) aggregateMemory(ctx context.Context) (memoryUsage, error) {
usage := newMemoryUsage()

for _, f := range []aggregate{
a.aggregateElasticsearchMemory,
Expand All @@ -50,19 +50,19 @@ func (a Aggregator) AggregateMemory(ctx context.Context) (resource.Quantity, err
} {
memory, err := f(ctx)
if err != nil {
return resource.Quantity{}, err
return memoryUsage{}, err
}
totalMemory.Add(memory)
usage.add(memory)
}

return totalMemory, nil
return usage, nil
}

func (a Aggregator) aggregateElasticsearchMemory(ctx context.Context) (resource.Quantity, error) {
func (a aggregator) aggregateElasticsearchMemory(ctx context.Context) (managedMemory, error) {
var esList esv1.ElasticsearchList
err := a.client.List(context.Background(), &esList)
if err != nil {
return resource.Quantity{}, errors.Wrap(err, "failed to aggregate Elasticsearch memory")
return managedMemory{}, errors.Wrap(err, "failed to aggregate Elasticsearch memory")
}

var total resource.Quantity
Expand All @@ -75,7 +75,7 @@ func (a Aggregator) aggregateElasticsearchMemory(ctx context.Context) (resource.
nodespec.DefaultMemoryLimits,
)
if err != nil {
return resource.Quantity{}, errors.Wrap(err, "failed to aggregate Elasticsearch memory")
return managedMemory{}, errors.Wrap(err, "failed to aggregate Elasticsearch memory")
}

total.Add(multiply(mem, nodeSet.Count))
Expand All @@ -84,14 +84,14 @@ func (a Aggregator) aggregateElasticsearchMemory(ctx context.Context) (resource.
}
}

return total, nil
return managedMemory{total, elasticsearchKey}, nil
}

func (a Aggregator) aggregateEnterpriseSearchMemory(ctx context.Context) (resource.Quantity, error) {
func (a aggregator) aggregateEnterpriseSearchMemory(ctx context.Context) (managedMemory, error) {
var entList entv1.EnterpriseSearchList
err := a.client.List(context.Background(), &entList)
if err != nil {
return resource.Quantity{}, errors.Wrap(err, "failed to aggregate Enterprise Search memory")
return managedMemory{}, errors.Wrap(err, "failed to aggregate Enterprise Search memory")
}

var total resource.Quantity
Expand All @@ -103,22 +103,22 @@ func (a Aggregator) aggregateEnterpriseSearchMemory(ctx context.Context) (resour
enterprisesearch.DefaultMemoryLimits,
)
if err != nil {
return resource.Quantity{}, errors.Wrap(err, "failed to aggregate Enterprise Search memory")
return managedMemory{}, errors.Wrap(err, "failed to aggregate Enterprise Search memory")
}

total.Add(multiply(mem, ent.Spec.Count))
ulog.FromContext(ctx).V(1).Info("Collecting", "namespace", ent.Namespace, "ent_name", ent.Name,
"memory", mem.String(), "count", ent.Spec.Count)
}

return total, nil
return managedMemory{total, entSearchKey}, nil
}

func (a Aggregator) aggregateKibanaMemory(ctx context.Context) (resource.Quantity, error) {
func (a aggregator) aggregateKibanaMemory(ctx context.Context) (managedMemory, error) {
var kbList kbv1.KibanaList
err := a.client.List(context.Background(), &kbList)
if err != nil {
return resource.Quantity{}, errors.Wrap(err, "failed to aggregate Kibana memory")
return managedMemory{}, errors.Wrap(err, "failed to aggregate Kibana memory")
}

var total resource.Quantity
Expand All @@ -130,22 +130,22 @@ func (a Aggregator) aggregateKibanaMemory(ctx context.Context) (resource.Quantit
kibana.DefaultMemoryLimits,
)
if err != nil {
return resource.Quantity{}, errors.Wrap(err, "failed to aggregate Kibana memory")
return managedMemory{}, errors.Wrap(err, "failed to aggregate Kibana memory")
}

total.Add(multiply(mem, kb.Spec.Count))
ulog.FromContext(ctx).V(1).Info("Collecting", "namespace", kb.Namespace, "kibana_name", kb.Name,
"memory", mem.String(), "count", kb.Spec.Count)
}

return total, nil
return managedMemory{total, kibanaKey}, nil
}

func (a Aggregator) aggregateLogstashMemory(ctx context.Context) (resource.Quantity, error) {
func (a aggregator) aggregateLogstashMemory(ctx context.Context) (managedMemory, error) {
var lsList lsv1alpha1.LogstashList
err := a.client.List(context.Background(), &lsList)
if err != nil {
return resource.Quantity{}, errors.Wrap(err, "failed to aggregate Logstash memory")
return managedMemory{}, errors.Wrap(err, "failed to aggregate Logstash memory")
}

var total resource.Quantity
Expand All @@ -157,22 +157,22 @@ func (a Aggregator) aggregateLogstashMemory(ctx context.Context) (resource.Quant
logstash.DefaultMemoryLimit,
)
if err != nil {
return resource.Quantity{}, errors.Wrap(err, "failed to aggregate Logstash memory")
return managedMemory{}, errors.Wrap(err, "failed to aggregate Logstash memory")
}

total.Add(multiply(mem, ls.Spec.Count))
ulog.FromContext(ctx).V(1).Info("Collecting", "namespace", ls.Namespace, "logstash_name", ls.Name,
"memory", mem.String(), "count", ls.Spec.Count)
}

return total, nil
return managedMemory{total, logstashKey}, nil
}

func (a Aggregator) aggregateApmServerMemory(ctx context.Context) (resource.Quantity, error) {
func (a aggregator) aggregateApmServerMemory(ctx context.Context) (managedMemory, error) {
var asList apmv1.ApmServerList
err := a.client.List(context.Background(), &asList)
if err != nil {
return resource.Quantity{}, errors.Wrap(err, "failed to aggregate APM Server memory")
return managedMemory{}, errors.Wrap(err, "failed to aggregate APM Server memory")
}

var total resource.Quantity
Expand All @@ -184,15 +184,15 @@ func (a Aggregator) aggregateApmServerMemory(ctx context.Context) (resource.Quan
apmserver.DefaultMemoryLimits,
)
if err != nil {
return resource.Quantity{}, errors.Wrap(err, "failed to aggregate APM Server memory")
return managedMemory{}, errors.Wrap(err, "failed to aggregate APM Server memory")
}

total.Add(multiply(mem, as.Spec.Count))
ulog.FromContext(ctx).V(1).Info("Collecting", "namespace", as.Namespace, "as_name", as.Name,
"memory", mem.String(), "count", as.Spec.Count)
}

return total, nil
return managedMemory{total, apmKey}, nil
}

// containerMemLimits reads the container memory limits from the resource specification with fallback
Expand Down
15 changes: 12 additions & 3 deletions pkg/license/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,20 @@ func TestMemFromNodeOpts(t *testing.T) {
func TestAggregator(t *testing.T) {
objects := readObjects(t, "testdata/stack.yaml")
client := k8s.NewFakeClient(objects...)
aggregator := Aggregator{client: client}
aggregator := aggregator{client: client}

val, err := aggregator.AggregateMemory(context.Background())
val, err := aggregator.aggregateMemory(context.Background())
require.NoError(t, err)
require.Equal(t, 329.9073486328125, inGiB(val))
for k, v := range map[string]float64{
elasticsearchKey: 294.0,
kibanaKey: 5.9073486328125,
apmKey: 2.0,
entSearchKey: 24.0,
logstashKey: 4.0,
} {
require.Equal(t, v, val.appUsage[k].inGiB(), k)
}
require.Equal(t, 329.9073486328125, val.totalMemory.inGiB(), "total")
}

func readObjects(t *testing.T, filePath string) []client.Object {
Expand Down
75 changes: 63 additions & 12 deletions pkg/license/license.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,29 +37,76 @@ const (
Type = "elastic-usage"
// GiB represents the number of bytes for 1 GiB
GiB = 1024 * 1024 * 1024

elasticsearchKey = "elasticsearch"
kibanaKey = "kibana"
apmKey = "apm"
entSearchKey = "enterprise_search"
logstashKey = "logstash"
totalKey = "total_managed"
)

type managedMemory struct {
resource.Quantity
label string
}

func newManagedMemory(binarySI int64, label string) managedMemory {
return managedMemory{
Quantity: *resource.NewQuantity(binarySI, resource.BinarySI),
label: label,
}
}

func (mm managedMemory) inGiB() float64 {
return inGiB(mm.Quantity)
}

func (mm managedMemory) intoMap(m map[string]string) {
m[mm.label+"_memory"] = fmt.Sprintf("%0.2fGiB", inGiB(mm.Quantity))
m[mm.label+"_memory_bytes"] = fmt.Sprintf("%d", mm.Quantity.Value())
}

type memoryUsage struct {
appUsage map[string]managedMemory
totalMemory managedMemory
}

func newMemoryUsage() memoryUsage {
usage := memoryUsage{
appUsage: map[string]managedMemory{},
totalMemory: managedMemory{label: totalKey},
}
return usage
pebrc marked this conversation as resolved.
Show resolved Hide resolved
}

func (mu *memoryUsage) add(memory managedMemory) {
mu.appUsage[memory.label] = memory
mu.totalMemory.Add(memory.Quantity)
}

// LicensingInfo represents information about the operator license including the total memory of all Elastic managed
// components
type LicensingInfo struct {
memoryUsage
Timestamp string
EckLicenseLevel string
EckLicenseExpiryDate *time.Time
TotalManagedMemoryGiB float64
TotalManagedMemoryBytes int64
MaxEnterpriseResourceUnits int64
EnterpriseResourceUnits int64
}

// toMap transforms a LicensingInfo to a map of string, in order to fill in the data of a config map
func (li LicensingInfo) toMap() map[string]string {
m := map[string]string{
"timestamp": li.Timestamp,
"eck_license_level": li.EckLicenseLevel,
"total_managed_memory": fmt.Sprintf("%0.2fGiB", li.TotalManagedMemoryGiB),
"total_managed_memory_bytes": fmt.Sprintf("%d", li.TotalManagedMemoryBytes),
"enterprise_resource_units": strconv.FormatInt(li.EnterpriseResourceUnits, 10),
"timestamp": li.Timestamp,
"eck_license_level": li.EckLicenseLevel,
"enterprise_resource_units": strconv.FormatInt(li.EnterpriseResourceUnits, 10),
}
for _, v := range li.appUsage {
v.intoMap(m)
}
li.totalMemory.intoMap(m)

if li.MaxEnterpriseResourceUnits > 0 {
m["max_enterprise_resource_units"] = strconv.FormatInt(li.MaxEnterpriseResourceUnits, 10)
Expand All @@ -74,7 +121,12 @@ func (li LicensingInfo) toMap() map[string]string {

func (li LicensingInfo) ReportAsMetrics() {
labels := prometheus.Labels{metrics.LicenseLevelLabel: li.EckLicenseLevel}
metrics.LicensingTotalMemoryGauge.With(labels).Set(li.TotalManagedMemoryGiB)
metrics.LicensingTotalMemoryGauge.With(labels).Set(li.totalMemory.inGiB())
metrics.LicensingESMemoryGauge.With(labels).Set(li.appUsage[elasticsearchKey].inGiB())
metrics.LicensingKBMemoryGauge.With(labels).Set(li.appUsage[kibanaKey].inGiB())
metrics.LicensingAPMMemoryGauge.With(labels).Set(li.appUsage[apmKey].inGiB())
metrics.LicensingEntSearchMemoryGauge.With(labels).Set((li.appUsage[entSearchKey].inGiB()))
pebrc marked this conversation as resolved.
Show resolved Hide resolved
metrics.LicensingLogstashMemoryGauge.With(labels).Set(li.appUsage[logstashKey].inGiB())
metrics.LicensingTotalERUGauge.With(labels).Set(float64(li.EnterpriseResourceUnits))

if li.MaxEnterpriseResourceUnits > 0 {
Expand All @@ -89,19 +141,18 @@ type LicensingResolver struct {
}

// ToInfo returns licensing information given the total memory of all Elastic managed components
func (r LicensingResolver) ToInfo(ctx context.Context, totalMemory resource.Quantity) (LicensingInfo, error) {
func (r LicensingResolver) ToInfo(ctx context.Context, memoryUsage memoryUsage) (LicensingInfo, error) {
operatorLicense, err := r.getOperatorLicense(ctx)
if err != nil {
return LicensingInfo{}, err
}

licensingInfo := LicensingInfo{
memoryUsage: memoryUsage,
Timestamp: time.Now().Format(time.RFC3339),
EckLicenseLevel: r.getOperatorLicenseLevel(operatorLicense),
EckLicenseExpiryDate: r.getOperatorLicenseExpiry(operatorLicense),
TotalManagedMemoryGiB: inGiB(totalMemory),
TotalManagedMemoryBytes: totalMemory.Value(),
EnterpriseResourceUnits: inEnterpriseResourceUnits(totalMemory),
EnterpriseResourceUnits: inEnterpriseResourceUnits(memoryUsage.totalMemory.Quantity),
}

// include the max ERUs only for a non trial/basic license
Expand Down
Loading