diff --git a/src/lib.rs b/src/lib.rs index 14bc83d..1f87e23 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,22 @@ +// TODO: implement deletion +// TODO: implement WAL rolling +// TODO: use pointer swizzling for better buffer pool +// TODO: add fast path for adding entry to a btree +// TODO: batch commit logs together +// TODO: remove txid in some wal entries +// TODO: use binary search to search in nodes +// TODO: just remove the btree lock altogether. There is no point using it if we only have one +// write txn at a time. +// TODO: use better eviction policy +// TODO: check WAL entry's integrity when iterating +// TODO: add fast path for the pager +// TODO: reduce syscall for writing files. We can use vectorized syscall +// TODO: improve checkpoint: don't exclusively lock a page if it's not dirty and need to be +// flushed. +// TODO: merge operations on the files to reduce the WAL size +// TODO: remove byte copy during rearrangement +// TODO: remove allocation during btree split +// TODO: handle checkpoint error in the background thread // TODO: don't use anyhow for the error handling // TODO: setup CI diff --git a/src/pager.rs b/src/pager.rs index e997132..2d16d34 100644 --- a/src/pager.rs +++ b/src/pager.rs @@ -7,18 +7,9 @@ use std::collections::{HashMap, HashSet}; use std::fs::File; use std::io::{Read, Seek, SeekFrom, Write}; use std::num::NonZeroU64; +use std::ops::Range; use std::os::unix::fs::MetadataExt; -pub(crate) const MINIMUM_PAGE_SIZE: usize = 256; - -// TODO: idea for double buffering -// Manage a separate buffer of N pages (maybe we can set N to be 10 and 20). -// This buffer lives on memory. Before flushing a page, put it to this buffer first. -// When full, or timeout, or forced, flush the buffer to a double-write buffer file somewhere -// If success, then flush those N pages to the main db file. -// When loading pages from master db file, first load it from master file first. It the checksum mismatch -// load it from double-write file. - // TODO: use better eviction policy. De-priorize pages with `page_lsn` < `flushed_lsn`. // Also use better algorithm such as tiny-lfu or secondchance. @@ -107,14 +98,119 @@ struct PagerFlushInternal { flushing_pgids: IndexSet, } +const MINIMUM_PAGE_SIZE: usize = 256; + const PAGE_HEADER_SIZE: usize = 32; +const PAGE_HEADER_VERSION_RANGE: Range = 0..2; +const PAGE_HEADER_KIND_INDEX: usize = 2; +const PAGE_HEADER_REC_LSN_RANGE: Range = 8..16; +const PAGE_HEADER_PAGE_LSN_RANGE: Range = 16..24; +const PAGE_HEADER_PAGE_ID_RANGE: Range = 24..32; + const PAGE_FOOTER_SIZE: usize = 8; +const PAGE_FOOTER_CHECKSUM_RANGE: Range = 0..8; + const INTERIOR_PAGE_HEADER_SIZE: usize = 16; -const INTERIOR_PAGE_CELL_SIZE: usize = 24; +const INTERIOR_HEADER_LAST_RANGE: Range = 0..8; +const INTERIOR_HEADER_COUNT_RANGE: Range = 8..10; +const INTERIOR_HEADER_OFFSET_RANGE: Range = 10..12; + +const INTERIOR_CELL_SIZE: usize = 24; +const INTERIOR_CELL_PTR_RANGE: Range = 0..8; +const INTERIOR_CELL_OVERFLOW_RANGE: Range = 8..16; +const INTERIOR_CELL_KEY_SIZE_RANGE: Range = 16..20; +const INTERIOR_CELL_OFFSET_RANGE: Range = 20..22; +const INTERIOR_CELL_SIZE_RANGE: Range = 22..24; + const LEAF_PAGE_HEADER_SIZE: usize = 16; -const LEAF_PAGE_CELL_SIZE: usize = 24; +const LEAF_HEADER_NEXT_RANGE: Range = 0..8; +const LEAF_HEADER_COUNT_RANGE: Range = 8..10; +const LEAF_HEADER_OFFSET_RANGE: Range = 10..12; + +const LEAF_CELL_SIZE: usize = 24; +const LEAF_CELL_OVERFLOW_RANGE: Range = 0..8; +const LEAF_CELL_KEY_SIZE_RANGE: Range = 8..12; +const LEAF_CELL_VAL_SIZE_RANGE: Range = 12..16; +const LEAF_CELL_OFFSET_RANGE: Range = 16..18; +const LEAF_CELL_SIZE_RANGE: Range = 18..20; + const OVERFLOW_PAGE_HEADER_SIZE: usize = 16; +const OVERFLOW_HEADER_NEXT_RANGE: Range = 0..8; +const OVERFLOW_HEADER_SIZE_RANGE: Range = 8..10; + const FREELIST_PAGE_HEADER_SIZE: usize = 16; +const FREELIST_HEADER_NEXT_RANGE: Range = 0..8; +const FREELIST_HEADER_COUNT_RANGE: Range = 8..10; + +macro_rules! const_assert { + ($($tt:tt)*) => { + const _: () = assert!($($tt)*); + } +} + +const_assert!(PAGE_HEADER_VERSION_RANGE.end <= PAGE_HEADER_SIZE); +const_assert!(range_size(PAGE_HEADER_VERSION_RANGE) == 2); +const_assert!(PAGE_HEADER_KIND_INDEX < PAGE_HEADER_SIZE); +const_assert!(PAGE_HEADER_REC_LSN_RANGE.end <= PAGE_HEADER_SIZE); +const_assert!(range_size(PAGE_HEADER_REC_LSN_RANGE) == 8); +const_assert!(PAGE_HEADER_PAGE_LSN_RANGE.end <= PAGE_HEADER_SIZE); +const_assert!(range_size(PAGE_HEADER_PAGE_LSN_RANGE) == 8); +const_assert!(PAGE_HEADER_PAGE_ID_RANGE.end <= PAGE_HEADER_SIZE); +const_assert!(range_size(PAGE_HEADER_PAGE_ID_RANGE) == 8); + +const_assert!(PAGE_FOOTER_CHECKSUM_RANGE.end <= PAGE_FOOTER_SIZE); +const_assert!(range_size(PAGE_FOOTER_CHECKSUM_RANGE) == 8); + +const_assert!(INTERIOR_HEADER_LAST_RANGE.end <= INTERIOR_PAGE_HEADER_SIZE); +const_assert!(range_size(INTERIOR_HEADER_LAST_RANGE) == 8); +const_assert!(INTERIOR_HEADER_COUNT_RANGE.end <= INTERIOR_PAGE_HEADER_SIZE); +const_assert!(range_size(INTERIOR_HEADER_COUNT_RANGE) == 2); +const_assert!(INTERIOR_HEADER_OFFSET_RANGE.end <= INTERIOR_PAGE_HEADER_SIZE); +const_assert!(range_size(INTERIOR_HEADER_OFFSET_RANGE) == 2); + +const_assert!(INTERIOR_CELL_OVERFLOW_RANGE.end <= INTERIOR_CELL_SIZE); +const_assert!(range_size(INTERIOR_CELL_OVERFLOW_RANGE) == 8); +const_assert!(INTERIOR_CELL_PTR_RANGE.end <= INTERIOR_CELL_SIZE); +const_assert!(range_size(INTERIOR_CELL_PTR_RANGE) == 8); +const_assert!(INTERIOR_CELL_KEY_SIZE_RANGE.end <= INTERIOR_CELL_SIZE); +const_assert!(range_size(INTERIOR_CELL_KEY_SIZE_RANGE) == 4); +const_assert!(INTERIOR_CELL_OFFSET_RANGE.end <= INTERIOR_CELL_SIZE); +const_assert!(range_size(INTERIOR_CELL_OFFSET_RANGE) == 2); +const_assert!(INTERIOR_CELL_SIZE_RANGE.end <= INTERIOR_CELL_SIZE); +const_assert!(range_size(INTERIOR_CELL_SIZE_RANGE) == 2); + +const_assert!(LEAF_CELL_OVERFLOW_RANGE.end <= LEAF_PAGE_HEADER_SIZE); +const_assert!(range_size(LEAF_CELL_OVERFLOW_RANGE) == 8); +const_assert!(LEAF_CELL_KEY_SIZE_RANGE.end <= LEAF_PAGE_HEADER_SIZE); +const_assert!(range_size(LEAF_CELL_KEY_SIZE_RANGE) == 4); +const_assert!(LEAF_CELL_VAL_SIZE_RANGE.end <= LEAF_PAGE_HEADER_SIZE); +const_assert!(range_size(LEAF_CELL_VAL_SIZE_RANGE) == 4); +const_assert!(LEAF_HEADER_NEXT_RANGE.end <= LEAF_PAGE_HEADER_SIZE); +const_assert!(range_size(LEAF_HEADER_NEXT_RANGE) == 8); +const_assert!(LEAF_HEADER_COUNT_RANGE.end <= LEAF_PAGE_HEADER_SIZE); +const_assert!(range_size(LEAF_HEADER_COUNT_RANGE) == 2); +const_assert!(LEAF_HEADER_OFFSET_RANGE.end <= LEAF_PAGE_HEADER_SIZE); +const_assert!(range_size(LEAF_HEADER_OFFSET_RANGE) == 2); + +const_assert!(LEAF_CELL_OFFSET_RANGE.end <= LEAF_CELL_SIZE); +const_assert!(range_size(LEAF_CELL_OFFSET_RANGE) == 2); +const_assert!(LEAF_CELL_SIZE_RANGE.end <= LEAF_CELL_SIZE); +const_assert!(range_size(LEAF_CELL_SIZE_RANGE) == 2); + +const_assert!(OVERFLOW_HEADER_NEXT_RANGE.end <= OVERFLOW_PAGE_HEADER_SIZE); +const_assert!(range_size(OVERFLOW_HEADER_NEXT_RANGE) == 8); +const_assert!(OVERFLOW_HEADER_SIZE_RANGE.end <= OVERFLOW_PAGE_HEADER_SIZE); +const_assert!(range_size(OVERFLOW_HEADER_SIZE_RANGE) == 2); + +const_assert!(FREELIST_HEADER_NEXT_RANGE.end <= FREELIST_PAGE_HEADER_SIZE); +const_assert!(range_size(FREELIST_HEADER_NEXT_RANGE) == 8); +const_assert!(FREELIST_HEADER_COUNT_RANGE.end <= FREELIST_PAGE_HEADER_SIZE); +const_assert!(range_size(FREELIST_HEADER_COUNT_RANGE) == 2); + +#[allow(dead_code)] +const fn range_size(range: Range) -> usize { + range.end - range.start +} unsafe impl Send for PagerInternal {} unsafe impl Sync for PagerInternal {} @@ -693,16 +789,18 @@ impl Pager { meta: &mut PageMeta, buff: &mut [u8], ) -> anyhow::Result { + assert!(page_size.count_ones() == 1 && page_size >= MINIMUM_PAGE_SIZE); + let header = &buff[..PAGE_HEADER_SIZE]; + let footer = &buff[page_size - PAGE_FOOTER_SIZE..]; let payload = &buff[PAGE_HEADER_SIZE..page_size - PAGE_FOOTER_SIZE]; - let buff_checksum = &buff[page_size - PAGE_FOOTER_SIZE..]; - let buff_version = &header[..2]; - let buff_kind = &header[2]; - let _ = &header[3..8]; - let buff_rec_lsn = &header[8..16]; - let buff_page_lsn = &header[16..24]; - let buff_page_id = &header[24..32]; + let buff_checksum = &footer[PAGE_FOOTER_CHECKSUM_RANGE]; + let buff_version = &header[PAGE_HEADER_VERSION_RANGE]; + let buff_kind = &header[PAGE_HEADER_KIND_INDEX]; + let buff_rec_lsn = &header[PAGE_HEADER_REC_LSN_RANGE]; + let buff_page_lsn = &header[PAGE_HEADER_PAGE_LSN_RANGE]; + let buff_page_id = &header[PAGE_HEADER_PAGE_ID_RANGE]; let buff_checksum_content = &buff[..page_size - PAGE_FOOTER_SIZE]; let checksum = crc64::crc64(0, buff_checksum_content); @@ -758,10 +856,9 @@ impl Pager { fn decode_interior_page(payload: &[u8]) -> anyhow::Result { let header = &payload[..INTERIOR_PAGE_HEADER_SIZE]; - let buff_last = &header[..8]; - let buff_count = &header[8..10]; - let buff_offset = &header[10..12]; - _ = &header[12..16]; + let buff_last = &header[INTERIOR_HEADER_LAST_RANGE]; + let buff_count = &header[INTERIOR_HEADER_COUNT_RANGE]; + let buff_offset = &header[INTERIOR_HEADER_OFFSET_RANGE]; let Some(last) = PageId::from_be_bytes(buff_last.try_into().unwrap()) else { return Err(anyhow!("got zero last ptr on interior page")); @@ -771,7 +868,7 @@ impl Pager { let mut remaining = payload.len() - INTERIOR_PAGE_HEADER_SIZE; for i in 0..count { - let cell_offset = INTERIOR_PAGE_HEADER_SIZE + INTERIOR_PAGE_CELL_SIZE * i as usize; + let cell_offset = INTERIOR_PAGE_HEADER_SIZE + INTERIOR_CELL_SIZE * i as usize; let buf = PageId::from_be_bytes(payload[cell_offset..cell_offset + 8].try_into().unwrap()); if buf.is_none() { @@ -779,7 +876,7 @@ impl Pager { } let cell = get_interior_cell(payload, i as usize); - remaining -= INTERIOR_PAGE_CELL_SIZE + cell.raw().len(); + remaining -= INTERIOR_CELL_SIZE + cell.raw().len(); } Ok(PageKind::Interior { @@ -795,10 +892,9 @@ impl Pager { let payload = &buff[PAGE_HEADER_SIZE..page_size - PAGE_FOOTER_SIZE]; let header = &payload[..LEAF_PAGE_HEADER_SIZE]; - let buff_next = &header[..8]; - let buff_count = &header[8..10]; - let buff_offset = &header[10..12]; - _ = &header[12..16]; + let buff_next = &header[LEAF_HEADER_NEXT_RANGE]; + let buff_count = &header[LEAF_HEADER_COUNT_RANGE]; + let buff_offset = &header[LEAF_HEADER_OFFSET_RANGE]; let next = PageId::from_be_bytes(buff_next.try_into().unwrap()); let count = u16::from_be_bytes(buff_count.try_into().unwrap()); @@ -807,7 +903,7 @@ impl Pager { let mut remaining = payload.len() - LEAF_PAGE_HEADER_SIZE; for i in 0..count { let cell = get_leaf_cell(buff, i as usize); - remaining -= LEAF_PAGE_CELL_SIZE + cell.raw().len(); + remaining -= LEAF_CELL_SIZE + cell.raw().len(); } Ok(PageKind::Leaf { @@ -823,9 +919,8 @@ impl Pager { let payload = &buff[PAGE_HEADER_SIZE..page_size - PAGE_FOOTER_SIZE]; let header = &payload[..OVERFLOW_PAGE_HEADER_SIZE]; - let buff_next = &header[..8]; - let buff_size = &header[8..10]; - _ = &header[10..16]; + let buff_next = &header[OVERFLOW_HEADER_NEXT_RANGE]; + let buff_size = &header[OVERFLOW_HEADER_SIZE_RANGE]; let next = PageId::from_be_bytes(buff_next.try_into().unwrap()); let size = u16::from_be_bytes(buff_size.try_into().unwrap()); @@ -841,9 +936,8 @@ impl Pager { let payload = &buff[PAGE_HEADER_SIZE..page_size - PAGE_FOOTER_SIZE]; let header = &payload[..FREELIST_PAGE_HEADER_SIZE]; - let buff_next = &header[..8]; - let buff_count = &header[8..10]; - _ = &header[10..16]; + let buff_next = &header[FREELIST_HEADER_NEXT_RANGE]; + let buff_count = &header[FREELIST_HEADER_COUNT_RANGE]; let next = PageId::from_be_bytes(buff_next.try_into().unwrap()); let count = u16::from_be_bytes(buff_count.try_into().unwrap()); @@ -858,7 +952,7 @@ impl Pager { let page_size = buff.len(); let header = &mut buff[..PAGE_HEADER_SIZE]; - header[..2].copy_from_slice(&0u16.to_be_bytes()); + header[PAGE_HEADER_VERSION_RANGE].fill(0); let kind = match meta.kind { PageKind::None => 0, @@ -867,8 +961,7 @@ impl Pager { PageKind::Overflow { .. } => 3, PageKind::Freelist { .. } => 4, }; - header[2] = kind; - header[3..8].copy_from_slice(&[0, 0, 0, 0, 0]); + header[PAGE_HEADER_KIND_INDEX] = kind; let (rec_lsn, page_lsn) = if let Some(ref wal_info) = meta.wal { (Some(wal_info.rec), Some(wal_info.page)) @@ -876,9 +969,9 @@ impl Pager { (None, None) }; - header[8..16].copy_from_slice(&rec_lsn.to_be_bytes()); - header[16..24].copy_from_slice(&page_lsn.to_be_bytes()); - header[24..32].copy_from_slice(&meta.id.to_be_bytes()); + header[PAGE_HEADER_REC_LSN_RANGE].copy_from_slice(&rec_lsn.to_be_bytes()); + header[PAGE_HEADER_PAGE_LSN_RANGE].copy_from_slice(&page_lsn.to_be_bytes()); + header[PAGE_HEADER_PAGE_ID_RANGE].copy_from_slice(&meta.id.to_be_bytes()); let payload_buff = &mut buff[PAGE_HEADER_SIZE..page_size - PAGE_FOOTER_SIZE]; match &meta.kind { @@ -890,9 +983,10 @@ impl Pager { remaining: _, } => { let header = &mut payload_buff[..INTERIOR_PAGE_HEADER_SIZE]; - header[..8].copy_from_slice(&last.0.get().to_be_bytes()); - header[8..10].copy_from_slice(&(*count as u16).to_be_bytes()); - header[10..12].copy_from_slice(&(*offset as u16).to_be_bytes()); + header[INTERIOR_HEADER_LAST_RANGE].copy_from_slice(&last.0.get().to_be_bytes()); + header[INTERIOR_HEADER_COUNT_RANGE].copy_from_slice(&(*count as u16).to_be_bytes()); + header[INTERIOR_HEADER_OFFSET_RANGE] + .copy_from_slice(&(*offset as u16).to_be_bytes()); } PageKind::Leaf { count, @@ -902,26 +996,27 @@ impl Pager { } => { let header = &mut payload_buff[..LEAF_PAGE_HEADER_SIZE]; let next = next.map(|p| p.0.get()).unwrap_or(0); - header[..8].copy_from_slice(&next.to_be_bytes()); - header[8..10].copy_from_slice(&(*count as u16).to_be_bytes()); - header[10..12].copy_from_slice(&(*offset as u16).to_be_bytes()); + header[LEAF_HEADER_NEXT_RANGE].copy_from_slice(&next.to_be_bytes()); + header[LEAF_HEADER_COUNT_RANGE].copy_from_slice(&(*count as u16).to_be_bytes()); + header[LEAF_HEADER_OFFSET_RANGE].copy_from_slice(&(*offset as u16).to_be_bytes()); } PageKind::Overflow { next, size } => { let header = &mut payload_buff[..OVERFLOW_PAGE_HEADER_SIZE]; let next = next.map(|p| p.0.get()).unwrap_or(0); - header[..8].copy_from_slice(&next.to_be_bytes()); - header[8..10].copy_from_slice(&(*size as u16).to_be_bytes()); + header[OVERFLOW_HEADER_NEXT_RANGE].copy_from_slice(&next.to_be_bytes()); + header[OVERFLOW_HEADER_SIZE_RANGE].copy_from_slice(&(*size as u16).to_be_bytes()); } PageKind::Freelist { next, count } => { let header = &mut payload_buff[..FREELIST_PAGE_HEADER_SIZE]; let next = next.map(|p| p.0.get()).unwrap_or(0); - header[..8].copy_from_slice(&next.to_be_bytes()); - header[8..10].copy_from_slice(&(*count as u16).to_be_bytes()); + header[FREELIST_HEADER_NEXT_RANGE].copy_from_slice(&next.to_be_bytes()); + header[FREELIST_HEADER_COUNT_RANGE].copy_from_slice(&(*count as u16).to_be_bytes()); } } let checksum = crc64::crc64(0, &buff[..page_size - PAGE_FOOTER_SIZE]); - buff[page_size - 8..].copy_from_slice(&checksum.to_be_bytes()); + let footer = &mut buff[page_size - PAGE_FOOTER_SIZE..]; + footer[PAGE_FOOTER_CHECKSUM_RANGE].copy_from_slice(&checksum.to_be_bytes()); Ok(()) } @@ -1344,10 +1439,10 @@ impl<'a> InteriorPageRead<'a> { } fn get_interior_cell(buff: &[u8], index: usize) -> InteriorCell<'_> { - let cell_offset = INTERIOR_PAGE_HEADER_SIZE + INTERIOR_PAGE_CELL_SIZE * index; - let cell = &buff[cell_offset..cell_offset + INTERIOR_PAGE_CELL_SIZE]; - let offset = u16::from_be_bytes(cell[20..22].try_into().unwrap()) as usize; - let size = u16::from_be_bytes(cell[22..24].try_into().unwrap()) as usize; + let cell_offset = INTERIOR_PAGE_HEADER_SIZE + INTERIOR_CELL_SIZE * index; + let cell = &buff[cell_offset..cell_offset + INTERIOR_CELL_SIZE]; + let offset = u16::from_be_bytes(cell[INTERIOR_CELL_OFFSET_RANGE].try_into().unwrap()) as usize; + let size = u16::from_be_bytes(cell[INTERIOR_CELL_SIZE_RANGE].try_into().unwrap()) as usize; let offset = offset - PAGE_HEADER_SIZE; let raw = &buff[offset..offset + size]; InteriorCell { cell, raw } @@ -1355,12 +1450,12 @@ fn get_interior_cell(buff: &[u8], index: usize) -> InteriorCell<'_> { fn interior_might_split(page_size: usize, remaining: usize) -> bool { let payload_size = page_size - PAGE_HEADER_SIZE - INTERIOR_PAGE_HEADER_SIZE - PAGE_FOOTER_SIZE; - let max_before_overflow = payload_size / 4 - INTERIOR_PAGE_CELL_SIZE; + let max_before_overflow = payload_size / 4 - INTERIOR_CELL_SIZE; let min_content_not_overflow = max_before_overflow / 2; - let remaining = if remaining < INTERIOR_PAGE_CELL_SIZE { + let remaining = if remaining < INTERIOR_CELL_SIZE { 0 } else { - remaining - INTERIOR_PAGE_CELL_SIZE + remaining - INTERIOR_CELL_SIZE }; remaining < min_content_not_overflow } @@ -1382,17 +1477,17 @@ impl<'a> BTreeCell<'a> for InteriorCell<'a> { } fn key_size(&self) -> usize { - u32::from_be_bytes(self.cell[16..20].try_into().unwrap()) as usize + u32::from_be_bytes(self.cell[INTERIOR_CELL_KEY_SIZE_RANGE].try_into().unwrap()) as usize } fn overflow(&self) -> Option { - PageId::from_be_bytes(self.cell[8..16].try_into().unwrap()) + PageId::from_be_bytes(self.cell[INTERIOR_CELL_OVERFLOW_RANGE].try_into().unwrap()) } } impl InteriorCell<'_> { pub(crate) fn ptr(&self) -> PageId { - PageId::from_be_bytes(self.cell[0..8].try_into().unwrap()).unwrap() + PageId::from_be_bytes(self.cell[INTERIOR_CELL_PTR_RANGE].try_into().unwrap()).unwrap() } } @@ -1500,9 +1595,8 @@ impl<'a> InteriorPageWrite<'a> { // TODO: refactor this. There are a multiple places where this logic is written. // check the `get_interior_cell` implementation. let pgid = self.id(); - let cell_offset = - PAGE_HEADER_SIZE + INTERIOR_PAGE_HEADER_SIZE + INTERIOR_PAGE_CELL_SIZE * index; - let cell = &mut self.0.buffer[cell_offset..cell_offset + INTERIOR_PAGE_CELL_SIZE]; + let cell_offset = PAGE_HEADER_SIZE + INTERIOR_PAGE_HEADER_SIZE + INTERIOR_CELL_SIZE * index; + let cell = &mut self.0.buffer[cell_offset..cell_offset + INTERIOR_CELL_SIZE]; let old_ptr = PageId::from_be_bytes(cell[0..8].try_into().unwrap()).unwrap(); if old_ptr == ptr { return Ok(()); @@ -1539,10 +1633,10 @@ impl<'a> InteriorPageWrite<'a> { // TODO: refactor this. There are a multiple places where this logic is written. // check the `get_interior_cell` implementation. let pgid = self.id(); - let cell_offset = - PAGE_HEADER_SIZE + INTERIOR_PAGE_HEADER_SIZE + INTERIOR_PAGE_CELL_SIZE * index; - let cell = &mut self.0.buffer[cell_offset..cell_offset + INTERIOR_PAGE_CELL_SIZE]; - let old_overflow = PageId::from_be_bytes(cell[8..16].try_into().unwrap()); + let cell_offset = PAGE_HEADER_SIZE + INTERIOR_PAGE_HEADER_SIZE + INTERIOR_CELL_SIZE * index; + let cell = &mut self.0.buffer[cell_offset..cell_offset + INTERIOR_CELL_SIZE]; + let old_overflow = + PageId::from_be_bytes(cell[INTERIOR_CELL_OVERFLOW_RANGE].try_into().unwrap()); if old_overflow == overflow_pgid { return Ok(()); } @@ -1580,7 +1674,7 @@ impl<'a> InteriorPageWrite<'a> { }; let raw = cell.raw(); assert!( - raw.len() + INTERIOR_PAGE_CELL_SIZE <= remaining, + raw.len() + INTERIOR_CELL_SIZE <= remaining, "insert cell only called in the context of moving a splitted page to a new page, so it should always fit", ); @@ -1634,15 +1728,15 @@ impl<'a> InteriorPageWrite<'a> { let total_size = content.remaining(); let payload_size = self.0.buffer.len() - PAGE_HEADER_SIZE - INTERIOR_PAGE_HEADER_SIZE - PAGE_FOOTER_SIZE; - let max_before_overflow = payload_size / 4 - INTERIOR_PAGE_CELL_SIZE; + let max_before_overflow = payload_size / 4 - INTERIOR_CELL_SIZE; let min_content_not_overflow = max_before_overflow / 2; let PageKind::Interior { remaining, .. } = self.0.meta.kind else { unreachable!(); }; - if remaining < INTERIOR_PAGE_CELL_SIZE { + if remaining < INTERIOR_CELL_SIZE { return Ok(false); } - let remaining = remaining - INTERIOR_PAGE_CELL_SIZE; + let remaining = remaining - INTERIOR_CELL_SIZE; if remaining < min_content_not_overflow && remaining < total_size { return Ok(false); } @@ -1693,9 +1787,9 @@ impl<'a> InteriorPageWrite<'a> { let PageKind::Interior { offset, count, .. } = self.0.meta.kind else { unreachable!(); }; - let added = INTERIOR_PAGE_CELL_SIZE + raw_size; + let added = INTERIOR_CELL_SIZE + raw_size; let current_cell_size = - PAGE_HEADER_SIZE + INTERIOR_PAGE_HEADER_SIZE + INTERIOR_PAGE_CELL_SIZE * count; + PAGE_HEADER_SIZE + INTERIOR_PAGE_HEADER_SIZE + INTERIOR_CELL_SIZE * count; if current_cell_size + added > offset { self.rearrange(); } @@ -1712,26 +1806,25 @@ impl<'a> InteriorPageWrite<'a> { let shifted = *count - index; for i in 0..shifted { - let x = PAGE_HEADER_SIZE - + INTERIOR_PAGE_HEADER_SIZE - + INTERIOR_PAGE_CELL_SIZE * (*count - i); + let x = + PAGE_HEADER_SIZE + INTERIOR_PAGE_HEADER_SIZE + INTERIOR_CELL_SIZE * (*count - i); let (a, b) = self.0.buffer.split_at_mut(x); - b[..INTERIOR_PAGE_CELL_SIZE].copy_from_slice(&a[a.len() - INTERIOR_PAGE_CELL_SIZE..]); + b[..INTERIOR_CELL_SIZE].copy_from_slice(&a[a.len() - INTERIOR_CELL_SIZE..]); } - let cell_offset = - PAGE_HEADER_SIZE + INTERIOR_PAGE_HEADER_SIZE + INTERIOR_PAGE_CELL_SIZE * index; - let cell = &mut self.0.buffer[cell_offset..cell_offset + INTERIOR_PAGE_CELL_SIZE]; + let cell_offset = PAGE_HEADER_SIZE + INTERIOR_PAGE_HEADER_SIZE + INTERIOR_CELL_SIZE * index; + let cell = &mut self.0.buffer[cell_offset..cell_offset + INTERIOR_CELL_SIZE]; *offset -= raw_size; *remaining -= added; *count += 1; - cell[0..8].copy_from_slice(&ptr.0.get().to_be_bytes()); - cell[8..16].copy_from_slice(&overflow.map(|p| p.0.get()).unwrap_or(0).to_be_bytes()); - cell[16..20].copy_from_slice(&(key_size as u32).to_be_bytes()); - cell[20..22].copy_from_slice(&(*offset as u16).to_be_bytes()); - cell[22..24].copy_from_slice(&(raw_size as u16).to_be_bytes()); + cell[INTERIOR_CELL_PTR_RANGE].copy_from_slice(&ptr.0.get().to_be_bytes()); + cell[INTERIOR_CELL_OVERFLOW_RANGE] + .copy_from_slice(&overflow.map(|p| p.0.get()).unwrap_or(0).to_be_bytes()); + cell[INTERIOR_CELL_KEY_SIZE_RANGE].copy_from_slice(&(key_size as u32).to_be_bytes()); + cell[INTERIOR_CELL_OFFSET_RANGE].copy_from_slice(&(*offset as u16).to_be_bytes()); + cell[INTERIOR_CELL_SIZE_RANGE].copy_from_slice(&(raw_size as u16).to_be_bytes()); *offset } @@ -1751,10 +1844,9 @@ impl<'a> InteriorPageWrite<'a> { self.0.buffer[new_offset..new_offset + copied_content.len()] .copy_from_slice(copied_content); - let cell_offset = - PAGE_HEADER_SIZE + INTERIOR_PAGE_HEADER_SIZE + INTERIOR_PAGE_CELL_SIZE * i; - let cell = &mut self.0.buffer[cell_offset..cell_offset + INTERIOR_PAGE_CELL_SIZE]; - cell[20..22].copy_from_slice(&(new_offset as u16).to_be_bytes()); + let cell_offset = PAGE_HEADER_SIZE + INTERIOR_PAGE_HEADER_SIZE + INTERIOR_CELL_SIZE * i; + let cell = &mut self.0.buffer[cell_offset..cell_offset + INTERIOR_CELL_SIZE]; + cell[INTERIOR_CELL_OFFSET_RANGE].copy_from_slice(&(new_offset as u16).to_be_bytes()); } let PageKind::Interior { ref mut offset, .. } = self.0.meta.kind else { @@ -1779,7 +1871,7 @@ impl<'a> InteriorPageWrite<'a> { for i in 0..count { let cell = get_interior_cell(&self.0.buffer[PAGE_HEADER_SIZE..], i); - cummulative_size += cell.raw.len() + INTERIOR_PAGE_CELL_SIZE; + cummulative_size += cell.raw.len() + INTERIOR_CELL_SIZE; if cummulative_size >= half_payload { n_cells_to_keep = i; break; @@ -1835,8 +1927,10 @@ impl<'a> InteriorPageWrite<'a> { &self.0.buffer[PAGE_HEADER_SIZE..self.0.buffer.len() - PAGE_FOOTER_SIZE], index, ); - let content_offset = u16::from_be_bytes(cell.cell[20..22].try_into().unwrap()) as usize; - let content_size = u16::from_be_bytes(cell.cell[22..24].try_into().unwrap()) as usize; + let content_offset = + u16::from_be_bytes(cell.cell[INTERIOR_CELL_OFFSET_RANGE].try_into().unwrap()) as usize; + let content_size = + u16::from_be_bytes(cell.cell[INTERIOR_CELL_SIZE_RANGE].try_into().unwrap()) as usize; let pgid = self.id(); record_mutation( @@ -1866,14 +1960,14 @@ impl<'a> InteriorPageWrite<'a> { if *offset == content_offset { *offset += content_size; } - *remaining += INTERIOR_PAGE_CELL_SIZE + content_size; + *remaining += INTERIOR_CELL_SIZE + content_size; *count -= 1; for i in index..*count { - let x = PAGE_HEADER_SIZE + INTERIOR_PAGE_HEADER_SIZE + INTERIOR_PAGE_CELL_SIZE * i; - let (a, b) = self.0.buffer.split_at_mut(x + INTERIOR_PAGE_CELL_SIZE); + let x = PAGE_HEADER_SIZE + INTERIOR_PAGE_HEADER_SIZE + INTERIOR_CELL_SIZE * i; + let (a, b) = self.0.buffer.split_at_mut(x + INTERIOR_CELL_SIZE); let a_len = a.len(); - a[a_len - INTERIOR_PAGE_CELL_SIZE..].copy_from_slice(&b[..INTERIOR_PAGE_CELL_SIZE]); + a[a_len - INTERIOR_CELL_SIZE..].copy_from_slice(&b[..INTERIOR_CELL_SIZE]); } Ok(()) @@ -2010,8 +2104,10 @@ impl<'a> LeafPageWrite<'a> { pub(crate) fn delete(&mut self, ctx: LogContext<'_>, index: usize) -> anyhow::Result<()> { let pgid = self.0.meta.id; let cell = get_leaf_cell(self.0.buffer, index); - let content_offset = u16::from_be_bytes(cell.cell[16..18].try_into().unwrap()) as usize; - let content_size = u16::from_be_bytes(cell.cell[18..20].try_into().unwrap()) as usize; + let content_offset = + u16::from_be_bytes(cell.cell[LEAF_CELL_OFFSET_RANGE].try_into().unwrap()) as usize; + let content_size = + u16::from_be_bytes(cell.cell[LEAF_CELL_SIZE_RANGE].try_into().unwrap()) as usize; record_mutation( self.0.txid, @@ -2041,14 +2137,14 @@ impl<'a> LeafPageWrite<'a> { if *offset == content_offset { *offset += content_size; } - *remaining += LEAF_PAGE_CELL_SIZE + content_size; + *remaining += LEAF_CELL_SIZE + content_size; *count -= 1; for i in index..*count { - let x = PAGE_HEADER_SIZE + LEAF_PAGE_HEADER_SIZE + LEAF_PAGE_CELL_SIZE * i; - let (a, b) = self.0.buffer.split_at_mut(x + LEAF_PAGE_CELL_SIZE); + let cell_offset = PAGE_HEADER_SIZE + LEAF_PAGE_HEADER_SIZE + LEAF_CELL_SIZE * i; + let (a, b) = self.0.buffer.split_at_mut(cell_offset + LEAF_CELL_SIZE); let a_len = a.len(); - a[a_len - LEAF_PAGE_CELL_SIZE..].copy_from_slice(&b[..LEAF_PAGE_CELL_SIZE]); + a[a_len - LEAF_CELL_SIZE..].copy_from_slice(&b[..LEAF_CELL_SIZE]); } Ok(()) @@ -2098,8 +2194,8 @@ impl<'a> LeafPageWrite<'a> { // TODO: refactor this. There are a multiple places where this logic is written. // check the `get_leaf_cell` implementation. let pgid = self.id(); - let cell_offset = PAGE_HEADER_SIZE + LEAF_PAGE_HEADER_SIZE + LEAF_PAGE_CELL_SIZE * index; - let cell = &mut self.0.buffer[cell_offset..cell_offset + LEAF_PAGE_CELL_SIZE]; + let cell_offset = PAGE_HEADER_SIZE + LEAF_PAGE_HEADER_SIZE + LEAF_CELL_SIZE * index; + let cell = &mut self.0.buffer[cell_offset..cell_offset + LEAF_CELL_SIZE]; let old_overflow = PageId::from_be_bytes(cell[0..8].try_into().unwrap()); if old_overflow == overflow_pgid { return Ok(()); @@ -2138,7 +2234,7 @@ impl<'a> LeafPageWrite<'a> { }; let raw = cell.raw(); assert!( - raw.len() + LEAF_PAGE_CELL_SIZE <= remaining, + raw.len() + LEAF_CELL_SIZE <= remaining, "insert cell only called in the context of moving a splitted page to a new page, so it should always fit", ); @@ -2192,15 +2288,15 @@ impl<'a> LeafPageWrite<'a> { let content_size = content.remaining(); let payload_size = self.0.buffer.len() - PAGE_HEADER_SIZE - LEAF_PAGE_HEADER_SIZE - PAGE_FOOTER_SIZE; - let max_before_overflow = payload_size / 4 - LEAF_PAGE_CELL_SIZE; + let max_before_overflow = payload_size / 4 - LEAF_CELL_SIZE; let min_content_not_overflow = max_before_overflow / 2; let PageKind::Leaf { remaining, .. } = self.0.meta.kind else { unreachable!(); }; - if remaining < LEAF_PAGE_CELL_SIZE { + if remaining < LEAF_CELL_SIZE { return Ok(false); } - let remaining = remaining - LEAF_PAGE_CELL_SIZE; + let remaining = remaining - LEAF_CELL_SIZE; if remaining < min_content_not_overflow && remaining < content_size { return Ok(false); } @@ -2239,12 +2335,11 @@ impl<'a> LeafPageWrite<'a> { } fn reserve_cell(&mut self, raw_size: usize) -> usize { - let added = LEAF_PAGE_CELL_SIZE + raw_size; + let added = LEAF_CELL_SIZE + raw_size; let PageKind::Leaf { offset, count, .. } = self.0.meta.kind else { unreachable!(); }; - let current_cell_size = - PAGE_HEADER_SIZE + LEAF_PAGE_HEADER_SIZE + LEAF_PAGE_CELL_SIZE * count; + let current_cell_size = PAGE_HEADER_SIZE + LEAF_PAGE_HEADER_SIZE + LEAF_CELL_SIZE * count; if current_cell_size + added > offset { self.rearrange(); } @@ -2266,7 +2361,7 @@ impl<'a> LeafPageWrite<'a> { val_size: usize, raw_size: usize, ) { - let added = LEAF_PAGE_CELL_SIZE + raw_size; + let added = LEAF_CELL_SIZE + raw_size; let PageKind::Leaf { ref mut offset, ref mut count, @@ -2276,28 +2371,29 @@ impl<'a> LeafPageWrite<'a> { else { unreachable!(); }; - let current_cell_size = - PAGE_HEADER_SIZE + LEAF_PAGE_HEADER_SIZE + LEAF_PAGE_CELL_SIZE * *count; + let current_cell_size = PAGE_HEADER_SIZE + LEAF_PAGE_HEADER_SIZE + LEAF_CELL_SIZE * *count; assert!(current_cell_size + added <= *offset); let shifted = *count - index; for i in 0..shifted { - let x = PAGE_HEADER_SIZE + LEAF_PAGE_HEADER_SIZE + LEAF_PAGE_CELL_SIZE * (*count - i); - let (a, b) = self.0.buffer.split_at_mut(x); - b[..LEAF_PAGE_CELL_SIZE].copy_from_slice(&a[a.len() - LEAF_PAGE_CELL_SIZE..]); + let cell_offset = + PAGE_HEADER_SIZE + LEAF_PAGE_HEADER_SIZE + LEAF_CELL_SIZE * (*count - i); + let (a, b) = self.0.buffer.split_at_mut(cell_offset); + b[..LEAF_CELL_SIZE].copy_from_slice(&a[a.len() - LEAF_CELL_SIZE..]); } - let cell_offset = PAGE_HEADER_SIZE + LEAF_PAGE_HEADER_SIZE + LEAF_PAGE_CELL_SIZE * index; - let cell = &mut self.0.buffer[cell_offset..cell_offset + LEAF_PAGE_CELL_SIZE]; + let cell_offset = PAGE_HEADER_SIZE + LEAF_PAGE_HEADER_SIZE + LEAF_CELL_SIZE * index; + let cell = &mut self.0.buffer[cell_offset..cell_offset + LEAF_CELL_SIZE]; *offset -= raw_size; *remaining -= added; *count += 1; - cell[0..8].copy_from_slice(&overflow.map(|p| p.0.get()).unwrap_or(0).to_be_bytes()); - cell[8..12].copy_from_slice(&(key_size as u32).to_be_bytes()); - cell[12..16].copy_from_slice(&(val_size as u32).to_be_bytes()); - cell[16..18].copy_from_slice(&(*offset as u16).to_be_bytes()); - cell[18..20].copy_from_slice(&(raw_size as u16).to_be_bytes()); + cell[LEAF_CELL_OVERFLOW_RANGE] + .copy_from_slice(&overflow.map(|p| p.0.get()).unwrap_or(0).to_be_bytes()); + cell[LEAF_CELL_KEY_SIZE_RANGE].copy_from_slice(&(key_size as u32).to_be_bytes()); + cell[LEAF_CELL_VAL_SIZE_RANGE].copy_from_slice(&(val_size as u32).to_be_bytes()); + cell[LEAF_CELL_OFFSET_RANGE].copy_from_slice(&(*offset as u16).to_be_bytes()); + cell[LEAF_CELL_SIZE_RANGE].copy_from_slice(&(raw_size as u16).to_be_bytes()); } fn rearrange(&mut self) { @@ -2312,9 +2408,9 @@ impl<'a> LeafPageWrite<'a> { self.0.buffer[new_offset..new_offset + copied_content.len()] .copy_from_slice(copied_content); - let cell_offset = PAGE_HEADER_SIZE + LEAF_PAGE_HEADER_SIZE + LEAF_PAGE_CELL_SIZE * i; - let cell = &mut self.0.buffer[cell_offset..cell_offset + LEAF_PAGE_CELL_SIZE]; - cell[16..18].copy_from_slice(&(new_offset as u16).to_be_bytes()); + let cell_offset = PAGE_HEADER_SIZE + LEAF_PAGE_HEADER_SIZE + LEAF_CELL_SIZE * i; + let cell = &mut self.0.buffer[cell_offset..cell_offset + LEAF_CELL_SIZE]; + cell[LEAF_CELL_OFFSET_RANGE].copy_from_slice(&(new_offset as u16).to_be_bytes()); } let PageKind::Leaf { ref mut offset, .. } = self.0.meta.kind else { @@ -2337,7 +2433,7 @@ impl<'a> LeafPageWrite<'a> { for i in 0..count { let cell = get_leaf_cell(self.0.buffer, i); - cummulative_size += cell.raw.len() + LEAF_PAGE_CELL_SIZE; + cummulative_size += cell.raw.len() + LEAF_CELL_SIZE; if cummulative_size >= half_payload { n_cells_to_keep = i; break; @@ -2390,10 +2486,10 @@ impl<'a> LeafPageWrite<'a> { } fn get_leaf_cell(buff: &[u8], index: usize) -> LeafCell<'_> { - let cell_offset = PAGE_HEADER_SIZE + LEAF_PAGE_HEADER_SIZE + LEAF_PAGE_CELL_SIZE * index; - let cell = &buff[cell_offset..cell_offset + LEAF_PAGE_CELL_SIZE]; - let offset = u16::from_be_bytes(cell[16..18].try_into().unwrap()) as usize; - let size = u16::from_be_bytes(cell[18..20].try_into().unwrap()) as usize; + let cell_offset = PAGE_HEADER_SIZE + LEAF_PAGE_HEADER_SIZE + LEAF_CELL_SIZE * index; + let cell = &buff[cell_offset..cell_offset + LEAF_CELL_SIZE]; + let offset = u16::from_be_bytes(cell[LEAF_CELL_OFFSET_RANGE].try_into().unwrap()) as usize; + let size = u16::from_be_bytes(cell[LEAF_CELL_SIZE_RANGE].try_into().unwrap()) as usize; let raw = &buff[offset..offset + size]; LeafCell { cell, raw } } @@ -2411,17 +2507,17 @@ impl<'a> BTreeCell<'a> for LeafCell<'a> { } fn key_size(&self) -> usize { - u32::from_be_bytes(self.cell[8..12].try_into().unwrap()) as usize + u32::from_be_bytes(self.cell[LEAF_CELL_KEY_SIZE_RANGE].try_into().unwrap()) as usize } fn overflow(&self) -> Option { - PageId::from_be_bytes(self.cell[0..8].try_into().unwrap()) + PageId::from_be_bytes(self.cell[LEAF_CELL_OVERFLOW_RANGE].try_into().unwrap()) } } impl<'a> LeafCell<'a> { pub(crate) fn val_size(&self) -> usize { - u32::from_be_bytes(self.cell[12..16].try_into().unwrap()) as usize + u32::from_be_bytes(self.cell[LEAF_CELL_VAL_SIZE_RANGE].try_into().unwrap()) as usize } }