Skip to content

Commit

Permalink
External time and statistics
Browse files Browse the repository at this point in the history
  • Loading branch information
alpinskiy committed Oct 15, 2024
1 parent 94103f0 commit d420b23
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 11 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ edition = "2021"
unsafe_code = "forbid"

[workspace.lints.clippy]
all = "warn"
pedantic = "warn"
all = { level = "warn", priority = -1 }
pedantic = { level = "warn", priority = -1 }
unwrap_used = "forbid"

[workspace.dependencies]
Expand Down
70 changes: 61 additions & 9 deletions statshouse/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
// file, You can obtain one at https://mozilla.org/MPL/2.0/.

use std::cmp;
use std::io::Error;
use std::io::{Error, ErrorKind};
use std::net::{Ipv4Addr, ToSocketAddrs, UdpSocket};
use std::time::{SystemTime, UNIX_EPOCH};

Expand All @@ -26,8 +26,26 @@ pub struct Transport {
tl_buffer: TLBuffer<MAX_DATAGRAM_SIZE>,
batch_count: u32,
last_flush: u32,
stats: Stats,
external_time: bool,
}

#[derive(Default, Copy, Clone)]
pub struct Stats {
pub metrics_overflow: usize,
pub metrics_failed: usize,
pub metrics_too_big: usize,
pub packets_overflow: usize,
pub packets_failed: usize,
}

/// # Examples
///
/// ```
/// let mut t = statshouse::Transport::default();
/// let mut m = Metric::new(t, b"test").tag(b"0", b"staging").tag(b"1", b"test");
/// m.write_count(1.0, 0);
/// ```
impl Transport {
pub fn new<A: ToSocketAddrs>(addr: A) -> Transport {
let mut tl_buffer = TLBuffer::new(BATCH_HEADER_LEN);
Expand All @@ -37,20 +55,39 @@ impl Transport {
tl_buffer,
batch_count: 0,
last_flush: unix_time_now(),
stats: Stats::default(),
external_time: false,
}
}

pub fn metric(&mut self, metric_name: &[u8]) -> Metric {
Metric::new(self, metric_name)
}

#[must_use]
pub fn get_stats(&self) -> Stats {
self.stats
}

pub fn clear_stats(&mut self) {
self.stats = Stats::default();
}

pub fn set_external_time(&mut self, now: u32) {
if self.external_time && self.last_flush == now {
return;
}
self.external_time = true;
self.flush(now);
}

fn write_count(&mut self, builder: &MetricBuilder, count: f64, mut timestamp: u32) -> bool {
if count <= 0. {
return false;
}
let mut len: usize = 4 + builder.tl_buffer.pos + 8; // field mask + header + counter
let mut field_mask: u32 = TL_STATSHOUSE_METRIC_COUNTER_FIELDS_MASK;
let now = unix_time_now();
let now = self.time_now();
if timestamp == 0 {
timestamp = now;
}
Expand Down Expand Up @@ -82,7 +119,7 @@ impl Transport {
field_mask |= TL_STATSHOUSE_METRIC_COUNTER_FIELDS_MASK;
len += 8;
}
let now = unix_time_now();
let now = self.time_now();
if timestamp == 0 {
timestamp = now;
}
Expand Down Expand Up @@ -121,7 +158,7 @@ impl Transport {
field_mask |= TL_STATSHOUSE_METRIC_COUNTER_FIELDS_MASK;
len += 8;
}
let now = unix_time_now();
let now = self.time_now();
if timestamp == 0 {
timestamp = now;
}
Expand Down Expand Up @@ -170,11 +207,26 @@ impl Transport {
}
if let Ok(socket) = self.socket.as_ref() {
write_u32(&mut self.tl_buffer.arr, 8, self.batch_count); // # of batches
let _ = socket.send(&self.tl_buffer.arr[..self.tl_buffer.pos]); // TODO: report error if any
if let Err(ref e) = socket.send(&self.tl_buffer.arr[..self.tl_buffer.pos]) {
if e.kind() == ErrorKind::WouldBlock {
self.stats.packets_overflow += 1;
self.stats.metrics_overflow += self.batch_count as usize;
} else {
self.stats.packets_failed += 1;
self.stats.metrics_failed += self.batch_count as usize;
}
}
self.batch_count = 0;
self.tl_buffer.pos = BATCH_HEADER_LEN;
}
}

fn time_now(&self) -> u32 {
if self.external_time {
return self.last_flush;
}
unix_time_now()
}
}

impl Default for Transport {
Expand All @@ -195,7 +247,7 @@ pub struct Metric<'a> {
}

impl Metric<'_> {
fn new<'a>(transport: &'a mut Transport, metric_name: &[u8]) -> Metric<'a> {
pub fn new<'a>(transport: &'a mut Transport, metric_name: &[u8]) -> Metric<'a> {
Metric {
transport,
builder: MetricBuilder::new(metric_name),
Expand All @@ -209,7 +261,7 @@ impl Metric<'_> {

pub fn write_count(&mut self, counter: f64, timestamp: u32) -> bool {
if self.builder.tl_buffer_overflow {
// TODO: report failure
self.transport.stats.metrics_too_big += 1;
return false;
}
self.transport
Expand All @@ -228,7 +280,7 @@ impl Metric<'_> {

pub fn write_values(&mut self, vals: &[f64], count: f64, timestamp: u32) -> bool {
if self.builder.tl_buffer_overflow {
// TODO: report failure
self.transport.stats.metrics_too_big += 1;
return false;
}
self.transport
Expand All @@ -237,7 +289,7 @@ impl Metric<'_> {

pub fn write_uniques(&mut self, vals: &[u64], count: f64, timestamp: u32) -> bool {
if self.builder.tl_buffer_overflow {
// TODO: report failure
self.transport.stats.metrics_too_big += 1;
return false;
}
self.transport
Expand Down

0 comments on commit d420b23

Please sign in to comment.