Skip to content

Commit

Permalink
Asynchronous Aggregation storage (open-telemetry#1232)
Browse files Browse the repository at this point in the history
  • Loading branch information
lalitb authored and DebajitDas committed Mar 21, 2022
1 parent fc82009 commit 52fadef
Show file tree
Hide file tree
Showing 12 changed files with 308 additions and 68 deletions.
4 changes: 2 additions & 2 deletions api/include/opentelemetry/metrics/observer_result.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ class ObserverResult
public:
virtual void Observe(T value) noexcept = 0;

virtual void Observer(T value, const common::KeyValueIterable &attributes) noexcept = 0;
virtual void Observe(T value, const common::KeyValueIterable &attributes) noexcept = 0;

template <class U,
nostd::enable_if_t<common::detail::is_key_value_iterable<U>::value> * = nullptr>
void Observe(T value, const U &attributes) noexcept
{
this->Observe(value, common::KeyValueIterableView<T>{attributes});
this->Observe(value, common::KeyValueIterableView<U>{attributes});
}

void Observe(T value,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,49 @@ class DefaultAggregation
return std::move(std::unique_ptr<Aggregation>(new DropAggregation()));
};
}

static std::unique_ptr<Aggregation> CreateAggregation(AggregationType aggregation_type,
InstrumentDescriptor instrument_descriptor)
{
switch (aggregation_type)
{
case AggregationType::kDrop:
return std::unique_ptr<Aggregation>(new DropAggregation());
break;
case AggregationType::kHistogram:
if (instrument_descriptor.value_type_ == InstrumentValueType::kLong)
{
return std::unique_ptr<Aggregation>(new LongHistogramAggregation());
}
else
{
return std::unique_ptr<Aggregation>(new DoubleHistogramAggregation());
}
break;
case AggregationType::kLastValue:
if (instrument_descriptor.value_type_ == InstrumentValueType::kLong)
{
return std::unique_ptr<Aggregation>(new LongLastValueAggregation());
}
else
{
return std::unique_ptr<Aggregation>(new DoubleLastValueAggregation());
}
break;
case AggregationType::kSum:
if (instrument_descriptor.value_type_ == InstrumentValueType::kLong)
{
return std::unique_ptr<Aggregation>(new LongSumAggregation(true));
}
else
{
return std::unique_ptr<Aggregation>(new DoubleSumAggregation(true));
}
break;
default:
return DefaultAggregation::CreateAggregation(instrument_descriptor);
}
}
};

} // namespace metrics
Expand Down
6 changes: 3 additions & 3 deletions sdk/include/opentelemetry/sdk/metrics/measurement_processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
#pragma once
#ifndef ENABLE_METRICS_PREVIEW

# include <map>
# include "opentelemetry/common/key_value_iterable_view.h"
# include "opentelemetry/sdk/metrics/instruments.h"
# include "opentelemetry/sdk/metrics/metric_reader.h"
# include "opentelemetry/sdk/metrics/state/sync_metric_storage.h"

# include <map>
# include "opentelemetry/sdk/metrics/view/attributes_processor.h"

OPENTELEMETRY_BEGIN_NAMESPACE
namespace sdk
Expand Down Expand Up @@ -50,7 +50,7 @@ class DefaultMeasurementProcessor : public MeasurementProcessor
InstrumentDescriptor instr_desc = {"name", "desc", "1unit", InstrumentType::kCounter,
InstrumentValueType::kLong};
metric_storages_[MakeKey(reader)] = std::unique_ptr<SyncMetricStorage>(
new SyncMetricStorage(instr_desc, AggregationType::kSum));
new SyncMetricStorage(instr_desc, AggregationType::kSum, new DefaultAttributesProcessor()));
return true;
}

Expand Down
48 changes: 48 additions & 0 deletions sdk/include/opentelemetry/sdk/metrics/observer_result.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

#pragma once
#ifndef ENABLE_METRICS_PREVIEW
# include "opentelemetry/common/key_value_iterable.h"
# include "opentelemetry/metrics/observer_result.h"
# include "opentelemetry/sdk/metrics/state/attributes_hashmap.h"
# include "opentelemetry/sdk/metrics/view/attributes_processor.h"

# include <map>

OPENTELEMETRY_BEGIN_NAMESPACE
namespace sdk
{
namespace metrics
{
template <class T>
class ObserverResult final : public opentelemetry::metrics::ObserverResult<T>
{
public:
ObserverResult(const AttributesProcessor *attributes_processor)
: attributes_processor_(attributes_processor)
{}

void Observe(T value) noexcept override { data_.insert({{}, value}); }

void Observe(T value, const opentelemetry::common::KeyValueIterable &attributes) noexcept override
{
auto attr = attributes_processor_->process(attributes);
data_.insert({attr, value});
}

const std::unordered_map<MetricAttributes, T, AttributeHashGenerator> &GetMeasurements()
{
return data_;
}

private:
std::unordered_map<MetricAttributes, T, AttributeHashGenerator> data_;

const AttributesProcessor *attributes_processor_;
};
} // namespace metrics
} // namespace sdk

