Skip to content

Commit

Permalink
chore(): added snapshot message rate counter
Browse files Browse the repository at this point in the history
  • Loading branch information
le-vlad committed Oct 23, 2024
1 parent b16738d commit 8800444
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 3 deletions.
13 changes: 10 additions & 3 deletions internal/impl/postgresql/input_postgrecdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,8 @@ func newPgStreamInput(conf *service.ParsedConfig, logger *service.Logger, metric

snapsotMetrics := metrics.NewGauge("snapshot_progress")
replicationLag := metrics.NewGauge("replication_lag")
snapshotMessageRate := metrics.NewGauge("snapshot_message_rate")
snapshotRateCounter := NewRateCounter()

return service.AutoRetryNacks(&pgStreamInput{
dbConfig: pgconnConfig,
Expand All @@ -231,6 +233,8 @@ func newPgStreamInput(conf *service.ParsedConfig, logger *service.Logger, metric
streamUncomited: streamUncomited,
temporarySlot: temporarySlot,
snapshotBatchSize: snapshotBatchSize,
snapshotMessageRate: snapshotMessageRate,
snapshotRateCounter: snapshotRateCounter,

logger: logger,
metrics: metrics,
Expand Down Expand Up @@ -270,8 +274,10 @@ type pgStreamInput struct {
logger *service.Logger
metrics *service.Metrics

snapshotMetrics *service.MetricGauge
replicationLag *service.MetricGauge
snapshotRateCounter *RateCounter
snapshotMessageRate *service.MetricGauge
snapshotMetrics *service.MetricGauge
replicationLag *service.MetricGauge
}

func (p *pgStreamInput) Connect(ctx context.Context) error {
Expand Down Expand Up @@ -303,7 +309,6 @@ func (p *pgStreamInput) Connect(ctx context.Context) error {
}

func (p *pgStreamInput) Read(ctx context.Context) (*service.Message, service.AckFunc, error) {

select {
case snapshotMessage := <-p.pglogicalStream.SnapshotMessageC():
var (
Expand All @@ -322,6 +327,8 @@ func (p *pgStreamInput) Read(ctx context.Context) (*service.Message, service.Ack
p.snapshotMetrics.SetFloat64(*snapshotMessage.Changes[0].TableSnapshotProgress, snapshotMessage.Changes[0].Table)
}

p.snapshotMessageRate.SetFloat64(p.snapshotRateCounter.Rate())

return connectMessage, func(ctx context.Context, err error) error {
// Nacks are retried automatically when we use service.AutoRetryNacks
return nil
Expand Down
File renamed without changes.

0 comments on commit 8800444

Please sign in to comment.