diff --git a/Cargo.lock b/Cargo.lock index 51e4cc9..9ca64b7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2264,8 +2264,8 @@ dependencies = [ [[package]] name = "graphcast-sdk" -version = "0.4.1" -source = "git+https://github.com/graphops/graphcast-sdk/?branch=hope/receiver-and-msg-type#33b67cf54a198313472d971aa389883c0209a618" +version = "0.4.2" +source = "git+https://github.com/graphops/graphcast-sdk/#ad073566ecb84bedefc7b755c96de36f0e674c51" dependencies = [ "anyhow", "async-graphql", diff --git a/Cargo.toml b/Cargo.toml index 2d28ed2..8ea1049 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,7 +10,7 @@ keywords = ["graphprotocol", "data-integrity", "Indexer", "waku", "p2p"] categories = ["network-programming", "web-programming::http-client"] [dependencies] -graphcast-sdk = { git = "https://github.com/graphops/graphcast-sdk/", branch = "hope/receiver-and-msg-type" } +graphcast-sdk = { git = "https://github.com/graphops/graphcast-sdk/" } anyhow = "1.0" axum = { version = "0.5", features = ["headers"] } async-graphql = "4.0.16" diff --git a/src/metrics/mod.rs b/src/metrics/mod.rs index 3847852..7da9d06 100644 --- a/src/metrics/mod.rs +++ b/src/metrics/mod.rs @@ -4,7 +4,7 @@ use axum::routing::get; use axum::Router; use once_cell::sync::Lazy; use prometheus::{core::Collector, Registry}; -use prometheus::{IntCounterVec, IntGauge, Opts}; +use prometheus::{IntCounterVec, IntGauge, IntCounter, Opts}; use std::{net::SocketAddr, str::FromStr}; use tracing::{debug, info}; @@ -68,19 +68,45 @@ pub static ACTIVE_PEERS: Lazy = Lazy::new(|| { m }); -// /// Number of content topics with traffic -// /// Updated periodically for the recently received messages -// #[allow(dead_code)] -// pub static ACTIVE_CONTENT_TOPICS: Lazy = Lazy::new(|| { -// let m = IntGauge::with_opts( -// Opts::new("active_content_topics", "Number of content topics being gossiped on network") -// .namespace("graphcast") -// .subsystem("listener_radio"), -// ) -// .expect("Failed to create active_content_topics gauges"); -// prometheus::register(Box::new(m.clone())).expect("Failed to register active_content_topics guage"); -// m -// }); +#[allow(dead_code)] +pub static CONNECTED_PEERS: Lazy = Lazy::new(|| { + let m = IntGauge::with_opts( + Opts::new( + "connected_peers", + "Number of Gossip peers connected with Graphcast agent", + ) + .namespace("graphcast") + .subsystem("listener_radio"), + ) + .expect("Failed to create connected_peers gauge"); + prometheus::register(Box::new(m.clone())).expect("Failed to register connected_peers gauge"); + m +}); + +#[allow(dead_code)] +pub static GOSSIP_PEERS: Lazy = Lazy::new(|| { + let m = IntGauge::with_opts( + Opts::new("gossip_peers", "Total number of gossip peers discovered") + .namespace("graphcast") + .subsystem("listener_radio"), + ) + .expect("Failed to create gossip_peers gauge"); + prometheus::register(Box::new(m.clone())).expect("Failed to register gossip_peers gauge"); + m +}); + +#[allow(dead_code)] +pub static RECEIVED_MESSAGES: Lazy = Lazy::new(|| { + let m = IntCounter::with_opts( + Opts::new("received_messages", "Number of messages received in total") + .namespace("graphcast") + .subsystem("listener_radio"), + ) + .expect("Failed to create received_messages counter"); + prometheus::register(Box::new(m.clone())) + .expect("Failed to register received_messages counter"); + m +}); #[allow(dead_code)] pub static REGISTRY: Lazy = Lazy::new(prometheus::Registry::new); @@ -102,6 +128,9 @@ pub fn start_metrics() { Box::new(INVALIDATED_MESSAGES.clone()), Box::new(CACHED_MESSAGES.clone()), Box::new(ACTIVE_PEERS.clone()), + Box::new(CONNECTED_PEERS.clone()), + Box::new(GOSSIP_PEERS.clone()), + Box::new(RECEIVED_MESSAGES.clone()), ], ); } diff --git a/src/operator/mod.rs b/src/operator/mod.rs index 7db3f2f..048ca5f 100644 --- a/src/operator/mod.rs +++ b/src/operator/mod.rs @@ -10,8 +10,9 @@ use std::time::Duration; use tokio::time::{interval, sleep, timeout}; use tracing::{debug, info, trace, warn}; -use graphcast_sdk::graphcast_agent::{message_typing::GraphcastMessage, GraphcastAgent}; +use graphcast_sdk::graphcast_agent::{message_typing::GraphcastMessage, waku_handling::connected_peer_count, GraphcastAgent}; +use crate::metrics::{CONNECTED_PEERS, GOSSIP_PEERS, RECEIVED_MESSAGES}; use crate::{config::Config, db::resolver::{add_message, list_messages}, message_types::{PublicPoiMessage, SimpleMessage, UpgradeIntentMessage}, @@ -139,6 +140,9 @@ impl RadioOperator { trace!("Network update"); let connection = network_check(&self.graphcast_agent().node_handle); debug!(network_check = tracing::field::debug(&connection), "Network condition"); + // Update the number of peers connected + CONNECTED_PEERS.set(connected_peer_count(&self.graphcast_agent().node_handle).unwrap_or_default().try_into().unwrap_or_default()); + GOSSIP_PEERS.set(self.graphcast_agent.number_of_peers().try_into().unwrap_or_default()); if let Some(true) = self.config.filter_protocol { if skip_iteration.load(Ordering::SeqCst) { @@ -196,6 +200,7 @@ impl RadioOperator { tokio::spawn(async move { for msg in receiver { trace!("Message processing"); + RECEIVED_MESSAGES.inc(); let timeout_duration = Duration::from_secs(1); let process_res = timeout(timeout_duration, process_message(&agent_ref, &db_ref, msg)).await;