Skip to content

Commit

Permalink
New monitoring mempool metrics collector #16
Browse files Browse the repository at this point in the history
  • Loading branch information
e-asphyx committed Jun 24, 2019
1 parent d93cbc8 commit fc64d38
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 19 deletions.
6 changes: 3 additions & 3 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

85 changes: 85 additions & 0 deletions collector/mempool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package collector

import (
"context"
"sync"

"github.com/ecadlabs/go-tezos"
"github.com/prometheus/client_golang/prometheus"
)

type MempoolOperationsCollector struct {
prometheus.Collector // refers to counter

counter *prometheus.CounterVec
service *tezos.Service
chainID string
wg sync.WaitGroup
cancel context.CancelFunc
}

func (m *MempoolOperationsCollector) listener(ctx context.Context, pool string) {
ch := make(chan []*tezos.Operation, 100)
defer close(ch)

go func() {
for ops := range ch {
for _, op := range ops {
for _, elem := range op.Contents {
m.counter.WithLabelValues(pool, op.Protocol, elem.OperationElemKind()).Inc()
}
}
}
}()

for {
err := m.service.MonitorMempoolOperations(ctx, m.chainID, pool, ch)
if err == context.Canceled {
return
}
}
}

func NewMempoolOperationsCollectorCollector(service *tezos.Service, chainID string, pools []string) *MempoolOperationsCollector {
counter := prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "tezos_node_mempool_operations_total",
Help: "The total number of mempool operations.",
},
[]string{"pool", "proto", "kind"},
)

c := MempoolOperationsCollector{
Collector: counter,
counter: counter,
service: service,
chainID: chainID,
}

ctx, cancel := context.WithCancel(context.Background())
c.cancel = cancel

for _, p := range pools {
c.wg.Add(1)
go c.listener(ctx, p)
}

return &c
}

func (m *MempoolOperationsCollector) Shutdown(ctx context.Context) error {
m.cancel()

sem := make(chan struct{})
go func() {
m.wg.Wait()
close(sem)
}()

select {
case <-sem:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
35 changes: 19 additions & 16 deletions collector/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,13 @@ var (
nil,
nil)

mempoolDesc = prometheus.NewDesc(
"tezos_node_mempool_operations",
"The current number of mempool operations.",
[]string{"pool", "proto", "kind"},
nil)
/*
mempoolDesc = prometheus.NewDesc(
"tezos_node_mempool_operations",
"The current number of mempool operations.",
[]string{"pool", "proto", "kind"},
nil)
*/

rpcFailedDesc = prometheus.NewDesc(
"tezos_rpc_failed",
Expand All @@ -70,10 +72,7 @@ type NetworkCollector struct {
}

// NewNetworkCollector returns a new NetworkCollector.
func NewNetworkCollector(
service *tezos.Service,
timeout time.Duration,
chainID string) *NetworkCollector {
func NewNetworkCollector(service *tezos.Service, timeout time.Duration, chainID string) *NetworkCollector {
return &NetworkCollector{
service: service,
timeout: timeout,
Expand Down Expand Up @@ -197,6 +196,7 @@ func getPeerStats(ctx context.Context, service *tezos.Service) (map[string]map[s
return peerStats, nil
}

/*
func getMempoolStats(ctx context.Context, service *tezos.Service, chainID string) (map[string]map[string]map[string]int, error) {
buildStats := func(ops []*tezos.Operation) map[string]map[string]int {
stats := map[string]map[string]int{}
Expand Down Expand Up @@ -240,6 +240,7 @@ func getMempoolStats(ctx context.Context, service *tezos.Service, chainID string
"unprocessed": buildStats(opsFromOpsAlt(ops.Unprocessed)),
}, nil
}
*/

// Collect implements prometheus.Collector and is called by the Prometheus registry when collecting metrics.
func (c *NetworkCollector) Collect(ch chan<- prometheus.Metric) {
Expand Down Expand Up @@ -300,14 +301,16 @@ func (c *NetworkCollector) Collect(ch chan<- prometheus.Metric) {
ch <- prometheus.MustNewConstMetric(bootstrappedDesc, prometheus.GaugeValue, v)
}

mempoolStats, err := getMempoolStats(ctx, &srv, c.chainID)
if err == nil {
for pool, stats := range mempoolStats {
for proto, protoStats := range stats {
for kind, count := range protoStats {
ch <- prometheus.MustNewConstMetric(mempoolDesc, prometheus.GaugeValue, float64(count), pool, proto, kind)
/*
mempoolStats, err := getMempoolStats(ctx, &srv, c.chainID)
if err == nil {
for pool, stats := range mempoolStats {
for proto, protoStats := range stats {
for kind, count := range protoStats {
ch <- prometheus.MustNewConstMetric(mempoolDesc, prometheus.GaugeValue, float64(count), pool, proto, kind)
}
}
}
}
}
*/
}
6 changes: 6 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ func main() {
reg.MustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{}))
reg.MustRegister(prometheus.NewGoCollector())
reg.MustRegister(collector.NewNetworkCollector(service, defaultTimeout, *chainID))
reg.MustRegister(collector.NewMempoolOperationsCollectorCollector(service, *chainID, []string{
"applied",
"branch_refused",
"refused",
"branch_delayed",
}))

http.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{}))
if err := http.ListenAndServe(*metricsAddr, nil); err != nil {
Expand Down
6 changes: 6 additions & 0 deletions vendor/github.com/ecadlabs/go-tezos/client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit fc64d38

Please sign in to comment.