Skip to content

Commit

Permalink
fix stale is_dirty
Browse files Browse the repository at this point in the history
  • Loading branch information
jauhararifin committed Aug 13, 2024
1 parent 49a0557 commit 98d2ee6
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 48 deletions.
95 changes: 53 additions & 42 deletions src/pager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -521,35 +521,15 @@ impl Pager {
txid: TxId,
pgid: PageId,
) -> anyhow::Result<()> {
let mut internal = self.internal.write();
assert!(
pgid.get() >= internal.db_state.page_count - 1,
"cannot dealloc {pgid:?} which >= page_count={} - 1",
internal.db_state.page_count
);
record_mutation(
let mut page = self.write(txid, pgid)?;
page.meta.lsn = record_mutation(
txid,
ctx,
WalRecord::DeallocPage { pgid },
WalRecord::DeallocPage { pgid },
)?;
if pgid.get() >= internal.db_state.page_count {
return Ok(());
}
assert!(pgid.get() == internal.db_state.page_count - 1);
internal.db_state.page_count -= 1;

if let Some(frame_id) = internal.page_to_frame.get(&pgid).copied() {
assert!(internal.ref_count[frame_id] == 0);
internal.free_frames.remove(&frame_id);
internal.free_and_clean.remove(&frame_id);
internal.page_to_frame.remove(&pgid);
};

let f = self.f.lock();
f.set_len(internal.db_state.page_count * self.page_size as u64)?;
drop(f);

page.meta.kind = PageKind::None;
page.meta.is_dirty = true;
Ok(())
}

Expand Down Expand Up @@ -617,7 +597,7 @@ impl Pager {

Ok((frame_id, meta, buffer_offset))
} else {
let (frame_id, evicted) = Self::evict_one(&mut internal)?;
let (frame_id, evicted) = Self::get_eviction_candidate(&mut internal)?;

// SAFETY:
// - it's guaranteed that the address pointed by metas + frame_id is valid
Expand Down Expand Up @@ -667,6 +647,9 @@ impl Pager {
self.decode(pgid, &mut meta_locked, buffer)?;
}

internal.free_and_clean.remove(&frame_id);
internal.free_frames.remove(&frame_id);

internal.page_to_frame.remove(&old_pgid);
internal.page_to_frame.insert(pgid, frame_id);
assert!(internal.ref_count[frame_id] == 0);
Expand All @@ -679,12 +662,10 @@ impl Pager {
}
}

fn evict_one(internal: &mut PagerInternal) -> anyhow::Result<(usize, bool)> {
fn get_eviction_candidate(internal: &mut PagerInternal) -> anyhow::Result<(usize, bool)> {
let (pgid, ok) = if let Some(frame_id) = internal.free_and_clean.iter().next().copied() {
internal.free_and_clean.remove(&frame_id);
(frame_id, false)
} else if let Some(frame_id) = internal.free_frames.iter().next().copied() {
internal.free_frames.remove(&frame_id);
(frame_id, true)
} else {
// TODO: consider sleep and retry this process
Expand Down Expand Up @@ -842,25 +823,30 @@ impl Pager {
assert!(buff.len() == self.page_size);
meta.id = pgid;

{
let ok = {
let flush_internal = self.flush_internal.read();
if let Some((i, _)) = flush_internal.flushing_pgids.get_full(&pgid) {
buff.copy_from_slice(
&flush_internal.flushing_pages[i * self.page_size..(i + 1) * self.page_size],
);
true
} else {
drop(flush_internal);
let mut f = self.f.lock();
read_page(&mut *f, pgid, self.page_size, buff)?;
read_page(&mut *f, pgid, self.page_size, buff)?
}
}

let ok = Self::decode_internal(self.page_size, meta, buff)?;
if !ok {
return Err(anyhow!(
"page {} is corrupted, checksum mismatch",
pgid.get(),
));
};
if ok {
let ok = Self::decode_internal(self.page_size, meta, buff)?;
if !ok {
meta.kind = PageKind::None;
meta.lsn = Lsn::new(1).unwrap();
meta.is_dirty = false;
}
} else {
meta.kind = PageKind::None;
meta.lsn = Lsn::new(1).unwrap();
meta.is_dirty = false;
}

if meta.id != pgid {
Expand Down Expand Up @@ -1271,6 +1257,7 @@ impl<'a> PageWrite<'a> {
WalRecord::InteriorInit { pgid, last },
WalRecord::InteriorInit { pgid, last },
)?;
self.meta.is_dirty = true;

self.meta.kind = PageKind::Interior {
count: 0,
Expand Down Expand Up @@ -1312,6 +1299,7 @@ impl<'a> PageWrite<'a> {
payload,
},
)?;
self.meta.is_dirty = true;

self.meta.kind = Pager::decode_interior_page(payload)?;
self.buffer[PAGE_HEADER_SIZE..page_size - PAGE_FOOTER_SIZE].copy_from_slice(payload);
Expand All @@ -1338,6 +1326,7 @@ impl<'a> PageWrite<'a> {
WalRecord::LeafInit { pgid },
WalRecord::LeafInit { pgid },
)?;
self.meta.is_dirty = true;

self.meta.kind = PageKind::Leaf {
count: 0,
Expand Down Expand Up @@ -1385,6 +1374,7 @@ impl<'a> PageWrite<'a> {
payload,
},
)?;
self.meta.is_dirty = true;

self.meta.kind = Pager::decode_leaf_page(payload)?;
self.buffer[PAGE_HEADER_SIZE..page_size - PAGE_FOOTER_SIZE].copy_from_slice(payload);
Expand Down Expand Up @@ -1414,6 +1404,7 @@ impl<'a> PageWrite<'a> {
WalRecord::OverflowInit { pgid },
WalRecord::OverflowInit { pgid },
)?;
self.meta.is_dirty = true;
self.meta.kind = PageKind::Overflow {
next: None,
size: 0,
Expand All @@ -1437,6 +1428,7 @@ impl<'a> PageWrite<'a> {
};

self.meta.lsn = lsn;
self.meta.is_dirty = true;
self.meta.kind = Pager::decode_overflow_page(payload)?;
self.buffer.copy_from_slice(payload);

Expand Down Expand Up @@ -1623,6 +1615,7 @@ impl<'a> InteriorPageWrite<'a> {
},
WalRecord::InteriorUndoReset { pgid },
)?;
self.0.meta.is_dirty = true;

self.0.meta.kind = PageKind::None;
Ok(self.0)
Expand All @@ -1649,6 +1642,7 @@ impl<'a> InteriorPageWrite<'a> {
old_last,
},
)?;
self.0.meta.is_dirty = true;

let PageKind::Interior { ref mut last, .. } = self.0.meta.kind else {
unreachable!();
Expand Down Expand Up @@ -1690,6 +1684,7 @@ impl<'a> InteriorPageWrite<'a> {
old_ptr,
},
)?;
self.0.meta.is_dirty = true;
cell[0..8].copy_from_slice(&ptr.to_be_bytes());

Ok(())
Expand Down Expand Up @@ -1728,6 +1723,7 @@ impl<'a> InteriorPageWrite<'a> {
old_overflow,
},
)?;
self.0.meta.is_dirty = true;
cell[8..16].copy_from_slice(&overflow_pgid.to_be_bytes());

Ok(())
Expand Down Expand Up @@ -1788,6 +1784,7 @@ impl<'a> InteriorPageWrite<'a> {
key_size: cell.key_size(),
},
)?;
self.0.meta.is_dirty = true;

log::debug!(
"insert_interior_cell_finish {pgid:?} kind={:?} page_lsn={:?} i={i}",
Expand Down Expand Up @@ -1860,6 +1857,7 @@ impl<'a> InteriorPageWrite<'a> {
overflow,
},
)?;
self.0.meta.is_dirty = true;

log::debug!(
"insert_interior_content_finish {pgid:?} kind={:?} page_lsn={:?} i={i} raw_size={}",
Expand Down Expand Up @@ -2000,6 +1998,7 @@ impl<'a> InteriorPageWrite<'a> {
WalRecord::InteriorUndoDelete { pgid, index: i },
)?;
}
self.0.meta.is_dirty = true;

let original_count = count;
let PageKind::Interior {
Expand Down Expand Up @@ -2058,6 +2057,7 @@ impl<'a> InteriorPageWrite<'a> {
},
WalRecord::InteriorUndoDelete { pgid, index },
)?;
self.0.meta.is_dirty = true;

let PageKind::Interior {
ref mut offset,
Expand Down Expand Up @@ -2224,6 +2224,7 @@ impl<'a> LeafPageWrite<'a> {
},
WalRecord::LeafUndoReset { pgid },
)?;
self.0.meta.is_dirty = true;

self.0.meta.kind = PageKind::None;
Ok(self.0)
Expand Down Expand Up @@ -2253,6 +2254,7 @@ impl<'a> LeafPageWrite<'a> {
},
WalRecord::LeafUndoDelete { pgid, index },
)?;
self.0.meta.is_dirty = true;

let PageKind::Leaf {
ref mut offset,
Expand Down Expand Up @@ -2310,6 +2312,7 @@ impl<'a> LeafPageWrite<'a> {
old_next,
},
)?;
self.0.meta.is_dirty = true;

let PageKind::Leaf { ref mut next, .. } = self.0.meta.kind else {
unreachable!();
Expand Down Expand Up @@ -2351,6 +2354,7 @@ impl<'a> LeafPageWrite<'a> {
old_overflow,
},
)?;
self.0.meta.is_dirty = true;
cell[0..8].copy_from_slice(&overflow_pgid.to_be_bytes());

Ok(())
Expand Down Expand Up @@ -2395,6 +2399,7 @@ impl<'a> LeafPageWrite<'a> {
value_size: cell.val_size(),
},
)?;
self.0.meta.is_dirty = true;

self.insert_cell_meta(
i,
Expand Down Expand Up @@ -2459,6 +2464,7 @@ impl<'a> LeafPageWrite<'a> {
value_size,
},
)?;
self.0.meta.is_dirty = true;

self.insert_cell_meta(i, overflow, key_size, value_size, raw_size);
Ok(true)
Expand Down Expand Up @@ -2609,6 +2615,7 @@ impl<'a> LeafPageWrite<'a> {
WalRecord::LeafUndoDelete { pgid, index: i },
)?;
}
self.0.meta.is_dirty = true;

let original_count = count;
let PageKind::Leaf {
Expand Down Expand Up @@ -2638,7 +2645,7 @@ fn get_leaf_cell(payload: &[u8], index: usize) -> LeafCell<'_> {
let offset = u16::from_be_bytes(cell[LEAF_CELL_OFFSET_RANGE].try_into().unwrap()) as usize;
assert!(
offset >= PAGE_HEADER_SIZE,
"cannot get leaf cell, offset={offset} page_header_size={PAGE_HEADER_SIZE}"
"cannot get leaf cell {index}, offset={offset} page_header_size={PAGE_HEADER_SIZE} cell={cell:x?} payload={payload:x?}",
);
let offset = offset - PAGE_HEADER_SIZE;
let size = u16::from_be_bytes(cell[LEAF_CELL_SIZE_RANGE].try_into().unwrap()) as usize;
Expand Down Expand Up @@ -2754,6 +2761,7 @@ impl<'a> OverflowPageWrite<'a> {
old_next,
},
)?;
self.0.meta.is_dirty = true;
let PageKind::Overflow { ref mut next, .. } = self.0.meta.kind else {
unreachable!();
};
Expand Down Expand Up @@ -2795,6 +2803,7 @@ impl<'a> OverflowPageWrite<'a> {
WalRecord::OverflowSetContent { pgid, raw, next },
WalRecord::OverflowSetContent { pgid, raw, next },
)?;
self.0.meta.is_dirty = true;

let PageKind::Overflow {
size: ref mut p_size,
Expand Down Expand Up @@ -2825,6 +2834,7 @@ impl<'a> OverflowPageWrite<'a> {
WalRecord::OverflowUndoSetContent { pgid },
WalRecord::OverflowUndoSetContent { pgid },
)?;
self.0.meta.is_dirty = true;

let PageKind::Overflow { ref mut size, .. } = self.0.meta.kind else {
unreachable!();
Expand All @@ -2848,23 +2858,24 @@ impl<'a> OverflowPageWrite<'a> {
},
WalRecord::OverflowUndoReset { pgid },
)?;
self.0.meta.is_dirty = true;

self.0.meta.kind = PageKind::None;
Ok(self.0)
}
}

