Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Explicitly handle some recoverable sqlx errors instead of failing #52

Merged
merged 1 commit into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

119 changes: 103 additions & 16 deletions crates/durable-runtime/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use rand::Rng;
use serde_json::value::RawValue;
use sqlx::postgres::PgNotification;
use sqlx::types::Json;
use tokio::sync::{broadcast, Mutex, Notify, Semaphore};
use tokio::sync::{broadcast, mpsc, Mutex, Notify, Semaphore};
use tokio::task::JoinSet;
use tokio::time::Instant;
use tracing::Instrument;
Expand Down Expand Up @@ -344,7 +344,8 @@ impl Worker {
//
// How we do things here is that we order workers by id, each worker looks at
// the one just in front of it and schedules a liveness check for just after
// that worker would expire. The exception to this is the
// that worker would expire. The worker with the oldest ID is then responsible
// for checking the one with the newest ID.

let _guard = ShutdownGuard::new(&shared.shutdown);
let mut shutdown = std::pin::pin!(shared.shutdown.wait());
Expand Down Expand Up @@ -559,29 +560,60 @@ impl Worker {
let shutdown = self.shared.shutdown.clone();
let _guard = ShutdownGuard::new(&shutdown);
let mut shutdown = std::pin::pin!(shutdown.wait());
let (tx, mut rx) = tokio::sync::mpsc::channel::<i64>(1024);

self.spawn_new_tasks().await?;
self.spawn_new_tasks(&tx).await?;
self.load_leader_id().await?;

'outer: loop {
let event = tokio::select! {
biased;

_ = shutdown.as_mut() => break 'outer,
_ = self.tasks.join_next(), if !self.tasks.is_empty() => None,
event = self.event_source.next() => Some(event?),
_ = self.tasks.join_next(), if !self.tasks.is_empty() => LoopEvent::TaskComplete,
id = rx.recv() => LoopEvent::TaskFailed(id.expect("failed task channel closed unexpectedly")),
event = self.event_source.next() => LoopEvent::Event(event?),
};

// Clean up any tasks that have completed already.
while self.tasks.try_join_next().is_some() {}

let event = match event {
Some(event) => event,
None => {
LoopEvent::Event(event) => event,
LoopEvent::TaskComplete => {
if self.blocked {
self.spawn_new_tasks().await?;
self.spawn_new_tasks(&tx).await?;
}

continue;
}
LoopEvent::TaskFailed(id) => {
let mut failed = vec![id];

let mut count = 0;
while let Ok(id) = rx.try_recv() {
failed.push(id);

count += 1;
if count > 1024 {
break;
}
}

sqlx::query!(
"
UPDATE durable.task
SET state = 'ready',
running_on = NULL
WHERE id = ANY($1::bigint[])
AND running_on = $2
",
&failed,
self.worker_id
)
.execute(&self.shared.pool)
.await?;

continue;
}
};
Expand All @@ -595,7 +627,7 @@ impl Worker {
running_on: Some(id),
..
}) if id != self.worker_id => (),
Event::Task(_) => self.spawn_new_tasks().await?,
Event::Task(_) => self.spawn_new_tasks(&tx).await?,
Event::TaskSuspend(_) => {
self.shared.suspend.notify_waiters();
}
Expand All @@ -613,7 +645,7 @@ impl Worker {

// We don't know what we missed so do everything.
Event::Lagged => {
self.spawn_new_tasks().await?;
self.spawn_new_tasks(&tx).await?;
self.load_leader_id().await?;
self.shared.suspend.notify_waiters();
}
Expand Down Expand Up @@ -647,7 +679,7 @@ impl Worker {

/// Spawn all new tasks that are scheduled on this server and also those
/// that aren't scheduled on any server.
async fn spawn_new_tasks(&mut self) -> anyhow::Result<()> {
async fn spawn_new_tasks(&mut self, failure: &mpsc::Sender<i64>) -> anyhow::Result<()> {
let max_tasks = self.shared.config.max_tasks;
let allowed = max_tasks.saturating_sub(self.tasks.len());
if allowed == 0 {
Expand Down Expand Up @@ -709,6 +741,7 @@ impl Worker {
let shared = self.shared.clone();
let engine = self.engine.clone();
let worker_id = self.worker_id;
let failures = failure.clone();

tracing::trace!(
target: "durable_runtime::worker::spawn_new_tasks",
Expand All @@ -723,6 +756,10 @@ impl Worker {
.await
{
tracing::error!(task_id, "worker task exited with an error: {e}");

// An error here means we are already shutting down and normal shutdown recovery
// should take care of any remaining tasks.
let _ = failures.send(task_id).await;
}
};

Expand Down Expand Up @@ -750,13 +787,52 @@ impl Worker {
) -> anyhow::Result<()> {
let task_id = task.id;

let status =
match AssertUnwindSafe(Self::run_task_impl(shared.clone(), engine, task, worker_id))
.catch_unwind()
.await
{
// We are using the loop here to do some early breaks.
#[allow(clippy::never_loop)]
let status = loop {
let future = Self::run_task_impl(shared.clone(), engine, task, worker_id);
break match AssertUnwindSafe(future).catch_unwind().await {
Ok(Ok(status)) => status,
Ok(Err(error)) => {
match find_sqlx_error(&error) {
// These errors are external to the runtime and should usually be resolvable
// if the workflow is retried somewhere else at a later point in time.
//
// This should also help to reduce the number of workflow aborts that are
// "not the fault" of the workflow itself. These will (eventually) instead
// get turned into worker errors, which can be handled at a higher level.
Some(
sqlx::Error::PoolTimedOut
| sqlx::Error::WorkerCrashed
| sqlx::Error::Io(_),
) => {
// Attempt to reset the task state so it can be picked up again.
//
// If this fails then the task failure gets reported to the main event
// loop which can ensure it gets retried.
sqlx::query!(
"
UPDATE durable.task
SET state = 'ready',
running_on = NULL
WHERE id = $1
AND running_on = $2
",
task_id,
worker_id
)
.execute(&shared.pool)
.await?;

break TaskStatus::Suspend;
}
Some(sqlx::Error::PoolClosed) => {
// Nothing we can do, since we can't make database queries.
break TaskStatus::Suspend;
}
_ => (),
}

let message = format!("{error:?}\n");

let result = sqlx::query!(
Expand Down Expand Up @@ -803,6 +879,7 @@ impl Worker {
TaskStatus::ExitFailure
}
};
};

match status {
TaskStatus::NotScheduledOnWorker => {
Expand Down Expand Up @@ -1087,3 +1164,13 @@ impl EventSource for PgEventSource {
}
}
}

enum LoopEvent {
Event(Event),
TaskComplete,
TaskFailed(i64),
}

fn find_sqlx_error(error: &anyhow::Error) -> Option<&sqlx::Error> {
error.chain().filter_map(|e| e.downcast_ref()).next()
}