Skip to content

Commit

Permalink
Add leader task to delete old workflows
Browse files Browse the repository at this point in the history
As old tasks and event entries pile up they end up slowing down the
execution of new tasks. They also end up using up database disk space.

This commit adds a task that automatically deletes old completed tasks
once a configurable amount of time has passed.
  • Loading branch information
swlynch99 committed Oct 18, 2024
1 parent 3b03e21 commit ae96c0f
Show file tree
Hide file tree
Showing 2 changed files with 133 additions and 2 deletions.
55 changes: 55 additions & 0 deletions crates/durable-runtime/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,33 @@ pub struct Config {
#[serde(with = "duration_seconds")]
pub suspend_margin: Duration,

/// The duration that completed tasks can be kept around before they are
/// automatically cleaned up. Setting this to `None` disables task cleanup
/// entirely.
///
/// Without this old database entries for tasks, including their logs and
/// events, will stick around and fill up the database. This will eventually
/// slow down event inserts.
///
/// The default is to schedule tasks for cleanup after 7 days.
#[serde(default = "default_option_seconds::<{ 3600 * 24 * 7 }>")]
#[serde(with = "option_duration_seconds")]
pub cleanup_age: Option<Duration>,

/// The maximum number of tasks that will be deleted in a single transaction
/// by task cleanup.
///
/// This prevents the cleanup task from blocking other work on the server
/// while it is deleting old tasks, since the amount of time it will spend
/// is bounded by the number of tasks it will delete.
///
/// Note that the cleanup task will still delete all old tasks. It will just
/// use multiple queries to do so.
///
/// The default limit is 10000 tasks.
#[serde(default = "default_u32::<10000>")]
pub cleanup_batch_limit: u32,

/// The maximum number of tasks that are allowed to be running on this node
/// at once.
///
Expand Down Expand Up @@ -174,6 +201,10 @@ const fn default_seconds<const SECONDS: u64>() -> Duration {
Duration::from_secs(SECONDS)
}

const fn default_option_seconds<const SECONDS: u64>() -> Option<Duration> {
Some(default_seconds::<{ SECONDS }>())
}

const fn default_u32<const N: u32>() -> u32 {
N
}
Expand Down Expand Up @@ -252,6 +283,30 @@ mod duration_seconds {
}
}

mod option_duration_seconds {
use std::time::Duration;

use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize)]
#[serde(transparent)]
struct Wrapper(#[serde(with = "super::duration_seconds")] Duration);

pub(crate) fn serialize<S>(duration: &Option<Duration>, ser: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
duration.as_ref().copied().map(Wrapper).serialize(ser)
}

pub(crate) fn deserialize<'de, D>(de: D) -> Result<Option<Duration>, D::Error>
where
D: serde::Deserializer<'de>,
{
Ok(<Option<Wrapper>>::deserialize(de)?.map(|v| v.0))
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
80 changes: 78 additions & 2 deletions crates/durable-runtime/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use sqlx::postgres::PgNotification;
use sqlx::types::Json;
use tokio::sync::{broadcast, mpsc, Mutex, Notify, Semaphore};
use tokio::task::JoinSet;
use tokio::time::Instant;
use tokio::time::{Instant, MissedTickBehavior};
use tracing::Instrument;
use wasmtime::component::Component;

Expand Down Expand Up @@ -265,14 +265,15 @@ impl Worker {
let heartbeat = Self::heartbeat(self.shared.clone(), self.worker_id);
let validate = Self::validate_workers(self.shared.clone(), self.worker_id);
let leader = Self::leader(self.shared.clone(), self.worker_id);
let cleanup = Self::task_cleanup(self.shared.clone(), worker_id);
let process = self.process_events();

// We want to run these all in the same tokio task so that if it has problems
// then the heartbeat will fail.
//
// Spawned tasks are put into their own joinset because running everything in a
// single task is not reasonable.
let (heartbeat, validate, leader, process) = (heartbeat, validate, leader, process)
let (heartbeat, validate, leader, process, cleanup) = (heartbeat, validate, leader, process, cleanup)
.join()
.instrument(tracing::info_span!("worker", worker_id))
.await;
Expand All @@ -289,6 +290,7 @@ impl Worker {
validate?;
heartbeat?;
leader?;
cleanup?;
result?;

Ok(())
Expand Down Expand Up @@ -562,6 +564,80 @@ impl Worker {
Ok(())
}

/// This task is responsible for deleting entries for old tasks.
async fn task_cleanup(shared: Arc<SharedState>, worker_id: i64) -> anyhow::Result<()> {
let cleanup_age = match shared.config.cleanup_age {
Some(cleanup_age) => cleanup_age,
None => {
shared.shutdown.wait().await;
return Ok(());
}
};

let _guard = ShutdownGuard::new(&shared.shutdown);
let mut shutdown = std::pin::pin!(shared.shutdown.wait());

let mut leader_id = shared.leader.get();
let mut leader_stream = std::pin::pin!(shared.leader.stream());

let mut interval = tokio::time::interval(Duration::from_secs(3600));
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);

'outer: loop {
tokio::select! {
biased;

_ = shutdown.as_mut() => break 'outer,
_ = interval.tick(), if leader_id == worker_id => (),
new_leader = leader_stream.as_mut().next() => {
leader_id = new_leader;
continue 'outer;
}
}

let limit = shared.config.cleanup_batch_limit as i64;
let interval = cleanup_age.into_pg_interval();
let mut conn = match shared.pool.acquire().await {
Ok(conn) => conn,
Err(e) => {
tracing::warn!("failed to acquire a connection to perform task cleanup: {e}");
continue;
}
};

// We do cleanup
loop {
let result = sqlx::query!(
r#"
DELETE FROM durable.task
WHERE task.ctid = ANY(ARRAY(
SELECT ctid
FROM durable.task
WHERE completed_at < NOW() - $1::interval
LIMIT $2
FOR UPDATE
))
"#,
interval,
limit
)
.execute(&mut *conn)
.await;

match result {
Ok(result) if result.rows_affected() < limit as u64 => break,
Ok(_) => (),
Err(e) => {
tracing::error!("failed to clean up old durable tasks: {e}");
break;
}
}
}
}

todo!()
}

async fn process_events(&mut self) -> anyhow::Result<()> {
let shutdown = self.shared.shutdown.clone();
let _guard = ShutdownGuard::new(&shutdown);
Expand Down

0 comments on commit ae96c0f

Please sign in to comment.