Skip to content

Commit

Permalink
refactor: use temporality enum for selection
Browse files Browse the repository at this point in the history
  • Loading branch information
pitoniak32 committed Oct 17, 2024
1 parent 4327283 commit 773803c
Show file tree
Hide file tree
Showing 17 changed files with 152 additions and 252 deletions.
5 changes: 2 additions & 3 deletions examples/metrics-advanced/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use opentelemetry::global;
use opentelemetry::Key;
use opentelemetry::KeyValue;
use opentelemetry_sdk::metrics::reader::DeltaTemporalitySelector;
use opentelemetry_sdk::metrics::{
Aggregation, Instrument, PeriodicReader, SdkMeterProvider, Stream,
data::Temporality, Aggregation, Instrument, PeriodicReader, SdkMeterProvider, Stream,
};
use opentelemetry_sdk::{runtime, Resource};
use std::error::Error;
Expand Down Expand Up @@ -47,7 +46,7 @@ fn init_meter_provider() -> opentelemetry_sdk::metrics::SdkMeterProvider {

// Build exporter using Delta Temporality.
let exporter = opentelemetry_stdout::MetricsExporterBuilder::default()
.with_temporality_selector(DeltaTemporalitySelector::new())
.with_temporality(Temporality::Delta)
.build();

let reader = PeriodicReader::builder(exporter, runtime::Tokio).build();
Expand Down
14 changes: 5 additions & 9 deletions examples/metrics-basic/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,16 @@
use opentelemetry::global;
use opentelemetry::KeyValue;
use opentelemetry_sdk::metrics::{PeriodicReader, SdkMeterProvider};
use opentelemetry_sdk::metrics::{data::Temporality, PeriodicReader, SdkMeterProvider};
use opentelemetry_sdk::{runtime, Resource};
use std::error::Error;
use std::vec;

fn init_meter_provider() -> opentelemetry_sdk::metrics::SdkMeterProvider {
let exporter = opentelemetry_stdout::MetricsExporterBuilder::default()
// Build exporter using Default (Cumulative) Temporality Selector.
.with_temporality_selector(
opentelemetry_sdk::metrics::reader::DefaultTemporalitySelector::new(),
)
// Build exporter using Delta Temporality Selector.
// .with_temporality_selector(
// opentelemetry_sdk::metrics::reader::DeltaTemporalitySelector::new(),
// )
// Build exporter using Default (Cumulative) Temporality.
.with_temporality(Temporality::default())
// Build exporter using Delta Temporality.
// .with_temporality(Temporality::Delta)
.build();
let reader = PeriodicReader::builder(exporter, runtime::Tokio).build();
let provider = SdkMeterProvider::builder()
Expand Down
14 changes: 5 additions & 9 deletions opentelemetry-otlp/src/exporter/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ use opentelemetry_proto::transform::trace::tonic::group_spans_by_resource_and_sc
use opentelemetry_sdk::export::logs::LogBatch;
#[cfg(feature = "trace")]
use opentelemetry_sdk::export::trace::SpanData;
#[cfg(feature = "metrics")]
use opentelemetry_sdk::metrics::data::ResourceMetrics;
use prost::Message;
use std::collections::HashMap;
use std::env;
Expand Down Expand Up @@ -77,9 +75,7 @@ impl Default for HttpConfig {
///
/// ```
/// # #[cfg(feature="metrics")]
/// use opentelemetry_sdk::metrics::reader::{
/// DefaultTemporalitySelector,
/// };
/// use opentelemetry_sdk::metrics::data:Temporality;
///
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
/// // Create a span exporter you can use to when configuring tracer providers
Expand All @@ -91,7 +87,7 @@ impl Default for HttpConfig {
/// let metrics_exporter = opentelemetry_otlp::new_exporter()
/// .http()
/// .build_metrics_exporter(
/// Box::new(DefaultTemporalitySelector::new()),
/// Temporality::default(),
/// )?;
///
/// // Create a log exporter you can use when configuring logger providers
Expand Down Expand Up @@ -251,7 +247,7 @@ impl HttpExporterBuilder {
#[cfg(feature = "metrics")]
pub fn build_metrics_exporter(
mut self,
temporality_selector: Box<dyn opentelemetry_sdk::metrics::reader::TemporalitySelector>,
temporality: opentelemetry_sdk::metrics::data::Temporality,
) -> opentelemetry::metrics::Result<crate::MetricsExporter> {
use crate::{
OTEL_EXPORTER_OTLP_METRICS_ENDPOINT, OTEL_EXPORTER_OTLP_METRICS_HEADERS,
Expand All @@ -265,7 +261,7 @@ impl HttpExporterBuilder {
OTEL_EXPORTER_OTLP_METRICS_HEADERS,
)?;

Ok(crate::MetricsExporter::new(client, temporality_selector))
Ok(crate::MetricsExporter::new(client, temporality))
}
}

Expand Down Expand Up @@ -341,7 +337,7 @@ impl OtlpHttpClient {
#[cfg(feature = "metrics")]
fn build_metrics_export_body(
&self,
metrics: &mut ResourceMetrics,
metrics: &mut opentelemetry_sdk::metrics::data::ResourceMetrics,
) -> opentelemetry::metrics::Result<(Vec<u8>, &'static str)> {
use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;

Expand Down
10 changes: 4 additions & 6 deletions opentelemetry-otlp/src/exporter/tonic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,7 @@ fn resolve_compression(
///
/// ```no_run
/// # #[cfg(feature="metrics")]
/// use opentelemetry_sdk::metrics::reader::{
/// DefaultTemporalitySelector,
/// };
/// use opentelemetry_sdk::metrics::data::Temporality;
///
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
/// // Create a span exporter you can use to when configuring tracer providers
Expand All @@ -110,7 +108,7 @@ fn resolve_compression(
/// let metrics_exporter = opentelemetry_otlp::new_exporter()
/// .tonic()
/// .build_metrics_exporter(
/// Box::new(DefaultTemporalitySelector::new()),
/// Temporality::default(),
/// )?;
///
/// // Create a log exporter you can use when configuring logger providers
Expand Down Expand Up @@ -331,7 +329,7 @@ impl TonicExporterBuilder {
#[cfg(feature = "metrics")]
pub fn build_metrics_exporter(
self,
temporality_selector: Box<dyn opentelemetry_sdk::metrics::reader::TemporalitySelector>,
temporality: opentelemetry_sdk::metrics::data::Temporality,
) -> opentelemetry::metrics::Result<crate::MetricsExporter> {
use crate::MetricsExporter;
use metrics::TonicMetricsClient;
Expand All @@ -345,7 +343,7 @@ impl TonicExporterBuilder {

let client = TonicMetricsClient::new(channel, interceptor, compression);

Ok(MetricsExporter::new(client, temporality_selector))
Ok(MetricsExporter::new(client, temporality))
}

/// Build a new tonic span exporter
Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-otlp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@
//! use opentelemetry::{global, KeyValue, trace::Tracer};
//! use opentelemetry_sdk::{trace::{self, RandomIdGenerator, Sampler}, Resource};
//! # #[cfg(feature = "metrics")]
//! use opentelemetry_sdk::metrics::reader::DefaultTemporalitySelector;
//! use opentelemetry_sdk::metrics::data::Temporality;
//! use opentelemetry_otlp::{Protocol, WithExportConfig, ExportConfig};
//! use std::time::Duration;
//! # #[cfg(feature = "grpc-tonic")]
Expand Down Expand Up @@ -184,7 +184,7 @@
//! .with_resource(Resource::new(vec![KeyValue::new("service.name", "example")]))
//! .with_period(Duration::from_secs(3))
//! .with_timeout(Duration::from_secs(10))
//! .with_temporality_selector(DefaultTemporalitySelector::new())
//! .with_temporality(Temporality::default())
//! .build();
//! # }
//!
Expand Down
66 changes: 21 additions & 45 deletions opentelemetry-otlp/src/metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ use opentelemetry_sdk::{
metrics::{
data::{ResourceMetrics, Temporality},
exporter::PushMetricsExporter,
reader::{DefaultTemporalitySelector, DeltaTemporalitySelector, TemporalitySelector},
InstrumentKind, PeriodicReader, SdkMeterProvider,
PeriodicReader, SdkMeterProvider,
},
runtime::Runtime,
Resource,
Expand Down Expand Up @@ -47,7 +46,7 @@ impl OtlpPipeline {
{
OtlpMetricPipeline {
rt,
temporality_selector: None,
temporality: None,
exporter_pipeline: NoExporterConfig(()),
resource: None,
period: None,
Expand Down Expand Up @@ -75,22 +74,15 @@ pub enum MetricsExporterBuilder {

impl MetricsExporterBuilder {
/// Build a OTLP metrics exporter with given configuration.
pub fn build_metrics_exporter(
self,
temporality_selector: Box<dyn TemporalitySelector>,
) -> Result<MetricsExporter> {
pub fn build_metrics_exporter(self, temporality: Temporality) -> Result<MetricsExporter> {
match self {
#[cfg(feature = "grpc-tonic")]
MetricsExporterBuilder::Tonic(builder) => {
builder.build_metrics_exporter(temporality_selector)
}
MetricsExporterBuilder::Tonic(builder) => builder.build_metrics_exporter(temporality),
#[cfg(feature = "http-proto")]
MetricsExporterBuilder::Http(builder) => {
builder.build_metrics_exporter(temporality_selector)
}
MetricsExporterBuilder::Http(builder) => builder.build_metrics_exporter(temporality),
#[cfg(not(any(feature = "http-proto", feature = "grpc-tonic")))]
MetricsExporterBuilder::Unconfigured => {
drop(temporality_selector);
let _ = temporality;
Err(opentelemetry::metrics::MetricsError::Other(
"no configured metrics exporter, enable `http-proto` or `grpc-tonic` feature to configure a metrics exporter".into(),
))
Expand Down Expand Up @@ -119,7 +111,7 @@ impl From<HttpExporterBuilder> for MetricsExporterBuilder {
/// runtime.
pub struct OtlpMetricPipeline<RT, EB> {
rt: RT,
temporality_selector: Option<Box<dyn TemporalitySelector>>,
temporality: Option<Temporality>,
exporter_pipeline: EB,
resource: Option<Resource>,
period: Option<time::Duration>,
Expand Down Expand Up @@ -154,23 +146,13 @@ where
}
}

/// Build with the given temporality selector
pub fn with_temporality_selector<T: TemporalitySelector + 'static>(self, selector: T) -> Self {
/// Set the [Temporality] of the exporter.
pub fn with_temporality(self, temporality: Temporality) -> Self {
OtlpMetricPipeline {
temporality_selector: Some(Box::new(selector)),
temporality: Some(temporality),
..self
}
}

/// Build with delta temporality selector.
///
/// This temporality selector is equivalent to OTLP Metrics Exporter's
/// `Delta` temporality preference (see [its documentation][exporter-docs]).
///
/// [exporter-docs]: https://github.com/open-telemetry/opentelemetry-specification/blob/a1c13d59bb7d0fb086df2b3e1eaec9df9efef6cc/specification/metrics/sdk_exporters/otlp.md#additional-configuration
pub fn with_delta_temporality(self) -> Self {
self.with_temporality_selector(DeltaTemporalitySelector::new())
}
}

impl<RT> OtlpMetricPipeline<RT, NoExporterConfig>
Expand All @@ -185,7 +167,7 @@ where
OtlpMetricPipeline {
exporter_pipeline: pipeline.into(),
rt: self.rt,
temporality_selector: self.temporality_selector,
temporality: self.temporality,
resource: self.resource,
period: self.period,
timeout: self.timeout,
Expand All @@ -199,10 +181,9 @@ where
{
/// Build MeterProvider
pub fn build(self) -> Result<SdkMeterProvider> {
let exporter = self.exporter_pipeline.build_metrics_exporter(
self.temporality_selector
.unwrap_or_else(|| Box::new(DefaultTemporalitySelector::new())),
)?;
let exporter = self
.exporter_pipeline
.build_metrics_exporter(self.temporality.unwrap_or_default())?;

let mut builder = PeriodicReader::builder(exporter, self.rt);

Expand Down Expand Up @@ -247,7 +228,7 @@ pub trait MetricsClient: fmt::Debug + Send + Sync + 'static {
/// Export metrics in OTEL format.
pub struct MetricsExporter {
client: Box<dyn MetricsClient>,
temporality_selector: Box<dyn TemporalitySelector>,
temporality: Temporality,
}

impl Debug for MetricsExporter {
Expand All @@ -256,12 +237,6 @@ impl Debug for MetricsExporter {
}
}

impl TemporalitySelector for MetricsExporter {
fn temporality(&self, kind: InstrumentKind) -> Temporality {
self.temporality_selector.temporality(kind)
}
}

#[async_trait]
impl PushMetricsExporter for MetricsExporter {
async fn export(&self, metrics: &mut ResourceMetrics) -> Result<()> {
Expand All @@ -276,17 +251,18 @@ impl PushMetricsExporter for MetricsExporter {
fn shutdown(&self) -> Result<()> {
self.client.shutdown()
}

fn temporality(&self) -> Temporality {
self.temporality
}
}

impl MetricsExporter {
/// Create a new metrics exporter
pub fn new(
client: impl MetricsClient,
temporality_selector: Box<dyn TemporalitySelector>,
) -> MetricsExporter {
pub fn new(client: impl MetricsClient, temporality: Temporality) -> MetricsExporter {
MetricsExporter {
client: Box::new(client),
temporality_selector,
temporality,
}
}
}
14 changes: 6 additions & 8 deletions opentelemetry-sdk/benches/metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use opentelemetry_sdk::{
metrics::{
data::{ResourceMetrics, Temporality},
new_view,
reader::{DeltaTemporalitySelector, MetricReader, TemporalitySelector},
reader::MetricReader,
Aggregation, Instrument, InstrumentKind, ManualReader, Pipeline, SdkMeterProvider, Stream,
View,
},
Expand All @@ -20,12 +20,6 @@ use opentelemetry_sdk::{
#[derive(Clone, Debug)]
struct SharedReader(Arc<dyn MetricReader>);

impl TemporalitySelector for SharedReader {
fn temporality(&self, kind: InstrumentKind) -> Temporality {
self.0.temporality(kind)
}
}

impl MetricReader for SharedReader {
fn register_pipeline(&self, pipeline: Weak<Pipeline>) {
self.0.register_pipeline(pipeline)
Expand All @@ -42,6 +36,10 @@ impl MetricReader for SharedReader {
fn shutdown(&self) -> Result<()> {
self.0.shutdown()
}

fn temporality(&self, kind: InstrumentKind) -> Temporality {
self.0.temporality(kind)
}
}

// * Summary *
Expand Down Expand Up @@ -118,7 +116,7 @@ fn bench_counter(view: Option<Box<dyn View>>, temporality: &str) -> (SharedReade
} else {
SharedReader(Arc::new(
ManualReader::builder()
.with_temporality_selector(DeltaTemporalitySelector::new())
.with_temporality(Temporality::Delta)
.build(),
))
};
Expand Down
8 changes: 7 additions & 1 deletion opentelemetry-sdk/src/metrics/data/temporality.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,22 @@
/// Defines the window that an aggregation was calculated over.
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
#[derive(Debug, Copy, Clone, Default, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub enum Temporality {
/// A measurement interval that continues to expand forward in time from a
/// starting point.
///
/// New measurements are added to all previous measurements since a start time.
#[default]
Cumulative,

/// A measurement interval that resets each cycle.
///
/// Measurements from one cycle are recorded independently, measurements from
/// other cycles do not affect them.
Delta,

/// Configures Synchronous Counter and Histogram instruments to use
/// Delta aggregation temporality, which allows them to shed memory
/// following a cardinality explosion, thus use less memory.
LowMemory,
}
Loading

0 comments on commit 773803c

Please sign in to comment.