Skip to content

Commit

Permalink
Add common metric definitions for various kvstores.
Browse files Browse the repository at this point in the history
Adds definitions for:
   /tensorstore/kvstore/driver/read
   /tensorstore/kvstore/driver/list
   /tensorstore/kvstore/driver/write
   /tensorstore/kvstore/driver/delete_range
   /tensorstore/kvstore/driver/batch_read
   /tensorstore/kvstore/driver/bytes_read
   /tensorstore/kvstore/driver/read_latency_ms
   /tensorstore/kvstore/driver/bytes_written
   /tensorstore/kvstore/driver/write_latency_ms

Before, the metric definitions were duplicated independently in each
kvstore driver; now there is a common set of definitions for many kvstores.

Update file, gcs_grpc, gcs, and s3 to consistently export the common metrics.
This adds latency metrics for file.

Update zip, ocdbt, and tsgrpc to more consistently export a subset of the common metrics.

PiperOrigin-RevId: 658546420
Change-Id: I4bd3979e2a14eb99174442ac1912d62f8c91385c
  • Loading branch information
laramiel authored and tkiela1 committed Aug 1, 2024
1 parent 5847d9a commit cc9fb66
Show file tree
Hide file tree
Showing 17 changed files with 309 additions and 267 deletions.
9 changes: 9 additions & 0 deletions tensorstore/kvstore/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -408,3 +408,12 @@ tensorstore_cc_library(
"@com_google_absl//absl/time",
],
)

cc_library(
name = "common_metrics",
hdrs = ["common_metrics.h"],
deps = [
"//tensorstore/internal/metrics",
"//tensorstore/internal/metrics:metadata",
],
)
147 changes: 147 additions & 0 deletions tensorstore/kvstore/common_metrics.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
// Copyright 2020 The TensorStore Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef TENSORSTORE_KVSTORE_COMMON_METRICS_H_
#define TENSORSTORE_KVSTORE_COMMON_METRICS_H_

#include <cstdint>

#include "tensorstore/internal/metrics/counter.h"
#include "tensorstore/internal/metrics/histogram.h"
#include "tensorstore/internal/metrics/metadata.h" // IWYU pragma: keep

