Skip to content

Commit

Permalink
fix: Make Task::follow_logs immediately return existing log messages (#…
Browse files Browse the repository at this point in the history
…41)

Previously this would wait for an initial event before doing anything.
This is wrong, since if it is run then it won't print anything until an
event is emitted, which could take a long time to occur.

This commit changes it to immediately fetch log messages and only wait
for events after it has yielded those.
  • Loading branch information
swlynch99 authored Sep 6, 2024
1 parent 376b821 commit ac5e0aa
Showing 1 changed file with 11 additions and 3 deletions.
14 changes: 11 additions & 3 deletions crates/durable-client/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,14 +225,14 @@ impl Task {
let pool = client.pool.clone();

try_stream!({
let mut done = false;
let mut last_seen = -1;
let mut listener = PgListener::connect_with(&pool).await?;
listener
.listen_all(["durable:log", "durable:task-complete"])
.await?;

loop {
let event = listener.try_recv().await?;
let results = sqlx::query!(
"
SELECT message, index
Expand All @@ -253,10 +253,18 @@ impl Task {
last_seen = last_seen.max(record.index);
}

if done {
break;
}

let event = listener.try_recv().await?;
match event.as_ref() {
Some(event) if event.channel() != "durable:task-complete" => continue,
Some(event) => match serde_json::from_str::<TaskComplete>(event.payload()) {
Ok(payload) if payload.id == self.id => break,
Ok(payload) if payload.id == self.id => {
done = true;
continue;
}
Ok(_) => continue,
Err(_) => (),
},
Expand All @@ -272,7 +280,7 @@ impl Task {
.state;

if state == "complete" || state == "suspended" {
break;
done = true;
}
}
})
Expand Down

0 comments on commit ac5e0aa

Please sign in to comment.