Skip to content

Commit

Permalink
added replication lag as gauge
Browse files Browse the repository at this point in the history
  • Loading branch information
sinamna committed Apr 29, 2024
1 parent c8b33d6 commit 1faad80
Showing 1 changed file with 57 additions and 16 deletions.
73 changes: 57 additions & 16 deletions surveyor/jetstream_configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,27 +28,30 @@ import (

var (
//JSStreamList = `$JS.API.STREAM.LIST`
streamConfigLabels = []string{"discard_policy", "storage_type", "replica_number", "stream_name"}
consumerConfigLabels = []string{"stream_name", "max_pending_ack", "ack_policy", "is_pull", "consumer_name"}
streamRaftInfoLabels = []string{"stream_name", "leader", "replica_count"}
streamRaftPeerInfoLabels = []string{"stream_name", "peer_name", "offline", "current", "leader", "lag"}
consumerRaftInfoLabels = []string{"consumer_name", "leader", "replica_count", "stream_name"}
consumerStateLabels = []string{"consumer_name", "stream_name", "last_delivered_message_consumer", "last_delivered_message_stream", "ack_floor_consumer", "ack_floor_stream"}
consumerRaftPeerInfoLabels = []string{"stream_name", "consumer_name", "peer_name", "offline", "current", "leader", "lag"}

DefaultScrapeInterval = 10 * time.Second
streamConfigLabels = []string{"discard_policy", "storage_type", "replica_number", "stream_name"}
consumerConfigLabels = []string{"stream_name", "max_pending_ack", "ack_policy", "is_pull", "consumer_name"}
streamRaftInfoLabels = []string{"stream_name", "leader", "replica_count"}
streamRaftPeerInfoLabels = []string{"stream_name", "peer_name", "offline", "current", "leader", "lag"}
consumerRaftInfoLabels = []string{"consumer_name", "leader", "replica_count", "stream_name"}
consumerStateLabels = []string{"consumer_name", "stream_name", "last_delivered_message_consumer", "last_delivered_message_stream", "ack_floor_consumer", "ack_floor_stream"}
consumerRaftPeerInfoLabels = []string{"stream_name", "consumer_name", "peer_name", "offline", "current", "leader", "lag"}
streamReplicationLagLabels = []string{"stream_name", "peer_name"}
consumerReplicationLagLabels = []string{"stream_name", "consumer_name", "peer_name"}
DefaultScrapeInterval = 10 * time.Second
//DefaultListenerID = "default_listener"
)

type JSStreamConfigMetrics struct {
jsStreamConfig *prometheus.GaugeVec
jsStreamRaftInfo *prometheus.GaugeVec
jsStreamRaftPeerInfo *prometheus.GaugeVec
jsStreamConfig *prometheus.GaugeVec
jsStreamRaftInfo *prometheus.GaugeVec
jsStreamRaftPeerInfo *prometheus.GaugeVec
jsStreamReplicationLag *prometheus.GaugeVec

jsConsumerConfig *prometheus.GaugeVec
jsConsumerState *prometheus.GaugeVec
jsConsumerRaftInfo *prometheus.GaugeVec
jsConsumerRaftPeerInfo *prometheus.GaugeVec
jsConsumerConfig *prometheus.GaugeVec
jsConsumerState *prometheus.GaugeVec
jsConsumerRaftInfo *prometheus.GaugeVec
jsConsumerRaftPeerInfo *prometheus.GaugeVec
jsConsumerReplicationLag *prometheus.GaugeVec
}

func NewJetStreamConfigListMetrics(registry *prometheus.Registry, constLabels prometheus.Labels) *JSStreamConfigMetrics {
Expand Down Expand Up @@ -89,6 +92,16 @@ func NewJetStreamConfigListMetrics(registry *prometheus.Registry, constLabels pr
Help: "raft peer info for consumer",
ConstLabels: constLabels,
}, consumerRaftPeerInfoLabels),
jsStreamReplicationLag: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: prometheus.BuildFQName("nats", "jetstream", "stream_replication_lag"),
Help: "replication lag of stream peers",
ConstLabels: constLabels,
}, streamReplicationLagLabels),
jsConsumerReplicationLag: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: prometheus.BuildFQName("nats", "jetstream", "consumer_replication_lag"),
Help: "replication lag of consumer peers",
ConstLabels: constLabels,
}, consumerReplicationLagLabels),
}

registry.MustRegister(metrics.jsStreamConfig)
Expand All @@ -98,6 +111,8 @@ func NewJetStreamConfigListMetrics(registry *prometheus.Registry, constLabels pr
registry.MustRegister(metrics.jsStreamRaftPeerInfo)
registry.MustRegister(metrics.jsConsumerRaftInfo)
registry.MustRegister(metrics.jsConsumerRaftPeerInfo)
registry.MustRegister(metrics.jsStreamReplicationLag)
registry.MustRegister(metrics.jsConsumerReplicationLag)
return metrics
}

Expand Down Expand Up @@ -209,6 +224,18 @@ func (o *jsConfigListListener) StreamHandler(streamInfo *nats.StreamInfo) {
"lag": strconv.FormatUint(peer.Lag, 10),
},
).Set(1)
o.metrics.jsStreamReplicationLag.DeletePartialMatch(
prometheus.Labels{
"stream_name": streamInfo.Config.Name,
"peer_name": peer.Name,
},
)
o.metrics.jsStreamReplicationLag.With(
prometheus.Labels{
"stream_name": streamInfo.Config.Name,
"peer_name": peer.Name,
},
).Set(float64(peer.Lag))
}
}
func convertBoolToString(value bool) string {
Expand Down Expand Up @@ -280,6 +307,20 @@ func (o *jsConfigListListener) ConsumerHandler(consumerInfo *nats.ConsumerInfo)
"lag": strconv.FormatUint(peer.Lag, 10),
},
).Set(1)
o.metrics.jsConsumerReplicationLag.DeletePartialMatch(
prometheus.Labels{
"stream_name": consumerInfo.Stream,
"consumer_name": consumerInfo.Name,
"peer_name": peer.Name,
},
)
o.metrics.jsConsumerReplicationLag.With(
prometheus.Labels{
"stream_name": consumerInfo.Stream,
"consumer_name": consumerInfo.Name,
"peer_name": peer.Name,
},
).Set(float64(peer.Lag))
}
}
func IsPullBased(info *nats.ConsumerInfo) string {
Expand Down

0 comments on commit 1faad80

Please sign in to comment.