Skip to content

Commit

Permalink
feat(metrics): add metrics for object upload
Browse files Browse the repository at this point in the history
  • Loading branch information
jeqo committed Jul 3, 2023
1 parent 01e36d6 commit 10d035f
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -231,17 +231,20 @@ private void uploadSegmentLog(final RemoteLogSegmentMetadata remoteLogSegmentMet
throws IOException, StorageBackendException {
final String fileKey = objectKey.key(remoteLogSegmentMetadata, ObjectKey.Suffix.LOG);
try (final var sis = transformFinisher.toInputStream()) {
uploader.upload(sis, fileKey);
final var bytes = uploader.upload(sis, fileKey);
metrics.recordObjectUpload(ObjectKey.Suffix.LOG, bytes);
}
}

private void uploadIndexFile(final RemoteLogSegmentMetadata remoteLogSegmentMetadata,
final InputStream index,
final IndexType indexType)
throws StorageBackendException, IOException {
final String key = objectKey.key(remoteLogSegmentMetadata, ObjectKey.Suffix.fromIndexType(indexType));
final var suffix = ObjectKey.Suffix.fromIndexType(indexType);
final String key = objectKey.key(remoteLogSegmentMetadata, suffix);
try (index) {
uploader.upload(index, key);
final var bytes = uploader.upload(index, key);
metrics.recordObjectUpload(suffix, bytes);
}
}

Expand All @@ -252,7 +255,8 @@ private void uploadManifest(final RemoteLogSegmentMetadata remoteLogSegmentMetad
final String manifestFileKey = objectKey.key(remoteLogSegmentMetadata, ObjectKey.Suffix.MANIFEST);

try (final ByteArrayInputStream manifestContent = new ByteArrayInputStream(manifest.getBytes())) {
uploader.upload(manifestContent, manifestFileKey);
final var bytes = uploader.upload(manifestContent, manifestFileKey);
metrics.recordObjectUpload(ObjectKey.Suffix.MANIFEST, bytes);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
package io.aiven.kafka.tieredstorage.metrics;

import java.util.List;
import java.util.Map;

import org.apache.kafka.common.MetricNameTemplate;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.MetricConfig;
Expand All @@ -29,6 +31,8 @@
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.utils.Time;

import io.aiven.kafka.tieredstorage.ObjectKey;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -49,6 +53,7 @@ public class Metrics {
private final Sensor segmentFetchRequestedBytes;

final String rsmMetricGroup = "remote-storage-manager-metrics";
final String storageBackendMetricGroup = "storage-backend-metrics";

public Metrics(final Time time) {
final JmxReporter reporter = new JmxReporter();
Expand Down Expand Up @@ -93,6 +98,26 @@ public Metrics(final Time time) {
segmentFetchRequestedBytes.add(
metrics.metricName("segment-fetch-requested-bytes-total", rsmMetricGroup),
new CumulativeSum());

final var objectUploadRequestsRate =
new MetricNameTemplate("object-upload-rate", storageBackendMetricGroup, "", "object-type");
final var objectUploadRequestsTotal =
new MetricNameTemplate("object-upload-total", storageBackendMetricGroup, "", "object-type");
final var objectUploadBytesRate =
new MetricNameTemplate("object-upload-bytes-rate", storageBackendMetricGroup, "", "object-type");
final var objectUploadBytesTotal =
new MetricNameTemplate("object-upload-bytes-total", storageBackendMetricGroup, "", "object-type");

for (ObjectKey.Suffix suffix : ObjectKey.Suffix.values()) {
final var objectUploadRequests = metrics.sensor("object." + suffix.value + ".object-upload");
final var tags = Map.of("object-type", suffix.value);
objectUploadRequests.add(metrics.metricInstance(objectUploadRequestsRate, tags), new Rate());
objectUploadRequests.add(metrics.metricInstance(objectUploadRequestsTotal, tags), new CumulativeCount());

final var objectUploadBytes = metrics.sensor("object." + suffix.value + ".object-upload-bytes");
objectUploadBytes.add(metrics.metricInstance(objectUploadBytesRate, tags), new Rate());
objectUploadBytes.add(metrics.metricInstance(objectUploadBytesTotal, tags), new CumulativeSum());
}
}

public void recordSegmentCopy(final int bytes) {
Expand All @@ -118,6 +143,13 @@ public void recordSegmentFetch(final int bytes) {
segmentFetchRequestedBytes.record(bytes);
}

public void recordObjectUpload(final ObjectKey.Suffix suffix, final long bytes) {
final var sensorRequests = metrics.getSensor("object." + suffix.value + ".object-upload");
sensorRequests.record();
final var sensorBytes = metrics.getSensor("object." + suffix.value + ".object-upload-bytes");
sensorBytes.record(bytes);
}

public void close() {
try {
metrics.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ static void setup(@TempDir final Path tmpDir,

logSegmentData = new LogSegmentData(
sourceFile, sourceFile, sourceFile, Optional.empty(), sourceFile,
ByteBuffer.allocate(0)
ByteBuffer.wrap("test".getBytes())
);
}

Expand Down Expand Up @@ -131,6 +131,30 @@ void metricsShouldBeReported() throws RemoteStorageException, JMException {
assertThat(MBEAN_SERVER.getAttribute(rsmMetricsName, "segment-fetch-requested-bytes-total"))
.isEqualTo(10.0);

for (final var suffix : ObjectKey.Suffix.values()) {
final ObjectName storageMetricsName = ObjectName.getInstance(
"aiven.kafka.server.tieredstorage:type=storage-backend-metrics,object-type=" + suffix.value);
if (suffix != ObjectKey.Suffix.TXN_INDEX) {
assertThat(MBEAN_SERVER.getAttribute(storageMetricsName, "object-upload-rate"))
.isEqualTo(3.0 / METRIC_TIME_WINDOW_SEC);
assertThat(MBEAN_SERVER.getAttribute(storageMetricsName, "object-upload-total"))
.isEqualTo(3.0);
assertThat((double) MBEAN_SERVER.getAttribute(storageMetricsName, "object-upload-bytes-rate"))
.isGreaterThan(0.0);
assertThat((double) MBEAN_SERVER.getAttribute(storageMetricsName, "object-upload-bytes-total"))
.isGreaterThan(0.0);
} else {
assertThat(MBEAN_SERVER.getAttribute(storageMetricsName, "object-upload-total"))
.isEqualTo(0.0);
assertThat(MBEAN_SERVER.getAttribute(storageMetricsName, "object-upload-total"))
.isEqualTo(0.0);
assertThat((double) MBEAN_SERVER.getAttribute(storageMetricsName, "object-upload-bytes-rate"))
.isEqualTo(0.0);
assertThat((double) MBEAN_SERVER.getAttribute(storageMetricsName, "object-upload-bytes-total"))
.isEqualTo(0.0);
}
}

rsm.deleteLogSegmentData(REMOTE_LOG_SEGMENT_METADATA);
rsm.deleteLogSegmentData(REMOTE_LOG_SEGMENT_METADATA);

Expand Down

0 comments on commit 10d035f

Please sign in to comment.