Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Refactor PVF preparation memory stats #6693

Merged
merged 7 commits into from
Feb 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions node/core/candidate-validation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
#![warn(missing_docs)]

use polkadot_node_core_pvf::{
InvalidCandidate as WasmInvalidCandidate, PrepareError, Pvf, ValidationError, ValidationHost,
InvalidCandidate as WasmInvalidCandidate, PrepareError, PrepareStats, Pvf, ValidationError,
ValidationHost,
};
use polkadot_node_primitives::{
BlockData, InvalidCandidate, PoV, ValidationResult, POV_BOMB_LIMIT, VALIDATION_CODE_BOMB_LIMIT,
Expand Down Expand Up @@ -654,7 +655,7 @@ trait ValidationBackend {
validation_result
}

async fn precheck_pvf(&mut self, pvf: Pvf) -> Result<Duration, PrepareError>;
async fn precheck_pvf(&mut self, pvf: Pvf) -> Result<PrepareStats, PrepareError>;
}

#[async_trait]
Expand All @@ -680,7 +681,7 @@ impl ValidationBackend for ValidationHost {
.map_err(|_| ValidationError::InternalError("validation was cancelled".into()))?
}

async fn precheck_pvf(&mut self, pvf: Pvf) -> Result<Duration, PrepareError> {
async fn precheck_pvf(&mut self, pvf: Pvf) -> Result<PrepareStats, PrepareError> {
let (tx, rx) = oneshot::channel();
if let Err(err) = self.precheck_pvf(pvf, tx).await {
// Return an IO error if there was an error communicating with the host.
Expand Down
12 changes: 6 additions & 6 deletions node/core/candidate-validation/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ impl ValidationBackend for MockValidateCandidateBackend {
result
}

async fn precheck_pvf(&mut self, _pvf: Pvf) -> Result<Duration, PrepareError> {
async fn precheck_pvf(&mut self, _pvf: Pvf) -> Result<PrepareStats, PrepareError> {
unreachable!()
}
}
Expand Down Expand Up @@ -894,11 +894,11 @@ fn pov_decompression_failure_is_invalid() {
}

struct MockPreCheckBackend {
result: Result<Duration, PrepareError>,
result: Result<PrepareStats, PrepareError>,
}

impl MockPreCheckBackend {
fn with_hardcoded_result(result: Result<Duration, PrepareError>) -> Self {
fn with_hardcoded_result(result: Result<PrepareStats, PrepareError>) -> Self {
Self { result }
}
}
Expand All @@ -914,7 +914,7 @@ impl ValidationBackend for MockPreCheckBackend {
unreachable!()
}

async fn precheck_pvf(&mut self, _pvf: Pvf) -> Result<Duration, PrepareError> {
async fn precheck_pvf(&mut self, _pvf: Pvf) -> Result<PrepareStats, PrepareError> {
self.result.clone()
}
}
Expand All @@ -931,7 +931,7 @@ fn precheck_works() {

let (check_fut, check_result) = precheck_pvf(
ctx.sender(),
MockPreCheckBackend::with_hardcoded_result(Ok(Duration::default())),
MockPreCheckBackend::with_hardcoded_result(Ok(PrepareStats::default())),
relay_parent,
validation_code_hash,
)
Expand Down Expand Up @@ -977,7 +977,7 @@ fn precheck_invalid_pvf_blob_compression() {

let (check_fut, check_result) = precheck_pvf(
ctx.sender(),
MockPreCheckBackend::with_hardcoded_result(Ok(Duration::default())),
MockPreCheckBackend::with_hardcoded_result(Ok(PrepareStats::default())),
relay_parent,
validation_code_hash,
)
Expand Down
10 changes: 5 additions & 5 deletions node/core/pvf/src/artifacts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

use crate::{error::PrepareError, host::PrepareResultSender};
use crate::{error::PrepareError, host::PrepareResultSender, prepare::PrepareStats};
use always_assert::always;
use polkadot_parachain::primitives::ValidationCodeHash;
use std::{
Expand Down Expand Up @@ -101,8 +101,8 @@ pub enum ArtifactState {
/// This is updated when we get the heads up for this artifact or when we just discover
/// this file.
last_time_needed: SystemTime,
/// The CPU time that was taken preparing this artifact.
cpu_time_elapsed: Duration,
/// Stats produced by successful preparation.
prepare_stats: PrepareStats,
},
/// A task to prepare this artifact is scheduled.
Preparing {
Expand Down Expand Up @@ -177,12 +177,12 @@ impl Artifacts {
&mut self,
artifact_id: ArtifactId,
last_time_needed: SystemTime,
cpu_time_elapsed: Duration,
prepare_stats: PrepareStats,
) {
// See the precondition.
always!(self
.artifacts
.insert(artifact_id, ArtifactState::Prepared { last_time_needed, cpu_time_elapsed })
.insert(artifact_id, ArtifactState::Prepared { last_time_needed, prepare_stats })
.is_none());
}

Expand Down
7 changes: 4 additions & 3 deletions node/core/pvf/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

use crate::prepare::PrepareStats;
use parity_scale_codec::{Decode, Encode};
use std::{any::Any, fmt, time::Duration};
use std::{any::Any, fmt};

/// Result of PVF preparation performed by the validation host. Contains the elapsed CPU time if
/// Result of PVF preparation performed by the validation host. Contains stats about the preparation if
/// successful
pub type PrepareResult = Result<Duration, PrepareError>;
pub type PrepareResult = Result<PrepareStats, PrepareError>;

/// An error that occurred during the prepare part of the PVF pipeline.
#[derive(Debug, Clone, Encode, Decode)]
Expand Down
30 changes: 17 additions & 13 deletions node/core/pvf/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -456,9 +456,9 @@ async fn handle_precheck_pvf(

if let Some(state) = artifacts.artifact_state_mut(&artifact_id) {
match state {
ArtifactState::Prepared { last_time_needed, cpu_time_elapsed } => {
ArtifactState::Prepared { last_time_needed, prepare_stats } => {
*last_time_needed = SystemTime::now();
let _ = result_sender.send(Ok(*cpu_time_elapsed));
let _ = result_sender.send(Ok(prepare_stats.clone()));
},
ArtifactState::Preparing { waiting_for_response, num_failures: _ } =>
waiting_for_response.push(result_sender),
Expand Down Expand Up @@ -725,8 +725,8 @@ async fn handle_prepare_done(
}

*state = match result {
Ok(cpu_time_elapsed) =>
ArtifactState::Prepared { last_time_needed: SystemTime::now(), cpu_time_elapsed },
Ok(prepare_stats) =>
ArtifactState::Prepared { last_time_needed: SystemTime::now(), prepare_stats },
Err(error) => {
let last_time_failed = SystemTime::now();
let num_failures = *num_failures + 1;
Expand Down Expand Up @@ -834,7 +834,7 @@ fn pulse_every(interval: std::time::Duration) -> impl futures::Stream<Item = ()>
#[cfg(test)]
mod tests {
use super::*;
use crate::{InvalidCandidate, PrepareError};
use crate::{prepare::PrepareStats, InvalidCandidate, PrepareError};
use assert_matches::assert_matches;
use futures::future::BoxFuture;

Expand Down Expand Up @@ -1056,8 +1056,12 @@ mod tests {
let mut builder = Builder::default();
builder.cleanup_pulse_interval = Duration::from_millis(100);
builder.artifact_ttl = Duration::from_millis(500);
builder.artifacts.insert_prepared(artifact_id(1), mock_now, Duration::default());
builder.artifacts.insert_prepared(artifact_id(2), mock_now, Duration::default());
builder
.artifacts
.insert_prepared(artifact_id(1), mock_now, PrepareStats::default());
builder
.artifacts
.insert_prepared(artifact_id(2), mock_now, PrepareStats::default());
let mut test = builder.build();
let mut host = test.host_handle();

Expand Down Expand Up @@ -1129,7 +1133,7 @@ mod tests {
test.from_prepare_queue_tx
.send(prepare::FromQueue {
artifact_id: artifact_id(1),
result: Ok(Duration::default()),
result: Ok(PrepareStats::default()),
})
.await
.unwrap();
Expand All @@ -1145,7 +1149,7 @@ mod tests {
test.from_prepare_queue_tx
.send(prepare::FromQueue {
artifact_id: artifact_id(2),
result: Ok(Duration::default()),
result: Ok(PrepareStats::default()),
})
.await
.unwrap();
Expand Down Expand Up @@ -1197,7 +1201,7 @@ mod tests {
test.from_prepare_queue_tx
.send(prepare::FromQueue {
artifact_id: artifact_id(1),
result: Ok(Duration::default()),
result: Ok(PrepareStats::default()),
})
.await
.unwrap();
Expand Down Expand Up @@ -1304,7 +1308,7 @@ mod tests {
test.from_prepare_queue_tx
.send(prepare::FromQueue {
artifact_id: artifact_id(2),
result: Ok(Duration::default()),
result: Ok(PrepareStats::default()),
})
.await
.unwrap();
Expand Down Expand Up @@ -1454,7 +1458,7 @@ mod tests {
test.from_prepare_queue_tx
.send(prepare::FromQueue {
artifact_id: artifact_id(1),
result: Ok(Duration::default()),
result: Ok(PrepareStats::default()),
})
.await
.unwrap();
Expand Down Expand Up @@ -1630,7 +1634,7 @@ mod tests {
test.from_prepare_queue_tx
.send(prepare::FromQueue {
artifact_id: artifact_id(1),
result: Ok(Duration::default()),
result: Ok(PrepareStats::default()),
})
.await
.unwrap();
Expand Down
1 change: 1 addition & 0 deletions node/core/pvf/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ pub mod testing;
pub use sp_tracing;

pub use error::{InvalidCandidate, PrepareError, PrepareResult, ValidationError};
pub use prepare::PrepareStats;
pub use priority::Priority;
pub use pvf::Pvf;

Expand Down
37 changes: 22 additions & 15 deletions node/core/pvf/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

//! Prometheus metrics related to the validation host.

use crate::prepare::MemoryStats;
use polkadot_node_metrics::metrics::{self, prometheus};

/// Validation host metrics.
Expand Down Expand Up @@ -73,24 +74,24 @@ impl Metrics {
self.0.as_ref().map(|metrics| metrics.execution_time.start_timer())
}

/// Observe max_rss for preparation.
pub(crate) fn observe_preparation_max_rss(&self, max_rss: f64) {
/// Observe memory stats for preparation.
#[allow(unused_variables)]
pub(crate) fn observe_preparation_memory_metrics(&self, memory_stats: MemoryStats) {
if let Some(metrics) = &self.0 {
metrics.preparation_max_rss.observe(max_rss);
}
}
#[cfg(target_os = "linux")]
if let Some(max_rss) = memory_stats.max_rss {
metrics.preparation_max_rss.observe(max_rss as f64);
}

/// Observe max resident memory for preparation.
pub(crate) fn observe_preparation_max_resident(&self, max_resident_kb: f64) {
if let Some(metrics) = &self.0 {
metrics.preparation_max_resident.observe(max_resident_kb);
}
}
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
if let Some(tracker_stats) = memory_stats.memory_tracker_stats {
// We convert these stats from B to KB to match the unit of `ru_maxrss` from `getrusage`.
let max_resident_kb = (tracker_stats.resident / 1024) as f64;
let max_allocated_kb = (tracker_stats.allocated / 1024) as f64;

/// Observe max allocated memory for preparation.
pub(crate) fn observe_preparation_max_allocated(&self, max_allocated_kb: f64) {
if let Some(metrics) = &self.0 {
metrics.preparation_max_allocated.observe(max_allocated_kb);
metrics.preparation_max_resident.observe(max_resident_kb);
metrics.preparation_max_allocated.observe(max_allocated_kb);
}
}
}
}
Expand All @@ -106,8 +107,11 @@ struct MetricsInner {
execute_finished: prometheus::Counter<prometheus::U64>,
preparation_time: prometheus::Histogram,
execution_time: prometheus::Histogram,
#[cfg(target_os = "linux")]
preparation_max_rss: prometheus::Histogram,
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
preparation_max_allocated: prometheus::Histogram,
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
preparation_max_resident: prometheus::Histogram,
}

Expand Down Expand Up @@ -226,6 +230,7 @@ impl metrics::Metrics for Metrics {
)?,
registry,
)?,
#[cfg(target_os = "linux")]
preparation_max_rss: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
Expand All @@ -238,6 +243,7 @@ impl metrics::Metrics for Metrics {
)?,
registry,
)?,
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
preparation_max_resident: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
Expand All @@ -250,6 +256,7 @@ impl metrics::Metrics for Metrics {
)?,
registry,
)?,
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
preparation_max_allocated: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
Expand Down
Loading