diff --git a/src/db.rs b/src/db.rs index d3d6c8a..7504111 100644 --- a/src/db.rs +++ b/src/db.rs @@ -27,11 +27,32 @@ pub struct Db { background_thread: JoinHandle<()>, } -#[derive(Default)] -pub struct Setting {} +pub struct Setting { + pub checkpoint_period: Duration, +} + +impl std::default::Default for Setting { + fn default() -> Self { + Self { + checkpoint_period: Duration::from_secs(60 * 60), + } + } +} + +impl Setting { + fn validate(&self) -> anyhow::Result<()> { + if self.checkpoint_period.as_secs() < 5 { + return Err(anyhow!("checkpoint period can't be less than 5 seconds")); + } + + Ok(()) + } +} impl Db { - pub fn open(path: &Path, _setting: Setting) -> anyhow::Result { + pub fn open(path: &Path, setting: Setting) -> anyhow::Result { + setting.validate()?; + if !path.exists() { std::fs::create_dir_all(path)?; } @@ -90,7 +111,7 @@ impl Db { let pager = pager.clone(); let tx_state = tx_state.clone(); std::thread::spawn(move || loop { - let Err(err) = receiver.recv_timeout(Duration::from_secs(60 * 60)) else { + let Err(err) = receiver.recv_timeout(setting.checkpoint_period) else { break; }; if err != std::sync::mpsc::RecvTimeoutError::Timeout { @@ -229,6 +250,12 @@ impl Db { return Err(anyhow!("cannot join background thread")); } + // Since we own self, it means there are no active transaction since active transaction + // borrows the db. And there are no ongoing flush and checkpoint since they also borrow + // the db. The background thread to periodically flush and perform checkpoint is also + // finished due to the join above. + Self::checkpoint(&self.pager, &self.wal, &self.tx_state)?; + // Since the background thread is finished, it means it doesn't hold the WAL anymore and // we can take the wal from Arc let wal = Arc::into_inner(self.wal).unwrap();