namespace tensorstore {
namespace internal_kvstore {

// Holds references to the common read metrics for a kvstore driver.
// /tensorstore/kvstore/driver/read
// /tensorstore/kvstore/driver/list
struct CommonReadMetrics {
internal_metrics::Counter<int64_t>& read;
internal_metrics::Counter<int64_t>& list;
};

// Holds references to the common write metrics for a kvstore driver.
// /tensorstore/kvstore/driver/write
// /tensorstore/kvstore/driver/delete_range
struct CommonWriteMetrics {
internal_metrics::Counter<int64_t>& write;
internal_metrics::Counter<int64_t>& delete_range;
};

// Holds references to the common read metrics for a kvstore driver.
// /tensorstore/kvstore/driver/batch_read
// /tensorstore/kvstore/driver/bytes_read
// /tensorstore/kvstore/driver/read_latency_ms
struct DetailedReadMetrics {
internal_metrics::Counter<int64_t>& batch_read;
internal_metrics::Counter<int64_t>& bytes_read;
internal_metrics::Histogram<internal_metrics::DefaultBucketer>&
read_latency_ms;
};

// Holds references to the common write metrics for a kvstore driver.
// /tensorstore/kvstore/driver/bytes_written
// /tensorstore/kvstore/driver/write_latency_ms
struct DetailedWriteMetrics {
internal_metrics::Counter<int64_t>& bytes_written;
internal_metrics::Histogram<internal_metrics::DefaultBucketer>&
write_latency_ms;
};

// Holds references to the common read and write metrics for a kvstore driver.
// /tensorstore/kvstore/driver/read
// /tensorstore/kvstore/driver/list
// /tensorstore/kvstore/driver/write
// /tensorstore/kvstore/driver/delete_range
// /tensorstore/kvstore/driver/batch_read
// /tensorstore/kvstore/driver/bytes_read
// /tensorstore/kvstore/driver/read_latency_ms
// /tensorstore/kvstore/driver/bytes_written
// /tensorstore/kvstore/driver/write_latency_ms
//
// Example:
// namespace {
// auto my_metrics = TENSORSTORE_KVSTORE_COMMON_METRICS(driver);
// } // namespace
//
// my_metrics.read.Increment();
// my_metrics.bytes_read.IncrementBy(100);
//
struct CommonMetrics : public CommonReadMetrics,
public CommonWriteMetrics,
public DetailedReadMetrics,
public DetailedWriteMetrics {
// no additional members
};

#define TENSORSTORE_KVSTORE_COUNTER_IMPL(KVSTORE, NAME, DESC, ...) \
internal_metrics::Counter<int64_t>::New( \
"/tensorstore/kvstore/" #KVSTORE "/" #NAME, \
internal_metrics::MetricMetadata(#KVSTORE " " DESC, ##__VA_ARGS__))

#define TENSORSTORE_KVSTORE_LATENCY_IMPL(KVSTORE, NAME, METRIC_FN) \
internal_metrics::Histogram<internal_metrics::DefaultBucketer>::New( \
"/tensorstore/kvstore/" #KVSTORE "/" #NAME, \
internal_metrics::MetricMetadata( \
#KVSTORE " kvstore::" #METRIC_FN " latency (ms)", \
internal_metrics::Units::kMilliseconds))

#define TENSORSTORE_KVSTORE_COMMON_READ_METRICS(KVSTORE) \
[]() -> ::tensorstore::internal_kvstore::CommonReadMetrics { \
return {TENSORSTORE_KVSTORE_COUNTER_IMPL(KVSTORE, read, \
"kvstore::Read calls"), \
TENSORSTORE_KVSTORE_COUNTER_IMPL(KVSTORE, list, \
"kvstore::List calls")}; \
}()

#define TENSORSTORE_KVSTORE_COMMON_WRITE_METRICS(KVSTORE) \
[]() -> ::tensorstore::internal_kvstore::CommonWriteMetrics { \
return {TENSORSTORE_KVSTORE_COUNTER_IMPL(KVSTORE, write, \
"kvstore::Write calls"), \
TENSORSTORE_KVSTORE_COUNTER_IMPL(KVSTORE, delete_range, \
"kvstore::DeleteRange calls")}; \
}()

#define TENSORSTORE_KVSTORE_DETAILED_READ_METRICS(KVSTORE) \
[]() -> ::tensorstore::internal_kvstore::DetailedReadMetrics { \
return { \
TENSORSTORE_KVSTORE_COUNTER_IMPL(KVSTORE, batch_read, \
"kvstore::Read after batching"), \
TENSORSTORE_KVSTORE_COUNTER_IMPL(KVSTORE, bytes_read, "bytes read", \
internal_metrics::Units::kBytes), \
TENSORSTORE_KVSTORE_LATENCY_IMPL(KVSTORE, read_latency_ms, Read)}; \
}()

#define TENSORSTORE_KVSTORE_DETAILED_WRITE_METRICS(KVSTORE) \
[]() -> ::tensorstore::internal_kvstore::DetailedWriteMetrics { \
return { \
TENSORSTORE_KVSTORE_COUNTER_IMPL(KVSTORE, bytes_written, \
"bytes written", \
internal_metrics::Units::kBytes), \
TENSORSTORE_KVSTORE_LATENCY_IMPL(KVSTORE, write_latency_ms, Write)}; \
}()

#define TENSORSTORE_KVSTORE_COMMON_METRICS(KVSTORE) \
[]() -> ::tensorstore::internal_kvstore::CommonMetrics { \
return {TENSORSTORE_KVSTORE_COMMON_READ_METRICS(KVSTORE), \
TENSORSTORE_KVSTORE_COMMON_WRITE_METRICS(KVSTORE), \
TENSORSTORE_KVSTORE_DETAILED_READ_METRICS(KVSTORE), \
TENSORSTORE_KVSTORE_DETAILED_WRITE_METRICS(KVSTORE)}; \
}()

} // namespace internal_kvstore
} // namespace tensorstore

#endif // TENSORSTORE_KVSTORE_COMMON_METRICS_H_
1 change: 1 addition & 0 deletions tensorstore/kvstore/file/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ tensorstore_cc_library(
"//tensorstore/kvstore",
"//tensorstore/kvstore:batch_util",
"//tensorstore/kvstore:byte_range",
"//tensorstore/kvstore:common_metrics",
"//tensorstore/kvstore:generation",
"//tensorstore/kvstore:key_range",
"//tensorstore/util:executor",
Expand Down
84 changes: 34 additions & 50 deletions tensorstore/kvstore/file/file_key_value_store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@
#include "tensorstore/internal/uri_utils.h"
#include "tensorstore/kvstore/batch_util.h"
#include "tensorstore/kvstore/byte_range.h"
#include "tensorstore/kvstore/common_metrics.h"
#include "tensorstore/kvstore/file/util.h"
#include "tensorstore/kvstore/generation.h"
#include "tensorstore/kvstore/key_range.h"
Expand Down Expand Up @@ -154,7 +155,6 @@
using ::tensorstore::internal::OsErrorCode;
using ::tensorstore::internal_file_util::IsKeyValid;
using ::tensorstore::internal_file_util::LongestDirectoryPrefix;
using ::tensorstore::internal_metrics::MetricMetadata;
using ::tensorstore::internal_os::FileDescriptor;
using ::tensorstore::internal_os::FileInfo;
using ::tensorstore::internal_os::kLockSuffix;
Expand All @@ -167,45 +167,22 @@ using ::tensorstore::kvstore::SupportedFeatures;
namespace tensorstore {
namespace internal_file_kvstore {
namespace {
namespace jb = tensorstore::internal_json_binding;

auto& file_bytes_read = internal_metrics::Counter<int64_t>::New(
"/tensorstore/kvstore/file/bytes_read",
MetricMetadata("Bytes read by the file kvstore driver",
internal_metrics::Units::kBytes));

auto& file_bytes_written = internal_metrics::Counter<int64_t>::New(
"/tensorstore/kvstore/file/bytes_written",
MetricMetadata("Bytes written by the file kvstore driver",
internal_metrics::Units::kBytes));

auto& file_read = internal_metrics::Counter<int64_t>::New(
"/tensorstore/kvstore/file/read",
MetricMetadata("file driver kvstore::Read calls"));

auto& file_open_read = internal_metrics::Counter<int64_t>::New(
"/tensorstore/kvstore/file/open_read",
MetricMetadata("Number of times a file is opened for reading"));

auto& file_batch_read = internal_metrics::Counter<int64_t>::New(
"/tensorstore/kvstore/file/batch_read",
MetricMetadata("file driver reads after batching"));

auto& file_write = internal_metrics::Counter<int64_t>::New(
"/tensorstore/kvstore/file/write",
MetricMetadata("file driver kvstore::Write calls"));

auto& file_delete_range = internal_metrics::Counter<int64_t>::New(
"/tensorstore/kvstore/file/delete_range",
MetricMetadata("file driver kvstore::DeleteRange calls"));
namespace jb = tensorstore::internal_json_binding;

auto& file_list = internal_metrics::Counter<int64_t>::New(
"/tensorstore/kvstore/file/list",
MetricMetadata("file driver kvstore::List calls"));
struct FileMetrics : public internal_kvstore::CommonMetrics {
internal_metrics::Counter<int64_t>& open_read;
internal_metrics::Counter<int64_t>& lock_contention;
// no additional members
};

auto& file_lock_contention = internal_metrics::Counter<int64_t>::New(
"/tensorstore/kvstore/file/lock_contention",
MetricMetadata("file driver write lock contention"));
auto file_metrics = []() -> FileMetrics {
return {TENSORSTORE_KVSTORE_COMMON_METRICS(file),
TENSORSTORE_KVSTORE_COUNTER_IMPL(
file, open_read, "Number of times a file is opened for reading"),
TENSORSTORE_KVSTORE_COUNTER_IMPL(file, lock_contention,
" kvstore::Write lock contention")};
}();

ABSL_CONST_INIT internal_log::VerboseFlag file_logging("file");

Expand Down Expand Up @@ -485,7 +462,7 @@ struct WriteLockHelper {
// Release lock and try again.
lock = FileLock{};
lock_fd = std::move(other_fd);
file_lock_contention.Increment();
file_metrics.lock_contention.Increment();
}
}

Expand All @@ -500,8 +477,9 @@ struct WriteLockHelper {
};

Result<absl::Cord> ReadFromFileDescriptor(FileDescriptor fd,
ByteRange byte_range) {
file_batch_read.Increment();
ByteRange byte_range,
absl::Time start_time) {
file_metrics.batch_read.Increment();
internal::FlatCordBuilder buffer(byte_range.size(), false);
size_t offset = 0;
while (offset < buffer.size()) {
Expand All @@ -510,7 +488,7 @@ Result<absl::Cord> ReadFromFileDescriptor(FileDescriptor fd,
buffer.size() - offset,
byte_range.inclusive_min + offset));
if (n > 0) {
file_bytes_read.IncrementBy(n);
file_metrics.bytes_read.IncrementBy(n);
offset += n;
buffer.set_inuse(offset);
continue;
Expand All @@ -520,6 +498,8 @@ Result<absl::Cord> ReadFromFileDescriptor(FileDescriptor fd,
tensorstore::StrCat("Length changed while reading"));
}
}
file_metrics.read_latency_ms.Observe(
absl::ToInt64Milliseconds(absl::Now() - start_time));
return std::move(buffer).Build();
}

Expand Down Expand Up @@ -557,14 +537,14 @@ class BatchReadTask final
Result<kvstore::ReadResult> DoByteRangeRead(ByteRange byte_range) {
absl::Cord value;
TENSORSTORE_ASSIGN_OR_RETURN(
value, ReadFromFileDescriptor(fd_.get(), byte_range),
value, ReadFromFileDescriptor(fd_.get(), byte_range, stamp_.time),
tensorstore::MaybeAnnotateStatus(_, "Error reading from open file"));
return kvstore::ReadResult::Value(std::move(value), stamp_);
}

void ProcessBatch() {
stamp_.time = absl::Now();
file_open_read.Increment();
file_metrics.open_read.Increment();
auto& requests = request_batch.requests;
TENSORSTORE_ASSIGN_OR_RETURN(
fd_,
Expand All @@ -587,6 +567,7 @@ class BatchReadTask final
// Perform single read immediately.
byte_range_request.promise.SetResult(
DoByteRangeRead(byte_range_request.byte_range.AsByteRange()));

return;
}

Expand All @@ -596,7 +577,8 @@ class BatchReadTask final
coalescing_options.max_extra_read_bytes = 255;
internal_kvstore_batch::ForEachCoalescedRequest<Request>(
requests, coalescing_options,
[&](ByteRange coalesced_byte_range, span<Request> coalesced_requests) {
[&](ByteRange coalesced_byte_range,
tensorstore::span<Request> coalesced_requests) {
auto self = internal::IntrusivePtr<BatchReadTask>(this);
executor([self = std::move(self), coalesced_byte_range,
coalesced_requests] {
Expand All @@ -607,7 +589,7 @@ class BatchReadTask final
}

void ProcessCoalescedRead(ByteRange coalesced_byte_range,
span<Request> coalesced_requests) {
tensorstore::span<Request> coalesced_requests) {
TENSORSTORE_ASSIGN_OR_RETURN(auto read_result,
DoByteRangeRead(coalesced_byte_range),
internal_kvstore_batch::SetCommonResult(
Expand All @@ -618,7 +600,7 @@ class BatchReadTask final
};

Future<ReadResult> FileKeyValueStore::Read(Key key, ReadOptions options) {
file_read.Increment();
file_metrics.read.Increment();
TENSORSTORE_RETURN_IF_ERROR(ValidateKey(key));
auto [promise, future] = PromiseFuturePair<kvstore::ReadResult>::Make();
BatchReadTask::MakeRequest<BatchReadTask>(
Expand Down Expand Up @@ -670,7 +652,7 @@ struct WriteTask {
MaybeAnnotateStatus(_,
tensorstore::StrCat("Failed writing: ",
QuoteString(lock_path))));
file_bytes_written.IncrementBy(n);
file_metrics.bytes_written.IncrementBy(n);
if (n == value_for_write.size()) break;
value_for_write.RemovePrefix(n);
}
Expand All @@ -696,6 +678,8 @@ struct WriteTask {
// modification time doesn't change afterwards.
FileInfo info;
TENSORSTORE_RETURN_IF_ERROR(internal_os::GetFileInfo(fd, &info));
file_metrics.write_latency_ms.Observe(
absl::ToInt64Milliseconds(absl::Now() - r.time));
return GetFileGeneration(info);
}();

Expand Down Expand Up @@ -767,7 +751,7 @@ struct DeleteTask {

Future<TimestampedStorageGeneration> FileKeyValueStore::Write(
Key key, std::optional<Value> value, WriteOptions options) {
file_write.Increment();
file_metrics.write.Increment();
TENSORSTORE_RETURN_IF_ERROR(ValidateKey(key));
if (value) {
return MapFuture(executor(), WriteTask{std::move(key), std::move(*value),
Expand Down Expand Up @@ -821,7 +805,7 @@ struct DeleteRangeTask {
};

Future<const void> FileKeyValueStore::DeleteRange(KeyRange range) {
file_delete_range.Increment();
file_metrics.delete_range.Increment();
if (range.empty()) return absl::OkStatus(); // Converted to a ReadyFuture.
TENSORSTORE_RETURN_IF_ERROR(ValidateKeyRange(range));
return PromiseFuturePair<void>::Link(
Expand Down Expand Up @@ -872,7 +856,7 @@ struct ListTask {
};

void FileKeyValueStore::ListImpl(ListOptions options, ListReceiver receiver) {
file_list.Increment();
file_metrics.list.Increment();
if (options.range.empty()) {
execution::set_starting(receiver, [] {});
execution::set_done(receiver);
Expand Down
Loading

0 comments on commit cc9fb66

Please sign in to comment.