diff --git a/.gitignore b/.gitignore index d132886..c88223d 100644 --- a/.gitignore +++ b/.gitignore @@ -7,5 +7,6 @@ mappers.toml spec.toml key.json test-key.json +gcp.json node_modules diff --git a/rfd-processor/src/processor.rs b/rfd-processor/src/processor.rs index 8c9fafe..a3a2cc7 100644 --- a/rfd-processor/src/processor.rs +++ b/rfd-processor/src/processor.rs @@ -24,9 +24,10 @@ pub enum JobError { pub async fn processor(ctx: Arc) -> Result<(), JobError> { let mut interval = interval(ctx.processor.interval); - interval.tick().await; - let pagination = ListPagination::default().limit(ctx.processor.batch_size); + tracing::info!(?interval, ?pagination, "Starting processor"); + + interval.tick().await; loop { if ctx.processor.enabled { @@ -39,12 +40,30 @@ pub async fn processor(ctx: Arc) -> Result<(), JobError> { ) .await?; + tracing::info!(jobs = ?jobs.iter().map(|job| job.id).collect::>(), "Spawning jobs"); + for job in jobs { let job_id = job.id; - tokio::spawn(run_job(ctx.clone(), job).or_else(move |err| async move { - tracing::error!(id = ?job_id, ?err, "Spawned job failed"); - Err(err) - })); + tracing::info!("Starting job processing"); + match JobStore::start(&ctx.db.storage, job.id).await { + Ok(Some(job)) => { + tracing::info!(job = ?job_id, "Spawning job"); + tokio::spawn(run_job(ctx.clone(), job).or_else(move |err| async move { + tracing::error!(id = ?job_id, ?err, "Spawned job failed"); + Err(err) + })); + } + Ok(None) => { + tracing::error!(?job, "Job that was scheduled to run has gone missing! Was it started by a different task?"); + } + Err(err) => { + tracing::warn!( + ?job, + ?err, + "Failed to start job. Was it previously started?" + ); + } + } } } @@ -54,48 +73,35 @@ pub async fn processor(ctx: Arc) -> Result<(), JobError> { #[instrument(skip(ctx, job), fields(id = job.id, rfd = job.rfd, sha = ?job.sha, commited_at = ?job.committed_at))] async fn run_job(ctx: Arc, job: Job) -> Result<(), JobError> { - tracing::info!("Starting job processing"); - match JobStore::start(&ctx.db.storage, job.id).await { - Ok(Some(job)) => { - let location = GitHubRfdLocation { - client: ctx.github.client.clone(), - owner: job.owner.clone(), - repo: job.repository.clone(), - branch: job.branch.clone(), - commit: job.sha.clone(), - default_branch: ctx.github.repository.default_branch.clone(), - }; - - let update = GitHubRfdUpdate { - location, - number: job.rfd.into(), - committed_at: job.committed_at, - }; - - let updater = RfdUpdater::new(&ctx.actions, ctx.processor.update_mode); - - match updater.handle(&ctx, &[update]).await { - Ok(_) => { - let _ = JobStore::complete(&ctx.db.storage, job.id) - .await - .tap_err(|err| tracing::error!(?err, "Failed to mark job as completed")); - } - Err(err) => { - tracing::error!(?err, "RFD update failed"); + tracing::info!("Running job"); - // TODO: Mark job as failed or retry? - } - } - } - Ok(None) => { - tracing::error!(?job, "Job that was scheduled to run has gone missing! Was it started by a different task?"); + let location = GitHubRfdLocation { + client: ctx.github.client.clone(), + owner: job.owner.clone(), + repo: job.repository.clone(), + branch: job.branch.clone(), + commit: job.sha.clone(), + default_branch: ctx.github.repository.default_branch.clone(), + }; + + let update = GitHubRfdUpdate { + location, + number: job.rfd.into(), + committed_at: job.committed_at, + }; + + let updater = RfdUpdater::new(&ctx.actions, ctx.processor.update_mode); + + match updater.handle(&ctx, &[update]).await { + Ok(_) => { + let _ = JobStore::complete(&ctx.db.storage, job.id) + .await + .tap_err(|err| tracing::error!(?err, "Failed to mark job as completed")); } Err(err) => { - tracing::warn!( - ?job, - ?err, - "Failed to start job. Was it previously started?" - ); + tracing::error!(?err, "RFD update failed"); + + // TODO: Mark job as failed or retry? } }