Skip to content

Commit

Permalink
Rework job spawning
Browse files Browse the repository at this point in the history
  • Loading branch information
augustuswm committed Nov 1, 2024
1 parent a186c15 commit d8158be
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 45 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@ mappers.toml
spec.toml
key.json
test-key.json
gcp.json

node_modules
96 changes: 51 additions & 45 deletions rfd-processor/src/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ pub enum JobError {

pub async fn processor(ctx: Arc<Context>) -> Result<(), JobError> {
let mut interval = interval(ctx.processor.interval);
interval.tick().await;

let pagination = ListPagination::default().limit(ctx.processor.batch_size);
tracing::info!(?interval, ?pagination, "Starting processor");

interval.tick().await;

loop {
if ctx.processor.enabled {
Expand All @@ -39,12 +40,30 @@ pub async fn processor(ctx: Arc<Context>) -> Result<(), JobError> {
)
.await?;

tracing::info!(jobs = ?jobs.iter().map(|job| job.id).collect::<Vec<_>>(), "Spawning jobs");

for job in jobs {
let job_id = job.id;
tokio::spawn(run_job(ctx.clone(), job).or_else(move |err| async move {
tracing::error!(id = ?job_id, ?err, "Spawned job failed");
Err(err)
}));
tracing::info!("Starting job processing");
match JobStore::start(&ctx.db.storage, job.id).await {
Ok(Some(job)) => {
tracing::info!(job = ?job_id, "Spawning job");
tokio::spawn(run_job(ctx.clone(), job).or_else(move |err| async move {
tracing::error!(id = ?job_id, ?err, "Spawned job failed");
Err(err)
}));
}
Ok(None) => {
tracing::error!(?job, "Job that was scheduled to run has gone missing! Was it started by a different task?");
}
Err(err) => {
tracing::warn!(
?job,
?err,
"Failed to start job. Was it previously started?"
);
}
}
}
}

Expand All @@ -54,48 +73,35 @@ pub async fn processor(ctx: Arc<Context>) -> Result<(), JobError> {

#[instrument(skip(ctx, job), fields(id = job.id, rfd = job.rfd, sha = ?job.sha, commited_at = ?job.committed_at))]
async fn run_job(ctx: Arc<Context>, job: Job) -> Result<(), JobError> {
tracing::info!("Starting job processing");
match JobStore::start(&ctx.db.storage, job.id).await {
Ok(Some(job)) => {
let location = GitHubRfdLocation {
client: ctx.github.client.clone(),
owner: job.owner.clone(),
repo: job.repository.clone(),
branch: job.branch.clone(),
commit: job.sha.clone(),
default_branch: ctx.github.repository.default_branch.clone(),
};

let update = GitHubRfdUpdate {
location,
number: job.rfd.into(),
committed_at: job.committed_at,
};

let updater = RfdUpdater::new(&ctx.actions, ctx.processor.update_mode);

match updater.handle(&ctx, &[update]).await {
Ok(_) => {
let _ = JobStore::complete(&ctx.db.storage, job.id)
.await
.tap_err(|err| tracing::error!(?err, "Failed to mark job as completed"));
}
Err(err) => {
tracing::error!(?err, "RFD update failed");
tracing::info!("Running job");

// TODO: Mark job as failed or retry?
}
}
}
Ok(None) => {
tracing::error!(?job, "Job that was scheduled to run has gone missing! Was it started by a different task?");
let location = GitHubRfdLocation {
client: ctx.github.client.clone(),
owner: job.owner.clone(),
repo: job.repository.clone(),
branch: job.branch.clone(),
commit: job.sha.clone(),
default_branch: ctx.github.repository.default_branch.clone(),
};

let update = GitHubRfdUpdate {
location,
number: job.rfd.into(),
committed_at: job.committed_at,
};

let updater = RfdUpdater::new(&ctx.actions, ctx.processor.update_mode);

match updater.handle(&ctx, &[update]).await {
Ok(_) => {
let _ = JobStore::complete(&ctx.db.storage, job.id)
.await
.tap_err(|err| tracing::error!(?err, "Failed to mark job as completed"));
}
Err(err) => {
tracing::warn!(
?job,
?err,
"Failed to start job. Was it previously started?"
);
tracing::error!(?err, "RFD update failed");

// TODO: Mark job as failed or retry?
}
}

Expand Down

0 comments on commit d8158be

Please sign in to comment.