Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: timeout for message processing #17

Merged
merged 1 commit into from
Aug 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ keywords = ["graphprotocol", "data-integrity", "Indexer", "waku", "p2p"]
categories = ["network-programming", "web-programming::http-client"]

[dependencies]
graphcast-sdk = { git = "https://github.com/graphops/graphcast-sdk/" }
graphcast-sdk = { git = "https://github.com/graphops/graphcast-sdk/", branch = "hope/receiver-and-msg-type" }
anyhow = "1.0"
axum = { version = "0.5", features = ["headers"] }
async-graphql = "4.0.16"
Expand Down
15 changes: 12 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,27 @@
use dotenv::dotenv;
use graphcast_sdk::{graphcast_agent::GraphcastAgent, WakuMessage};
use listener_radio::{config::Config, operator::RadioOperator};
use std::sync::mpsc;

#[tokio::main]
async fn main() {
dotenv().ok();

// Parse basic configurations
let radio_config = Config::args();

let (sender, receiver) = mpsc::channel::<WakuMessage>();
// Initialization
let radio_operator = RadioOperator::new(radio_config).await;
let agent = GraphcastAgent::new(
radio_config.to_graphcast_agent_config().await.unwrap(),
sender,
)
.await
.expect("Initialize Graphcast agent");

let radio_operator = RadioOperator::new(radio_config, agent).await;

// Start separate processes
radio_operator.prepare().await;
radio_operator.prepare(receiver).await;

// Start radio operations
radio_operator.run().await;
Expand Down
93 changes: 49 additions & 44 deletions src/operator/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use anyhow::anyhow;
use graphcast_sdk::WakuMessage;
use sqlx::postgres::PgPoolOptions;
use sqlx::{Pool, Postgres};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::Receiver;
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use tokio::time::{interval, sleep, timeout};
use tracing::{debug, info, warn};
use tracing::{debug, info, trace, warn};

use graphcast_sdk::graphcast_agent::{message_typing::GraphcastMessage, GraphcastAgent};

Expand Down Expand Up @@ -35,14 +37,9 @@ pub struct RadioOperator {
impl RadioOperator {
/// Create a radio operator with radio configurations, persisted data,
/// graphcast agent, and control flow
pub async fn new(config: Config) -> RadioOperator {
debug!("Initializing Graphcast Agent");
let (agent, receiver) =
GraphcastAgent::new(config.to_graphcast_agent_config().await.unwrap())
.await
.expect("Initialize Graphcast agent");
let graphcast_agent = Arc::new(agent);
pub async fn new(config: Config, graphcast_agent: GraphcastAgent) -> RadioOperator {
debug!("Set global static instance of graphcast_agent");
let graphcast_agent = Arc::new(graphcast_agent);
_ = GRAPHCAST_AGENT.set(graphcast_agent.clone());

let notifier = Notifier::from_config(&config);
Expand All @@ -62,40 +59,6 @@ impl RadioOperator {
.await
.expect("Could not run migration");

let agent_ref = graphcast_agent.clone();
let db_ref = db.clone();
thread::spawn(move || {
tokio::runtime::Runtime::new().unwrap().block_on(async {
for msg in receiver {
if let Ok(msg) = agent_ref.decode::<PublicPoiMessage>(msg.payload()).await {
if let Err(e) = add_message(&db_ref, msg).await {
warn!(
err = tracing::field::debug(&e),
"Failed to store public POI message"
);
};
} else if let Ok(msg) = agent_ref
.decode::<VersionUpgradeMessage>(msg.payload())
.await
{
if let Err(e) = add_message(&db_ref, msg).await {
warn!(
err = tracing::field::debug(&e),
"Failed to store version upgrade message"
);
};
} else if let Ok(msg) = agent_ref.decode::<SimpleMessage>(msg.payload()).await {
if let Err(e) = add_message(&db_ref, msg).await {
warn!(
err = tracing::field::debug(&e),
"Failed to store simple test message"
);
};
}
}
})
});

debug!("Initialized Radio Operator");
RadioOperator {
config,
Expand All @@ -107,7 +70,7 @@ impl RadioOperator {

/// Preparation for running the radio applications
/// Expose metrics and subscribe to graphcast topics
pub async fn prepare(&self) {
pub async fn prepare(&self, receiver: Receiver<WakuMessage>) {
// Set up Prometheus metrics url if configured
if let Some(port) = self.config.metrics_port {
debug!("Initializing metrics port");
Expand All @@ -126,6 +89,7 @@ impl RadioOperator {
.await;
}

self.message_processor(receiver).await;
GRAPHCAST_AGENT
.get()
.unwrap()
Expand Down Expand Up @@ -213,4 +177,45 @@ impl RadioOperator {
continue;
}
}

pub async fn message_processor(&self, receiver: Receiver<WakuMessage>) {
let agent_ref = self.graphcast_agent.clone();
let db_ref = self.db.clone();
tokio::spawn(async move {
for msg in receiver {
let timeout_duration = Duration::from_secs(1);
let process_res =
timeout(timeout_duration, process_message(&agent_ref, &db_ref, msg)).await;
match process_res {
Ok(Ok(r)) => trace!(msg_row_id = r, "New message added to DB"),
Ok(Err(e)) => {
warn!(err = tracing::field::debug(&e), "Failed to process message");
}
Err(e) => debug!(error = e.to_string(), "Message processor timed out"),
}
}
});
}
}

pub async fn process_message(
graphcast_agent: &Arc<GraphcastAgent>,
db: &Pool<Postgres>,
msg: WakuMessage,
) -> Result<i64, anyhow::Error> {
if let Ok(msg) = graphcast_agent
.decode::<PublicPoiMessage>(msg.payload())
.await
{
add_message(db, msg).await
} else if let Ok(msg) = graphcast_agent
.decode::<VersionUpgradeMessage>(msg.payload())
.await
{
add_message(db, msg).await
} else if let Ok(msg) = graphcast_agent.decode::<SimpleMessage>(msg.payload()).await {
add_message(db, msg).await
} else {
Err(anyhow!("Message cannot be decoded"))
}
}
Loading