Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add metrics for memtable #1036

Merged
merged 6 commits into from
Jun 27, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion analytic_engine/src/memtable/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.
// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0.

//! MemTable

Expand Down Expand Up @@ -193,6 +193,19 @@ pub trait MemTable {
///
/// If the memtable is empty, then the last sequence is 0.
fn last_sequence(&self) -> SequenceNumber;

/// Metrics of inner state.
fn metrics(&self) -> Metrics;
}

#[derive(Debug)]
pub struct Metrics {
/// Size of original rows.
pub row_raw_size: usize,
/// Size of rows after encoded.
pub row_encoded_size: usize,
/// Row number count.
pub row_count: usize,
}

/// A reference to memtable
Expand Down
3 changes: 2 additions & 1 deletion analytic_engine/src/memtable/skiplist/factory.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.
// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0.

//! Skiplist memtable factory

Expand All @@ -25,6 +25,7 @@ impl Factory for SkiplistMemTableFactory {
schema: opts.schema,
skiplist,
last_sequence: AtomicU64::new(opts.creation_sequence),
metrics: Default::default(),
});

Ok(memtable)
Expand Down
44 changes: 39 additions & 5 deletions analytic_engine/src/memtable/skiplist/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.
// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0.

//! MemTable based on skiplist

Expand All @@ -8,7 +8,7 @@ pub mod iter;
use std::{
cmp::Ordering,
convert::TryInto,
sync::atomic::{self, AtomicU64},
sync::atomic::{self, AtomicU64, AtomicUsize},
};

use arena::{Arena, BasicStats};
Expand All @@ -26,10 +26,17 @@ use snafu::{ensure, ResultExt};
use crate::memtable::{
key::{ComparableInternalKey, KeySequence},
skiplist::iter::{ColumnarIterImpl, ReversedColumnarIterator},
ColumnarIterPtr, EncodeInternalKey, InvalidPutSequence, InvalidRow, MemTable, PutContext,
Result, ScanContext, ScanRequest,
ColumnarIterPtr, EncodeInternalKey, InvalidPutSequence, InvalidRow, MemTable,
Metrics as MemtableMetrics, PutContext, Result, ScanContext, ScanRequest,
};

#[derive(Default, Debug)]
struct Metrics {
row_raw_size: AtomicUsize,
row_encoded_size: AtomicUsize,
row_count: AtomicUsize,
}

/// MemTable implementation based on skiplist
pub struct SkiplistMemTable<A: Arena<Stats = BasicStats> + Clone + Sync + Send> {
/// Schema of this memtable, is immutable.
Expand All @@ -38,6 +45,8 @@ pub struct SkiplistMemTable<A: Arena<Stats = BasicStats> + Clone + Sync + Send>
/// The last sequence of the rows in this memtable. Update to this field
/// require external synchronization.
last_sequence: AtomicU64,

metrics: Metrics,
}

impl<A: Arena<Stats = BasicStats> + Clone + Sync + Send + 'static> MemTable
Expand Down Expand Up @@ -95,9 +104,20 @@ impl<A: Arena<Stats = BasicStats> + Clone + Sync + Send + 'static> MemTable
let row_value = &mut ctx.value_buf;
let mut row_writer = ContiguousRowWriter::new(row_value, schema, &ctx.index_in_writer);
row_writer.write_row(row).box_err().context(InvalidRow)?;

let encoded_size = internal_key.len() + row_value.len();
self.skiplist.put(internal_key, row_value);

// Update metrics
self.metrics
.row_raw_size
.fetch_add(row.size(), atomic::Ordering::Relaxed);
self.metrics
.row_count
.fetch_add(1, atomic::Ordering::Relaxed);
self.metrics
.row_encoded_size
.fetch_add(encoded_size, atomic::Ordering::Relaxed);

Ok(())
}

Expand Down Expand Up @@ -147,6 +167,20 @@ impl<A: Arena<Stats = BasicStats> + Clone + Sync + Send + 'static> MemTable
fn last_sequence(&self) -> SequenceNumber {
self.last_sequence.load(atomic::Ordering::Relaxed)
}

