From d420b23f3f2ff5fa38070c8942ab8535038029e9 Mon Sep 17 00:00:00 2001 From: Mikhail Alpinskiy Date: Tue, 15 Oct 2024 12:46:19 +0300 Subject: [PATCH] External time and statistics --- Cargo.toml | 4 +-- statshouse/src/lib.rs | 70 +++++++++++++++++++++++++++++++++++++------ 2 files changed, 63 insertions(+), 11 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 099c850..13f3e1b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,8 +30,8 @@ edition = "2021" unsafe_code = "forbid" [workspace.lints.clippy] -all = "warn" -pedantic = "warn" +all = { level = "warn", priority = -1 } +pedantic = { level = "warn", priority = -1 } unwrap_used = "forbid" [workspace.dependencies] diff --git a/statshouse/src/lib.rs b/statshouse/src/lib.rs index 0cae1ef..12467b4 100644 --- a/statshouse/src/lib.rs +++ b/statshouse/src/lib.rs @@ -5,7 +5,7 @@ // file, You can obtain one at https://mozilla.org/MPL/2.0/. use std::cmp; -use std::io::Error; +use std::io::{Error, ErrorKind}; use std::net::{Ipv4Addr, ToSocketAddrs, UdpSocket}; use std::time::{SystemTime, UNIX_EPOCH}; @@ -26,8 +26,26 @@ pub struct Transport { tl_buffer: TLBuffer, batch_count: u32, last_flush: u32, + stats: Stats, + external_time: bool, } +#[derive(Default, Copy, Clone)] +pub struct Stats { + pub metrics_overflow: usize, + pub metrics_failed: usize, + pub metrics_too_big: usize, + pub packets_overflow: usize, + pub packets_failed: usize, +} + +/// # Examples +/// +/// ``` +/// let mut t = statshouse::Transport::default(); +/// let mut m = Metric::new(t, b"test").tag(b"0", b"staging").tag(b"1", b"test"); +/// m.write_count(1.0, 0); +/// ``` impl Transport { pub fn new(addr: A) -> Transport { let mut tl_buffer = TLBuffer::new(BATCH_HEADER_LEN); @@ -37,6 +55,8 @@ impl Transport { tl_buffer, batch_count: 0, last_flush: unix_time_now(), + stats: Stats::default(), + external_time: false, } } @@ -44,13 +64,30 @@ impl Transport { Metric::new(self, metric_name) } + #[must_use] + pub fn get_stats(&self) -> Stats { + self.stats + } + + pub fn clear_stats(&mut self) { + self.stats = Stats::default(); + } + + pub fn set_external_time(&mut self, now: u32) { + if self.external_time && self.last_flush == now { + return; + } + self.external_time = true; + self.flush(now); + } + fn write_count(&mut self, builder: &MetricBuilder, count: f64, mut timestamp: u32) -> bool { if count <= 0. { return false; } let mut len: usize = 4 + builder.tl_buffer.pos + 8; // field mask + header + counter let mut field_mask: u32 = TL_STATSHOUSE_METRIC_COUNTER_FIELDS_MASK; - let now = unix_time_now(); + let now = self.time_now(); if timestamp == 0 { timestamp = now; } @@ -82,7 +119,7 @@ impl Transport { field_mask |= TL_STATSHOUSE_METRIC_COUNTER_FIELDS_MASK; len += 8; } - let now = unix_time_now(); + let now = self.time_now(); if timestamp == 0 { timestamp = now; } @@ -121,7 +158,7 @@ impl Transport { field_mask |= TL_STATSHOUSE_METRIC_COUNTER_FIELDS_MASK; len += 8; } - let now = unix_time_now(); + let now = self.time_now(); if timestamp == 0 { timestamp = now; } @@ -170,11 +207,26 @@ impl Transport { } if let Ok(socket) = self.socket.as_ref() { write_u32(&mut self.tl_buffer.arr, 8, self.batch_count); // # of batches - let _ = socket.send(&self.tl_buffer.arr[..self.tl_buffer.pos]); // TODO: report error if any + if let Err(ref e) = socket.send(&self.tl_buffer.arr[..self.tl_buffer.pos]) { + if e.kind() == ErrorKind::WouldBlock { + self.stats.packets_overflow += 1; + self.stats.metrics_overflow += self.batch_count as usize; + } else { + self.stats.packets_failed += 1; + self.stats.metrics_failed += self.batch_count as usize; + } + } self.batch_count = 0; self.tl_buffer.pos = BATCH_HEADER_LEN; } } + + fn time_now(&self) -> u32 { + if self.external_time { + return self.last_flush; + } + unix_time_now() + } } impl Default for Transport { @@ -195,7 +247,7 @@ pub struct Metric<'a> { } impl Metric<'_> { - fn new<'a>(transport: &'a mut Transport, metric_name: &[u8]) -> Metric<'a> { + pub fn new<'a>(transport: &'a mut Transport, metric_name: &[u8]) -> Metric<'a> { Metric { transport, builder: MetricBuilder::new(metric_name), @@ -209,7 +261,7 @@ impl Metric<'_> { pub fn write_count(&mut self, counter: f64, timestamp: u32) -> bool { if self.builder.tl_buffer_overflow { - // TODO: report failure + self.transport.stats.metrics_too_big += 1; return false; } self.transport @@ -228,7 +280,7 @@ impl Metric<'_> { pub fn write_values(&mut self, vals: &[f64], count: f64, timestamp: u32) -> bool { if self.builder.tl_buffer_overflow { - // TODO: report failure + self.transport.stats.metrics_too_big += 1; return false; } self.transport @@ -237,7 +289,7 @@ impl Metric<'_> { pub fn write_uniques(&mut self, vals: &[u64], count: f64, timestamp: u32) -> bool { if self.builder.tl_buffer_overflow { - // TODO: report failure + self.transport.stats.metrics_too_big += 1; return false; } self.transport