fn read_page(f: &mut File, id: PageId, page_size: usize, buff: &mut [u8]) -> anyhow::Result<()> {
fn read_page(f: &mut File, id: PageId, page_size: usize, buff: &mut [u8]) -> anyhow::Result<bool> {
// TODO: try to seek and read using a single syscall
let page_size = page_size as u64;
let file_size = f.metadata()?.len();
let min_size = id.get() * page_size + page_size;
if min_size > file_size {
f.set_len(min_size)?;
return Ok(false);
}
f.seek(SeekFrom::Start(id.get() * page_size))?;
f.read_exact(buff)?;
Ok(())
Ok(true)
}

fn write_page(f: &mut File, id: PageId, page_size: usize, buff: &[u8]) -> anyhow::Result<()> {
Expand Down
12 changes: 6 additions & 6 deletions src/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ fn redo_page(pager: &Pager, lsn: Lsn, entry: &WalEntry, pgid: PageId) -> anyhow:
let page = pager.write(entry.txid, pgid)?;
if page.page_lsn() >= lsn {
log::debug!(
"redo skipped becauase page_lsn={:?} >= {lsn:?}",
"redo skipped because page_lsn={:?} >= {lsn:?}",
page.page_lsn()
);
return Ok(());
Expand Down Expand Up @@ -762,14 +762,14 @@ pub(crate) fn undo_txn(
WalRecord::LeafInit { pgid } => {
let page = pager.write(txid, pgid)?;
let Some(page) = page.into_leaf() else {
return Err(anyhow!("expected a leaf page for undo"));
return Err(anyhow!("expected a leaf page for undo {pgid:?}"));
};
page.reset(ctx)?;
}
WalRecord::LeafInsert { pgid, index, .. } => {
let page = pager.write(txid, pgid)?;
let Some(mut page) = page.into_leaf() else {
return Err(anyhow!("expected a leaf page for undo"));
return Err(anyhow!("expected a leaf page for undo {pgid:?}"));
};
page.delete(ctx, index)?;
}
Expand All @@ -783,7 +783,7 @@ pub(crate) fn undo_txn(
} => {
let page = pager.write(txid, pgid)?;
let Some(mut page) = page.into_leaf() else {
return Err(anyhow!("expected a leaf page for undo"));
return Err(anyhow!("expected a leaf page for undo {pgid:?}"));
};
let ok = page.insert_content(
ctx,
Expand All @@ -806,14 +806,14 @@ pub(crate) fn undo_txn(
} => {
let page = pager.write(txid, pgid)?;
let Some(mut page) = page.into_leaf() else {
return Err(anyhow!("expected a leaf page for undo"));
return Err(anyhow!("expected a leaf page for undo {pgid:?}"));
};
page.set_cell_overflow(ctx, index, old_overflow)?;
}
WalRecord::LeafSetNext { pgid, old_next, .. } => {
let page = pager.write(txid, pgid)?;
let Some(mut page) = page.into_leaf() else {
return Err(anyhow!("expected a leaf page for undo"));
return Err(anyhow!("expected a leaf page for undo {pgid:?}"));
};
page.set_next(ctx, old_next)?;
}
Expand Down

0 comments on commit 98d2ee6

Please sign in to comment.