Skip to content

Commit

Permalink
Add a warning message when a worker shuts down unexpectedly
Browse files Browse the repository at this point in the history
  • Loading branch information
swlynch99 committed Oct 24, 2024
1 parent 3ef1ef9 commit 4b97b7f
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 5 deletions.
6 changes: 6 additions & 0 deletions crates/durable-runtime/src/flag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,12 @@ impl<'a> ShutdownGuard<'a> {

impl<'a> Drop for ShutdownGuard<'a> {
fn drop(&mut self) {
if !self.0.is_raised() {
tracing::warn!(
"durable worker task shutting down without the shutdown flag being raised"
);
}

self.0.raise();
}
}
16 changes: 11 additions & 5 deletions crates/durable-runtime/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,11 +262,17 @@ impl Worker {
self.load_leader_id().await?;

let worker_id = self.worker_id;
let heartbeat = Self::heartbeat(self.shared.clone(), self.worker_id);
let validate = Self::validate_workers(self.shared.clone(), self.worker_id);
let leader = Self::leader(self.shared.clone(), self.worker_id);
let cleanup = Self::task_cleanup(self.shared.clone(), worker_id);
let process = self.process_events();
let heartbeat = Self::heartbeat(self.shared.clone(), self.worker_id)
.instrument(tracing::info_span!("heartbeat"));
let validate = Self::validate_workers(self.shared.clone(), self.worker_id)
.instrument(tracing::info_span!("validate_workers"));
let leader = Self::leader(self.shared.clone(), self.worker_id)
.instrument(tracing::info_span!("leader"));
let cleanup = Self::task_cleanup(self.shared.clone(), worker_id)
.instrument(tracing::info_span!("task_cleanup"));
let process = self
.process_events()
.instrument(tracing::info_span!("process"));

// We want to run these all in the same tokio task so that if it has problems
// then the heartbeat will fail.
Expand Down

0 comments on commit 4b97b7f

Please sign in to comment.