From ec43a2bb1abb73ddaa48b68dce90fa520658bad6 Mon Sep 17 00:00:00 2001 From: hopeyen Date: Tue, 15 Aug 2023 11:32:02 -0500 Subject: [PATCH] fix: timeout for message processing --- Cargo.lock | 4 +- Cargo.toml | 2 +- src/main.rs | 15 ++++++-- src/operator/mod.rs | 93 ++++++++++++++++++++++++--------------------- 4 files changed, 64 insertions(+), 50 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5555c74..51e4cc9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2264,8 +2264,8 @@ dependencies = [ [[package]] name = "graphcast-sdk" -version = "0.4.0" -source = "git+https://github.com/graphops/graphcast-sdk/#0f367f2140ea8da82bae2a9a52711df68dcfa78a" +version = "0.4.1" +source = "git+https://github.com/graphops/graphcast-sdk/?branch=hope/receiver-and-msg-type#33b67cf54a198313472d971aa389883c0209a618" dependencies = [ "anyhow", "async-graphql", diff --git a/Cargo.toml b/Cargo.toml index 8ea1049..2d28ed2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,7 +10,7 @@ keywords = ["graphprotocol", "data-integrity", "Indexer", "waku", "p2p"] categories = ["network-programming", "web-programming::http-client"] [dependencies] -graphcast-sdk = { git = "https://github.com/graphops/graphcast-sdk/" } +graphcast-sdk = { git = "https://github.com/graphops/graphcast-sdk/", branch = "hope/receiver-and-msg-type" } anyhow = "1.0" axum = { version = "0.5", features = ["headers"] } async-graphql = "4.0.16" diff --git a/src/main.rs b/src/main.rs index 13510c2..3f9a87b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,7 @@ use dotenv::dotenv; +use graphcast_sdk::{graphcast_agent::GraphcastAgent, WakuMessage}; use listener_radio::{config::Config, operator::RadioOperator}; +use std::sync::mpsc; #[tokio::main] async fn main() { @@ -7,12 +9,19 @@ async fn main() { // Parse basic configurations let radio_config = Config::args(); - + let (sender, receiver) = mpsc::channel::(); // Initialization - let radio_operator = RadioOperator::new(radio_config).await; + let agent = GraphcastAgent::new( + radio_config.to_graphcast_agent_config().await.unwrap(), + sender, + ) + .await + .expect("Initialize Graphcast agent"); + + let radio_operator = RadioOperator::new(radio_config, agent).await; // Start separate processes - radio_operator.prepare().await; + radio_operator.prepare(receiver).await; // Start radio operations radio_operator.run().await; diff --git a/src/operator/mod.rs b/src/operator/mod.rs index 7f38755..7db3899 100644 --- a/src/operator/mod.rs +++ b/src/operator/mod.rs @@ -1,11 +1,13 @@ +use anyhow::anyhow; +use graphcast_sdk::WakuMessage; use sqlx::postgres::PgPoolOptions; use sqlx::{Pool, Postgres}; use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::mpsc::Receiver; use std::sync::Arc; -use std::thread; use std::time::Duration; use tokio::time::{interval, sleep, timeout}; -use tracing::{debug, info, warn}; +use tracing::{debug, info, trace, warn}; use graphcast_sdk::graphcast_agent::{message_typing::GraphcastMessage, GraphcastAgent}; @@ -35,14 +37,9 @@ pub struct RadioOperator { impl RadioOperator { /// Create a radio operator with radio configurations, persisted data, /// graphcast agent, and control flow - pub async fn new(config: Config) -> RadioOperator { - debug!("Initializing Graphcast Agent"); - let (agent, receiver) = - GraphcastAgent::new(config.to_graphcast_agent_config().await.unwrap()) - .await - .expect("Initialize Graphcast agent"); - let graphcast_agent = Arc::new(agent); + pub async fn new(config: Config, graphcast_agent: GraphcastAgent) -> RadioOperator { debug!("Set global static instance of graphcast_agent"); + let graphcast_agent = Arc::new(graphcast_agent); _ = GRAPHCAST_AGENT.set(graphcast_agent.clone()); let notifier = Notifier::from_config(&config); @@ -62,40 +59,6 @@ impl RadioOperator { .await .expect("Could not run migration"); - let agent_ref = graphcast_agent.clone(); - let db_ref = db.clone(); - thread::spawn(move || { - tokio::runtime::Runtime::new().unwrap().block_on(async { - for msg in receiver { - if let Ok(msg) = agent_ref.decode::(msg.payload()).await { - if let Err(e) = add_message(&db_ref, msg).await { - warn!( - err = tracing::field::debug(&e), - "Failed to store public POI message" - ); - }; - } else if let Ok(msg) = agent_ref - .decode::(msg.payload()) - .await - { - if let Err(e) = add_message(&db_ref, msg).await { - warn!( - err = tracing::field::debug(&e), - "Failed to store version upgrade message" - ); - }; - } else if let Ok(msg) = agent_ref.decode::(msg.payload()).await { - if let Err(e) = add_message(&db_ref, msg).await { - warn!( - err = tracing::field::debug(&e), - "Failed to store simple test message" - ); - }; - } - } - }) - }); - debug!("Initialized Radio Operator"); RadioOperator { config, @@ -107,7 +70,7 @@ impl RadioOperator { /// Preparation for running the radio applications /// Expose metrics and subscribe to graphcast topics - pub async fn prepare(&self) { + pub async fn prepare(&self, receiver: Receiver) { // Set up Prometheus metrics url if configured if let Some(port) = self.config.metrics_port { debug!("Initializing metrics port"); @@ -126,6 +89,7 @@ impl RadioOperator { .await; } + self.message_processor(receiver).await; GRAPHCAST_AGENT .get() .unwrap() @@ -213,4 +177,45 @@ impl RadioOperator { continue; } } + + pub async fn message_processor(&self, receiver: Receiver) { + let agent_ref = self.graphcast_agent.clone(); + let db_ref = self.db.clone(); + tokio::spawn(async move { + for msg in receiver { + let timeout_duration = Duration::from_secs(1); + let process_res = + timeout(timeout_duration, process_message(&agent_ref, &db_ref, msg)).await; + match process_res { + Ok(Ok(r)) => trace!(msg_row_id = r, "New message added to DB"), + Ok(Err(e)) => { + warn!(err = tracing::field::debug(&e), "Failed to process message"); + } + Err(e) => debug!(error = e.to_string(), "Message processor timed out"), + } + } + }); + } +} + +pub async fn process_message( + graphcast_agent: &Arc, + db: &Pool, + msg: WakuMessage, +) -> Result { + if let Ok(msg) = graphcast_agent + .decode::(msg.payload()) + .await + { + add_message(db, msg).await + } else if let Ok(msg) = graphcast_agent + .decode::(msg.payload()) + .await + { + add_message(db, msg).await + } else if let Ok(msg) = graphcast_agent.decode::(msg.payload()).await { + add_message(db, msg).await + } else { + Err(anyhow!("Message cannot be decoded")) + } }