From d599e9a9ee69a626330bd52d0bba9d7dab39ea30 Mon Sep 17 00:00:00 2001 From: marvin-j97 Date: Wed, 13 Mar 2024 18:04:01 +0100 Subject: [PATCH] add crc value, blob cache & index writer trait --- Cargo.toml | 11 ++++++ src/blob_cache.rs | 69 +++++++++++++++++++++++++++++++++++++ src/config.rs | 15 ++++++++ src/error.rs | 3 ++ src/handle.rs | 4 ++- src/index.rs | 24 +++++++++++++ src/lib.rs | 3 +- src/main.rs | 58 ++++++++++++++++--------------- src/segment/mod.rs | 4 +-- src/segment/multi_writer.rs | 15 ++++++-- src/segment/reader.rs | 12 ++++++- src/segment/writer.rs | 16 +++++++++ src/value_log.rs | 36 +++++++++++++++---- tests/basic_gc.rs | 17 +++++---- tests/basic_kv.rs | 8 +---- 15 files changed, 238 insertions(+), 57 deletions(-) create mode 100644 src/blob_cache.rs diff --git a/Cargo.toml b/Cargo.toml index a27aa57..3bc0de7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,12 +18,23 @@ categories = ["data-structures", "database-implementations", "algorithms"] name = "value_log" path = "src/lib.rs" +[features] +default = [] +serde = ["dep:serde"] + [dependencies] byteorder = "1.5.0" chrono = "0.4.34" +crc32fast = "1.4.0" log = "0.4.20" min-max-heap = "1.3.0" +quick_cache = "0.4.1" rand = "0.8.5" +serde = { version = "1.0.197", default-features = false, features = [ + "alloc", + "rc", + "derive", +], optional = true } [dev-dependencies] env_logger = "0.11.2" diff --git a/src/blob_cache.rs b/src/blob_cache.rs new file mode 100644 index 0000000..02974e6 --- /dev/null +++ b/src/blob_cache.rs @@ -0,0 +1,69 @@ +use crate::ValueHandle; +use quick_cache::{sync::Cache, Weighter}; +use std::sync::Arc; + +type CacheKey = ValueHandle; +type Item = Arc<[u8]>; + +#[derive(Clone)] +struct BlobWeighter; + +impl Weighter for BlobWeighter { + fn weight(&self, _: &CacheKey, blob: &Item) -> u32 { + // TODO: quick_cache only supports u32 as weight...? + blob.len() as u32 + } +} + +/// Blob cache, in which blobs are cached in-memory +/// after being retrieved from disk +/// +/// This speeds up consecutive accesses to the same blobs, improving +/// read performance for hot data. +pub struct BlobCache { + data: Cache, + capacity: u64, +} + +impl std::fmt::Debug for BlobCache { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "BlobCache", self.capacity) + } +} + +impl BlobCache { + /// Creates a new block cache with roughly `n` bytes of capacity + #[must_use] + pub fn with_capacity_bytes(bytes: u64) -> Self { + Self { + data: Cache::with_weighter(10_000, bytes, BlobWeighter), + capacity: bytes, + } + } + + pub fn insert(&self, handle: CacheKey, value: Arc<[u8]>) { + self.data.insert(handle, value); + } + + pub fn get(&self, handle: &CacheKey) -> Option { + self.data.get(handle) + } + + /// Returns the cache capacity in bytes + #[must_use] + pub fn capacity(&self) -> u64 { + self.capacity + } + + /// Returns the number of cached blocks + #[must_use] + pub fn len(&self) -> usize { + self.data.len() + } + + /// Returns `true` if there are no cached blocks + #[must_use] + pub fn is_empty(&self) -> bool { + self.len() == 0 + } +} diff --git a/src/config.rs b/src/config.rs index 4fc742c..41d811e 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,18 +1,33 @@ +use crate::blob_cache::BlobCache; +use std::sync::Arc; + /// Value log configuration #[derive(Debug)] pub struct Config { pub(crate) segment_size_bytes: u64, + pub(crate) blob_cache: Arc, } impl Default for Config { fn default() -> Self { Self { segment_size_bytes: 256 * 1_024 * 1_024, + blob_cache: Arc::new(BlobCache::with_capacity_bytes(16 * 1_024 * 1_024)), } } } impl Config { + /// Sets the blob cache. + /// + /// Defaults to a blob cache 16 MiB of capacity shared + /// between all partitions inside this keyspace. + #[must_use] + pub fn blob_cache(mut self, blob_cache: Arc) -> Self { + self.blob_cache = blob_cache; + self + } + /// Sets the maximum size of value log segments. /// /// This heavily influences space amplification, as diff --git a/src/error.rs b/src/error.rs index e331a99..a2664b4 100644 --- a/src/error.rs +++ b/src/error.rs @@ -8,6 +8,9 @@ pub enum Error { /// Invalid data format version InvalidVersion(Option), + + /// CRC check failed + CrcMismatch, } impl std::fmt::Display for Error { diff --git a/src/handle.rs b/src/handle.rs index fe4d7b5..b9f7411 100644 --- a/src/handle.rs +++ b/src/handle.rs @@ -1,8 +1,10 @@ +use std::hash::Hash; use std::sync::Arc; /// A value handle points into the value log. #[allow(clippy::module_name_repetitions)] -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Eq, Hash, PartialEq)] +#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))] pub struct ValueHandle { /// Segment ID pub segment_id: Arc, diff --git a/src/index.rs b/src/index.rs index d9d3f0b..791150f 100644 --- a/src/index.rs +++ b/src/index.rs @@ -14,6 +14,9 @@ pub trait Index { /// Will return `Err` if an IO error occurs. fn get(&self, key: &[u8]) -> std::io::Result>; + // TODO: shouldn'be part of Index... remove + // TODO: flushing to value log should use `Writer` (atomic) + /// Inserts an value handle into the index. /// /// This method is called during value log garbage collection. @@ -23,3 +26,24 @@ pub trait Index { /// Will return `Err` if an IO error occurs. fn insert_indirection(&self, key: &[u8], value: ValueHandle) -> std::io::Result<()>; } + +/// Trait that allows writing into an index +pub trait Writer { + /// Inserts an value handle into the index. + /// + /// This method is called during value log garbage collection. + /// + /// # Errors + /// + /// Will return `Err` if an IO error occurs. + fn insert_indirection(&self, key: &[u8], value: ValueHandle) -> std::io::Result<()>; + + /// Finishes the write batch. + /// + /// This operation should be atomic. + /// + /// # Errors + /// + /// Will return `Err` if an IO error occurs. + fn finish(&self) -> std::io::Result<()>; +} diff --git a/src/lib.rs b/src/lib.rs index e2713a3..93ffa91 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -38,6 +38,7 @@ #![warn(clippy::expect_used)] #![allow(clippy::missing_const_for_fn)] +mod blob_cache; mod config; mod error; mod handle; @@ -51,7 +52,7 @@ pub use { config::Config, error::{Error, Result}, handle::ValueHandle, - index::Index, + index::{Index, Writer as IndexWriter}, segment::multi_writer::MultiWriter as SegmentWriter, segment::reader::Reader as SegmentReader, segment::Segment, diff --git a/src/main.rs b/src/main.rs index b8f183a..6671fca 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,7 +3,7 @@ use std::{ path::Path, sync::{Arc, RwLock}, }; -use value_log::{Config, Index, SegmentReader, ValueHandle, ValueLog}; +use value_log::{Config, Index, IndexWriter, SegmentReader, ValueHandle, ValueLog}; #[derive(Default)] pub struct DebugIndex(RwLock, ValueHandle>>); @@ -30,6 +30,18 @@ impl DebugIndex { } } +struct DebugIndexWriter(Arc); + +impl IndexWriter for DebugIndexWriter { + fn insert_indirection(&self, key: &[u8], value: ValueHandle) -> std::io::Result<()> { + self.0.insert_indirection(key, value) + } + + fn finish(&self) -> std::io::Result<()> { + Ok(()) + } +} + fn main() -> value_log::Result<()> { let index = DebugIndex(RwLock::new(BTreeMap::, ValueHandle>::default())); let index = Arc::new(index); @@ -41,11 +53,7 @@ fn main() -> value_log::Result<()> { } std::fs::create_dir_all(vl_path)?; - let value_log = ValueLog::new( - vl_path, - Config::default().segment_size_bytes(100), - index.clone(), - )?; + let value_log = ValueLog::new(vl_path, Config::default(), index.clone())?; { let mut writer = value_log.get_writer()?; @@ -68,7 +76,7 @@ fn main() -> value_log::Result<()> { value_log.register(writer)?; } - { + /* { let mut writer = value_log.get_writer()?; let segment_id = writer.segment_id(); @@ -131,7 +139,7 @@ fn main() -> value_log::Result<()> { value_log.register(writer)?; } - eprintln!("{:#?}", value_log.segments); + eprintln!("{:#?}", value_log.segments.read().unwrap()); for (key, handle) in index.0.read().expect("lock is poisoned").iter() { eprintln!( @@ -148,29 +156,12 @@ fn main() -> value_log::Result<()> { ); } - /* eprintln!( - "multi: {:#?}", - value_log - .get_multiple( - &index - .0 - .read() - .expect("lock is poisoned") - .values() - .cloned() - .collect::>() - )? - .into_iter() - .filter(Option::is_some) - .count() - ); */ - index.remove("d".as_bytes())?; for segment_id in value_log.list_segments() { // Scan segment let reader = SegmentReader::new( - vl_path.join("segments").join(&*segment_id), + vl_path.join("segments").join(&*segment_id).join("data"), segment_id.clone(), )?; @@ -211,9 +202,20 @@ fn main() -> value_log::Result<()> { } eprintln!("=== rollover ==="); - value_log.rollover(&value_log.list_segments())?; + value_log.rollover(&value_log.list_segments(), DebugIndexWriter(index.clone()))?; */ - eprintln!("{:#?}", value_log.segments); + eprintln!("{:#?}", value_log.segments.read().unwrap()); + + for _ in 0..10 { + let value_handle = ValueHandle { + segment_id: value_log.list_segments().first().unwrap().clone(), + offset: 3, + }; + + let before = std::time::Instant::now(); + value_log.get(&value_handle)?; + eprintln!("blob loaded in {:?}ns", before.elapsed().as_nanos()); + } Ok(()) } diff --git a/src/segment/mod.rs b/src/segment/mod.rs index f6886a2..3381e3c 100644 --- a/src/segment/mod.rs +++ b/src/segment/mod.rs @@ -8,14 +8,12 @@ pub mod multi_writer; pub mod reader; pub mod writer; -/* TODO: per blob CRC value */ - /// A disk segment is an immutable, sorted, contiguous file /// that contains key-value pairs. /// /// ### File format /// -/// KV: \ \ \ \ +/// KV: \ \ \ \ \ /// /// Segment: { KV } + #[derive(Debug)] diff --git a/src/segment/multi_writer.rs b/src/segment/multi_writer.rs index 1d35147..3ecdd8f 100644 --- a/src/segment/multi_writer.rs +++ b/src/segment/multi_writer.rs @@ -22,7 +22,9 @@ impl MultiWriter { pub fn new>(target_size: u64, folder: P) -> std::io::Result { let folder = folder.as_ref(); let segment_id = generate_segment_id(); - let path = folder.join("segments").join(&*segment_id); + + let segment_folder = folder.join("segments").join(&*segment_id); + let path = segment_folder.join("data"); Ok(Self { folder: folder.into(), @@ -61,7 +63,11 @@ impl MultiWriter { log::debug!("Rotating segment writer"); let new_segment_id = generate_segment_id(); - let path = self.folder.join("segments").join(&*new_segment_id); + let path = self + .folder + .join("segments") + .join(&*new_segment_id) + .join("data"); self.writers.push(Writer::new(new_segment_id, path)?); Ok(()) @@ -73,10 +79,13 @@ impl MultiWriter { /// /// Will return `Err` if an IO error occurs. pub fn write(&mut self, key: &[u8], value: &[u8]) -> crate::Result<()> { + let target_size = self.target_size; + let writer = self.get_active_writer_mut(); writer.write(key, value)?; - if writer.offset() >= self.target_size { + if writer.offset() >= target_size { + writer.flush()?; self.rotate()?; } diff --git a/src/segment/reader.rs b/src/segment/reader.rs index 3b218c1..a102285 100644 --- a/src/segment/reader.rs +++ b/src/segment/reader.rs @@ -9,7 +9,6 @@ use std::{ /// Reads through a segment in order. pub struct Reader { pub(crate) segment_id: Arc, - inner: BufReader, } @@ -49,6 +48,17 @@ impl Iterator for Reader { return Some(Err(e)); }; + // TODO: handle crc + let _crc = match self.inner.read_u32::() { + Ok(v) => v, + Err(e) => { + if e.kind() == std::io::ErrorKind::UnexpectedEof { + return None; + } + return Some(Err(e)); + } + }; + let val_len = match self.inner.read_u32::() { Ok(v) => v, Err(e) => { diff --git a/src/segment/writer.rs b/src/segment/writer.rs index d4af150..985a15f 100644 --- a/src/segment/writer.rs +++ b/src/segment/writer.rs @@ -26,6 +26,9 @@ impl Writer { #[doc(hidden)] pub fn new>(segment_id: Arc, path: P) -> std::io::Result { let path = path.as_ref(); + let folder = path.parent().expect("should have parent directory"); + + std::fs::create_dir_all(folder)?; let file = File::create(path)?; Ok(Self { @@ -78,13 +81,26 @@ impl Writer { /// /// Will return `Err` if an IO error occurs. pub fn write(&mut self, key: &[u8], value: &[u8]) -> std::io::Result<()> { + let mut hasher = crc32fast::Hasher::new(); + hasher.update(value); + let crc = hasher.finalize(); + self.inner.write_u16::(key.len() as u16)?; self.inner.write_all(key)?; + self.inner.write_u32::(crc)?; self.inner.write_u32::(value.len() as u32)?; self.inner.write_all(value)?; + // Key self.offset += std::mem::size_of::() as u64; self.offset += key.len() as u64; + + // CRC + self.offset += std::mem::size_of::() as u64; + + // TODO: compress + + // Value self.offset += std::mem::size_of::() as u64; self.offset += value.len() as u64; diff --git a/src/value_log.rs b/src/value_log.rs index 5763c39..4b6e048 100644 --- a/src/value_log.rs +++ b/src/value_log.rs @@ -1,4 +1,6 @@ use crate::{ + blob_cache::BlobCache, + index::Writer as IndexWriter, segment::{merge::MergeReader, multi_writer::MultiWriter}, version::Version, Config, Index, Segment, SegmentReader, SegmentWriter, ValueHandle, @@ -12,8 +14,6 @@ use std::{ sync::{atomic::AtomicU64, Arc, Mutex, RwLock}, }; -/// TODO: blob cache - /// A disk-resident value log. #[derive(Clone)] pub struct ValueLog(Arc); @@ -35,6 +35,9 @@ pub struct ValueLogInner { /// External index pub index: Arc, + /// In-memory blob cache + blob_cache: Arc, + /// Segment manifest pub segments: RwLock, Arc>>, @@ -86,9 +89,12 @@ impl ValueLog { folder.sync_all()?; } + let blob_cache = config.blob_cache.clone(); + Ok(Self(Arc::new(ValueLogInner { config, path, // TODO: absolute path + blob_cache, index, segments: RwLock::new(BTreeMap::default()), semaphore: Mutex::new(()), @@ -144,15 +150,27 @@ impl ValueLog { return Ok(None); }; + if let Some(value) = self.blob_cache.get(handle) { + return Ok(Some(value)); + } + let mut reader = BufReader::new(File::open(&segment.path)?); reader.seek(std::io::SeekFrom::Start(handle.offset))?; + // TODO: handle CRC + let _crc = reader.read_u32::()?; + let val_len = reader.read_u32::()?; let mut val = vec![0; val_len as usize]; reader.read_exact(&mut val)?; + let val: Arc<[u8]> = val.into(); + + // TODO: decompress + + self.blob_cache.insert(handle.clone(), val.clone()); - Ok(Some(val.into())) + Ok(Some(val)) } /* pub fn get_multiple( @@ -217,7 +235,11 @@ impl ValueLog { /// # Errors /// /// Will return `Err` if an IO error occurs. - pub fn rollover(&self, ids: &[Arc]) -> crate::Result<()> { + pub fn rollover( + &self, + ids: &[Arc], + index_writer: W, + ) -> crate::Result<()> { // IMPORTANT: Only allow 1 rollover at any given time let _guard = self.semaphore.lock().expect("lock is poisoned"); @@ -252,16 +274,16 @@ impl ValueLog { String::from_utf8_lossy(&k) ); - self.index - .insert_indirection(&k, ValueHandle { segment_id, offset })?; + index_writer.insert_indirection(&k, ValueHandle { segment_id, offset })?; writer.write(&k, &v)?; } self.register(writer)?; + index_writer.finish()?; let mut lock = self.segments.write().expect("lock is poisoned"); for id in ids { - std::fs::remove_file(self.path.join("segments").join(&**id))?; + std::fs::remove_dir_all(self.path.join("segments").join(&**id))?; lock.remove(id); } } diff --git a/tests/basic_gc.rs b/tests/basic_gc.rs index 006d3be..f4c9e9b 100644 --- a/tests/basic_gc.rs +++ b/tests/basic_gc.rs @@ -2,7 +2,7 @@ use std::{ collections::BTreeMap, sync::{Arc, RwLock}, }; -use value_log::{Config, Index, ValueHandle, ValueLog}; +use value_log::{Config, Index, IndexWriter, ValueHandle, ValueLog}; type Inner = RwLock, ValueHandle>>; @@ -30,11 +30,16 @@ impl Index for DebugIndex { } } -impl DebugIndex { - /* fn remove(&self, key: &[u8]) -> std::io::Result<()> { - self.write().expect("lock is poisoned").remove(key); +struct DebugIndexWriter(Arc); + +impl IndexWriter for DebugIndexWriter { + fn insert_indirection(&self, key: &[u8], value: ValueHandle) -> std::io::Result<()> { + self.0.insert_indirection(key, value) + } + + fn finish(&self) -> std::io::Result<()> { Ok(()) - } */ + } } #[test] @@ -134,7 +139,7 @@ fn basic_kv() -> value_log::Result<()> { assert_eq!(item, key.repeat(1_000).into()); } - value_log.rollover(&value_log.list_segments())?; + value_log.rollover(&value_log.list_segments(), DebugIndexWriter(index.clone()))?; { let lock = value_log.segments.read().unwrap(); diff --git a/tests/basic_kv.rs b/tests/basic_kv.rs index da7c197..0266d02 100644 --- a/tests/basic_kv.rs +++ b/tests/basic_kv.rs @@ -26,15 +26,9 @@ impl Index for DebugIndex { self.write() .expect("lock is poisoned") .insert(key.into(), value); - Ok(()) - } -} -impl DebugIndex { - /* fn remove(&self, key: &[u8]) -> std::io::Result<()> { - self.write().expect("lock is poisoned").remove(key); Ok(()) - } */ + } } #[test]