Skip to content

Commit

Permalink
Introduce LogBatch Struct to Replace Vec in LogExporter Interface (#2057
Browse files Browse the repository at this point in the history
)
  • Loading branch information
lalitb authored Aug 30, 2024
1 parent 8e713e4 commit 976bc54
Show file tree
Hide file tree
Showing 12 changed files with 132 additions and 60 deletions.
4 changes: 2 additions & 2 deletions opentelemetry-appender-tracing/benches/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use criterion::{criterion_group, criterion_main, Criterion};
use opentelemetry::logs::LogResult;
use opentelemetry::{InstrumentationLibrary, KeyValue};
use opentelemetry_appender_tracing::layer as tracing_layer;
use opentelemetry_sdk::export::logs::LogExporter;
use opentelemetry_sdk::export::logs::{LogBatch, LogExporter};
use opentelemetry_sdk::logs::{LogProcessor, LogRecord, LoggerProvider};
use opentelemetry_sdk::Resource;
use pprof::criterion::{Output, PProfProfiler};
Expand All @@ -34,7 +34,7 @@ struct NoopExporter {

#[async_trait]
impl LogExporter for NoopExporter {
async fn export(&mut self, _: Vec<(&LogRecord, &InstrumentationLibrary)>) -> LogResult<()> {
async fn export(&mut self, _: LogBatch<'_>) -> LogResult<()> {
LogResult::Ok(())
}

Expand Down
6 changes: 2 additions & 4 deletions opentelemetry-otlp/src/exporter/http/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,13 @@ use std::sync::Arc;
use async_trait::async_trait;
use http::{header::CONTENT_TYPE, Method};
use opentelemetry::logs::{LogError, LogResult};
use opentelemetry::InstrumentationLibrary;
use opentelemetry_sdk::export::logs::LogExporter;
use opentelemetry_sdk::logs::LogRecord;
use opentelemetry_sdk::export::logs::{LogBatch, LogExporter};

use super::OtlpHttpClient;

#[async_trait]
impl LogExporter for OtlpHttpClient {
async fn export(&mut self, batch: Vec<(&LogRecord, &InstrumentationLibrary)>) -> LogResult<()> {
async fn export(&mut self, batch: LogBatch<'_>) -> LogResult<()> {
let client = self
.client
.lock()
Expand Down
8 changes: 3 additions & 5 deletions opentelemetry-otlp/src/exporter/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,16 @@ use crate::{
OTEL_EXPORTER_OTLP_TIMEOUT,
};
use http::{HeaderName, HeaderValue, Uri};
#[cfg(feature = "logs")]
use opentelemetry::InstrumentationLibrary;
use opentelemetry_http::HttpClient;
use opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema;
#[cfg(feature = "logs")]
use opentelemetry_proto::transform::logs::tonic::group_logs_by_resource_and_scope;
#[cfg(feature = "trace")]
use opentelemetry_proto::transform::trace::tonic::group_spans_by_resource_and_scope;
#[cfg(feature = "logs")]
use opentelemetry_sdk::export::logs::LogBatch;
#[cfg(feature = "trace")]
use opentelemetry_sdk::export::trace::SpanData;
#[cfg(feature = "logs")]
use opentelemetry_sdk::logs::LogRecord;
#[cfg(feature = "metrics")]
use opentelemetry_sdk::metrics::data::ResourceMetrics;
use prost::Message;
Expand Down Expand Up @@ -330,7 +328,7 @@ impl OtlpHttpClient {
#[cfg(feature = "logs")]
fn build_logs_export_body(
&self,
logs: Vec<(&LogRecord, &InstrumentationLibrary)>,
logs: LogBatch<'_>,
) -> opentelemetry::logs::LogResult<(Vec<u8>, &'static str)> {
use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
let resource_logs = group_logs_by_resource_and_scope(logs, &self.resource);
Expand Down
6 changes: 2 additions & 4 deletions opentelemetry-otlp/src/exporter/tonic/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,12 @@ use opentelemetry::logs::{LogError, LogResult};
use opentelemetry_proto::tonic::collector::logs::v1::{
logs_service_client::LogsServiceClient, ExportLogsServiceRequest,
};
use opentelemetry_sdk::export::logs::LogExporter;
use opentelemetry_sdk::export::logs::{LogBatch, LogExporter};
use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request};

use opentelemetry_proto::transform::logs::tonic::group_logs_by_resource_and_scope;

use super::BoxInterceptor;
use opentelemetry::InstrumentationLibrary;
use opentelemetry_sdk::logs::LogRecord;

pub(crate) struct TonicLogsClient {
inner: Option<ClientInner>,
Expand Down Expand Up @@ -56,7 +54,7 @@ impl TonicLogsClient {

#[async_trait]
impl LogExporter for TonicLogsClient {
async fn export(&mut self, batch: Vec<(&LogRecord, &InstrumentationLibrary)>) -> LogResult<()> {
async fn export(&mut self, batch: LogBatch<'_>) -> LogResult<()> {
let (mut client, metadata, extensions) = match &mut self.inner {
Some(inner) => {
let (m, e, _) = inner
Expand Down
6 changes: 3 additions & 3 deletions opentelemetry-otlp/src/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ use async_trait::async_trait;
use std::fmt::Debug;

use opentelemetry::logs::{LogError, LogResult};
use opentelemetry::InstrumentationLibrary;

use opentelemetry_sdk::{logs::LogRecord, runtime::RuntimeChannel, Resource};
use opentelemetry_sdk::export::logs::LogBatch;
use opentelemetry_sdk::{runtime::RuntimeChannel, Resource};

/// Compression algorithm to use, defaults to none.
pub const OTEL_EXPORTER_OTLP_LOGS_COMPRESSION: &str = "OTEL_EXPORTER_OTLP_LOGS_COMPRESSION";
Expand Down Expand Up @@ -99,7 +99,7 @@ impl LogExporter {

#[async_trait]
impl opentelemetry_sdk::export::logs::LogExporter for LogExporter {
async fn export(&mut self, batch: Vec<(&LogRecord, &InstrumentationLibrary)>) -> LogResult<()> {
async fn export(&mut self, batch: LogBatch<'_>) -> LogResult<()> {
self.client.export(batch).await
}

Expand Down
18 changes: 9 additions & 9 deletions opentelemetry-proto/src/transform/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pub mod tonic {
transform::common::{to_nanos, tonic::ResourceAttributesWithSchema},
};
use opentelemetry::logs::{AnyValue as LogsAnyValue, Severity};
use opentelemetry_sdk::export::logs::LogBatch;
use std::borrow::Cow;
use std::collections::HashMap;

Expand Down Expand Up @@ -177,10 +178,7 @@ pub mod tonic {
}

pub fn group_logs_by_resource_and_scope(
logs: Vec<(
&opentelemetry_sdk::logs::LogRecord,
&opentelemetry::InstrumentationLibrary,
)>,
logs: LogBatch<'_>,
resource: &ResourceAttributesWithSchema,
) -> Vec<ResourceLogs> {
// Group logs by target or instrumentation name
Expand Down Expand Up @@ -237,7 +235,7 @@ mod tests {
use crate::transform::common::tonic::ResourceAttributesWithSchema;
use opentelemetry::logs::LogRecord as _;
use opentelemetry::InstrumentationLibrary;
use opentelemetry_sdk::{logs::LogRecord, Resource};
use opentelemetry_sdk::{export::logs::LogBatch, logs::LogRecord, Resource};
use std::time::SystemTime;

fn create_test_log_data(
Expand All @@ -258,11 +256,12 @@ mod tests {
let (log_record1, instrum_lib1) = create_test_log_data("test-lib", "Log 1");
let (log_record2, instrum_lib2) = create_test_log_data("test-lib", "Log 2");

let logs = vec![(&log_record1, &instrum_lib1), (&log_record2, &instrum_lib2)];
let logs = [(&log_record1, &instrum_lib1), (&log_record2, &instrum_lib2)];
let log_batch = LogBatch::new(&logs);
let resource: ResourceAttributesWithSchema = (&resource).into(); // Convert Resource to ResourceAttributesWithSchema

let grouped_logs =
crate::transform::logs::tonic::group_logs_by_resource_and_scope(logs, &resource);
crate::transform::logs::tonic::group_logs_by_resource_and_scope(log_batch, &resource);

assert_eq!(grouped_logs.len(), 1);
let resource_logs = &grouped_logs[0];
Expand All @@ -278,10 +277,11 @@ mod tests {
let (log_record1, instrum_lib1) = create_test_log_data("lib1", "Log 1");
let (log_record2, instrum_lib2) = create_test_log_data("lib2", "Log 2");

let logs = vec![(&log_record1, &instrum_lib1), (&log_record2, &instrum_lib2)];
let logs = [(&log_record1, &instrum_lib1), (&log_record2, &instrum_lib2)];
let log_batch = LogBatch::new(&logs);
let resource: ResourceAttributesWithSchema = (&resource).into(); // Convert Resource to ResourceAttributesWithSchema
let grouped_logs =
crate::transform::logs::tonic::group_logs_by_resource_and_scope(logs, &resource);
crate::transform::logs::tonic::group_logs_by_resource_and_scope(log_batch, &resource);

assert_eq!(grouped_logs.len(), 1);
let resource_logs = &grouped_logs[0];
Expand Down
13 changes: 11 additions & 2 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
- Provide default implementation for `event_enabled` method in `LogProcessor`
trait that returns `true` always.
- **Breaking** [#2041](https://github.com/open-telemetry/opentelemetry-rust/pull/2041)
and [#2057](https://github.com/open-telemetry/opentelemetry-rust/pull/2057)
- The Exporter::export() interface is modified as below:
Previous Signature:
```rust
Expand All @@ -34,9 +35,17 @@

Updated Signature:
```rust
async fn export(&mut self, batch: Vec<(&LogRecord, &InstrumentationLibrary)>) -> LogResult<()>;
async fn export(&mut self, batch: LogBatch<'_>) -> LogResult<()>;
```
This change simplifies the processing required by exporters. Exporters no longer need to determine if the LogData is borrowed or owned, as they now work directly with references. As a result, exporters must explicitly create a copy of LogRecord and/or InstrumentationLibrary when needed, as the new interface only provides references to these structures.

where
```rust
pub struct LogBatch<'a> {

data: &'a [(&'a LogRecord, &'a InstrumentationLibrary)],
}
```
This change enhances performance by reducing unnecessary heap allocations and maintains object safety, allowing for more efficient handling of log records. It also simplifies the processing required by exporters. Exporters no longer need to determine if the LogData is borrowed or owned, as they now work directly with references. As a result, exporters must explicitly create a copy of LogRecord and/or InstrumentationLibrary when needed, as the new interface only provides references to these structures.

## v0.24.1

Expand Down
19 changes: 11 additions & 8 deletions opentelemetry-sdk/benches/log_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
RAM: 64.0 GB
| Test | Average time|
|--------------------------------|-------------|
| LogExporterWithFuture | 122 ns |
| LogExporterWithoutFuture | 89 ns |
| LogExporterWithFuture | 111 ns |
| LogExporterWithoutFuture | 92 ns |
*/

use std::sync::Mutex;
Expand All @@ -19,6 +19,7 @@ use criterion::{criterion_group, criterion_main, Criterion};
use opentelemetry::logs::{LogRecord as _, LogResult, Logger as _, LoggerProvider as _, Severity};

use opentelemetry::InstrumentationLibrary;
use opentelemetry_sdk::export::logs::LogBatch;
use opentelemetry_sdk::logs::LogProcessor;
use opentelemetry_sdk::logs::LogRecord;
use opentelemetry_sdk::logs::LoggerProvider;
Expand All @@ -29,25 +30,25 @@ use std::fmt::Debug;
// cargo bench --bench log_exporter
#[async_trait]
pub trait LogExporterWithFuture: Send + Sync + Debug {
async fn export(&mut self, batch: Vec<(&LogRecord, &InstrumentationLibrary)>);
async fn export(&mut self, batch: LogBatch<'_>);
}

pub trait LogExporterWithoutFuture: Send + Sync + Debug {
fn export(&mut self, batch: Vec<(&LogRecord, &InstrumentationLibrary)>);
fn export(&mut self, batch: LogBatch<'_>);
}

#[derive(Debug)]
struct NoOpExporterWithFuture {}

#[async_trait]
impl LogExporterWithFuture for NoOpExporterWithFuture {
async fn export(&mut self, _batch: Vec<(&LogRecord, &InstrumentationLibrary)>) {}
async fn export(&mut self, _batch: LogBatch<'_>) {}
}

#[derive(Debug)]
struct NoOpExporterWithoutFuture {}
impl LogExporterWithoutFuture for NoOpExporterWithoutFuture {
fn export(&mut self, _batch: Vec<(&LogRecord, &InstrumentationLibrary)>) {}
fn export(&mut self, _batch: LogBatch<'_>) {}
}

#[derive(Debug)]
Expand All @@ -66,7 +67,8 @@ impl ExportingProcessorWithFuture {
impl LogProcessor for ExportingProcessorWithFuture {
fn emit(&self, record: &mut LogRecord, library: &InstrumentationLibrary) {
let mut exporter = self.exporter.lock().expect("lock error");
futures_executor::block_on(exporter.export(vec![(record, library)]));
let logs = [(record as &LogRecord, library)];
futures_executor::block_on(exporter.export(LogBatch::new(&logs)));
}

fn force_flush(&self) -> LogResult<()> {
Expand All @@ -93,10 +95,11 @@ impl ExportingProcessorWithoutFuture {

impl LogProcessor for ExportingProcessorWithoutFuture {
fn emit(&self, record: &mut LogRecord, library: &InstrumentationLibrary) {
let logs = [(record as &LogRecord, library)];
self.exporter
.lock()
.expect("lock error")
.export(vec![(record, library)]);
.export(LogBatch::new(&logs));
}

fn force_flush(&self) -> LogResult<()> {
Expand Down
74 changes: 72 additions & 2 deletions opentelemetry-sdk/src/export/logs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,81 @@ use opentelemetry::{
};
use std::fmt::Debug;

/// A batch of log records to be exported by a `LogExporter`.
///
/// The `LogBatch` struct holds a collection of log records along with their associated
/// instrumentation libraries. This structure is used to group log records together for efficient
/// export operations.
///
/// # Type Parameters
/// - `'a`: The lifetime of the references to the log records and instrumentation libraries.
///
#[derive(Debug)]
pub struct LogBatch<'a> {
/// The data field contains a slice of tuples, where each tuple consists of a reference to
/// a `LogRecord` and a reference to an `InstrumentationLibrary`.
data: &'a [(&'a LogRecord, &'a InstrumentationLibrary)],
}

impl<'a> LogBatch<'a> {
/// Creates a new instance of `LogBatch`.
///
/// # Arguments
///
/// * `data` - A slice of tuples, where each tuple consists of a reference to a `LogRecord`
/// and a reference to an `InstrumentationLibrary`. These tuples represent the log records
/// and their associated instrumentation libraries to be exported.
///
/// # Returns
///
/// A `LogBatch` instance containing the provided log records and instrumentation libraries.
///
/// Note - this is not a public function, and should not be used directly. This would be
/// made private in the future.

pub fn new(data: &'a [(&'a LogRecord, &'a InstrumentationLibrary)]) -> LogBatch<'a> {
LogBatch { data }
}
}

impl LogBatch<'_> {
/// Returns an iterator over the log records and instrumentation libraries in the batch.
///
/// Each item yielded by the iterator is a tuple containing references to a `LogRecord`
/// and an `InstrumentationLibrary`.
///
/// # Returns
///
/// An iterator that yields references to the `LogRecord` and `InstrumentationLibrary` in the batch.
///
pub fn iter(&self) -> impl Iterator<Item = (&LogRecord, &InstrumentationLibrary)> {
self.data
.iter()
.map(|(record, library)| (*record, *library))
}
}

/// `LogExporter` defines the interface that log exporters should implement.
#[async_trait]
pub trait LogExporter: Send + Sync + Debug {
/// Exports a batch of [`LogRecord`, `InstrumentationLibrary`].
async fn export(&mut self, batch: Vec<(&LogRecord, &InstrumentationLibrary)>) -> LogResult<()>;
/// Exports a batch of log records and their associated instrumentation libraries.
///
/// The `export` method is responsible for sending a batch of log records to an external
/// destination. It takes a `LogBatch` as an argument, which contains references to the
/// log records and their corresponding instrumentation libraries. The method returns
/// a `LogResult` indicating the success or failure of the export operation.
///
/// # Arguments
///
/// * `batch` - A `LogBatch` containing the log records and instrumentation libraries
/// to be exported.
///
/// # Returns
///
/// A `LogResult<()>`, which is a result type indicating either a successful export (with
/// `Ok(())`) or an error (`Err(LogError)`) if the export operation failed.
///
async fn export(&mut self, batch: LogBatch<'_>) -> LogResult<()>;
/// Shuts down the exporter.
fn shutdown(&mut self) {}
#[cfg(feature = "logs_level_enabled")]
Expand Down
Loading

0 comments on commit 976bc54

Please sign in to comment.