fn metrics(&self) -> MemtableMetrics {
let row_raw_size = self.metrics.row_raw_size.load(atomic::Ordering::Relaxed);
let row_encoded_size = self
.metrics
.row_encoded_size
.load(atomic::Ordering::Relaxed);
let row_count = self.metrics.row_count.load(atomic::Ordering::Relaxed);
MemtableMetrics {
row_raw_size,
row_encoded_size,
row_count,
}
}
}

#[derive(Debug, Clone)]
Expand Down
9 changes: 4 additions & 5 deletions analytic_engine/src/table/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -449,13 +449,12 @@ impl TableData {

let mutable_usage = self.current_version.mutable_memory_usage();
let total_usage = self.current_version.total_memory_usage();

let in_flush = serial_exec.flush_scheduler().is_in_flush();
// Inspired by https://github.com/facebook/rocksdb/blob/main/include/rocksdb/write_buffer_manager.h#L94
if mutable_usage > mutable_limit && !in_flush {
info!(
"TableData should flush, table:{}, table_id:{}, mutable_usage:{}, mutable_limit: {}, total_usage:{}, max_write_buffer_size:{}",
self.name, self.id, mutable_usage, mutable_limit, total_usage, max_write_buffer_size
"TableData should flush by mutable limit, table:{}, table_id:{}, mutable_usage:{}, mutable_limit: {}, total_usage:{}, max_write_buffer_size:{}.",
ShiKaiWi marked this conversation as resolved.
Show resolved Hide resolved
self.name, self.id, mutable_usage, mutable_limit, total_usage, max_write_buffer_size,
);
return true;
}
Expand All @@ -473,8 +472,8 @@ impl TableData {

if should_flush {
info!(
"TableData should flush, table:{}, table_id:{}, mutable_usage:{}, mutable_limit: {}, total_usage:{}, max_write_buffer_size:{}",
self.name, self.id, mutable_usage, mutable_limit, total_usage, max_write_buffer_size
"TableData should flush by total usage, table:{}, table_id:{}, mutable_usage:{}, mutable_limit: {}, total_usage:{}, max_write_buffer_size:{}.",
self.name, self.id, mutable_usage, mutable_limit, total_usage, max_write_buffer_size,
);
}

Expand Down
2 changes: 2 additions & 0 deletions analytic_engine/src/table/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ impl fmt::Debug for MemTableState {
f.debug_struct("MemTableState")
.field("time_range", &self.time_range)
.field("id", &self.id)
.field("mem", &self.mem.approximate_memory_usage())
.field("metrics", &self.mem.metrics())
.field("last_sequence", &self.mem.last_sequence())
.finish()
}
Expand Down
22 changes: 22 additions & 0 deletions common_types/src/datum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -851,6 +851,28 @@ impl Datum {
Ok(Datum::Date(days))
}

pub fn size(&self) -> usize {
match self {
Datum::Null => 1,
Datum::Timestamp(_) => 8,
Datum::Double(_) => 8,
Datum::Float(_) => 4,
Datum::Varbinary(v) => v.len(),
Datum::String(v) => v.len(),
Datum::UInt64(_) => 8,
Datum::UInt32(_) => 4,
Datum::UInt16(_) => 2,
Datum::UInt8(_) => 1,
Datum::Int64(_) => 8,
Datum::Int32(_) => 4,
Datum::Int16(_) => 2,
Datum::Int8(_) => 1,
Datum::Boolean(_) => 1,
Datum::Date(_) => 4,
Datum::Time(_) => 8,
}
}

#[cfg(test)]
pub fn as_view(&self) -> DatumView {
match self {
Expand Down
4 changes: 4 additions & 0 deletions common_types/src/row/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ impl Row {

self.cols[timestamp_index].as_timestamp()
}

pub fn size(&self) -> usize {
self.cols.iter().map(|col| col.size()).sum()
}
}

#[derive(Debug)]
Expand Down
Loading