Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: aggregate metrics #41

Merged
merged 1 commit into from
Mar 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions migrations/20240325124203_aggregates.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS indexer_aggregates;
9 changes: 9 additions & 0 deletions migrations/20240325124203_aggregates.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
CREATE TABLE IF NOT EXISTS indexer_aggregates
(
id SERIAL PRIMARY KEY,
timestamp BIGINT NOT NULL,
graph_account VARCHAR(255) NOT NULL,
message_count BIGINT NOT NULL,
subgraphs_count BIGINT NOT NULL,
UNIQUE(graph_account, timestamp)
);
89 changes: 83 additions & 6 deletions src/db/resolver.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use async_graphql::{OutputType, SimpleObject};
use chrono::Utc;
use serde::{de::DeserializeOwned, Serialize};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use sqlx::{postgres::PgQueryResult, types::Json, FromRow, PgPool, Row as SqliteRow};
use std::ops::Deref;
use tracing::trace;
Expand All @@ -21,11 +21,11 @@ pub struct MessageID {
}

#[allow(dead_code)]
#[derive(FromRow, SimpleObject, Serialize, Debug, Clone)]
#[derive(FromRow, SimpleObject, Serialize, Deserialize, Debug, Clone)]
pub struct IndexerStats {
graph_account: String,
message_count: i64,
subgraphs_count: i64,
pub graph_account: String,
pub message_count: i64,
pub subgraphs_count: i64,
}

