Skip to content

Commit

Permalink
imports and abstract changes
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-rcheng committed Jul 24, 2023
1 parent 0f570ee commit bbdf245
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,17 @@
public abstract class SnowflakeTelemetryBasicInfo {
final String tableName;

/**
* User created or user visible stage name.
*
* <p>It is null for Snowpipe Streaming Telemetry.
*/
final String stageName;

static final KCLogger LOGGER = new KCLogger(SnowflakeTelemetryBasicInfo.class.getName());

/**
* Base Constructor. Accepts a tableName and StageName.
*
* @param tableName Checks for Nullability
* @param stageName Can be null (In case of Snowpipe Streaming since there is no user visible
* Snowflake Stage)
*/
public SnowflakeTelemetryBasicInfo(final String tableName, final String stageName) {
public SnowflakeTelemetryBasicInfo(final String tableName) {
Preconditions.checkArgument(
!Strings.isNullOrEmpty(tableName), "tableName cannot be null or empty");
this.tableName = tableName;
this.stageName = stageName;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@ public class SnowflakeTelemetryPipeCreation extends SnowflakeTelemetryBasicInfo
int fileCountReprocessPurge =
0; // files on stage that are purged due to reprocessing when cleaner starts
long startTime; // start time of the pipe
private final String stageName;
private final String pipeName;

public SnowflakeTelemetryPipeCreation(
final String tableName, final String stageName, final String pipeName) {
super(tableName, stageName);
super(tableName);
this.stageName = stageName;
this.pipeName = pipeName;
this.startTime = System.currentTimeMillis();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package com.snowflake.kafka.connector.internal.telemetry;

import static com.snowflake.kafka.connector.internal.metrics.MetricsUtil.*;
import static com.snowflake.kafka.connector.internal.metrics.MetricsUtil.FILE_COUNT_SUB_DOMAIN;
import static com.snowflake.kafka.connector.internal.metrics.MetricsUtil.FILE_COUNT_TABLE_STAGE_INGESTION_FAIL;
import static com.snowflake.kafka.connector.internal.metrics.MetricsUtil.LATENCY_SUB_DOMAIN;
import static com.snowflake.kafka.connector.internal.metrics.MetricsUtil.OFFSET_SUB_DOMAIN;
import static com.snowflake.kafka.connector.internal.metrics.MetricsUtil.constructMetricName;
import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.AVERAGE_COMMIT_LAG_FILE_COUNT;
import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.AVERAGE_COMMIT_LAG_MS;
import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.AVERAGE_INGESTION_LAG_FILE_COUNT;
Expand All @@ -9,23 +13,34 @@
import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.AVERAGE_KAFKA_LAG_RECORD_COUNT;
import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.BYTE_NUMBER;
import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.CLEANER_RESTART_COUNT;
import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.COMMITTED_OFFSET;
import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.END_TIME;
import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.FILE_COUNT_ON_INGESTION;
import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.FILE_COUNT_ON_STAGE;
import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.FILE_COUNT_PURGED;
import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.FILE_COUNT_TABLE_STAGE_BROKEN_RECORD;
import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.FILE_COUNT_TABLE_STAGE_INGEST_FAIL;
import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.FLUSHED_OFFSET;
import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.MEMORY_USAGE;
import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.PIPE_NAME;
import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.PROCESSED_OFFSET;
import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.PURGED_OFFSET;
import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.RECORD_NUMBER;
import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.STAGE_NAME;
import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.START_TIME;
import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.TABLE_NAME;

import com.codahale.metrics.*;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import com.snowflake.kafka.connector.internal.metrics.MetricsJmxReporter;
import com.snowflake.kafka.connector.internal.metrics.MetricsUtil;
import com.snowflake.kafka.connector.internal.metrics.MetricsUtil.EventType;
import java.util.*;
import java.util.Arrays;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -108,6 +123,7 @@ public class SnowflakeTelemetryPipeStatus extends SnowflakeTelemetryBasicInfo {
// May not be set if jmx is set to false
private Meter fileCountTableStageBrokenRecordMeter, fileCountTableStageIngestFailMeter;

private final String stageName;
private final String pipeName;

public SnowflakeTelemetryPipeStatus(
Expand All @@ -116,7 +132,8 @@ public SnowflakeTelemetryPipeStatus(
final String pipeName,
final boolean enableCustomJMXConfig,
final MetricsJmxReporter metricsJmxReporter) {
super(tableName, stageName);
super(tableName);
this.stageName = stageName;
this.pipeName = pipeName;

// Initial value of processed/flushed/committed/purged offset should be set to -1,
Expand Down Expand Up @@ -266,6 +283,7 @@ public void dumpTo(ObjectNode msg) {
msg.put(STAGE_NAME, stageName);
msg.put(PIPE_NAME, pipeName);

// TODO @rcheng: changed from metricsutil to telemetry?
msg.put(PROCESSED_OFFSET, processedOffset.get());
msg.put(FLUSHED_OFFSET, flushedOffset.get());
msg.put(COMMITTED_OFFSET, committedOffset.get());
Expand Down

0 comments on commit bbdf245

Please sign in to comment.