Skip to content

Commit

Permalink
Adding prometheus support for telemetry
Browse files Browse the repository at this point in the history
  • Loading branch information
maxpert committed Nov 7, 2023
1 parent 1b32206 commit e47d83a
Show file tree
Hide file tree
Showing 9 changed files with 245 additions and 12 deletions.
15 changes: 15 additions & 0 deletions cfg/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,13 @@ type LoggingConfiguration struct {
Format string `toml:"format"`
}

type PrometheusConfiguration struct {
Bind string `toml:"bind"`
Enable bool `toml:"enable"`
Namespace string `toml:"namespace"`
Subsystem string `toml:"subsystem"`
}

type Configuration struct {
SeqMapPath string `toml:"seq_map_path"`
DBPath string `toml:"db_path"`
Expand All @@ -102,6 +109,7 @@ type Configuration struct {
ReplicationLog ReplicationLogConfiguration `toml:"replication_log"`
NATS NATSConfiguration `toml:"nats"`
Logging LoggingConfiguration `toml:"logging"`
Prometheus PrometheusConfiguration `toml:"prometheus"`
}

var ConfigPathFlag = flag.String("config", "", "Path to configuration file")
Expand Down Expand Up @@ -160,6 +168,13 @@ var Config = &Configuration{
Verbose: false,
Format: "console",
},

Prometheus: PrometheusConfiguration{
Bind: ":3010",
Enable: false,
Namespace: "marmot",
Subsystem: "",
},
}

