Skip to content

Commit

Permalink
feat: client-side tracing.
Browse files Browse the repository at this point in the history
feat: client-side tracing.

feat: Add FirestoreOpenTelemetryOptions to FirestoreOptions. (#1531)

* feat: Add FirestoreOpenTelemetryOptions to FirestoreOptions.

* Address code review feedback.

feat: Add com.google.cloud.firestore.telemetry package. (#1533)

* feat: Add FirestoreOpenTelemetryOptions to FirestoreOptions.

* feat: Add com.google.cloud.firestore.telemetry package.

* Address code review feedback.

* Factor out the otel version in pom.xml.

fix: Remove OpenCensus tracing code. (#1589)

* feat: Add FirestoreOpenTelemetryOptions to FirestoreOptions.

* feat: Add com.google.cloud.firestore.telemetry package.

* fix: Remove OpenCensus tracing code.

feat: tracing for aggregate queries, bulkwriter, partition queries, a… (#1590)

* feat: Add FirestoreOpenTelemetryOptions to FirestoreOptions.

* feat: Add com.google.cloud.firestore.telemetry package.

* fix: Remove OpenCensus tracing code.

* feat: tracing for aggregate queries, bulkwriter, partition queries, and listDocuments.

* Address code review feedback.

* Address feedback.

* don't use wildcard imports.

feat: trace instrumentation for DocumentReference methods. (#1591)

* feat: Add FirestoreOpenTelemetryOptions to FirestoreOptions.

* feat: Add com.google.cloud.firestore.telemetry package.

* fix: Remove OpenCensus tracing code.

* feat: tracing for aggregate queries, bulkwriter, partition queries, and listDocuments.

* feat: trace instrumentation for DocumentReference methods.

feat: trace instrumentation for queries and transactions. (#1592)

* feat: Add FirestoreOpenTelemetryOptions to FirestoreOptions.

* feat: Add com.google.cloud.firestore.telemetry package.

* fix: Remove OpenCensus tracing code.

* feat: tracing for aggregate queries, bulkwriter, partition queries, and listDocuments.

* feat: trace instrumentation for DocumentReference methods.

* feat: trace instrumentation for queries and transactions.

* test: Adding first e2e client-tracing test w/ Custom Root Span (#1621)

* test: Adding first e2e client-tracing test w/ Custom Root Span

* Roll back E2E tests commit.

* Address feedback.

* Address feedback (better event log message).

* Address feedback.

---------

Co-authored-by: Jimit J Shah <57637300+jimit-j-shah@users.noreply.github.com>

test: End-to-End Integration Test for Client-side Tracing in Firestore Java Server SDK using OpenTelemetry SDK and Cloud Trace Exporter against Cloud Trace. (#1635)

* Adding first e2e client-tracing test w/ Custom Root Span

* test: Adding first e2e client-tracing test w/ Custom Root Span

* Fixing test dependencies and use default GCP testing project.

Fixing

* Fixing test dependencies and use default GCP testing project.

* Fixing formatting

* Add aggregationQueryGet Test

* Add bulkWriterCommitTrace Test

* Fixing running multiple-tests

* Add partitionQuery Test

* Add collectionListDocumentsTrace Test

* Add docRef*Trace Tests

* Add docRefUpdate*Trace and docRefDelete*Trace Tests

* Fixing Trace fetching using retries for missing or incomplete traces due to eventual consistency of Cloud Trace

* Add get/query Trace Tests

* Add Transaction test

* Added TraceContainer to be able to test transaction test-cases

* test: Adding Transaction tests

* test: Adding Transaction tests

* test: Adding TestParameterInjector to run the test for global and non-global opentelemetry SDK instances

* test: formatting and cleanup

* test: Adding first e2e client-tracing test w/ Custom Root Span

* test: Add aggregationQueryGet Test

* test: Add bulkWriterCommitTrace Test and fixed running multiple-tests

* test: Add partitionQuery Test

* test: Add collectionListDocumentsTrace Test

* test: Add docRefUpdate*Trace and docRefDelete*Trace Tests and fixed Trace fetching using retries for missing or incomplete traces due to eventual consistency of Cloud Trace

* test: Add get/query Trace Tests

* test: Added Transaction tests using TraceContainer to verify traces for Transaction ops (BeginTransaction, Rollback etc)

* test: Adding TestParameterInjector to run the test for global and non-global opentelemetry SDK instances

* test: Formatting and cleanup

* test: review comments

* test: fixing dfs to handle case where the compareTo callstack may be shorter than the trace callstack - don't need to throw an exception in that case

* test: Consolidating verification methods

* test: review comments

fix: Make telemetry-related fields transient. (#1638)

fix: Rename 'enabled' to 'tracingEnabled'. (#1639)

* fix: Rename 'enabled' to 'tracingEnabled'.

In the future, FirestoreOpenTelemetryOptions will support enabling/disabling
Logging and Metrics as well. So we should use a better name for this field.

* address feedback.
  • Loading branch information
ehsannas committed Apr 26, 2024
1 parent c2812f7 commit b8bf2aa
Show file tree
Hide file tree
Showing 25 changed files with 3,895 additions and 488 deletions.
98 changes: 90 additions & 8 deletions google-cloud-firestore/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
</parent>
<properties>
<site.installationModule>google-cloud-firestore</site.installationModule>
<opentelemetry.version>1.29.0</opentelemetry.version>
</properties>
<dependencies>
<dependency>
Expand All @@ -39,10 +40,6 @@
<artifactId>grpc-google-cloud-firestore-v1</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.opencensus</groupId>
<artifactId>opencensus-contrib-grpc-util</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
Expand Down Expand Up @@ -91,10 +88,6 @@
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
</dependency>
<dependency>
<groupId>io.opencensus</groupId>
<artifactId>opencensus-api</artifactId>
</dependency>
<dependency>
<groupId>com.google.auth</groupId>
<artifactId>google-auth-library-credentials</artifactId>
Expand All @@ -113,6 +106,23 @@
<artifactId>protobuf-java-util</artifactId>
</dependency>

<!-- OpenTelemetry -->
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
<version>${opentelemetry.version}</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-context</artifactId>
<version>${opentelemetry.version}</version>
</dependency>
<dependency>
<groupId>io.opentelemetry.instrumentation</groupId>
<artifactId>opentelemetry-grpc-1.6</artifactId>
<version>${opentelemetry.version}-alpha</version>
</dependency>
<!-- END OpenTelemetry -->

<!-- Test dependencies -->
<dependency>
Expand Down Expand Up @@ -173,6 +183,78 @@
<version>3.14.0</version>
<scope>test</scope>
</dependency>
<!-- OpenTelemetry -->
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk</artifactId>
<version>${opentelemetry.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-testing</artifactId>
<version>${opentelemetry.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-semconv</artifactId>
<version>${opentelemetry.version}-alpha</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-trace</artifactId>
<version>${opentelemetry.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-common</artifactId>
<version>${opentelemetry.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.cloud.opentelemetry</groupId>
<artifactId>exporter-trace</artifactId>
<version>0.15.0</version>
<scope>test</scope>
</dependency>
<!-- END OpenTelemetry -->
<!-- Cloud Ops -->
<dependency>
<groupId>com.google.api.grpc</groupId>
<artifactId>proto-google-cloud-trace-v1</artifactId>
<version>1.3.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.cloud.opentelemetry</groupId>
<artifactId>exporter-trace</artifactId>
<version>0.15.0</version>
<scope>test</scope>
</dependency>
<!-- END OpenTelemetry -->
<!-- Cloud Ops -->
<dependency>
<groupId>com.google.api.grpc</groupId>
<artifactId>proto-google-cloud-trace-v1</artifactId>
<version>1.3.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-trace</artifactId>
<version>1.3.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.testparameterinjector</groupId>
<artifactId>test-parameter-injector</artifactId>
<version>1.15</version>
<scope>test</scope>
</dependency>
<!-- END Cloud Ops -->
</dependencies>

<reporting>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.google.cloud.firestore;

import static com.google.cloud.firestore.telemetry.TraceUtil.SPAN_NAME_RUN_AGGREGATION_QUERY;

import com.google.api.core.ApiFuture;
import com.google.api.core.InternalExtensionOnly;
import com.google.api.core.SettableApiFuture;
Expand All @@ -24,7 +26,10 @@
import com.google.api.gax.rpc.StatusCode;
import com.google.api.gax.rpc.StreamController;
import com.google.cloud.Timestamp;
import com.google.cloud.firestore.telemetry.TraceUtil;
import com.google.cloud.firestore.telemetry.TraceUtil.Scope;
import com.google.cloud.firestore.v1.FirestoreSettings;
import com.google.common.collect.ImmutableMap;
import com.google.firestore.v1.RunAggregationQueryRequest;
import com.google.firestore.v1.RunAggregationQueryResponse;
import com.google.firestore.v1.RunQueryRequest;
Expand All @@ -34,6 +39,7 @@
import com.google.firestore.v1.Value;
import com.google.protobuf.ByteString;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand All @@ -59,6 +65,11 @@ public class AggregateQuery {
this.aliasMap = new HashMap<>();
}

@Nonnull
private TraceUtil getTraceUtil() {
return query.getFirestore().getOptions().getTraceUtil();
}

/** Returns the query whose aggregations will be calculated by this object. */
@Nonnull
public Query getQuery() {
Expand All @@ -77,17 +88,30 @@ public ApiFuture<AggregateQuerySnapshot> get() {

@Nonnull
ApiFuture<AggregateQuerySnapshot> get(@Nullable final ByteString transactionId) {
AggregateQueryResponseDeliverer responseDeliverer =
new AggregateQueryResponseDeliverer(
transactionId, /* startTimeNanos= */ query.rpcContext.getClock().nanoTime());
runQuery(responseDeliverer);
return responseDeliverer.getFuture();
TraceUtil.Span span =
getTraceUtil()
.startSpan(
transactionId == null
? TraceUtil.SPAN_NAME_AGGREGATION_QUERY_GET
: TraceUtil.SPAN_NAME_TRANSACTION_GET_AGGREGATION_QUERY);
try (Scope ignored = span.makeCurrent()) {
AggregateQueryResponseDeliverer responseDeliverer =
new AggregateQueryResponseDeliverer(
transactionId, /* startTimeNanos= */ query.rpcContext.getClock().nanoTime());
runQuery(responseDeliverer, /* attempt= */ 0);
ApiFuture<AggregateQuerySnapshot> result = responseDeliverer.getFuture();
span.endAtFuture(result);
return result;
} catch (Exception error) {
span.end(error);
throw error;
}
}

private void runQuery(AggregateQueryResponseDeliverer responseDeliverer) {
private void runQuery(AggregateQueryResponseDeliverer responseDeliverer, int attempt) {
RunAggregationQueryRequest request = toProto(responseDeliverer.getTransactionId());
AggregateQueryResponseObserver responseObserver =
new AggregateQueryResponseObserver(responseDeliverer);
new AggregateQueryResponseObserver(responseDeliverer, attempt);
ServerStreamingCallable<RunAggregationQueryRequest, RunAggregationQueryResponse> callable =
query.rpcContext.getClient().runAggregationQueryCallable();
query.rpcContext.streamRequest(request, responseObserver, callable);
Expand Down Expand Up @@ -138,18 +162,36 @@ private final class AggregateQueryResponseObserver

private final AggregateQueryResponseDeliverer responseDeliverer;
private StreamController streamController;
private int attempt;

AggregateQueryResponseObserver(AggregateQueryResponseDeliverer responseDeliverer) {
AggregateQueryResponseObserver(AggregateQueryResponseDeliverer responseDeliverer, int attempt) {
this.responseDeliverer = responseDeliverer;
this.attempt = attempt;
}

Map<String, Object> getAttemptAttributes() {
ImmutableMap.Builder<String, Object> builder =
new ImmutableMap.Builder<String, Object>().put("isRetryAttempt", attempt > 0);
if (attempt > 0) {
builder.put("attemptNumber", attempt);
}
return builder.build();
}

@Override
public void onStart(StreamController streamController) {
getTraceUtil()
.currentSpan()
.addEvent(SPAN_NAME_RUN_AGGREGATION_QUERY + " Stream started.", getAttemptAttributes());
this.streamController = streamController;
}

@Override
public void onResponse(RunAggregationQueryResponse response) {
getTraceUtil()
.currentSpan()
.addEvent(
SPAN_NAME_RUN_AGGREGATION_QUERY + " Response Received.", getAttemptAttributes());
// Close the stream to avoid it dangling, since we're not expecting any more responses.
streamController.cancel();

Expand All @@ -165,8 +207,19 @@ public void onResponse(RunAggregationQueryResponse response) {
@Override
public void onError(Throwable throwable) {
if (shouldRetry(throwable)) {
runQuery(responseDeliverer);
getTraceUtil()
.currentSpan()
.addEvent(
SPAN_NAME_RUN_AGGREGATION_QUERY + ": Retryable Error",
Collections.singletonMap("error.message", throwable.getMessage()));

runQuery(responseDeliverer, attempt + 1);
} else {
getTraceUtil()
.currentSpan()
.addEvent(
SPAN_NAME_RUN_AGGREGATION_QUERY + ": Error",
Collections.singletonMap("error.message", throwable.getMessage()));
responseDeliverer.deliverError(throwable);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,10 @@
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.Timestamp;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.firestore.v1.BatchWriteRequest;
import com.google.firestore.v1.BatchWriteResponse;
import io.grpc.Status;
import io.opencensus.trace.AttributeValue;
import io.opencensus.trace.Tracing;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -69,12 +66,6 @@ ApiFuture<WriteResult> wrapResult(int writeIndex) {
* <p>The writes in the batch are not applied atomically and can be applied out of order.
*/
ApiFuture<Void> bulkCommit() {
Tracing.getTracer()
.getCurrentSpan()
.addAnnotation(
TraceUtil.SPAN_NAME_BATCHWRITE,
ImmutableMap.of("numDocuments", AttributeValue.longAttributeValue(getWrites().size())));

final BatchWriteRequest.Builder request = BatchWriteRequest.newBuilder();
request.setDatabase(firestore.getDatabaseName());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.cloud.firestore.telemetry.TraceUtil;
import com.google.cloud.firestore.telemetry.TraceUtil.Context;
import com.google.cloud.firestore.telemetry.TraceUtil.Scope;
import com.google.cloud.firestore.v1.FirestoreSettings;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -221,6 +224,8 @@ enum OperationType {
@GuardedBy("lock")
private Executor errorExecutor;

Context traceContext;

/**
* Used to track when writes are enqueued. The user handler executors cannot be changed after a
* write has been enqueued.
Expand All @@ -237,6 +242,7 @@ enum OperationType {
this.successExecutor = MoreExecutors.directExecutor();
this.errorExecutor = MoreExecutors.directExecutor();
this.bulkCommitBatch = new BulkCommitBatch(firestore, bulkWriterExecutor, maxBatchSize);
this.traceContext = firestore.getOptions().getTraceUtil().currentContext();

if (!options.getThrottlingEnabled()) {
this.rateLimiter =
Expand Down Expand Up @@ -899,21 +905,32 @@ private void scheduleCurrentBatchLocked(final boolean flush) {

/** Sends the provided batch once the rate limiter does not require any delay. */
private void sendBatchLocked(final BulkCommitBatch batch, final boolean flush) {
// Send the batch if it is does not require any delay, or schedule another attempt after the
// Send the batch if it does not require any delay, or schedule another attempt after the
// appropriate timeout.
boolean underRateLimit = rateLimiter.tryMakeRequest(batch.getMutationsSize());
if (underRateLimit) {
batch
.bulkCommit()
.addListener(
() -> {
if (flush) {
synchronized (lock) {
scheduleCurrentBatchLocked(/* flush= */ true);
}
TraceUtil.Span span =
firestore
.getOptions()
.getTraceUtil()
.startSpan(TraceUtil.SPAN_NAME_BULK_WRITER_COMMIT, traceContext)
.setAttribute("numDocuments", batch.getWrites().size());
try (Scope ignored = span.makeCurrent()) {
ApiFuture<Void> result = batch.bulkCommit();
result.addListener(
() -> {
if (flush) {
synchronized (lock) {
scheduleCurrentBatchLocked(/* flush= */ true);
}
},
bulkWriterExecutor);
}
},
bulkWriterExecutor);
span.endAtFuture(result);
} catch (Exception error) {
span.end(error);
throw error;
}
} else {
long delayMs = rateLimiter.getNextRequestDelayMs(batch.getMutationsSize());
logger.log(Level.FINE, () -> String.format("Backing off for %d seconds", delayMs / 1000));
Expand Down
Loading

0 comments on commit b8bf2aa

Please sign in to comment.