diff --git a/Gopkg.lock b/Gopkg.lock index e7d72ae..c4bb16e 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -11,11 +11,11 @@ [[projects]] branch = "monitor-mempool-operations" - digest = "1:3c7349246af8fcbaccd2f7fb8c6b3ffefe8e270d11606b4dd512289147deecfe" + digest = "1:ed4fa2fe2ea95f23e336eb44796e971f5bd285e879dee6f9bee6067604904431" name = "github.com/ecadlabs/go-tezos" packages = ["."] pruneopts = "UT" - revision = "e92de97b5b89346b71af72e79df4a539f1932305" + revision = "fbbb6f7a57d122f85ca042752a4edd5ba8156ebc" [[projects]] digest = "1:db2e37856e0f3e8ad322792ca2ede9bcf821f4d482ca0ef300e6a9d85776a99a" @@ -117,7 +117,7 @@ name = "golang.org/x/sys" packages = ["windows"] pruneopts = "UT" - revision = "d432491b91382bba9c2a91776aa47c9430183a6f" + revision = "c5567b49c5d04a5f83870795b8c0e2df43a8ce32" [solve-meta] analyzer-name = "dep" diff --git a/collector/mempool.go b/collector/mempool.go new file mode 100644 index 0000000..9867c0c --- /dev/null +++ b/collector/mempool.go @@ -0,0 +1,85 @@ +package collector + +import ( + "context" + "sync" + + "github.com/ecadlabs/go-tezos" + "github.com/prometheus/client_golang/prometheus" +) + +type MempoolOperationsCollector struct { + prometheus.Collector // refers to counter + + counter *prometheus.CounterVec + service *tezos.Service + chainID string + wg sync.WaitGroup + cancel context.CancelFunc +} + +func (m *MempoolOperationsCollector) listener(ctx context.Context, pool string) { + ch := make(chan []*tezos.Operation, 100) + defer close(ch) + + go func() { + for ops := range ch { + for _, op := range ops { + for _, elem := range op.Contents { + m.counter.WithLabelValues(pool, op.Protocol, elem.OperationElemKind()).Inc() + } + } + } + }() + + for { + err := m.service.MonitorMempoolOperations(ctx, m.chainID, pool, ch) + if err == context.Canceled { + return + } + } +} + +func NewMempoolOperationsCollectorCollector(service *tezos.Service, chainID string, pools []string) *MempoolOperationsCollector { + counter := prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "tezos_node_mempool_operations_total", + Help: "The total number of mempool operations.", + }, + []string{"pool", "proto", "kind"}, + ) + + c := MempoolOperationsCollector{ + Collector: counter, + counter: counter, + service: service, + chainID: chainID, + } + + ctx, cancel := context.WithCancel(context.Background()) + c.cancel = cancel + + for _, p := range pools { + c.wg.Add(1) + go c.listener(ctx, p) + } + + return &c +} + +func (m *MempoolOperationsCollector) Shutdown(ctx context.Context) error { + m.cancel() + + sem := make(chan struct{}) + go func() { + m.wg.Wait() + close(sem) + }() + + select { + case <-sem: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} diff --git a/collector/network.go b/collector/network.go index 617537b..66889b7 100644 --- a/collector/network.go +++ b/collector/network.go @@ -49,11 +49,13 @@ var ( nil, nil) - mempoolDesc = prometheus.NewDesc( - "tezos_node_mempool_operations", - "The current number of mempool operations.", - []string{"pool", "proto", "kind"}, - nil) + /* + mempoolDesc = prometheus.NewDesc( + "tezos_node_mempool_operations", + "The current number of mempool operations.", + []string{"pool", "proto", "kind"}, + nil) + */ rpcFailedDesc = prometheus.NewDesc( "tezos_rpc_failed", @@ -70,10 +72,7 @@ type NetworkCollector struct { } // NewNetworkCollector returns a new NetworkCollector. -func NewNetworkCollector( - service *tezos.Service, - timeout time.Duration, - chainID string) *NetworkCollector { +func NewNetworkCollector(service *tezos.Service, timeout time.Duration, chainID string) *NetworkCollector { return &NetworkCollector{ service: service, timeout: timeout, @@ -197,6 +196,7 @@ func getPeerStats(ctx context.Context, service *tezos.Service) (map[string]map[s return peerStats, nil } +/* func getMempoolStats(ctx context.Context, service *tezos.Service, chainID string) (map[string]map[string]map[string]int, error) { buildStats := func(ops []*tezos.Operation) map[string]map[string]int { stats := map[string]map[string]int{} @@ -240,6 +240,7 @@ func getMempoolStats(ctx context.Context, service *tezos.Service, chainID string "unprocessed": buildStats(opsFromOpsAlt(ops.Unprocessed)), }, nil } +*/ // Collect implements prometheus.Collector and is called by the Prometheus registry when collecting metrics. func (c *NetworkCollector) Collect(ch chan<- prometheus.Metric) { @@ -300,14 +301,16 @@ func (c *NetworkCollector) Collect(ch chan<- prometheus.Metric) { ch <- prometheus.MustNewConstMetric(bootstrappedDesc, prometheus.GaugeValue, v) } - mempoolStats, err := getMempoolStats(ctx, &srv, c.chainID) - if err == nil { - for pool, stats := range mempoolStats { - for proto, protoStats := range stats { - for kind, count := range protoStats { - ch <- prometheus.MustNewConstMetric(mempoolDesc, prometheus.GaugeValue, float64(count), pool, proto, kind) + /* + mempoolStats, err := getMempoolStats(ctx, &srv, c.chainID) + if err == nil { + for pool, stats := range mempoolStats { + for proto, protoStats := range stats { + for kind, count := range protoStats { + ch <- prometheus.MustNewConstMetric(mempoolDesc, prometheus.GaugeValue, float64(count), pool, proto, kind) + } } } } - } + */ } diff --git a/main.go b/main.go index 7d48e69..f2ee2ef 100644 --- a/main.go +++ b/main.go @@ -38,6 +38,12 @@ func main() { reg.MustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{})) reg.MustRegister(prometheus.NewGoCollector()) reg.MustRegister(collector.NewNetworkCollector(service, defaultTimeout, *chainID)) + reg.MustRegister(collector.NewMempoolOperationsCollectorCollector(service, *chainID, []string{ + "applied", + "branch_refused", + "refused", + "branch_delayed", + })) http.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{})) if err := http.ListenAndServe(*metricsAddr, nil); err != nil { diff --git a/vendor/github.com/ecadlabs/go-tezos/client.go b/vendor/github.com/ecadlabs/go-tezos/client.go index 99e6241..8ab120d 100644 --- a/vendor/github.com/ecadlabs/go-tezos/client.go +++ b/vendor/github.com/ecadlabs/go-tezos/client.go @@ -63,6 +63,8 @@ type RPCClient struct { UserAgent string // Optional callback for metrics. RPCStatusCallback func(req *http.Request, status int, duration time.Duration, err error) + // Optional callback for metrics. + RPCHeaderCallback func(req *http.Request, resp *http.Response, duration time.Duration) } // NewRPCClient returns a new Tezos RPC client. @@ -133,6 +135,10 @@ func (c *RPCClient) Do(req *http.Request, v interface{}) (err error) { timestamp := time.Now() resp, err := c.Client.Do(req) + if c.RPCHeaderCallback != nil { + duration := time.Since(timestamp) + c.RPCHeaderCallback(req, resp, duration) + } if c.RPCStatusCallback != nil { defer func() {