OPENTELEMETRY_END_NAMESPACE
#endif
80 changes: 80 additions & 0 deletions sdk/include/opentelemetry/sdk/metrics/state/async_metric_storage.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

#pragma once
#ifndef ENABLE_METRICS_PREVIEW
# include "opentelemetry/sdk/common/attributemap_hash.h"
# include "opentelemetry/sdk/metrics/instruments.h"

# include "opentelemetry/sdk/instrumentationlibrary/instrumentation_library.h"
# include "opentelemetry/sdk/metrics/aggregation/default_aggregation.h"
# include "opentelemetry/sdk/metrics/state/attributes_hashmap.h"
# include "opentelemetry/sdk/metrics/state/metric_storage.h"
# include "opentelemetry/sdk/metrics/view/attributes_processor.h"
# include "opentelemetry/sdk/resource/resource.h"

# include <memory>
# include "opentelemetry/sdk/metrics/observer_result.h"

OPENTELEMETRY_BEGIN_NAMESPACE
namespace sdk
{
namespace metrics
{

template <class T>
class AsyncMetricStorage : public MetricStorage
{
public:
AsyncMetricStorage(InstrumentDescriptor instrument_descriptor,
const AggregationType aggregation_type,
void (*measurement_callback)(opentelemetry::metrics::ObserverResult<T> &),
const AttributesProcessor *attributes_processor)
: instrument_descriptor_(instrument_descriptor),
aggregation_type_{aggregation_type},
measurement_collection_callback_{measurement_callback},
attributes_processor_{attributes_processor},
active_attributes_hashmap_(new AttributesHashMap())
{}

bool Collect(
MetricCollector *collector,
nostd::span<MetricCollector *> collectors,
opentelemetry::sdk::instrumentationlibrary::InstrumentationLibrary *instrumentation_library,
opentelemetry::sdk::resource::Resource *resource,
nostd::function_ref<bool(MetricData &)> metric_collection_callback) noexcept override
{
opentelemetry::sdk::metrics::ObserverResult<T> ob_res(attributes_processor_);

// read the measurement using configured callback
measurement_collection_callback_(ob_res);

// process the read measurements - aggregate and store in hashmap
for (auto &measurement : ob_res.GetMeasurements())
{
auto agg = DefaultAggregation::CreateAggregation(aggregation_type_, instrument_descriptor_);
agg->Aggregate(measurement.second);
active_attributes_hashmap_->Set(measurement.first, std::move(agg));
}

// TBD -> read aggregation from hashmap, and perform metric collection
MetricData metric_data;
if (metric_collection_callback(metric_data))
{
return true;
}
return false;
}

private:
InstrumentDescriptor instrument_descriptor_;
AggregationType aggregation_type_;
void (*measurement_collection_callback_)(opentelemetry::metrics::ObserverResult<T> &);
const AttributesProcessor *attributes_processor_;
std::unique_ptr<AttributesHashMap> active_attributes_hashmap_;
};

} // namespace metrics
} // namespace sdk
OPENTELEMETRY_END_NAMESPACE
#endif
4 changes: 2 additions & 2 deletions sdk/include/opentelemetry/sdk/metrics/state/metric_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class MetricStorage
nostd::span<MetricCollector *> collectors,
opentelemetry::sdk::instrumentationlibrary::InstrumentationLibrary *instrumentation_library,
opentelemetry::sdk::resource::Resource *resource,
nostd::function_ref<bool(MetricData)> callback) noexcept = 0;
nostd::function_ref<bool(MetricData &)> callback) noexcept = 0;
};

