diff --git a/.gitignore b/.gitignore index 3475174..55caad0 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,5 @@ tarpaulin-report.html test1 test2 test_btree + +*.log diff --git a/Cargo.toml b/Cargo.toml index 560223e..c57b978 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,3 +20,7 @@ tempfile = "3.10.1" [[bench]] name = "db_benchmark" harness = false + +[[test]] +name = "db_crash" +harness = false diff --git a/src/btree.rs b/src/btree.rs index b7f33ff..028ac46 100644 --- a/src/btree.rs +++ b/src/btree.rs @@ -94,9 +94,7 @@ impl<'a> BTree<'a, LogContext<'a>> { } let new_right_page = self.new_page()?; - let mut new_right_leaf = new_right_page - .init_leaf(self.ctx)? - .expect("new page should always be convertible to leaf page"); + let mut new_right_leaf = new_right_page.init_leaf(self.ctx)?; let new_right_pgid = new_right_leaf.id(); let mut pivot = self.leaf_split_and_insert( &mut result.leaf.node, @@ -108,10 +106,7 @@ impl<'a> BTree<'a, LogContext<'a>> { let is_root_leaf = result.leaf.node.id() == self.root; if is_root_leaf { - let new_left_leaf = self - .new_page()? - .init_leaf(self.ctx)? - .expect("new page should always be convertible to leaf page"); + let new_left_leaf = self.new_page()?.init_leaf(self.ctx)?; self.split_root_leaf(result.leaf.node, new_left_leaf, new_right_pgid, &mut pivot)?; } else { self.propagate_interior_splitting(result.interiors, new_right_pgid, pivot)?; @@ -150,11 +145,7 @@ impl<'a> BTree<'a, LogContext<'a>> { current = next; }; - let Some(node) = page.init_leaf(self.ctx)? else { - return Err(anyhow!( - "invalid state, btree contain non-interior and non-leaf page" - )); - }; + let node = page.init_leaf(self.ctx)?; let (i, found) = self.search_key_in_leaf(&node, key)?; Ok(LookupForUpdateResult { diff --git a/src/db.rs b/src/db.rs index 19f6e3b..db122d9 100644 --- a/src/db.rs +++ b/src/db.rs @@ -1,7 +1,7 @@ use crate::btree::{BTreeRead, BTreeWrite, Cursor}; use crate::pager::{DbState, LogContext, PageId, PageIdExt, Pager}; use crate::recovery::{recover, undo_txn}; -use crate::wal::{TxId, TxIdExt, TxState, Wal, WalRecord}; +use crate::wal::{TxId, TxState, Wal, WalRecord}; use anyhow::anyhow; use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard}; use std::fs::OpenOptions; @@ -94,15 +94,10 @@ impl Db { .create(true) .truncate(false) .open(wal_path)?; - let wal = recover(wal_file, &pager, page_size)?; - let wal = Arc::new(wal); + let result = recover(wal_file, &pager, page_size)?; + let wal = Arc::new(result.wal); - let next_txid = if let Some(txid) = header.last_txid { - txid.next().get() - } else { - 1 - }; - let next_txid = AtomicU64::new(next_txid); + let next_txid = AtomicU64::new(result.next_txid.get()); // at this point, the recovery is already finished, so there is no active transaction let tx_state = Arc::new(RwLock::new(TxState::None)); @@ -176,7 +171,6 @@ impl Db { let header = Header { version: 0, page_size: DEFAULT_PAGE_SIZE as u32, - last_txid: None, }; let mut buff = vec![0; 2 * DB_HEADER_SIZE]; @@ -191,7 +185,7 @@ impl Db { let tx_guard = self.tx_lock.write(); let mut tx_state = self.tx_state.write(); - self.undo_dangling_tx(&mut tx_state)?; + self.finish_dangling_tx(&mut tx_state)?; let txid = self.next_txid.fetch_add(1, Ordering::SeqCst); let txid = TxId::new(txid).unwrap(); @@ -200,39 +194,58 @@ impl Db { Tx::new(txid, self, tx_guard) } - fn undo_dangling_tx(&self, tx_state: &mut TxState) -> anyhow::Result<()> { - let TxState::Active(txid) = *tx_state else { - return Ok(()); - }; - - log::debug!("previous transaction {txid:?} is not closed yet"); - - let lsn = self.wal.append(txid, None, WalRecord::Rollback)?; - *tx_state = TxState::Aborting { - txid, - rollback: lsn, - last_undone: lsn, - }; - let TxState::Aborting { - ref mut last_undone, - .. - } = &mut *tx_state - else { - unreachable!(); - }; - - undo_txn(&self.pager, &self.wal, txid, lsn, last_undone)?; - self.wal.append(txid, None, WalRecord::End)?; - *tx_state = TxState::None; + fn finish_dangling_tx(&self, tx_state: &mut TxState) -> anyhow::Result<()> { + match *tx_state { + TxState::None => Ok(()), + TxState::Active(txid) => { + log::debug!("previous transaction {txid:?} is not closed yet"); + + let lsn = self.wal.append(txid, None, WalRecord::Rollback)?; + *tx_state = TxState::Aborting { + txid, + rollback: lsn, + last_undone: lsn, + }; + let TxState::Aborting { + ref mut last_undone, + .. + } = &mut *tx_state + else { + unreachable!(); + }; - Ok(()) + undo_txn(&self.pager, &self.wal, txid, lsn, last_undone)?; + self.wal.append(txid, None, WalRecord::End)?; + *tx_state = TxState::None; + Ok(()) + } + TxState::Aborting { + txid, + rollback, + ref mut last_undone, + } => { + log::debug!("continue aborting previous transaction {txid:?}"); + + undo_txn(&self.pager, &self.wal, txid, rollback, last_undone)?; + self.wal.append(txid, None, WalRecord::End)?; + *tx_state = TxState::None; + Ok(()) + } + TxState::Committing(txid) => { + let commit_lsn = self.wal.append(txid, None, WalRecord::Commit)?; + self.wal.append(txid, None, WalRecord::End)?; + self.wal.sync(commit_lsn)?; + *tx_state = TxState::None; + Ok(()) + } + } } pub fn read(&self) -> anyhow::Result { let tx_guard = self.tx_lock.read(); let mut tx_state = self.tx_state.write(); - self.undo_dangling_tx(&mut tx_state)?; + self.finish_dangling_tx(&mut tx_state)?; let txid = self.next_txid.fetch_add(1, Ordering::SeqCst); let txid = TxId::new(txid).unwrap(); @@ -276,14 +289,13 @@ impl Db { } } -const DB_HEADER_SIZE: usize = 32; +const DB_HEADER_SIZE: usize = 24; const DEFAULT_PAGE_SIZE: usize = 0x1000; const MAGIC_HEADER: &[u8] = b"dbest000"; struct Header { version: u32, page_size: u32, - last_txid: Option, } impl Header { @@ -291,9 +303,8 @@ impl Header { buff[0..8].copy_from_slice(MAGIC_HEADER); buff[8..12].copy_from_slice(&self.version.to_be_bytes()); buff[12..16].copy_from_slice(&self.page_size.to_be_bytes()); - buff[16..24].copy_from_slice(&self.last_txid.to_be_bytes()); - let checksum = crc64::crc64(0, &buff[0..24]); - buff[24..32].copy_from_slice(&checksum.to_be_bytes()); + let checksum = crc64::crc64(0, &buff[0..16]); + buff[16..24].copy_from_slice(&checksum.to_be_bytes()); } fn decode(buff: &[u8]) -> Option { @@ -307,13 +318,8 @@ impl Header { let version = u32::from_be_bytes(buff[8..12].try_into().unwrap()); let page_size = u32::from_be_bytes(buff[12..16].try_into().unwrap()); - let last_txid = TxId::from_be_bytes(buff[16..24].try_into().unwrap()); - Some(Self { - version, - page_size, - last_txid, - }) + Some(Self { version, page_size }) } } diff --git a/src/pager.rs b/src/pager.rs index 2d16d34..2be9aae 100644 --- a/src/pager.rs +++ b/src/pager.rs @@ -100,12 +100,11 @@ struct PagerFlushInternal { const MINIMUM_PAGE_SIZE: usize = 256; -const PAGE_HEADER_SIZE: usize = 32; +const PAGE_HEADER_SIZE: usize = 24; const PAGE_HEADER_VERSION_RANGE: Range = 0..2; const PAGE_HEADER_KIND_INDEX: usize = 2; -const PAGE_HEADER_REC_LSN_RANGE: Range = 8..16; -const PAGE_HEADER_PAGE_LSN_RANGE: Range = 16..24; -const PAGE_HEADER_PAGE_ID_RANGE: Range = 24..32; +const PAGE_HEADER_PAGE_LSN_RANGE: Range = 8..16; +const PAGE_HEADER_PAGE_ID_RANGE: Range = 16..24; const PAGE_FOOTER_SIZE: usize = 8; const PAGE_FOOTER_CHECKSUM_RANGE: Range = 0..8; @@ -151,8 +150,6 @@ macro_rules! const_assert { const_assert!(PAGE_HEADER_VERSION_RANGE.end <= PAGE_HEADER_SIZE); const_assert!(range_size(PAGE_HEADER_VERSION_RANGE) == 2); const_assert!(PAGE_HEADER_KIND_INDEX < PAGE_HEADER_SIZE); -const_assert!(PAGE_HEADER_REC_LSN_RANGE.end <= PAGE_HEADER_SIZE); -const_assert!(range_size(PAGE_HEADER_REC_LSN_RANGE) == 8); const_assert!(PAGE_HEADER_PAGE_LSN_RANGE.end <= PAGE_HEADER_SIZE); const_assert!(range_size(PAGE_HEADER_PAGE_LSN_RANGE) == 8); const_assert!(PAGE_HEADER_PAGE_ID_RANGE.end <= PAGE_HEADER_SIZE); @@ -260,7 +257,8 @@ impl Pager { RwLock::new(PageMeta { id: dummy_pgid, kind: PageKind::None, - wal: None, + lsn: None, + is_dirty: false, }) }) .collect::>() @@ -343,7 +341,8 @@ impl Pager { let mut meta = PageMeta { id: dummy_pgid, kind: PageKind::None, - wal: None, + lsn: None, + is_dirty: false, }; let ok = Self::decode_internal(page_size, &mut meta, buff)?; @@ -521,7 +520,8 @@ impl Pager { *meta_locked = PageMeta { id: pgid, kind: PageKind::None, - wal: None, + lsn: None, + is_dirty: false, }; } drop(meta_locked); @@ -582,7 +582,8 @@ impl Pager { *meta_locked = PageMeta { id: pgid, kind: PageKind::None, - wal: None, + lsn: None, + is_dirty: false, }; } drop(meta_locked); @@ -730,12 +731,9 @@ impl Pager { // TODO: maybe acquire read lock first and skip it if it's already clean. // only when it's dirty, we acquire write lock and flush it. // TODO: maybe we can batch few pages together to reduce the number of syscall. - let mut frame = meta.write(); - if let Some(ref wal_info) = frame.wal { - wal.sync(wal_info.page)?; - frame.wal = None; - let frame = RwLockWriteGuard::downgrade(frame); - + let frame = meta.read(); + if let Some(lsn) = frame.lsn { + wal.sync(lsn)?; Self::encode(&frame, buffer)?; Self::flush_page(&self.f, &self.double_buff_f, frame.id, buffer)?; } @@ -781,6 +779,12 @@ impl Pager { )); } + log::debug!( + "decode page pgid={:?} kind={:?} lsn={:?}", + meta.id, + meta.kind, + meta.lsn + ); Ok(()) } @@ -798,7 +802,6 @@ impl Pager { let buff_checksum = &footer[PAGE_FOOTER_CHECKSUM_RANGE]; let buff_version = &header[PAGE_HEADER_VERSION_RANGE]; let buff_kind = &header[PAGE_HEADER_KIND_INDEX]; - let buff_rec_lsn = &header[PAGE_HEADER_REC_LSN_RANGE]; let buff_page_lsn = &header[PAGE_HEADER_PAGE_LSN_RANGE]; let buff_page_id = &header[PAGE_HEADER_PAGE_ID_RANGE]; let buff_checksum_content = &buff[..page_size - PAGE_FOOTER_SIZE]; @@ -813,7 +816,6 @@ impl Pager { return Err(anyhow!("page version {} is not supported", version)); } - let rec_lsn = Lsn::from_be_bytes(buff_rec_lsn.try_into().unwrap()); let page_lsn = Lsn::from_be_bytes(buff_page_lsn.try_into().unwrap()); let Some(page_id) = PageId::from_be_bytes(buff_page_id.try_into().unwrap()) else { return Err(anyhow!("found an empty page_id field when decoding page",)); @@ -822,34 +824,14 @@ impl Pager { let kind = match buff_kind { 0 => PageKind::None, 1 => Self::decode_interior_page(payload)?, - 2 => Self::decode_leaf_page(buff)?, + 2 => Self::decode_leaf_page(payload)?, 3 => Self::decode_overflow_page(buff)?, 4 => Self::decode_freelist_page(buff)?, _ => return Err(anyhow!("page kind {buff_kind} is not recognized")), }; meta.id = page_id; meta.kind = kind; - - if let Some(rec_lsn) = rec_lsn { - let Some(page_lsn) = page_lsn else { - return Err(anyhow!( - "page {} has rec_lsn but no page_lsn", - meta.id.get() - )); - }; - meta.wal = Some(PageWalInfo { - rec: rec_lsn, - page: page_lsn, - }); - } else { - if page_lsn.is_some() { - return Err(anyhow!( - "page {} has page_lsn but no rec_lsn", - meta.id.get() - )); - } - meta.wal = None; - } + meta.lsn = page_lsn; Ok(true) } @@ -887,10 +869,7 @@ impl Pager { }) } - fn decode_leaf_page(buff: &[u8]) -> anyhow::Result { - let page_size = buff.len(); - - let payload = &buff[PAGE_HEADER_SIZE..page_size - PAGE_FOOTER_SIZE]; + fn decode_leaf_page(payload: &[u8]) -> anyhow::Result { let header = &payload[..LEAF_PAGE_HEADER_SIZE]; let buff_next = &header[LEAF_HEADER_NEXT_RANGE]; let buff_count = &header[LEAF_HEADER_COUNT_RANGE]; @@ -902,7 +881,7 @@ impl Pager { let mut remaining = payload.len() - LEAF_PAGE_HEADER_SIZE; for i in 0..count { - let cell = get_leaf_cell(buff, i as usize); + let cell = get_leaf_cell(payload, i as usize); remaining -= LEAF_CELL_SIZE + cell.raw().len(); } @@ -949,6 +928,13 @@ impl Pager { } fn encode(meta: &PageMeta, buff: &mut [u8]) -> anyhow::Result<()> { + log::debug!( + "encode page pgid={:?} kind={:?} lsn={:?}", + meta.id, + meta.kind, + meta.lsn + ); + let page_size = buff.len(); let header = &mut buff[..PAGE_HEADER_SIZE]; @@ -963,14 +949,7 @@ impl Pager { }; header[PAGE_HEADER_KIND_INDEX] = kind; - let (rec_lsn, page_lsn) = if let Some(ref wal_info) = meta.wal { - (Some(wal_info.rec), Some(wal_info.page)) - } else { - (None, None) - }; - - header[PAGE_HEADER_REC_LSN_RANGE].copy_from_slice(&rec_lsn.to_be_bytes()); - header[PAGE_HEADER_PAGE_LSN_RANGE].copy_from_slice(&page_lsn.to_be_bytes()); + header[PAGE_HEADER_PAGE_LSN_RANGE].copy_from_slice(&meta.lsn.to_be_bytes()); header[PAGE_HEADER_PAGE_ID_RANGE].copy_from_slice(&meta.id.to_be_bytes()); let payload_buff = &mut buff[PAGE_HEADER_SIZE..page_size - PAGE_FOOTER_SIZE]; @@ -1050,7 +1029,7 @@ impl Pager { Ok(()) } - fn release(&self, frame_id: usize, is_mutated: bool) { + fn release(&self, frame_id: usize, is_dirty: bool) { let mut internal = self.internal.write(); internal.ref_count[frame_id] -= 1; @@ -1061,7 +1040,7 @@ impl Pager { false }; - let maybe_clean = if is_mutated { + let maybe_clean = if is_dirty { internal.dirty_frames.insert(frame_id); internal.free_and_clean.remove(&frame_id); false @@ -1078,17 +1057,15 @@ impl Pager { } } +#[derive(Debug)] struct PageMeta { id: PageId, kind: PageKind, - wal: Option, -} - -struct PageWalInfo { - rec: Lsn, - page: Lsn, + lsn: Option, + is_dirty: bool, } +#[derive(Debug)] enum PageKind { None, Interior { @@ -1170,7 +1147,7 @@ pub(crate) struct PageWrite<'a> { impl Drop for PageWrite<'_> { fn drop(&mut self) { - self.pager.release(self.frame_id, self.meta.wal.is_some()); + self.pager.release(self.frame_id, self.meta.is_dirty); } } @@ -1184,7 +1161,7 @@ impl<'a> PageWrite<'a> { } pub(crate) fn page_lsn(&self) -> Option { - self.meta.wal.as_ref().map(|wal| wal.page) + self.meta.lsn } pub(crate) fn init_interior( @@ -1197,7 +1174,7 @@ impl<'a> PageWrite<'a> { record_mutation( self.txid, ctx, - &mut self.meta.wal, + &mut self.meta.lsn, WalRecord::InteriorInit { pgid, last }, WalRecord::InteriorInit { pgid, last }, )?; @@ -1225,14 +1202,27 @@ impl<'a> PageWrite<'a> { matches!(self.meta.kind, PageKind::None), "page is not empty" ); - let LogContext::Redo(lsn) = ctx else { - panic!("set_interior only can be used for redo-ing wal"); - }; - record_redo_mutation(lsn, &mut self.meta); + let pgid = self.id(); + let page_size = self.buffer.len(); + record_mutation( + self.txid, + ctx, + &mut self.meta.lsn, + WalRecord::InteriorSet { + pgid, + page_version: 0, + payload, + }, + WalRecord::InteriorSet { + pgid, + page_version: 0, + payload, + }, + )?; self.meta.kind = Pager::decode_interior_page(payload)?; - self.buffer.copy_from_slice(payload); + self.buffer[PAGE_HEADER_SIZE..page_size - PAGE_FOOTER_SIZE].copy_from_slice(payload); Ok(self .into_interior() @@ -1247,16 +1237,13 @@ impl<'a> PageWrite<'a> { } } - pub(crate) fn init_leaf( - mut self, - ctx: LogContext<'_>, - ) -> anyhow::Result>> { + pub(crate) fn init_leaf(mut self, ctx: LogContext<'_>) -> anyhow::Result> { if let PageKind::None = self.meta.kind { let pgid = self.id(); record_mutation( self.txid, ctx, - &mut self.meta.wal, + &mut self.meta.lsn, WalRecord::LeafInit { pgid }, WalRecord::LeafInit { pgid }, )?; @@ -1272,7 +1259,13 @@ impl<'a> PageWrite<'a> { }; } - Ok(self.into_leaf()) + let Some(leaf) = self.into_leaf() else { + return Err(anyhow!( + "cannot init leaf page because page is not an empty page nor leaf page", + )); + }; + + Ok(leaf) } pub(crate) fn set_leaf( @@ -1284,14 +1277,27 @@ impl<'a> PageWrite<'a> { matches!(self.meta.kind, PageKind::None), "page is not empty" ); - let LogContext::Redo(lsn) = ctx else { - panic!("set_leaf only can be used for redo-ing wal"); - }; - record_redo_mutation(lsn, &mut self.meta); + let pgid = self.id(); + let page_size = self.buffer.len(); + record_mutation( + self.txid, + ctx, + &mut self.meta.lsn, + WalRecord::LeafSet { + pgid, + page_version: 0, + payload, + }, + WalRecord::LeafSet { + pgid, + page_version: 0, + payload, + }, + )?; self.meta.kind = Pager::decode_leaf_page(payload)?; - self.buffer.copy_from_slice(payload); + self.buffer[PAGE_HEADER_SIZE..page_size - PAGE_FOOTER_SIZE].copy_from_slice(payload); Ok(self.into_leaf().expect("the page should be a leaf now")) } @@ -1315,7 +1321,7 @@ impl<'a> PageWrite<'a> { record_mutation( self.txid, ctx, - &mut self.meta.wal, + &mut self.meta.lsn, WalRecord::OverflowInit { pgid }, WalRecord::OverflowInit { pgid }, )?; @@ -1364,7 +1370,7 @@ impl<'a> PageWrite<'a> { fn record_mutation( txid: TxId, ctx: LogContext<'_>, - meta_wal: &mut Option, + meta_wal: &mut Option, entry: WalRecord, compensation_entry: WalRecord, ) -> anyhow::Result<()> { @@ -1380,27 +1386,12 @@ fn record_mutation( LogContext::Redo(lsn) => lsn, }; - if let Some(ref mut wal_info) = meta_wal { - wal_info.page = lsn; - } else { - *meta_wal = Some(PageWalInfo { - rec: lsn, - page: lsn, - }); - } - + *meta_wal = Some(lsn); Ok(()) } fn record_redo_mutation(lsn: Lsn, meta: &mut PageMeta) { - if let Some(ref mut wal_info) = meta.wal { - wal_info.page = lsn; - } else { - meta.wal = Some(PageWalInfo { - rec: lsn, - page: lsn, - }); - } + meta.lsn = Some(lsn); } pub(crate) trait BTreePage<'a> { @@ -1538,11 +1529,13 @@ impl<'a> InteriorPageWrite<'a> { } pub(crate) fn reset(mut self, ctx: LogContext<'_>) -> anyhow::Result> { + Pager::encode(&self.0.meta, &mut self.0.buffer)?; + let pgid = self.id(); record_mutation( self.0.txid, ctx, - &mut self.0.meta.wal, + &mut self.0.meta.lsn, WalRecord::InteriorReset { pgid, page_version: 0, @@ -1565,7 +1558,7 @@ impl<'a> InteriorPageWrite<'a> { record_mutation( self.0.txid, ctx, - &mut self.0.meta.wal, + &mut self.0.meta.lsn, WalRecord::InteriorSetLast { pgid, last: new_last, @@ -1605,7 +1598,7 @@ impl<'a> InteriorPageWrite<'a> { record_mutation( self.0.txid, ctx, - &mut self.0.meta.wal, + &mut self.0.meta.lsn, WalRecord::InteriorSetCellPtr { pgid, index, @@ -1644,7 +1637,7 @@ impl<'a> InteriorPageWrite<'a> { record_mutation( self.0.txid, ctx, - &mut self.0.meta.wal, + &mut self.0.meta.lsn, WalRecord::InteriorSetCellOverflow { pgid, index, @@ -1694,7 +1687,7 @@ impl<'a> InteriorPageWrite<'a> { record_mutation( self.0.txid, ctx, - &mut self.0.meta.wal, + &mut self.0.meta.lsn, WalRecord::InteriorInsert { pgid, index: i, @@ -1754,7 +1747,7 @@ impl<'a> InteriorPageWrite<'a> { record_mutation( self.0.txid, ctx, - &mut self.0.meta.wal, + &mut self.0.meta.lsn, WalRecord::InteriorInsert { pgid, index: i, @@ -1883,12 +1876,12 @@ impl<'a> InteriorPageWrite<'a> { ); // TODO: we can reduce the wal entry size by merging all of these mutations together. - for i in n_cells_to_keep..count { + for i in (n_cells_to_keep..count).rev() { let cell = get_interior_cell(&self.0.buffer[PAGE_HEADER_SIZE..], i); record_mutation( self.0.txid, ctx, - &mut self.0.meta.wal, + &mut self.0.meta.lsn, WalRecord::InteriorDelete { pgid, index: i, @@ -1936,7 +1929,7 @@ impl<'a> InteriorPageWrite<'a> { record_mutation( self.0.txid, ctx, - &mut self.0.meta.wal, + &mut self.0.meta.lsn, WalRecord::InteriorDelete { pgid, index, @@ -2015,7 +2008,10 @@ impl<'a, 'b> BTreePage<'b> for LeafPageRead<'a> { } fn get(&'b self, index: usize) -> Self::Cell { - get_leaf_cell(self.0.buffer, index) + get_leaf_cell( + &self.0.buffer[PAGE_HEADER_SIZE..self.0.buffer.len() - PAGE_FOOTER_SIZE], + index, + ) } } @@ -2032,7 +2028,10 @@ impl<'a> LeafPageRead<'a> { } pub(crate) fn get(&self, index: usize) -> LeafCell { - get_leaf_cell(self.0.buffer, index) + get_leaf_cell( + &self.0.buffer[PAGE_HEADER_SIZE..self.0.buffer.len() - PAGE_FOOTER_SIZE], + index, + ) } pub(crate) fn next(&self) -> Option { @@ -2055,7 +2054,10 @@ impl<'a, 'b> BTreePage<'b> for LeafPageWrite<'a> { } fn get(&'b self, index: usize) -> Self::Cell { - get_leaf_cell(self.0.buffer, index) + get_leaf_cell( + &self.0.buffer[PAGE_HEADER_SIZE..self.0.buffer.len() - PAGE_FOOTER_SIZE], + index, + ) } } @@ -2072,7 +2074,10 @@ impl<'a> LeafPageWrite<'a> { } pub(crate) fn get(&self, index: usize) -> LeafCell { - get_leaf_cell(self.0.buffer, index) + get_leaf_cell( + &self.0.buffer[PAGE_HEADER_SIZE..self.0.buffer.len() - PAGE_FOOTER_SIZE], + index, + ) } pub(crate) fn next(&self) -> Option { @@ -2083,12 +2088,13 @@ impl<'a> LeafPageWrite<'a> { } pub(crate) fn reset(mut self, ctx: LogContext<'_>) -> anyhow::Result> { - let pgid = self.id(); + Pager::encode(&self.0.meta, &mut self.0.buffer)?; + let pgid = self.id(); record_mutation( self.0.txid, ctx, - &mut self.0.meta.wal, + &mut self.0.meta.lsn, WalRecord::LeafReset { pgid, page_version: 0, @@ -2102,8 +2108,18 @@ impl<'a> LeafPageWrite<'a> { } pub(crate) fn delete(&mut self, ctx: LogContext<'_>, index: usize) -> anyhow::Result<()> { + log::debug!( + "leaf_delete pgid={:?} index={index} kind={:?} lsn={:?}", + self.id(), + self.0.meta.kind, + self.0.meta.lsn + ); + let pgid = self.0.meta.id; - let cell = get_leaf_cell(self.0.buffer, index); + let cell = get_leaf_cell( + &self.0.buffer[PAGE_HEADER_SIZE..self.0.buffer.len() - PAGE_FOOTER_SIZE], + index, + ); let content_offset = u16::from_be_bytes(cell.cell[LEAF_CELL_OFFSET_RANGE].try_into().unwrap()) as usize; let content_size = @@ -2112,7 +2128,7 @@ impl<'a> LeafPageWrite<'a> { record_mutation( self.0.txid, ctx, - &mut self.0.meta.wal, + &mut self.0.meta.lsn, WalRecord::LeafDelete { pgid, index, @@ -2133,6 +2149,8 @@ impl<'a> LeafPageWrite<'a> { else { unreachable!(); }; + assert!(index < *count, "deleting {index} of {count}"); + assert!(*count > 0); if *offset == content_offset { *offset += content_size; @@ -2160,11 +2178,14 @@ impl<'a> LeafPageWrite<'a> { unreachable!(); }; let old_next = next; + if old_next == new_next { + return Ok(()); + } record_mutation( self.0.txid, ctx, - &mut self.0.meta.wal, + &mut self.0.meta.lsn, WalRecord::LeafSetNext { pgid, next: new_next, @@ -2204,7 +2225,7 @@ impl<'a> LeafPageWrite<'a> { record_mutation( self.0.txid, ctx, - &mut self.0.meta.wal, + &mut self.0.meta.lsn, WalRecord::LeafSetOverflow { pgid, index, @@ -2229,6 +2250,14 @@ impl<'a> LeafPageWrite<'a> { i: usize, cell: LeafCell, ) -> anyhow::Result<()> { + log::debug!( + "leaf_insert_cell pgid={:?} kind={:?} lsn={:?} cell_raw_len={:?}", + self.id(), + self.0.meta.kind, + self.0.meta.lsn, + cell.raw().len(), + ); + let PageKind::Leaf { remaining, .. } = self.0.meta.kind else { unreachable!(); }; @@ -2239,18 +2268,17 @@ impl<'a> LeafPageWrite<'a> { ); let reserved_offset = self.reserve_cell(raw.len()); - Bytes::new(cell.raw) - .put(&mut self.0.buffer[reserved_offset..reserved_offset + raw.len()])?; + self.0.buffer[reserved_offset..reserved_offset + raw.len()].copy_from_slice(raw); let pgid = self.id(); record_mutation( self.0.txid, ctx, - &mut self.0.meta.wal, + &mut self.0.meta.lsn, WalRecord::LeafInsert { pgid, index: i, - raw: &self.0.buffer[reserved_offset..reserved_offset + raw.len()], + raw: cell.raw(), overflow: cell.overflow(), key_size: cell.key_size(), value_size: cell.val_size(), @@ -2258,7 +2286,7 @@ impl<'a> LeafPageWrite<'a> { WalRecord::LeafInsert { pgid, index: i, - raw: &self.0.buffer[reserved_offset..reserved_offset + raw.len()], + raw: cell.raw(), overflow: cell.overflow(), key_size: cell.key_size(), value_size: cell.val_size(), @@ -2285,6 +2313,13 @@ impl<'a> LeafPageWrite<'a> { value_size: usize, overflow: Option, ) -> anyhow::Result { + log::debug!( + "leaf_insert_content pgid={:?} kind={:?} lsn={:?}", + self.id(), + self.0.meta.kind, + self.0.meta.lsn + ); + let content_size = content.remaining(); let payload_size = self.0.buffer.len() - PAGE_HEADER_SIZE - LEAF_PAGE_HEADER_SIZE - PAGE_FOOTER_SIZE; @@ -2311,7 +2346,7 @@ impl<'a> LeafPageWrite<'a> { record_mutation( self.0.txid, ctx, - &mut self.0.meta.wal, + &mut self.0.meta.lsn, WalRecord::LeafInsert { pgid, index: i, @@ -2336,20 +2371,26 @@ impl<'a> LeafPageWrite<'a> { fn reserve_cell(&mut self, raw_size: usize) -> usize { let added = LEAF_CELL_SIZE + raw_size; - let PageKind::Leaf { offset, count, .. } = self.0.meta.kind else { + let PageKind::Leaf { + offset, + count, + remaining, + .. + } = self.0.meta.kind + else { unreachable!(); }; + assert!(added <= remaining, "added cell should be fit in the page"); let current_cell_size = PAGE_HEADER_SIZE + LEAF_PAGE_HEADER_SIZE + LEAF_CELL_SIZE * count; if current_cell_size + added > offset { self.rearrange(); } - let PageKind::Leaf { ref mut offset, .. } = self.0.meta.kind else { + let PageKind::Leaf { offset, .. } = self.0.meta.kind else { unreachable!(); }; - assert!(current_cell_size + added <= *offset); - - *offset - raw_size + assert!(current_cell_size + added <= offset, "added cell overflowed to the offset. current_cell_size={current_cell_size} added={added} offset={offset}"); + offset - raw_size } // insert_cell_meta assumes that the raw content is already inserted to the page @@ -2362,6 +2403,7 @@ impl<'a> LeafPageWrite<'a> { raw_size: usize, ) { let added = LEAF_CELL_SIZE + raw_size; + let pgid = self.id(); let PageKind::Leaf { ref mut offset, ref mut count, @@ -2371,6 +2413,10 @@ impl<'a> LeafPageWrite<'a> { else { unreachable!(); }; + assert!( + index <= *count, + "insert cell meta on pgid={pgid:?} index out of bound. index={index}, count={count}", + ); let current_cell_size = PAGE_HEADER_SIZE + LEAF_PAGE_HEADER_SIZE + LEAF_CELL_SIZE * *count; assert!(current_cell_size + added <= *offset); @@ -2388,8 +2434,7 @@ impl<'a> LeafPageWrite<'a> { *offset -= raw_size; *remaining -= added; *count += 1; - cell[LEAF_CELL_OVERFLOW_RANGE] - .copy_from_slice(&overflow.map(|p| p.0.get()).unwrap_or(0).to_be_bytes()); + cell[LEAF_CELL_OVERFLOW_RANGE].copy_from_slice(&overflow.to_be_bytes()); cell[LEAF_CELL_KEY_SIZE_RANGE].copy_from_slice(&(key_size as u32).to_be_bytes()); cell[LEAF_CELL_VAL_SIZE_RANGE].copy_from_slice(&(val_size as u32).to_be_bytes()); cell[LEAF_CELL_OFFSET_RANGE].copy_from_slice(&(*offset as u16).to_be_bytes()); @@ -2398,11 +2443,12 @@ impl<'a> LeafPageWrite<'a> { fn rearrange(&mut self) { // TODO: try not to copy - let copied = self.0.buffer.to_vec(); + let copied_payload = + self.0.buffer[PAGE_HEADER_SIZE..self.0.buffer.len() - PAGE_FOOTER_SIZE].to_vec(); let mut new_offset = self.0.pager.page_size - PAGE_FOOTER_SIZE; for i in 0..self.count() { - let copied_cell = get_leaf_cell(&copied, i); + let copied_cell = get_leaf_cell(&copied_payload, i); let copied_content = copied_cell.raw(); new_offset -= copied_content.len(); self.0.buffer[new_offset..new_offset + copied_content.len()] @@ -2417,10 +2463,22 @@ impl<'a> LeafPageWrite<'a> { unreachable!(); }; *offset = new_offset; + + let PageKind::Leaf { + offset, remaining, .. + } = self.0.meta.kind + else { + unreachable!(); + }; + log::debug!( + "rearrange result pgid={:?} offset={offset} remaining={remaining}", + self.id(), + ); } pub(crate) fn split(&mut self, ctx: LogContext<'_>) -> anyhow::Result { let pgid = self.id(); + log::debug!("leaf split start pgid={pgid:?} kind={:?}", self.0.meta.kind); let payload_size = self.0.pager.page_size - PAGE_HEADER_SIZE - LEAF_PAGE_HEADER_SIZE - PAGE_FOOTER_SIZE; let half_payload = payload_size / 2; @@ -2432,12 +2490,16 @@ impl<'a> LeafPageWrite<'a> { let mut n_cells_to_keep = 0; for i in 0..count { - let cell = get_leaf_cell(self.0.buffer, i); - cummulative_size += cell.raw.len() + LEAF_CELL_SIZE; - if cummulative_size >= half_payload { + let cell = get_leaf_cell( + &self.0.buffer[PAGE_HEADER_SIZE..self.0.buffer.len() - PAGE_FOOTER_SIZE], + i, + ); + let new_cummulative_size = cummulative_size + cell.raw.len() + LEAF_CELL_SIZE; + if new_cummulative_size >= half_payload { n_cells_to_keep = i; break; } + cummulative_size = new_cummulative_size; } assert!( n_cells_to_keep < count, @@ -2445,12 +2507,15 @@ impl<'a> LeafPageWrite<'a> { ); // TODO: we can reduce the wal entry size by merging all of these mutations together. - for i in n_cells_to_keep..count { - let cell = get_leaf_cell(self.0.buffer, i); + for i in (n_cells_to_keep..count).rev() { + let cell = get_leaf_cell( + &self.0.buffer[PAGE_HEADER_SIZE..self.0.buffer.len() - PAGE_FOOTER_SIZE], + i, + ); record_mutation( self.0.txid, ctx, - &mut self.0.meta.wal, + &mut self.0.meta.lsn, WalRecord::LeafDelete { pgid, index: i, @@ -2475,6 +2540,11 @@ impl<'a> LeafPageWrite<'a> { *count = n_cells_to_keep; *remaining = payload_size - cummulative_size; + log::debug!( + "leaf split finish pgid={pgid:?} kind={:?}", + self.0.meta.kind + ); + Ok(LeafPageSplit { n: n_cells_to_keep, @@ -2485,12 +2555,18 @@ impl<'a> LeafPageWrite<'a> { } } -fn get_leaf_cell(buff: &[u8], index: usize) -> LeafCell<'_> { - let cell_offset = PAGE_HEADER_SIZE + LEAF_PAGE_HEADER_SIZE + LEAF_CELL_SIZE * index; - let cell = &buff[cell_offset..cell_offset + LEAF_CELL_SIZE]; +fn get_leaf_cell(payload: &[u8], index: usize) -> LeafCell<'_> { + let cell_offset = LEAF_PAGE_HEADER_SIZE + LEAF_CELL_SIZE * index; + let cell = &payload[cell_offset..cell_offset + LEAF_CELL_SIZE]; let offset = u16::from_be_bytes(cell[LEAF_CELL_OFFSET_RANGE].try_into().unwrap()) as usize; + assert!(offset >= PAGE_HEADER_SIZE); + let offset = offset - PAGE_HEADER_SIZE; let size = u16::from_be_bytes(cell[LEAF_CELL_SIZE_RANGE].try_into().unwrap()) as usize; - let raw = &buff[offset..offset + size]; + assert!( + offset + size <= payload.len(), + "offset + size is overflow. offset={offset} size={size}" + ); + let raw = &payload[offset..offset + size]; LeafCell { cell, raw } } @@ -2535,7 +2611,10 @@ impl<'a> Iterator for LeafPageSplit<'a> { return None; } - let cell = get_leaf_cell(self.buff, self.i); + let cell = get_leaf_cell( + &self.buff[PAGE_HEADER_SIZE..self.buff.len() - PAGE_FOOTER_SIZE], + self.i, + ); self.i += 1; Some(cell) } @@ -2584,7 +2663,7 @@ impl<'a> OverflowPageWrite<'a> { record_mutation( self.0.txid, ctx, - &mut self.0.meta.wal, + &mut self.0.meta.lsn, WalRecord::OverflowSetNext { pgid, next: new_next, @@ -2634,7 +2713,7 @@ impl<'a> OverflowPageWrite<'a> { record_mutation( self.0.txid, ctx, - &mut self.0.meta.wal, + &mut self.0.meta.lsn, WalRecord::OverflowSetContent { pgid, raw, next }, WalRecord::OverflowSetContent { pgid, raw, next }, )?; @@ -2665,7 +2744,7 @@ impl<'a> OverflowPageWrite<'a> { record_mutation( self.0.txid, ctx, - &mut self.0.meta.wal, + &mut self.0.meta.lsn, WalRecord::OverflowUndoSetContent { pgid }, WalRecord::OverflowUndoSetContent { pgid }, )?; @@ -2679,11 +2758,13 @@ impl<'a> OverflowPageWrite<'a> { } pub(crate) fn reset(mut self, ctx: LogContext<'_>) -> anyhow::Result> { + Pager::encode(&self.0.meta, &mut self.0.buffer)?; + let pgid = self.id(); record_mutation( self.0.txid, ctx, - &mut self.0.meta.wal, + &mut self.0.meta.lsn, WalRecord::OverflowReset { pgid, page_version: 0, diff --git a/src/recovery.rs b/src/recovery.rs index 2b4689c..1f66fe5 100644 --- a/src/recovery.rs +++ b/src/recovery.rs @@ -5,14 +5,24 @@ use crate::wal::{ WalHeader, WalRecord, WAL_HEADER_SIZE, }; use anyhow::anyhow; -use std::collections::HashMap; use std::fs::File; use std::io::{Read, Seek, SeekFrom}; use std::os::unix::fs::MetadataExt; -pub(crate) fn recover(mut f: File, pager: &Pager, page_size: usize) -> anyhow::Result { +pub(crate) struct RecoveryResult { + pub(crate) wal: Wal, + pub(crate) next_txid: TxId, +} + +pub(crate) fn recover( + mut f: File, + pager: &Pager, + page_size: usize, +) -> anyhow::Result { let wal_header = get_wal_header(&mut f)?; + // TODO: due to our way of checkpointing, we don't need to separate analyze and redo phase. We + // can just combine them into a single phase. let analyze_result = analyze(&mut f, &wal_header, page_size)?; redo(&mut f, pager, &wal_header, &analyze_result, page_size)?; @@ -25,7 +35,13 @@ pub(crate) fn recover(mut f: File, pager: &Pager, page_size: usize) -> anyhow::R undo(&analyze_result, pager, &wal)?; - Ok(wal) + let next_txid = if let Some(txid) = analyze_result.last_txid { + TxId::new(txid.get() + 1).unwrap() + } else { + TxId::new(1).unwrap() + }; + + Ok(RecoveryResult { wal, next_txid }) } fn get_wal_header(f: &mut File) -> anyhow::Result { @@ -106,9 +122,9 @@ impl WalIterator<'_> { struct AriesAnalyzeResult { lsn_to_redo: Lsn, - dirty_pages: HashMap, active_tx: TxState, next_lsn: Lsn, + last_txid: Option, } fn analyze( @@ -116,7 +132,6 @@ fn analyze( wal_header: &WalHeader, page_size: usize, ) -> anyhow::Result { - // TODO: perform aries recovery here, and get the `next_lsn`. let analyze_start = wal_header .checkpoint .unwrap_or(Lsn::new(WAL_HEADER_SIZE as u64 * 2).unwrap()); @@ -131,7 +146,6 @@ fn analyze( let mut next_lsn = wal_header.checkpoint; let mut tx_state = TxState::None; let mut last_txn: Option = None; - let mut dirty_pages = HashMap::default(); log::debug!("aries analysis started wal_header={wal_header:?}"); @@ -144,9 +158,14 @@ fn analyze( assert_eq!( TxState::None, tx_state, - "when a transaction begin, there should be no active tx previously" + "when a transaction begin, there should be no active tx previously, but got {tx_state:?}" + ); + let last_txid = last_txn.map(TxId::get).unwrap_or(0); + assert!( + last_txid < entry.txid.get(), + "last txn is {last_txid}, but newer txid is {}", + entry.txid.get() ); - assert!(last_txn.map(TxId::get).unwrap_or(0) < entry.txid.get()); tx_state = TxState::Active(entry.txid); last_txn = Some(entry.txid); } @@ -167,13 +186,6 @@ fn analyze( }; } WalRecord::End => { - assert!( - TxState::Committing(entry.txid) == tx_state || - if let TxState::Aborting{txid, ..} = tx_state { - txid == entry.txid - } else { false }, - "when a transaction ended, there should be exactly one committing or aborting tx previously", - ); tx_state = TxState::None; } @@ -181,38 +193,49 @@ fn analyze( tx_state = active_tx; } - WalRecord::InteriorInit { pgid, .. } - | WalRecord::InteriorInsert { pgid, .. } - | WalRecord::InteriorDelete { pgid, .. } - | WalRecord::LeafInit { pgid, .. } - | WalRecord::LeafInsert { pgid, .. } => { - dirty_pages.entry(pgid).or_insert(lsn); - } - - _ => (), + WalRecord::InteriorReset { .. } + | WalRecord::InteriorUndoReset { .. } + | WalRecord::InteriorSet { .. } + | WalRecord::InteriorInit { .. } + | WalRecord::InteriorInsert { .. } + | WalRecord::InteriorDelete { .. } + | WalRecord::InteriorUndoDelete { .. } + | WalRecord::InteriorSetCellOverflow { .. } + | WalRecord::InteriorSetCellPtr { .. } + | WalRecord::InteriorSetLast { .. } + | WalRecord::LeafReset { .. } + | WalRecord::LeafUndoReset { .. } + | WalRecord::LeafSet { .. } + | WalRecord::LeafInit { .. } + | WalRecord::LeafInsert { .. } + | WalRecord::LeafDelete { .. } + | WalRecord::LeafUndoDelete { .. } + | WalRecord::LeafSetOverflow { .. } + | WalRecord::LeafSetNext { .. } + | WalRecord::OverflowReset { .. } + | WalRecord::OverflowUndoReset { .. } + | WalRecord::OverflowInit { .. } + | WalRecord::OverflowSetContent { .. } + | WalRecord::OverflowUndoSetContent { .. } + | WalRecord::OverflowSetNext { .. } + | WalRecord::HeaderSet { .. } + | WalRecord::HeaderUndoSet { .. } => (), } next_lsn = Some(lsn); } - log::debug!("aries analysis finished next_lsn={next_lsn:?} dirty_pages={dirty_pages:?} tx_state={tx_state:?}"); - - let mut min_rec_lsn = next_lsn; - for rec_lsn in dirty_pages.values() { - if let Some(min_lsn) = min_rec_lsn { - min_rec_lsn = Some(std::cmp::min(min_lsn, *rec_lsn)); - } else { - min_rec_lsn = Some(*rec_lsn); - } - } + log::debug!("aries analysis finished next_lsn={next_lsn:?} tx_state={tx_state:?}"); let next_lsn = next_lsn.unwrap_or(Lsn::new(WAL_HEADER_SIZE as u64 * 2).unwrap()); Ok(AriesAnalyzeResult { - lsn_to_redo: Lsn::new(wal_header.relative_lsn_offset + 2 * WAL_HEADER_SIZE as u64).unwrap(), - dirty_pages, + lsn_to_redo: wal_header + .checkpoint + .unwrap_or(Lsn::new(WAL_HEADER_SIZE as u64 * 2).unwrap()), active_tx: tx_state, next_lsn, + last_txid: last_txn, }) } @@ -249,6 +272,7 @@ fn redo( WalRecord::InteriorReset { pgid, .. } | WalRecord::InteriorUndoReset { pgid } + | WalRecord::InteriorSet { pgid, .. } | WalRecord::InteriorInit { pgid, .. } | WalRecord::InteriorInsert { pgid, .. } | WalRecord::InteriorDelete { pgid, .. } @@ -258,6 +282,7 @@ fn redo( | WalRecord::InteriorSetLast { pgid, .. } | WalRecord::LeafReset { pgid, .. } | WalRecord::LeafUndoReset { pgid } + | WalRecord::LeafSet { pgid, .. } | WalRecord::LeafInit { pgid, .. } | WalRecord::LeafInsert { pgid, .. } | WalRecord::LeafDelete { pgid, .. } @@ -270,7 +295,7 @@ fn redo( | WalRecord::OverflowSetContent { pgid, .. } | WalRecord::OverflowUndoSetContent { pgid, .. } | WalRecord::OverflowSetNext { pgid, .. } => { - redo_page(pager, analyze_result, lsn, &entry, pgid)?; + redo_page(pager, lsn, &entry, pgid)?; } }; } @@ -280,20 +305,7 @@ fn redo( Ok(()) } -fn redo_page( - pager: &Pager, - analyze_result: &AriesAnalyzeResult, - lsn: Lsn, - entry: &WalEntry, - pgid: PageId, -) -> anyhow::Result<()> { - let Some(rec_lsn) = analyze_result.dirty_pages.get(&pgid) else { - return Ok(()); - }; - if &lsn < rec_lsn { - return Ok(()); - } - +fn redo_page(pager: &Pager, lsn: Lsn, entry: &WalEntry, pgid: PageId) -> anyhow::Result<()> { let page = pager.write(entry.txid, pgid)?; if let Some(page_lsn) = page.page_lsn() { if page_lsn >= lsn { @@ -317,14 +329,17 @@ fn redo_page( WalRecord::InteriorReset { .. } | WalRecord::InteriorUndoReset { .. } => { let Some(page) = page.into_interior() else { return Err(anyhow!( - "redo failed on interior reset because page is not an interior" + "redo failed on interior reset because page {pgid:?} is not an interior" )); }; page.reset(ctx)?; } + WalRecord::InteriorSet { payload, .. } => { + page.set_interior(ctx, payload)?; + } WalRecord::InteriorInit { last, .. } => { if page.init_interior(ctx, last)?.is_none() { - return Err(anyhow!("redo failed on interior init")); + return Err(anyhow!("redo failed on interior init on page {pgid:?}")); } } WalRecord::InteriorInsert { @@ -337,21 +352,21 @@ fn redo_page( } => { let Some(mut page) = page.into_interior() else { return Err(anyhow!( - "redo failed on interior insert because page is not an interior" + "redo failed on interior insert because page {pgid:?} is not an interior" )); }; let ok = page.insert_content(ctx, index, &mut Bytes::new(raw), key_size, ptr, overflow)?; if !ok { return Err(anyhow!( - "redo failed on interior insert because the content can't be inserted" + "redo failed on interior insert because the content can't be inserted into page {pgid:?}" )); } } WalRecord::InteriorDelete { index, .. } | WalRecord::InteriorUndoDelete { index, .. } => { let Some(mut page) = page.into_interior() else { return Err(anyhow!( - "redo failed on interior delete because page is not an interior" + "redo failed on interior delete because page {pgid:?} is not an interior" )); }; page.delete(ctx, index)?; @@ -361,7 +376,7 @@ fn redo_page( } => { let Some(mut page) = page.into_interior() else { return Err(anyhow!( - "redo failed on interior set overflow because page is not an interior" + "redo failed on interior set overflow because page {pgid:?} is not an interior" )); }; page.set_cell_overflow(ctx, index, overflow)?; @@ -369,7 +384,7 @@ fn redo_page( WalRecord::InteriorSetCellPtr { index, ptr, .. } => { let Some(mut page) = page.into_interior() else { return Err(anyhow!( - "redo failed on interior set ptr because page is not an interior" + "redo failed on interior set ptr because page {pgid:?} is not an interior" )); }; page.set_cell_ptr(ctx, index, ptr)?; @@ -377,7 +392,7 @@ fn redo_page( WalRecord::InteriorSetLast { last, .. } => { let Some(mut page) = page.into_interior() else { return Err(anyhow!( - "redo failed on interior set ptr because page is not an interior" + "redo failed on interior set ptr because page {pgid:?} is not an interior" )); }; page.set_last(ctx, last)?; @@ -386,15 +401,16 @@ fn redo_page( WalRecord::LeafReset { .. } | WalRecord::LeafUndoReset { .. } => { let Some(page) = page.into_leaf() else { return Err(anyhow!( - "redo failed on leaf reset because page is not a leaf" + "redo failed on leaf reset because page {pgid:?} is not a leaf" )); }; page.reset(ctx)?; } + WalRecord::LeafSet { payload, .. } => { + page.set_leaf(ctx, payload)?; + } WalRecord::LeafInit { .. } => { - if page.init_leaf(ctx)?.is_none() { - return Err(anyhow!("redo failed on leaf init")); - }; + page.init_leaf(ctx)?; } WalRecord::LeafInsert { index, @@ -406,7 +422,7 @@ fn redo_page( } => { let Some(mut page) = page.into_leaf() else { return Err(anyhow!( - "redo failed on leaf insert because page is not a leaf" + "redo failed on leaf insert because page {pgid:?} is not a leaf" )); }; let ok = page.insert_content( @@ -419,14 +435,14 @@ fn redo_page( )?; if !ok { return Err(anyhow!( - "redo failed on leaf insert because the content can't be inserted" + "redo failed on leaf insert because the content can't be inserted into page {pgid:?}" )); } } WalRecord::LeafDelete { index, .. } | WalRecord::LeafUndoDelete { index, .. } => { let Some(mut page) = page.into_leaf() else { return Err(anyhow!( - "redo failed on leaf delete because page is not a leaf" + "redo failed on leaf delete because page {pgid:?} is not a leaf" )); }; page.delete(ctx, index)?; @@ -436,7 +452,7 @@ fn redo_page( } => { let Some(mut page) = page.into_leaf() else { return Err(anyhow!( - "redo failed on leaf set overflow because page is not a leaf" + "redo failed on leaf set overflow because page {pgid:?} is not a leaf", )); }; page.set_cell_overflow(ctx, index, overflow)?; @@ -444,7 +460,7 @@ fn redo_page( WalRecord::LeafSetNext { next, .. } => { let Some(mut page) = page.into_leaf() else { return Err(anyhow!( - "redo failed on leaf set overflow because page is not a leaf" + "redo failed on leaf set overflow because page {pgid:?} is not a leaf", )); }; page.set_next(ctx, next)?; @@ -453,7 +469,7 @@ fn redo_page( WalRecord::OverflowReset { .. } | WalRecord::OverflowUndoReset { .. } => { let Some(page) = page.into_overflow() else { return Err(anyhow!( - "redo failed on overflow reset because page is not an overflow" + "redo failed on overflow reset because page {pgid:?} is not an overflow" )); }; page.reset(ctx)?; @@ -466,7 +482,7 @@ fn redo_page( WalRecord::OverflowSetContent { next, raw, .. } => { let Some(mut page) = page.into_overflow() else { return Err(anyhow!( - "redo failed on overflow reset because page is not an overflow" + "redo failed on overflow reset because page {pgid:?} is not an overflow" )); }; page.set_content(ctx, &mut Bytes::new(raw), next)?; @@ -474,7 +490,7 @@ fn redo_page( WalRecord::OverflowUndoSetContent { .. } => { let Some(mut page) = page.into_overflow() else { return Err(anyhow!( - "redo failed on overflow reset because page is not an overflow" + "redo failed on overflow reset because page {pgid:?} is not an overflow" )); }; page.unset_content(ctx)?; @@ -482,7 +498,7 @@ fn redo_page( WalRecord::OverflowSetNext { next, .. } => { let Some(mut page) = page.into_overflow() else { return Err(anyhow!( - "redo failed on overflow reset because page is not an overflow" + "redo failed on overflow reset because page {pgid:?} is not an overflow" )); }; page.set_next(ctx, next)?; @@ -533,14 +549,13 @@ pub(crate) fn undo_txn( lsn: Lsn, last_undone_clr: &mut Lsn, ) -> anyhow::Result<()> { - log::debug!("undo txn started from={lsn:?} last_undone_clr={last_undone_clr:?}"); + log::debug!("undo txn started txid={txid:?} from={lsn:?} last_undone_clr={last_undone_clr:?}"); let mut iterator = wal.iterate_back(lsn); - let mut is_ended = false; - - let ctx = LogContext::Undo(wal, lsn); while let Some((lsn, entry)) = iterator.next()? { + let ctx = LogContext::Undo(wal, lsn); + log::debug!("undo txn item lsn={lsn:?} entry={entry:?}"); if entry.txid != txid { continue; @@ -560,7 +575,7 @@ pub(crate) fn undo_txn( return Err(anyhow!("found a commit log during transaction rollback")) } WalRecord::Rollback => (), - WalRecord::End => is_ended = true, + WalRecord::End => return Err(anyhow!("found a transaction-end log during rollback")), WalRecord::HeaderSet { old_root, @@ -594,6 +609,18 @@ pub(crate) fn undo_txn( WalRecord::InteriorUndoReset { .. } => { unreachable!("InteriorUndoReset only used for CLR which shouldn't be undone"); } + WalRecord::InteriorSet { + pgid, page_version, .. + } => { + if page_version != 0 { + return Err(anyhow!("page version {page_version} is not supported")); + } + let page = pager.write(txid, pgid)?; + let Some(page) = page.into_interior() else { + return Err(anyhow!("expected an interior page for undo")); + }; + page.reset(ctx)?; + } WalRecord::InteriorInit { pgid, .. } => { let page = pager.write(txid, pgid)?; let Some(page) = page.into_interior() else { @@ -620,7 +647,7 @@ pub(crate) fn undo_txn( let Some(mut page) = page.into_interior() else { return Err(anyhow!("expected an interior page for undo")); }; - page.insert_content( + let ok = page.insert_content( ctx, index, &mut Bytes::new(old_raw), @@ -628,6 +655,7 @@ pub(crate) fn undo_txn( old_ptr, old_overflow, )?; + assert!(ok, "if it can be deleted, then it must be ok to insert"); } WalRecord::InteriorUndoDelete { .. } => { unreachable!("InteriorUndoDelete only used for CLR which shouldn't be undone"); @@ -678,6 +706,18 @@ pub(crate) fn undo_txn( WalRecord::LeafUndoReset { .. } => { unreachable!("LeafUndoReset only used for CLR which shouldn't be undone"); } + WalRecord::LeafSet { + pgid, page_version, .. + } => { + if page_version != 0 { + return Err(anyhow!("page version {page_version} is not supported")); + } + let page = pager.write(txid, pgid)?; + let Some(page) = page.into_leaf() else { + return Err(anyhow!("expected an interior page for undo")); + }; + page.reset(ctx)?; + } WalRecord::LeafInit { pgid } => { let page = pager.write(txid, pgid)?; let Some(page) = page.into_leaf() else { @@ -704,7 +744,7 @@ pub(crate) fn undo_txn( let Some(mut page) = page.into_leaf() else { return Err(anyhow!("expected a leaf page for undo")); }; - page.insert_content( + let ok = page.insert_content( ctx, index, &mut Bytes::new(old_raw), @@ -712,6 +752,7 @@ pub(crate) fn undo_txn( old_val_size, old_overflow, )?; + assert!(ok, "if it can be deleted, then it must be ok to insert"); } WalRecord::LeafUndoDelete { .. } => { unreachable!("LeafUndoDelete only used for CLR which shouldn't be undone"); @@ -779,11 +820,7 @@ pub(crate) fn undo_txn( } } - if !is_ended { - log::debug!("appending txn-end txid={txid:?}"); - wal.append(txid, None, WalRecord::End)?; - } - + wal.append(txid, None, WalRecord::End)?; log::debug!("undo txn finished"); Ok(()) } diff --git a/src/wal.rs b/src/wal.rs index a28f6c6..be48d3c 100644 --- a/src/wal.rs +++ b/src/wal.rs @@ -20,10 +20,6 @@ impl TxId { pub(crate) fn from_be_bytes(lsn: [u8; 8]) -> Option { Self::new(u64::from_be_bytes(lsn)) } - - pub(crate) fn next(&self) -> TxId { - Self(self.0.checked_add(1).unwrap()) - } } pub(crate) trait TxIdExt { @@ -352,6 +348,11 @@ pub(crate) enum WalRecord<'a> { InteriorUndoReset { pgid: PageId, }, + InteriorSet { + pgid: PageId, + page_version: u16, + payload: &'a [u8], + }, InteriorInit { pgid: PageId, last: PageId, @@ -402,6 +403,11 @@ pub(crate) enum WalRecord<'a> { LeafUndoReset { pgid: PageId, }, + LeafSet { + pgid: PageId, + page_version: u16, + payload: &'a [u8], + }, LeafInit { pgid: PageId, }, @@ -492,22 +498,24 @@ const WAL_RECORD_HEADER_UNDO_SET_KIND: u8 = 11; const WAL_RECORD_INTERIOR_RESET_KIND: u8 = 20; const WAL_RECORD_INTERIOR_UNDO_RESET_KIND: u8 = 21; -const WAL_RECORD_INTERIOR_INIT_KIND: u8 = 22; -const WAL_RECORD_INTERIOR_INSERT_KIND: u8 = 23; -const WAL_RECORD_INTERIOR_DELETE_KIND: u8 = 24; -const WAL_RECORD_INTERIOR_UNDO_DELETE_KIND: u8 = 25; -const WAL_RECORD_INTERIOR_SET_CELL_OVERFLOW_KIND: u8 = 26; -const WAL_RECORD_INTERIOR_SET_CELL_PTR_KIND: u8 = 27; -const WAL_RECORD_INTERIOR_SET_LAST_KIND: u8 = 28; +const WAL_RECORD_INTERIOR_SET_KIND: u8 = 22; +const WAL_RECORD_INTERIOR_INIT_KIND: u8 = 23; +const WAL_RECORD_INTERIOR_INSERT_KIND: u8 = 24; +const WAL_RECORD_INTERIOR_DELETE_KIND: u8 = 25; +const WAL_RECORD_INTERIOR_UNDO_DELETE_KIND: u8 = 26; +const WAL_RECORD_INTERIOR_SET_CELL_OVERFLOW_KIND: u8 = 27; +const WAL_RECORD_INTERIOR_SET_CELL_PTR_KIND: u8 = 28; +const WAL_RECORD_INTERIOR_SET_LAST_KIND: u8 = 29; const WAL_RECORD_LEAF_RESET_KIND: u8 = 30; const WAL_RECORD_LEAF_UNDO_RESET_KIND: u8 = 31; -const WAL_RECORD_LEAF_INIT_KIND: u8 = 32; -const WAL_RECORD_LEAF_INSERT_KIND: u8 = 33; -const WAL_RECORD_LEAF_DELETE_KIND: u8 = 34; -const WAL_RECORD_LEAF_UNDO_DELETE_KIND: u8 = 35; -const WAL_RECORD_LEAF_SET_CELL_OVERFLOW_KIND: u8 = 36; -const WAL_RECORD_LEAF_SET_NEXT_KIND: u8 = 37; +const WAL_RECORD_LEAF_SET_KIND: u8 = 32; +const WAL_RECORD_LEAF_INIT_KIND: u8 = 33; +const WAL_RECORD_LEAF_INSERT_KIND: u8 = 34; +const WAL_RECORD_LEAF_DELETE_KIND: u8 = 35; +const WAL_RECORD_LEAF_UNDO_DELETE_KIND: u8 = 36; +const WAL_RECORD_LEAF_SET_CELL_OVERFLOW_KIND: u8 = 37; +const WAL_RECORD_LEAF_SET_NEXT_KIND: u8 = 38; const WAL_RECORD_OVERFLOW_RESET_KIND: u8 = 40; const WAL_RECORD_OVERFLOW_UNDO_RESET_KIND: u8 = 41; @@ -531,6 +539,7 @@ impl<'a> WalRecord<'a> { WalRecord::InteriorReset { .. } => WAL_RECORD_INTERIOR_RESET_KIND, WalRecord::InteriorUndoReset { .. } => WAL_RECORD_INTERIOR_UNDO_RESET_KIND, + WalRecord::InteriorSet { .. } => WAL_RECORD_INTERIOR_SET_KIND, WalRecord::InteriorInit { .. } => WAL_RECORD_INTERIOR_INIT_KIND, WalRecord::InteriorInsert { .. } => WAL_RECORD_INTERIOR_INSERT_KIND, WalRecord::InteriorDelete { .. } => WAL_RECORD_INTERIOR_DELETE_KIND, @@ -541,6 +550,7 @@ impl<'a> WalRecord<'a> { WalRecord::LeafReset { .. } => WAL_RECORD_LEAF_RESET_KIND, WalRecord::LeafUndoReset { .. } => WAL_RECORD_LEAF_UNDO_RESET_KIND, + WalRecord::LeafSet { .. } => WAL_RECORD_LEAF_SET_KIND, WalRecord::LeafInit { .. } => WAL_RECORD_LEAF_INIT_KIND, WalRecord::LeafInsert { .. } => WAL_RECORD_LEAF_INSERT_KIND, WalRecord::LeafDelete { .. } => WAL_RECORD_LEAF_DELETE_KIND, @@ -568,6 +578,7 @@ impl<'a> WalRecord<'a> { WalRecord::InteriorReset { payload, .. } => 8 + 2 + 2 + payload.len(), WalRecord::InteriorUndoReset { .. } => 8, + WalRecord::InteriorSet { payload, .. } => 8 + 2 + 2 + payload.len(), WalRecord::InteriorInit { .. } => 16, WalRecord::InteriorInsert { raw, .. } => 32 + raw.len(), WalRecord::InteriorDelete { old_raw, .. } => 32 + old_raw.len(), @@ -578,6 +589,7 @@ impl<'a> WalRecord<'a> { WalRecord::LeafReset { payload, .. } => 8 + 2 + 2 + payload.len(), WalRecord::LeafUndoReset { .. } => 8, + WalRecord::LeafSet { payload, .. } => 8 + 2 + 2 + payload.len(), WalRecord::LeafInit { .. } => 8, WalRecord::LeafInsert { raw, .. } => 28 + raw.len(), WalRecord::LeafDelete { old_raw, .. } => 28 + old_raw.len(), @@ -630,6 +642,17 @@ impl<'a> WalRecord<'a> { WalRecord::InteriorUndoReset { pgid } => { buff[0..8].copy_from_slice(&pgid.get().to_be_bytes()); } + WalRecord::InteriorSet { + pgid, + page_version, + payload, + } => { + assert!(payload.len() <= u16::MAX as usize); + buff[0..8].copy_from_slice(&pgid.get().to_be_bytes()); + buff[8..10].copy_from_slice(&page_version.to_be_bytes()); + buff[10..12].copy_from_slice(&(payload.len() as u16).to_be_bytes()); + buff[12..12 + payload.len()].copy_from_slice(payload); + } WalRecord::InteriorInit { pgid, last } => { buff[0..8].copy_from_slice(&pgid.get().to_be_bytes()); buff[8..16].copy_from_slice(&last.get().to_be_bytes()); @@ -727,6 +750,17 @@ impl<'a> WalRecord<'a> { WalRecord::LeafUndoReset { pgid } => { buff[0..8].copy_from_slice(&pgid.get().to_be_bytes()); } + WalRecord::LeafSet { + pgid, + page_version, + payload, + } => { + assert!(payload.len() <= u16::MAX as usize); + buff[0..8].copy_from_slice(&pgid.get().to_be_bytes()); + buff[8..10].copy_from_slice(&page_version.to_be_bytes()); + buff[10..12].copy_from_slice(&(payload.len() as u16).to_be_bytes()); + buff[12..12 + payload.len()].copy_from_slice(payload); + } WalRecord::LeafInit { pgid } => { buff[0..8].copy_from_slice(&pgid.get().to_be_bytes()); } @@ -914,6 +948,18 @@ impl<'a> WalRecord<'a> { }; Ok(Self::InteriorUndoReset { pgid }) } + WAL_RECORD_INTERIOR_SET_KIND => { + let Some(pgid) = PageId::from_be_bytes(buff[0..8].try_into().unwrap()) else { + return Err(anyhow!("zero page id")); + }; + let page_version = u16::from_be_bytes(buff[8..10].try_into().unwrap()); + let size = u16::from_be_bytes(buff[10..12].try_into().unwrap()); + Ok(Self::InteriorSet { + pgid, + page_version, + payload: &buff[12..12 + size as usize], + }) + } WAL_RECORD_INTERIOR_INIT_KIND => { let Some(pgid) = PageId::from_be_bytes(buff[0..8].try_into().unwrap()) else { return Err(anyhow!("zero page id")); @@ -987,6 +1033,40 @@ impl<'a> WalRecord<'a> { old_overflow, }) } + WAL_RECORD_INTERIOR_SET_CELL_PTR_KIND => { + let Some(pgid) = PageId::from_be_bytes(buff[0..8].try_into().unwrap()) else { + return Err(anyhow!("zero page id")); + }; + let Some(ptr) = PageId::from_be_bytes(buff[8..16].try_into().unwrap()) else { + return Err(anyhow!("zero page id")); + }; + let Some(old_ptr) = PageId::from_be_bytes(buff[16..24].try_into().unwrap()) else { + return Err(anyhow!("zero page id")); + }; + let index = u16::from_be_bytes(buff[24..26].try_into().unwrap()); + Ok(Self::InteriorSetCellPtr { + pgid, + index: index as usize, + ptr, + old_ptr, + }) + } + WAL_RECORD_INTERIOR_SET_LAST_KIND => { + let Some(pgid) = PageId::from_be_bytes(buff[0..8].try_into().unwrap()) else { + return Err(anyhow!("zero page id")); + }; + let Some(last) = PageId::from_be_bytes(buff[8..16].try_into().unwrap()) else { + return Err(anyhow!("zero page id")); + }; + let Some(old_last) = PageId::from_be_bytes(buff[16..24].try_into().unwrap()) else { + return Err(anyhow!("zero page id")); + }; + Ok(Self::InteriorSetLast { + pgid, + last, + old_last, + }) + } WAL_RECORD_LEAF_RESET_KIND => { let Some(pgid) = PageId::from_be_bytes(buff[0..8].try_into().unwrap()) else { @@ -1006,6 +1086,18 @@ impl<'a> WalRecord<'a> { }; Ok(Self::LeafUndoReset { pgid }) } + WAL_RECORD_LEAF_SET_KIND => { + let Some(pgid) = PageId::from_be_bytes(buff[0..8].try_into().unwrap()) else { + return Err(anyhow!("zero page id")); + }; + let page_version = u16::from_be_bytes(buff[8..10].try_into().unwrap()); + let size = u16::from_be_bytes(buff[10..12].try_into().unwrap()); + Ok(Self::LeafSet { + pgid, + page_version, + payload: &buff[12..12 + size as usize], + }) + } WAL_RECORD_LEAF_INIT_KIND => { let Some(pgid) = PageId::from_be_bytes(buff[0..8].try_into().unwrap()) else { return Err(anyhow!("zero page id")); diff --git a/tests/db_crash.rs b/tests/db_crash.rs new file mode 100644 index 0000000..d6b3af3 --- /dev/null +++ b/tests/db_crash.rs @@ -0,0 +1,115 @@ +use dbshark::{Db, Setting}; +use rand::rngs::StdRng; +use rand::{thread_rng, Rng, SeedableRng}; +use std::path::PathBuf; +use std::process::Command; +use std::sync::{Arc, Mutex}; +use std::thread::sleep; +use std::time::Duration; + +fn main() { + let is_worker = std::env::var("DBSHARK_TEST_WORKER").unwrap_or_default(); + if is_worker == "1" { + return worker(); + } + + let dir = tempfile::tempdir().unwrap(); + println!("test started on {:?}", dir.path()); + + let myself = std::env::args().next().expect("missing first arg"); + + let t = std::time::SystemTime::now() + .duration_since(std::time::SystemTime::UNIX_EPOCH) + .unwrap() + .as_millis() as u64; + + let rng = Arc::new(Mutex::new(StdRng::seed_from_u64(t))); + + loop { + println!("restarting db test"); + + let child = Arc::new(Mutex::new( + Command::new(&myself) + .arg(dir.path()) + .env("DBSHARK_TEST_WORKER", "1") + .spawn() + .expect("command failed to start"), + )); + + std::thread::scope(|s| { + let (send, recv) = std::sync::mpsc::channel::<()>(); + + let handle = { + let child = child.clone(); + let rng = rng.clone(); + s.spawn(move || { + let ms = rng.lock().unwrap().gen_range(5..10 * 1000); + let res = recv.recv_timeout(Duration::from_millis(ms)); + if matches!(res, Err(std::sync::mpsc::RecvTimeoutError::Timeout)) { + child.lock().unwrap().kill().expect("cannot kill child"); + } + }) + }; + + let status = loop { + let Some(status) = child.lock().unwrap().try_wait().unwrap() else { + sleep(Duration::from_millis(100)); + continue; + }; + break status; + }; + drop(send); + assert_eq!(0, status.code().unwrap_or_default()); + + handle.join().unwrap(); + }); + } +} + +fn worker() { + env_logger::init(); + + let path = std::env::args().skip(1).next().unwrap(); + let db = Db::open( + &PathBuf::from(path), + Setting { + checkpoint_period: Duration::from_secs(5), + }, + ) + .unwrap(); + let db = Arc::new(db); + + let mut handles = vec![]; + + let n = 100_000_000usize; + let p = 100usize; + + for _ in 0..20 { + let db = db.clone(); + handles.push(std::thread::spawn(move || { + let mut rng = thread_rng(); + loop { + let mut tx = db.update().expect("cannot create write tx"); + let mut bucket = tx.bucket("table1").unwrap(); + + let x = rng.gen_range(0..n); + for i in 0..p { + let x = x + i * n; + let key = format!("key{x:05}"); + let val = format!("val{x:05}"); + bucket.put(key.as_bytes(), val.as_bytes()).unwrap(); + } + + if rng.gen_bool(0.5) { + tx.commit().unwrap() + } else { + tx.rollback().unwrap() + } + } + })); + } + + for handle in handles { + handle.join().unwrap(); + } +}