// Define graphql type for the Row in Messages
Expand Down Expand Up @@ -315,7 +315,7 @@ pub async fn get_indexer_stats(
SELECT
message->>'graph_account' as graph_account,
COUNT(*) as message_count,
COUNT(DISTINCT message->>'identifier') as subgraphs_count -- Updated field name
COUNT(DISTINCT message->>'identifier') as subgraphs_count
FROM messages
WHERE (CAST(message->>'nonce' AS BIGINT)) > $1";

Expand Down Expand Up @@ -352,6 +352,83 @@ pub async fn get_indexer_stats(
Ok(stats)
}

pub async fn insert_aggregate(
pool: &PgPool,
timestamp: i64,
graph_account: String,
message_count: i64,
subgraphs_count: i64,
) -> anyhow::Result<()> {
let _ = sqlx::query!(
"INSERT INTO indexer_aggregates (timestamp, graph_account, message_count, subgraphs_count) VALUES ($1, $2, $3, $4)",
timestamp,
graph_account,
message_count,
subgraphs_count
)
.execute(pool)
.await?;

Ok(())
}

pub async fn fetch_aggregates(
pool: &PgPool,
since_timestamp: i64,
) -> Result<Vec<IndexerStats>, anyhow::Error> {
let aggregates = sqlx::query_as!(
IndexerStats,
"SELECT graph_account, message_count, subgraphs_count FROM indexer_aggregates WHERE timestamp > $1",
since_timestamp
)
.fetch_all(pool)
.await
.map_err(anyhow::Error::new)?;

let results: Vec<IndexerStats> = aggregates
.clone()
.into_iter()
.map(|agg| IndexerStats {
graph_account: agg.graph_account,
message_count: agg.message_count,
subgraphs_count: agg.subgraphs_count,
})
.collect();

Ok(results)
}

pub async fn count_distinct_subgraphs(
pool: &PgPool,
from_timestamp: i64,
) -> Result<i64, anyhow::Error> {
let result = sqlx::query!(
"
SELECT COUNT(DISTINCT message->>'identifier') AS count
FROM messages
WHERE (CAST(message->>'nonce' AS BIGINT)) > $1
",
from_timestamp
)
.fetch_one(pool)
.await
.map_err(anyhow::Error::new)?;

Ok(result.count.unwrap_or(0) as i64)
}

pub async fn prune_old_aggregates(pool: &PgPool) -> Result<i64, anyhow::Error> {
let since_timestamp = (Utc::now() - chrono::Duration::try_days(90).unwrap()).timestamp();
let result = sqlx::query!(
"DELETE FROM indexer_aggregates WHERE timestamp < $1",
since_timestamp
)
.execute(pool)
.await?;

Ok(result.rows_affected() as i64)
}

#[cfg(test)]
mod tests {
use crate::message_types::PublicPoiMessage;
Expand Down
37 changes: 35 additions & 2 deletions src/operator/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use anyhow::anyhow;
use chrono::Utc;
use graphcast_sdk::WakuMessage;
use sqlx::postgres::PgPoolOptions;
use sqlx::{Pool, Postgres};
Expand All @@ -9,11 +10,14 @@ use std::thread::{self, JoinHandle};
use std::time::Duration;
use tokio::runtime::Runtime;
use tokio::time::{interval, sleep, timeout};
use tracing::{debug, info, trace, warn};
use tracing::{debug, error, info, trace, warn};

use graphcast_sdk::graphcast_agent::{message_typing::GraphcastMessage, GraphcastAgent};

use crate::db::resolver::{count_messages, prune_old_messages, retain_max_storage};
use crate::db::resolver::{
count_messages, get_indexer_stats, insert_aggregate, prune_old_aggregates, prune_old_messages,
retain_max_storage,
};
use crate::metrics::{CONNECTED_PEERS, GOSSIP_PEERS, PRUNED_MESSAGES, RECEIVED_MESSAGES};
use crate::{
config::Config,
Expand Down Expand Up @@ -110,6 +114,7 @@ impl RadioOperator {

let mut network_update_interval = interval(Duration::from_secs(600));
let mut summary_interval = interval(Duration::from_secs(180));
let mut daily_aggregate_interval = interval(Duration::from_secs(86400)); // 24 hours

let iteration_timeout = Duration::from_secs(180);
let update_timeout = Duration::from_secs(5);
Expand Down Expand Up @@ -214,6 +219,34 @@ impl RadioOperator {
}
}
},
_ = daily_aggregate_interval.tick() => {
if skip_iteration.load(Ordering::SeqCst) {
skip_iteration.store(false, Ordering::SeqCst);
continue;
}

let pool = &self.db;

match prune_old_aggregates(pool).await {
Ok(deleted_count) => trace!("Pruned {} old aggregate entries.", deleted_count),
Err(e) => error!("Failed to prune old aggregates: {:?}", e),
}

let from_timestamp = (Utc::now() - Duration::from_secs(86400)).timestamp();

match get_indexer_stats(pool, None, from_timestamp).await {
Ok(stats) => {
for stat in stats {
match insert_aggregate(pool, Utc::now().timestamp(), stat.graph_account, stat.message_count, stat.subgraphs_count).await {
Ok(_) => trace!("Successfully inserted daily aggregate."),
Err(e) => error!("Failed to insert daily aggregate: {:?}", e),
}
}
},
Err(e) => error!("Failed to fetch Indexer stats: {:?}", e),
}
},

else => break,
}

Expand Down
71 changes: 68 additions & 3 deletions src/server/model/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@ use async_graphql::{Context, EmptySubscription, Object, OutputType, Schema, Simp
use chrono::Utc;
use serde::{de::DeserializeOwned, Serialize};
use sqlx::{Pool, Postgres};
use std::{sync::Arc, time::Duration};
use std::{collections::HashMap, sync::Arc, time::Duration};
use thiserror::Error;
use tracing::error;

use crate::{
config::Config,
db::resolver::{
delete_message_all, delete_message_by_id, get_indexer_stats, list_active_indexers,
list_messages, list_rows, message_by_id, IndexerStats,
count_distinct_subgraphs, delete_message_all, delete_message_by_id, fetch_aggregates,
get_indexer_stats, list_active_indexers, list_messages, list_rows, message_by_id,
IndexerStats,
},
operator::radio_types::RadioPayloadMessage,
};
Expand All @@ -35,6 +37,13 @@ impl RadioContext {
}
}

#[derive(Serialize, SimpleObject)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also interested in additional summary like total topics covered, average message count (or some other things mentioned in the message propagation issues), but yeah some are not so easy to get from the existing code

It looks like the two HashMaps are indexed by indexer address, so having a Vec for active_indexers doesn't seem so useful when they can grab the keys from one of the maps.

I also think it might be easier to simply return the vec of IndexerStats (same thing as AggregatedIndexerStats?) so that users can do manipulation independently with greater flexibility

Copy link
Contributor Author

@pete-eiger pete-eiger Mar 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

total topics covered

you mean all distinct ipfs hashes that Listener Radio has received messages for?

average message count

i don't quite get this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

total topics covered

you mean all distinct ipfs hashes that Listener Radio has received messages for?

yep

average message count

i don't quite get this

you have the total_message_count for each indexer already, I'm interested in the average of that across all indexers. but, as I alluded to in the previous comment, it is something the client can do once they receive the vec of indexerStats

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i agree, let's leave it to the client

Copy link
Contributor Author

@pete-eiger pete-eiger Mar 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i will add one for distinct ipfs hashes thought

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cool, thanks.
What do you think about returning Vec<IndexerStats> directly as part of Summary instead of having to calculate total_message_count and average_subgraph_account? I think that allows more flexibility on the client side like calculating averages; but I can see an argument of providing that functionality to the client out-of-the-box... a bit of a tricky balance

pub struct Summary {
total_message_count: HashMap<String, i64>,
average_subgraphs_count: HashMap<String, i64>,
total_subgraphs_covered: i64,
}

// Unified query object for resolvers
#[derive(Default)]
pub struct QueryRoot;
Expand Down Expand Up @@ -127,6 +136,62 @@ impl QueryRoot {
message_by_id(pool, id).await?.get_message();
Ok(msg)
}

async fn query_aggregate_stats(
&self,
ctx: &Context<'_>,
days: i32,
) -> Result<Summary, HttpServiceError> {
let pool = ctx.data_unchecked::<Pool<Postgres>>();

let since_timestamp =
(Utc::now() - chrono::Duration::try_days(days.into()).unwrap()).timestamp();
let aggregates = fetch_aggregates(pool, since_timestamp)
.await
.map_err(HttpServiceError::Others)?;

let mut total_message_count: HashMap<String, i64> = HashMap::new();
let mut total_subgraphs_count: HashMap<String, i64> = HashMap::new();

let mut subgraphs_counts = HashMap::new();

for stat in aggregates {
*total_message_count
.entry(stat.graph_account.clone())
.or_default() += stat.message_count;
*total_subgraphs_count
.entry(stat.graph_account.clone())
.or_default() += stat.subgraphs_count;
subgraphs_counts
.entry(stat.graph_account.clone())
.or_insert_with(Vec::new)
.push(stat.subgraphs_count);
}

let average_subgraphs_count: HashMap<String, i64> = total_subgraphs_count
.iter()
.map(|(key, &total_count)| {
let count = subgraphs_counts.get(key).map_or(1, |counts| counts.len());
(
key.clone(),
if count > 0 {
(total_count as f64 / count as f64).ceil() as i64
} else {
0
},
)
})
.collect();

let total_subgraphs_covered = count_distinct_subgraphs(pool, since_timestamp)
.await
.map_err(HttpServiceError::Others)?;
Ok(Summary {
total_message_count,
average_subgraphs_count,
total_subgraphs_covered,
})
}
}

// Unified query object for resolvers
Expand Down
Loading