func init() {
Expand Down
9 changes: 9 additions & 0 deletions config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,15 @@ connect_retries=5
# Wait time between NATS reconnect attempts (will only be used if URLs array is not empty)
reconnect_wait_seconds=2

[prometheus]
# Enable/Disable prometheus telemetry collection
enable=false
# HTTP endpoint to expose for prometheus matrix collection
# bind=":3010"
# Namespace for prometheus (default: `marmot`), applies to all counters, gaugues, histograms
# namespace=""
# Subsystem for prometheus (default: empty), applies to all counters, gauges, histograms
# subsystem=""

# Console STDOUT configurations
[logging]
Expand Down
31 changes: 30 additions & 1 deletion db/change_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ func (conn *SqliteStreamDB) watchChanges(watcher *fsnotify.Watcher, path string)

func (conn *SqliteStreamDB) getGlobalChanges(limit uint32) ([]globalChangeLogEntry, error) {
sw := utils.NewStopWatch("scan_changes")
defer sw.Log(log.Debug())
defer sw.Log(log.Debug(), conn.stats.scanChanges)

sqlConn, err := conn.pool.Borrow()
if err != nil {
Expand All @@ -331,13 +331,40 @@ func (conn *SqliteStreamDB) getGlobalChanges(limit uint32) ([]globalChangeLogEnt
return entries, nil
}

func (conn *SqliteStreamDB) countChanges() (int64, error) {
sw := utils.NewStopWatch("count_changes")
defer sw.Log(log.Debug(), conn.stats.countChanges)

sqlConn, err := conn.pool.Borrow()
if err != nil {
return -1, err
}
defer sqlConn.Return()

return sqlConn.DB().
From(conn.globalMetaTable()).
Count()
}

func (conn *SqliteStreamDB) publishChangeLog() {
if !conn.publishLock.TryLock() {
log.Warn().Msg("Publish in progress skipping...")
return
}
defer conn.publishLock.Unlock()

cnt, err := conn.countChanges()
if err != nil {
log.Error().Err(err).Msg("Unable to count global changes")
return
}

conn.stats.pendingPublish.Set(float64(cnt))
if cnt <= 0 {
log.Debug().Msg("no new rows")
return
}

changes, err := conn.getGlobalChanges(cfg.Config.ScanMaxChanges)
if err != nil {
log.Error().Err(err).Msg("Unable to scan global changes")
Expand Down Expand Up @@ -379,6 +406,8 @@ func (conn *SqliteStreamDB) publishChangeLog() {
if err != nil {
log.Error().Err(err).Msg("Unable to cleanup change log")
}

conn.stats.published.Inc()
}
}

Expand Down
15 changes: 15 additions & 0 deletions db/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/fsnotify/fsnotify"
"github.com/mattn/go-sqlite3"
"github.com/maxpert/marmot/pool"
"github.com/maxpert/marmot/telemetry"
"github.com/rs/zerolog/log"
)

Expand All @@ -22,6 +23,13 @@ const snapshotTransactionMode = "exclusive"
var PoolSize = 4
var MarmotPrefix = "__marmot__"

type statsSqliteStreamDB struct {
published telemetry.Counter
pendingPublish telemetry.Gauge
countChanges telemetry.Histogram
scanChanges telemetry.Histogram
}

type SqliteStreamDB struct {
OnChange func(event *ChangeLogEvent) error
pool *pool.SQLitePool
Expand All @@ -31,6 +39,7 @@ type SqliteStreamDB struct {
dbPath string
prefix string
watchTablesSchema map[string][]*ColumnInfo
stats *statsSqliteStreamDB
}

type ColumnInfo struct {
Expand Down Expand Up @@ -142,6 +151,12 @@ func OpenStreamDB(path string) (*SqliteStreamDB, error) {
prefix: MarmotPrefix,
publishLock: &sync.Mutex{},
watchTablesSchema: map[string][]*ColumnInfo{},
stats: &statsSqliteStreamDB{
published: telemetry.NewCounter("published", "number of rows published"),
pendingPublish: telemetry.NewGauge("pending_publish", "rows pending publishing"),
countChanges: telemetry.NewHistogram("count_changes", "latency counting changes in microseconds"),
scanChanges: telemetry.NewHistogram("scan_changes", "latency scanning change rows in DB"),
},
}

return ret, nil
Expand Down
13 changes: 10 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,25 @@ require (
github.com/nats-io/nats-server/v2 v2.9.20
github.com/nats-io/nats.go v1.28.0
github.com/pkg/sftp v1.13.5
github.com/prometheus/client_golang v1.17.0
github.com/rs/zerolog v1.29.1
github.com/samber/lo v1.38.1
github.com/studio-b12/gowebdav v0.9.0
golang.org/x/crypto v0.11.0
)

require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/cpuid/v2 v2.2.5 // indirect
github.com/kr/fs v0.1.0 // indirect
github.com/lib/pq v1.10.4 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/minio/highwayhash v1.0.2 // indirect
github.com/minio/md5-simd v1.1.2 // indirect
github.com/minio/sha256-simd v1.0.1 // indirect
Expand All @@ -39,15 +43,18 @@ require (
github.com/nats-io/jwt/v2 v2.4.1 // indirect
github.com/nats-io/nkeys v0.4.4 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/prometheus/client_model v0.4.1-0.20230718164431-9a2bf3000d16 // indirect
github.com/prometheus/common v0.44.0 // indirect
github.com/prometheus/procfs v0.11.1 // indirect
github.com/rs/xid v1.5.0 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/stretchr/objx v0.1.1 // indirect
github.com/x448/float16 v0.8.4 // indirect
golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1 // indirect
golang.org/x/net v0.12.0 // indirect
golang.org/x/sys v0.10.0 // indirect
golang.org/x/sys v0.11.0 // indirect
golang.org/x/text v0.11.0 // indirect
golang.org/x/time v0.3.0 // indirect
google.golang.org/protobuf v1.26.0 // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
)
28 changes: 23 additions & 5 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ github.com/DATA-DOG/go-sqlmock v1.5.0 h1:Shsta01QNfFxHCfpW6YH2STWB0MudeXXEWMr20O
github.com/DATA-DOG/go-sqlmock v1.5.0/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM=
github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef h1:2JGTg6JapxP9/R33ZaagQtAM4EkkSYnIAlOG5EI8gkM=
github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef/go.mod h1:JS7hed4L1fj0hXcyEejnW57/7LCetXggd+vwrRnYeII=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
Expand All @@ -22,10 +26,12 @@ github.com/fxamacker/cbor/v2 v2.4.0/go.mod h1:TA1xS00nchWmaBnEIxPSE5oHLuJBAVvqrt
github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe/go.mod h1:8vg3r2VgvsThLBIFL93Qb5yWzgyZWhEmBwUJWevAkK0=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
Expand All @@ -51,6 +57,8 @@ github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D
github.com/mattn/go-sqlite3 v1.14.7/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
github.com/mattn/go-sqlite3 v1.14.17 h1:mCRHCLDUBXgpKAqIKsaAaAsrAlbkeomtRFKXh2L6YIM=
github.com/mattn/go-sqlite3 v1.14.17/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg=
github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo=
github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4=
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/minio/md5-simd v1.1.2 h1:Gdi1DZK69+ZVMoNHRXJyNcxrMA4dSxoYHZSQbirFg34=
Expand Down Expand Up @@ -79,6 +87,14 @@ github.com/pkg/sftp v1.13.5 h1:a3RLUqkyjYRtBTZJZ1VRrKbN3zhuPLlUc3sphVz81go=
github.com/pkg/sftp v1.13.5/go.mod h1:wHDZ0IZX6JcBYRK1TH9bcVq8G7TLpVHYIGJRFnmPfxg=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v1.17.0 h1:rl2sfwZMtSthVU752MqfjQozy7blglC+1SOtjMAMh+Q=
github.com/prometheus/client_golang v1.17.0/go.mod h1:VeL+gMmOAxkS2IqfCq0ZmHSL+LjWfWDUmp1mBz9JgUY=
github.com/prometheus/client_model v0.4.1-0.20230718164431-9a2bf3000d16 h1:v7DLqVdK4VrYkVD5diGdl4sxJurKJEMnODWRJlxV9oM=
github.com/prometheus/client_model v0.4.1-0.20230718164431-9a2bf3000d16/go.mod h1:oMQmHW1/JoDwqLtg57MGgP/Fb1CJEYF2imWWhWtMkYU=
github.com/prometheus/common v0.44.0 h1:+5BrQJwiBB9xsMygAB3TNvpQKOwlkc25LbISbrdOOfY=
github.com/prometheus/common v0.44.0/go.mod h1:ofAIvZbQ1e/nugmZGz4/qCb9Ap1VoSTIO7x0VV9VvuY=
github.com/prometheus/procfs v0.11.1 h1:xRC8Iq1yyca5ypa9n1EZnWZkt7dwcoRPQwX/5gwaUuI=
github.com/prometheus/procfs v0.11.1/go.mod h1:eesXgaPo1q7lBpVMoMy0ZOFTth9hBn4W/y0/p/ScXhY=
github.com/rs/xid v1.4.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
github.com/rs/xid v1.5.0 h1:mKX4bl4iPYJtEIxp6CYiUuLQ/8DYMoz0PUdtGgMFRVc=
github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
Expand Down Expand Up @@ -110,6 +126,7 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.12.0 h1:cfawfvKITfUsFCeJIHJrbSxpeu/E81khclypR0GVT50=
golang.org/x/net v0.12.0/go.mod h1:zEVYFnQC7m/vmpQFELhcD1EWkZlX69l4oqgmer6hfKA=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand All @@ -124,8 +141,8 @@ golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.10.0 h1:SqMFp9UcQJZa+pmYuAKjd9xq1f0j5rLcDIk0mj4qAsA=
golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM=
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.10.0 h1:3R7pNqamzBraeqj/Tj8qt1aQ2HpmlC+Cx/qL/7hn4/c=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
Expand All @@ -137,8 +154,9 @@ golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA=
gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
Expand Down
4 changes: 4 additions & 0 deletions marmot.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"os"
"time"

"github.com/maxpert/marmot/telemetry"
"github.com/maxpert/marmot/utils"

"github.com/maxpert/marmot/cfg"
Expand Down Expand Up @@ -43,6 +44,9 @@ func main() {
log.Logger = gLog.Level(zerolog.InfoLevel)
}

log.Debug().Msg("Initializing telemetry")
telemetry.InitializeTelemetry()

log.Debug().Str("path", cfg.Config.DBPath).Msg("Opening database")
streamDB, err := db.OpenStreamDB(cfg.Config.DBPath)
if err != nil {
Expand Down
Loading

0 comments on commit e47d83a

Please sign in to comment.