class WritableMetricStorage
Expand All @@ -48,7 +48,7 @@ class NoopMetricStorage : public MetricStorage
nostd::span<MetricCollector *> collectors,
opentelemetry::sdk::instrumentationlibrary::InstrumentationLibrary *instrumentation_library,
opentelemetry::sdk::resource::Resource *resource,
nostd::function_ref<bool(MetricData)> callback) noexcept override
nostd::function_ref<bool(MetricData &)> callback) noexcept override
{
MetricData metric_data;
if (callback(metric_data))
Expand Down
74 changes: 14 additions & 60 deletions sdk/include/opentelemetry/sdk/metrics/state/sync_metric_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,17 @@ class SyncMetricStorage : public MetricStorage, public WritableMetricStorage
{

public:
SyncMetricStorage(
InstrumentDescriptor instrument_descriptor,
const AggregationType aggregation_type,
const AttributesProcessor *attributes_processor = new DefaultAttributesProcessor())
SyncMetricStorage(InstrumentDescriptor instrument_descriptor,
const AggregationType aggregation_type,
const AttributesProcessor *attributes_processor)
: instrument_descriptor_(instrument_descriptor),
aggregation_type_{aggregation_type},
attributes_hashmap_(new AttributesHashMap()),
attributes_processor_{attributes_processor}
{
create_default_aggregation_ = [&]() -> std::unique_ptr<Aggregation> {
return std::move(this->create_aggregation());
return std::move(
DefaultAggregation::CreateAggregation(aggregation_type_, instrument_descriptor_));
};
}

Expand All @@ -45,8 +45,7 @@ class SyncMetricStorage : public MetricStorage, public WritableMetricStorage
{
return;
}
auto aggregation = attributes_hashmap_->GetOrSetDefault({}, create_default_aggregation_);
aggregation->Aggregate(value);
attributes_hashmap_->GetOrSetDefault({}, create_default_aggregation_)->Aggregate(value);
}

void RecordLong(long value,
Expand All @@ -57,9 +56,8 @@ class SyncMetricStorage : public MetricStorage, public WritableMetricStorage
return;
}

auto attr = attributes_processor_->process(attributes);
auto aggregation = attributes_hashmap_->GetOrSetDefault(attr, create_default_aggregation_);
aggregation->Aggregate(value);
auto attr = attributes_processor_->process(attributes);
attributes_hashmap_->GetOrSetDefault(attr, create_default_aggregation_)->Aggregate(value);
}

void RecordDouble(double value) noexcept override
Expand All @@ -69,8 +67,7 @@ class SyncMetricStorage : public MetricStorage, public WritableMetricStorage
return;
}

auto aggregation = attributes_hashmap_->GetOrSetDefault({}, create_default_aggregation_);
aggregation->Aggregate(value);
attributes_hashmap_->GetOrSetDefault({}, create_default_aggregation_)->Aggregate(value);
}

void RecordDouble(double value,
Expand All @@ -81,20 +78,19 @@ class SyncMetricStorage : public MetricStorage, public WritableMetricStorage
return;
}

auto attr = attributes_processor_->process(attributes);
auto aggregation = attributes_hashmap_->GetOrSetDefault(attr, create_default_aggregation_);
aggregation->Aggregate(value);
auto attr = attributes_processor_->process(attributes);
attributes_hashmap_->GetOrSetDefault(attr, create_default_aggregation_)->Aggregate(value);
}

bool Collect(
MetricCollector *collector,
nostd::span<MetricCollector *> collectors,
opentelemetry::sdk::instrumentationlibrary::InstrumentationLibrary *instrumentation_library,
opentelemetry::sdk::resource::Resource *resource,
nostd::function_ref<bool(MetricData)> callback) noexcept override
nostd::function_ref<bool(MetricData &)> callback) noexcept override
{

if (callback(MetricData()))
MetricData metric_data;
if (callback(metric_data))
{
return true;
}
Expand All @@ -107,48 +103,6 @@ class SyncMetricStorage : public MetricStorage, public WritableMetricStorage
std::unique_ptr<AttributesHashMap> attributes_hashmap_;
const AttributesProcessor *attributes_processor_;
std::function<std::unique_ptr<Aggregation>()> create_default_aggregation_;

std::unique_ptr<Aggregation> create_aggregation()
{
switch (aggregation_type_)
{
case AggregationType::kDrop:
return std::move(std::unique_ptr<Aggregation>(new DropAggregation()));
break;
case AggregationType::kHistogram:
if (instrument_descriptor_.value_type_ == InstrumentValueType::kLong)
{
return std::move(std::unique_ptr<Aggregation>(new LongHistogramAggregation()));
}
else
{
return std::move(std::unique_ptr<Aggregation>(new DoubleHistogramAggregation()));
}
break;
case AggregationType::kLastValue:
if (instrument_descriptor_.value_type_ == InstrumentValueType::kLong)
{
return std::move(std::unique_ptr<Aggregation>(new LongLastValueAggregation()));
}
else
{
return std::move(std::unique_ptr<Aggregation>(new DoubleLastValueAggregation()));
}
break;
case AggregationType::kSum:
if (instrument_descriptor_.value_type_ == InstrumentValueType::kLong)
{
return std::move(std::unique_ptr<Aggregation>(new LongSumAggregation(true)));
}
else
{
return std::move(std::unique_ptr<Aggregation>(new DoubleSumAggregation(true)));
}
break;
default:
return std::move(DefaultAggregation::CreateAggregation(instrument_descriptor_));
}
}
};

} // namespace metrics
Expand Down
Loading

0 comments on commit 52fadef

Please sign in to comment.