Skip to content

Commit

Permalink
Explicitly handle some recoverable sqlx errors instead of failing (#52)
Browse files Browse the repository at this point in the history
The whole goal of durable is for its workflows to be reliable. While we
can't handle some errors, there are a subset of sqlx errors that are not
the fault of the workflow itself and should instead be handled by
performing a retry.

This commit handles these and translates them into either retries or
runtime-level errors.
  • Loading branch information
swlynch99 authored Sep 25, 2024
1 parent 1e5917f commit 24c19f9
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 16 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

119 changes: 103 additions & 16 deletions crates/durable-runtime/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use rand::Rng;
use serde_json::value::RawValue;
use sqlx::postgres::PgNotification;
use sqlx::types::Json;
use tokio::sync::{broadcast, Mutex, Notify, Semaphore};
use tokio::sync::{broadcast, mpsc, Mutex, Notify, Semaphore};
use tokio::task::JoinSet;
use tokio::time::Instant;
use tracing::Instrument;
Expand Down Expand Up @@ -344,7 +344,8 @@ impl Worker {
//
// How we do things here is that we order workers by id, each worker looks at
// the one just in front of it and schedules a liveness check for just after
// that worker would expire. The exception to this is the
// that worker would expire. The worker with the oldest ID is then responsible
// for checking the one with the newest ID.

let _guard = ShutdownGuard::new(&shared.shutdown);
let mut shutdown = std::pin::pin!(shared.shutdown.wait());
Expand Down Expand Up @@ -559,29 +560,60 @@ impl Worker {
let shutdown = self.shared.shutdown.clone();
let _guard = ShutdownGuard::new(&shutdown);
let mut shutdown = std::pin::pin!(shutdown.wait());
let (tx, mut rx) = tokio::sync::mpsc::channel::<i64>(1024);

self.spawn_new_tasks().await?;
self.spawn_new_tasks(&tx).await?;
self.load_leader_id().await?;

'outer: loop {
let event = tokio::select! {
biased;

_ = shutdown.as_mut() => break 'outer,
_ = self.tasks.join_next(), if !self.tasks.is_empty() => None,
event = self.event_source.next() => Some(event?),
_ = self.tasks.join_next(), if !self.tasks.is_empty() => LoopEvent::TaskComplete,
id = rx.recv() => LoopEvent::TaskFailed(id.expect("failed task channel closed unexpectedly")),
event = self.event_source.next() => LoopEvent::Event(event?),
};

// Clean up any tasks that have completed already.
while self.tasks.try_join_next().is_some() {}

let event = match event {
Some(event) => event,
None => {
LoopEvent::Event(event) => event,
LoopEvent::TaskComplete => {
if self.blocked {
self.spawn_new_tasks().await?;
self.spawn_new_tasks(&tx).await?;
}

continue;
}
LoopEvent::TaskFailed(id) => {
let mut failed = vec![id];

let mut count = 0;
while let Ok(id) = rx.try_recv() {
failed.push(id);

count += 1;
if count > 1024 {
break;
}
}

sqlx::query!(
"
UPDATE durable.task
SET state = 'ready',
running_on = NULL
WHERE id = ANY($1::bigint[])
AND running_on = $2
",
&failed,
self.worker_id
)
.execute(&self.shared.pool)
.await?;

continue;
}
};
Expand All @@ -595,7 +627,7 @@ impl Worker {
running_on: Some(id),
..
}) if id != self.worker_id => (),
Event::Task(_) => self.spawn_new_tasks().await?,
Event::Task(_) => self.spawn_new_tasks(&tx).await?,
Event::TaskSuspend(_) => {
self.shared.suspend.notify_waiters();
}
Expand All @@ -613,7 +645,7 @@ impl Worker {

// We don't know what we missed so do everything.
Event::Lagged => {
self.spawn_new_tasks().await?;
self.spawn_new_tasks(&tx).await?;
self.load_leader_id().await?;
self.shared.suspend.notify_waiters();
}
Expand Down Expand Up @@ -647,7 +679,7 @@ impl Worker {

/// Spawn all new tasks that are scheduled on this server and also those
/// that aren't scheduled on any server.
async fn spawn_new_tasks(&mut self) -> anyhow::Result<()> {
async fn spawn_new_tasks(&mut self, failure: &mpsc::Sender<i64>) -> anyhow::Result<()> {
let max_tasks = self.shared.config.max_tasks;
let allowed = max_tasks.saturating_sub(self.tasks.len());
if allowed == 0 {
Expand Down Expand Up @@ -709,6 +741,7 @@ impl Worker {
let shared = self.shared.clone();
let engine = self.engine.clone();
let worker_id = self.worker_id;
let failures = failure.clone();

tracing::trace!(
target: "durable_runtime::worker::spawn_new_tasks",
Expand All @@ -723,6 +756,10 @@ impl Worker {
.await
{
tracing::error!(task_id, "worker task exited with an error: {e}");

// An error here means we are already shutting down and normal shutdown recovery
// should take care of any remaining tasks.
let _ = failures.send(task_id).await;
}
};

Expand Down Expand Up @@ -750,13 +787,52 @@ impl Worker {
) -> anyhow::Result<()> {
let task_id = task.id;

let status =
match AssertUnwindSafe(Self::run_task_impl(shared.clone(), engine, task, worker_id))
.catch_unwind()
.await
{
// We are using the loop here to do some early breaks.
#[allow(clippy::never_loop)]
let status = loop {
let future = Self::run_task_impl(shared.clone(), engine, task, worker_id);
break match AssertUnwindSafe(future).catch_unwind().await {
Ok(Ok(status)) => status,
Ok(Err(error)) => {
match find_sqlx_error(&error) {
// These errors are external to the runtime and should usually be resolvable
// if the workflow is retried somewhere else at a later point in time.
//
// This should also help to reduce the number of workflow aborts that are
// "not the fault" of the workflow itself. These will (eventually) instead
// get turned into worker errors, which can be handled at a higher level.
Some(
sqlx::Error::PoolTimedOut
| sqlx::Error::WorkerCrashed
| sqlx::Error::Io(_),
) => {
// Attempt to reset the task state so it can be picked up again.
//
// If this fails then the task failure gets reported to the main event
// loop which can ensure it gets retried.
sqlx::query!(
"
UPDATE durable.task
SET state = 'ready',
running_on = NULL
WHERE id = $1
AND running_on = $2
",
task_id,
worker_id
)
.execute(&shared.pool)
.await?;

break TaskStatus::Suspend;
}
Some(sqlx::Error::PoolClosed) => {
// Nothing we can do, since we can't make database queries.
break TaskStatus::Suspend;
}
_ => (),
}

let message = format!("{error:?}\n");

let result = sqlx::query!(
Expand Down Expand Up @@ -803,6 +879,7 @@ impl Worker {
TaskStatus::ExitFailure
}
};
};

match status {
TaskStatus::NotScheduledOnWorker => {
Expand Down Expand Up @@ -1087,3 +1164,13 @@ impl EventSource for PgEventSource {
}
}
}

enum LoopEvent {
Event(Event),
TaskComplete,
TaskFailed(i64),
}

fn find_sqlx_error(error: &anyhow::Error) -> Option<&sqlx::Error> {
error.chain().filter_map(|e| e.downcast_ref()).next()
}

0 comments on commit 24c19f9

Please sign in to comment.