From 29661c0edca7b1f6b52a4d69d96a96f3a31cc4db Mon Sep 17 00:00:00 2001 From: Palash Nigam Date: Mon, 21 Nov 2022 19:54:32 +0530 Subject: [PATCH] Add Summary metric Signed-off-by: Palash Nigam --- Cargo.toml | 1 + src/encoding/text.rs | 80 ++++++++++++++++++++++ src/metrics.rs | 2 + src/metrics/summary.rs | 151 +++++++++++++++++++++++++++++++++++++++++ 4 files changed, 234 insertions(+) create mode 100644 src/metrics/summary.rs diff --git a/Cargo.toml b/Cargo.toml index a42a237f..3da3e9c6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,6 +24,7 @@ prometheus-client-derive-encode = { version = "0.3.0", path = "derive-encode" } prost = { version = "0.11.0", optional = true } prost-types = { version = "0.11.0", optional = true } void = { version = "1.0", optional = true } +quantiles = "0.7.1" [dev-dependencies] async-std = { version = "1", features = ["attributes"] } diff --git a/src/encoding/text.rs b/src/encoding/text.rs index 754b08a7..f6d61035 100644 --- a/src/encoding/text.rs +++ b/src/encoding/text.rs @@ -29,6 +29,7 @@ use crate::metrics::exemplar::{CounterWithExemplar, Exemplar, HistogramWithExemp use crate::metrics::family::{Family, MetricConstructor}; use crate::metrics::gauge::{self, Gauge}; use crate::metrics::histogram::Histogram; +use crate::metrics::summary::Summary; use crate::metrics::info::Info; use crate::metrics::{MetricType, TypedMetric}; use crate::registry::{Registry, Unit}; @@ -186,6 +187,7 @@ impl Encode for MetricType { MetricType::Histogram => "histogram", MetricType::Info => "info", MetricType::Unknown => "unknown", + MetricType::Summary => "summary", }; writer.write_all(t.as_bytes())?; @@ -323,6 +325,23 @@ impl<'a> BucketEncoder<'a> { }) } + /// Encode a quantile. Used for the [`Summary`] metric type. + pub fn encode_quantile(&mut self, quantile: f64) -> Result { + if self.opened_curly_brackets { + self.writer.write_all(b",")?; + } else { + self.writer.write_all(b"{")?; + } + + self.writer.write_all(b"quantile=\"")?; + quantile.encode(self.writer)?; + self.writer.write_all(b"\"}")?; + + Ok(ValueEncoder { + writer: self.writer, + }) + } + /// Signal that the metric type has no bucket. pub fn no_bucket(&mut self) -> Result { if self.opened_curly_brackets { @@ -579,6 +598,41 @@ fn encode_histogram_with_maybe_exemplars( Ok(()) } + +///////////////////////////////////////////////////////////////////////////////// +// Summary + +impl EncodeMetric for Summary { + fn encode(&self, mut encoder: Encoder) -> Result<(), std::io::Error> { + let (sum, count, quantiles) = self.get(); + + encoder + .encode_suffix("sum")? + .no_bucket()? + .encode_value(sum)? + .no_exemplar()?; + encoder + .encode_suffix("count")? + .no_bucket()? + .encode_value(count)? + .no_exemplar()?; + + for (_, (quantile, result)) in quantiles.iter().enumerate() { + let mut bucket_encoder = encoder.no_suffix()?; + let mut value_encoder = bucket_encoder.encode_quantile(*quantile)?; + let mut exemplar_encoder = value_encoder.encode_value(*result)?; + exemplar_encoder.no_exemplar()? + } + + Result::Ok(()) + } + + fn metric_type(&self) -> MetricType { + Self::TYPE + } +} + + ///////////////////////////////////////////////////////////////////////////////// // Info @@ -818,6 +872,32 @@ mod tests { parse_with_python_client(String::from_utf8(encoded).unwrap()); } + #[test] + fn encode_summary() { + let mut registry = Registry::default(); + let summary = Summary::new(3, 10, vec![0.5, 0.9, 0.99], 0.0); + registry.register("my_summary", "My summary", summary.clone()); + summary.observe(0.10); + summary.observe(0.20); + summary.observe(0.30); + + let mut encoded = Vec::new(); + + encode(&mut encoded, ®istry).unwrap(); + + let expected = "# HELP my_summary My summary.\n".to_owned() + + "# TYPE my_summary summary\n" + + "my_summary_sum 0.6000000000000001\n" + + "my_summary_count 3\n" + + "my_summary{quantile=\"0.5\"} 0.2\n" + + "my_summary{quantile=\"0.9\"} 0.3\n" + + "my_summary{quantile=\"0.99\"} 0.3\n" + + "# EOF\n"; + assert_eq!(expected, String::from_utf8(encoded.clone()).unwrap()); + + parse_with_python_client(String::from_utf8(encoded).unwrap()); + } + fn parse_with_python_client(input: String) { pyo3::prepare_freethreaded_python(); diff --git a/src/metrics.rs b/src/metrics.rs index 647fa5c7..e1a4cb2a 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -5,6 +5,7 @@ pub mod exemplar; pub mod family; pub mod gauge; pub mod histogram; +pub mod summary; pub mod info; /// A metric that is aware of its Open Metrics metric type. @@ -22,6 +23,7 @@ pub enum MetricType { Histogram, Info, Unknown, + Summary, // Not (yet) supported metric types. // // GaugeHistogram, diff --git a/src/metrics/summary.rs b/src/metrics/summary.rs new file mode 100644 index 00000000..b4a30c62 --- /dev/null +++ b/src/metrics/summary.rs @@ -0,0 +1,151 @@ +//! Module implementing an Open Metrics histogram. +//! +//! See [`Summary`] for details. + +use super::{MetricType, TypedMetric}; +//use owning_ref::OwningRef; +//use std::iter::{self, once}; +use std::sync::{Arc, Mutex}; +use std::time::{Duration, Instant}; + +use quantiles::ckms::CKMS; + +/// Open Metrics [`Summary`] to measure distributions of discrete events. +#[derive(Debug)] +pub struct Summary { + target_quantile: Vec, + target_error: f64, + max_age_buckets: u64, + max_age_seconds: u64, + stream_duration: Duration, + inner: Arc>, +} + +impl Clone for Summary { + fn clone(&self) -> Self { + Summary { + target_quantile: self.target_quantile.clone(), + target_error: self.target_error, + max_age_buckets: self.max_age_buckets, + max_age_seconds: self.max_age_seconds, + stream_duration: self.stream_duration, + inner: self.inner.clone(), + } + } +} + +#[derive(Debug)] +pub(crate) struct InnerSummary { + sum: f64, + count: u64, + quantile_streams: Vec>, + // head_stream is like a cursor which carries the index + // of the stream in the quantile_streams that we want to query. + head_stream_idx: u64, + // timestamp at which the head_stream_idx was last rotated. + last_rotated_timestamp: Instant, +} + +impl Summary { + /// Create a new [`Summary`]. + pub fn new(max_age_buckets: u64, max_age_seconds: u64, target_quantile: Vec, target_error: f64) -> Self { + let mut streams: Vec> = Vec::new(); + for _ in 0..max_age_buckets { + streams.push(CKMS::new(target_error)); + } + + let stream_duration = Duration::from_secs(max_age_seconds / max_age_buckets); + let last_rotated_timestamp = Instant::now(); + + if target_quantile.iter().any(|&x| x > 1.0 || x < 0.0) { + panic!("Quantile value out of range"); + } + + Summary{ + max_age_buckets, + max_age_seconds, + stream_duration, + target_quantile, + target_error, + inner: Arc::new(Mutex::new(InnerSummary { + sum: Default::default(), + count: Default::default(), + quantile_streams: streams, + head_stream_idx: 0, + last_rotated_timestamp, + })) + } + } + + /// Observe the given value. + pub fn observe(&self, v: f64) { + self.rotate_buckets(); + + let mut inner = self.inner.lock().unwrap(); + inner.sum += v; + inner.count += 1; + + // insert quantiles into all streams/buckets. + for stream in inner.quantile_streams.iter_mut() { + stream.insert(v); + } + } + + /// Retrieve the values of the summary metric. + pub fn get(&self) -> (f64, u64, Vec<(f64, f64)>) { + self.rotate_buckets(); + + let inner = self.inner.lock().unwrap(); + let sum = inner.sum; + let count = inner.count; + let mut quantile_values: Vec<(f64, f64)> = Vec::new(); + + for q in self.target_quantile.iter() { + match inner.quantile_streams[inner.head_stream_idx as usize].query(*q) { + Some((_, v)) => quantile_values.push((*q, v)), + None => continue, + }; + } + (sum, count, quantile_values) + } + + fn rotate_buckets(&self) { + let mut inner = self.inner.lock().unwrap(); + if inner.last_rotated_timestamp.elapsed() >= self.stream_duration { + inner.last_rotated_timestamp = Instant::now(); + if inner.head_stream_idx == self.max_age_buckets { + inner.head_stream_idx = 0; + } else { + inner.head_stream_idx += 1; + } + }; + } +} + +impl TypedMetric for Summary { + const TYPE: MetricType = MetricType::Summary; +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn summary() { + let summary = Summary::new(5, 10, vec![0.5, 0.9, 0.99], 0.01); + summary.observe(1.0); + summary.observe(5.0); + summary.observe(10.0); + + let (s, c, q) = summary.get(); + assert_eq!(16.0, s); + assert_eq!(3, c); + assert_eq!(vec![(0.5, 5.0), (0.9, 10.0), (0.99, 10.0)], q); + } + + #[test] + #[should_panic(expected="Quantile value out of range")] + fn summary_panic() { + Summary::new(5, 10, vec![1.0, 5.0, 9.0], 0.01); + } +}