Skip to content

Commit

Permalink
add simple proptest for recovery logic
Browse files Browse the repository at this point in the history
  • Loading branch information
jauhararifin committed Aug 7, 2024
1 parent 38249ef commit b720c2b
Show file tree
Hide file tree
Showing 8 changed files with 637 additions and 309 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,5 @@ tarpaulin-report.html
test1
test2
test_btree

*.log
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,7 @@ tempfile = "3.10.1"
[[bench]]
name = "db_benchmark"
harness = false

[[test]]
name = "db_crash"
harness = false
15 changes: 3 additions & 12 deletions src/btree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,7 @@ impl<'a> BTree<'a, LogContext<'a>> {
}

let new_right_page = self.new_page()?;
let mut new_right_leaf = new_right_page
.init_leaf(self.ctx)?
.expect("new page should always be convertible to leaf page");
let mut new_right_leaf = new_right_page.init_leaf(self.ctx)?;
let new_right_pgid = new_right_leaf.id();
let mut pivot = self.leaf_split_and_insert(
&mut result.leaf.node,
Expand All @@ -108,10 +106,7 @@ impl<'a> BTree<'a, LogContext<'a>> {

let is_root_leaf = result.leaf.node.id() == self.root;
if is_root_leaf {
let new_left_leaf = self
.new_page()?
.init_leaf(self.ctx)?
.expect("new page should always be convertible to leaf page");
let new_left_leaf = self.new_page()?.init_leaf(self.ctx)?;
self.split_root_leaf(result.leaf.node, new_left_leaf, new_right_pgid, &mut pivot)?;
} else {
self.propagate_interior_splitting(result.interiors, new_right_pgid, pivot)?;
Expand Down Expand Up @@ -150,11 +145,7 @@ impl<'a> BTree<'a, LogContext<'a>> {
current = next;
};

let Some(node) = page.init_leaf(self.ctx)? else {
return Err(anyhow!(
"invalid state, btree contain non-interior and non-leaf page"
));
};
let node = page.init_leaf(self.ctx)?;

let (i, found) = self.search_key_in_leaf(&node, key)?;
Ok(LookupForUpdateResult {
Expand Down
102 changes: 54 additions & 48 deletions src/db.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::btree::{BTreeRead, BTreeWrite, Cursor};
use crate::pager::{DbState, LogContext, PageId, PageIdExt, Pager};
use crate::recovery::{recover, undo_txn};
use crate::wal::{TxId, TxIdExt, TxState, Wal, WalRecord};
use crate::wal::{TxId, TxState, Wal, WalRecord};
use anyhow::anyhow;
use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use std::fs::OpenOptions;
Expand Down Expand Up @@ -94,15 +94,10 @@ impl Db {
.create(true)
.truncate(false)
.open(wal_path)?;
let wal = recover(wal_file, &pager, page_size)?;
let wal = Arc::new(wal);
let result = recover(wal_file, &pager, page_size)?;
let wal = Arc::new(result.wal);

let next_txid = if let Some(txid) = header.last_txid {
txid.next().get()
} else {
1
};
let next_txid = AtomicU64::new(next_txid);
let next_txid = AtomicU64::new(result.next_txid.get());

// at this point, the recovery is already finished, so there is no active transaction
let tx_state = Arc::new(RwLock::new(TxState::None));
Expand Down Expand Up @@ -176,7 +171,6 @@ impl Db {
let header = Header {
version: 0,
page_size: DEFAULT_PAGE_SIZE as u32,
last_txid: None,
};

let mut buff = vec![0; 2 * DB_HEADER_SIZE];
Expand All @@ -191,7 +185,7 @@ impl Db {
let tx_guard = self.tx_lock.write();

let mut tx_state = self.tx_state.write();
self.undo_dangling_tx(&mut tx_state)?;
self.finish_dangling_tx(&mut tx_state)?;

let txid = self.next_txid.fetch_add(1, Ordering::SeqCst);
let txid = TxId::new(txid).unwrap();
Expand All @@ -200,39 +194,58 @@ impl Db {
Tx::new(txid, self, tx_guard)
}

fn undo_dangling_tx(&self, tx_state: &mut TxState) -> anyhow::Result<()> {
let TxState::Active(txid) = *tx_state else {
return Ok(());
};

log::debug!("previous transaction {txid:?} is not closed yet");

let lsn = self.wal.append(txid, None, WalRecord::Rollback)?;
*tx_state = TxState::Aborting {
txid,
rollback: lsn,
last_undone: lsn,
};
let TxState::Aborting {
ref mut last_undone,
..
} = &mut *tx_state
else {
unreachable!();
};

undo_txn(&self.pager, &self.wal, txid, lsn, last_undone)?;
self.wal.append(txid, None, WalRecord::End)?;
*tx_state = TxState::None;
fn finish_dangling_tx(&self, tx_state: &mut TxState) -> anyhow::Result<()> {
match *tx_state {
TxState::None => Ok(()),
TxState::Active(txid) => {
log::debug!("previous transaction {txid:?} is not closed yet");

let lsn = self.wal.append(txid, None, WalRecord::Rollback)?;
*tx_state = TxState::Aborting {
txid,
rollback: lsn,
last_undone: lsn,
};
let TxState::Aborting {
ref mut last_undone,
..
} = &mut *tx_state
else {
unreachable!();
};

Ok(())
undo_txn(&self.pager, &self.wal, txid, lsn, last_undone)?;
self.wal.append(txid, None, WalRecord::End)?;
*tx_state = TxState::None;
Ok(())
}
TxState::Aborting {
txid,
rollback,
ref mut last_undone,
} => {
log::debug!("continue aborting previous transaction {txid:?}");

undo_txn(&self.pager, &self.wal, txid, rollback, last_undone)?;
self.wal.append(txid, None, WalRecord::End)?;
*tx_state = TxState::None;
Ok(())
}
TxState::Committing(txid) => {
let commit_lsn = self.wal.append(txid, None, WalRecord::Commit)?;
self.wal.append(txid, None, WalRecord::End)?;
self.wal.sync(commit_lsn)?;
*tx_state = TxState::None;
Ok(())
}
}
}

pub fn read(&self) -> anyhow::Result<ReadTx> {
let tx_guard = self.tx_lock.read();

let mut tx_state = self.tx_state.write();
self.undo_dangling_tx(&mut tx_state)?;
self.finish_dangling_tx(&mut tx_state)?;

let txid = self.next_txid.fetch_add(1, Ordering::SeqCst);
let txid = TxId::new(txid).unwrap();
Expand Down Expand Up @@ -276,24 +289,22 @@ impl Db {
}
}

const DB_HEADER_SIZE: usize = 32;
const DB_HEADER_SIZE: usize = 24;
const DEFAULT_PAGE_SIZE: usize = 0x1000;
const MAGIC_HEADER: &[u8] = b"dbest000";

struct Header {
version: u32,
page_size: u32,
last_txid: Option<TxId>,
}

impl Header {
fn encode(&self, buff: &mut [u8]) {
buff[0..8].copy_from_slice(MAGIC_HEADER);
buff[8..12].copy_from_slice(&self.version.to_be_bytes());
buff[12..16].copy_from_slice(&self.page_size.to_be_bytes());
buff[16..24].copy_from_slice(&self.last_txid.to_be_bytes());
let checksum = crc64::crc64(0, &buff[0..24]);
buff[24..32].copy_from_slice(&checksum.to_be_bytes());
let checksum = crc64::crc64(0, &buff[0..16]);
buff[16..24].copy_from_slice(&checksum.to_be_bytes());
}

fn decode(buff: &[u8]) -> Option<Self> {
Expand All @@ -307,13 +318,8 @@ impl Header {

let version = u32::from_be_bytes(buff[8..12].try_into().unwrap());
let page_size = u32::from_be_bytes(buff[12..16].try_into().unwrap());
let last_txid = TxId::from_be_bytes(buff[16..24].try_into().unwrap());

Some(Self {
version,
page_size,
last_txid,
})
Some(Self { version, page_size })
}
}

Expand Down
Loading

0 comments on commit b720c2b

Please sign in to comment.