Skip to content

Commit

Permalink
add crc value, blob cache & index writer trait
Browse files Browse the repository at this point in the history
  • Loading branch information
marvin-j97 committed Mar 13, 2024
1 parent 246d941 commit d599e9a
Show file tree
Hide file tree
Showing 15 changed files with 238 additions and 57 deletions.
11 changes: 11 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,23 @@ categories = ["data-structures", "database-implementations", "algorithms"]
name = "value_log"
path = "src/lib.rs"

[features]
default = []
serde = ["dep:serde"]

[dependencies]
byteorder = "1.5.0"
chrono = "0.4.34"
crc32fast = "1.4.0"
log = "0.4.20"
min-max-heap = "1.3.0"
quick_cache = "0.4.1"
rand = "0.8.5"
serde = { version = "1.0.197", default-features = false, features = [
"alloc",
"rc",
"derive",
], optional = true }

[dev-dependencies]
env_logger = "0.11.2"
Expand Down
69 changes: 69 additions & 0 deletions src/blob_cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
use crate::ValueHandle;
use quick_cache::{sync::Cache, Weighter};
use std::sync::Arc;

type CacheKey = ValueHandle;
type Item = Arc<[u8]>;

#[derive(Clone)]
struct BlobWeighter;

impl Weighter<CacheKey, Item> for BlobWeighter {
fn weight(&self, _: &CacheKey, blob: &Item) -> u32 {
// TODO: quick_cache only supports u32 as weight...?
blob.len() as u32
}
}

/// Blob cache, in which blobs are cached in-memory
/// after being retrieved from disk
///
/// This speeds up consecutive accesses to the same blobs, improving
/// read performance for hot data.
pub struct BlobCache {
data: Cache<CacheKey, Item, BlobWeighter>,
capacity: u64,
}

impl std::fmt::Debug for BlobCache {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "BlobCache<cap: {} bytes>", self.capacity)
}
}

impl BlobCache {
/// Creates a new block cache with roughly `n` bytes of capacity
#[must_use]
pub fn with_capacity_bytes(bytes: u64) -> Self {
Self {
data: Cache::with_weighter(10_000, bytes, BlobWeighter),
capacity: bytes,
}
}

pub fn insert(&self, handle: CacheKey, value: Arc<[u8]>) {
self.data.insert(handle, value);
}

pub fn get(&self, handle: &CacheKey) -> Option<Item> {
self.data.get(handle)
}

/// Returns the cache capacity in bytes
#[must_use]
pub fn capacity(&self) -> u64 {
self.capacity
}

/// Returns the number of cached blocks
#[must_use]
pub fn len(&self) -> usize {
self.data.len()
}

/// Returns `true` if there are no cached blocks
#[must_use]
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}
15 changes: 15 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,33 @@
use crate::blob_cache::BlobCache;
use std::sync::Arc;

/// Value log configuration
#[derive(Debug)]
pub struct Config {
pub(crate) segment_size_bytes: u64,
pub(crate) blob_cache: Arc<BlobCache>,
}

impl Default for Config {
fn default() -> Self {
Self {
segment_size_bytes: 256 * 1_024 * 1_024,
blob_cache: Arc::new(BlobCache::with_capacity_bytes(16 * 1_024 * 1_024)),
}
}
}

impl Config {
/// Sets the blob cache.
///
/// Defaults to a blob cache 16 MiB of capacity shared
/// between all partitions inside this keyspace.
#[must_use]
pub fn blob_cache(mut self, blob_cache: Arc<BlobCache>) -> Self {
self.blob_cache = blob_cache;
self
}

/// Sets the maximum size of value log segments.
///
/// This heavily influences space amplification, as
Expand Down
3 changes: 3 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ pub enum Error {

/// Invalid data format version
InvalidVersion(Option<Version>),

/// CRC check failed
CrcMismatch,
}

impl std::fmt::Display for Error {
Expand Down
4 changes: 3 additions & 1 deletion src/handle.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use std::hash::Hash;
use std::sync::Arc;

/// A value handle points into the value log.
#[allow(clippy::module_name_repetitions)]
#[derive(Clone, Debug)]
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
pub struct ValueHandle {
/// Segment ID
pub segment_id: Arc<str>,
Expand Down
24 changes: 24 additions & 0 deletions src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ pub trait Index {
/// Will return `Err` if an IO error occurs.
fn get(&self, key: &[u8]) -> std::io::Result<Option<ValueHandle>>;

// TODO: shouldn'be part of Index... remove
// TODO: flushing to value log should use `Writer` (atomic)

/// Inserts an value handle into the index.
///
/// This method is called during value log garbage collection.
Expand All @@ -23,3 +26,24 @@ pub trait Index {
/// Will return `Err` if an IO error occurs.
fn insert_indirection(&self, key: &[u8], value: ValueHandle) -> std::io::Result<()>;
}

/// Trait that allows writing into an index
pub trait Writer {
/// Inserts an value handle into the index.
///
/// This method is called during value log garbage collection.
///
/// # Errors
///
/// Will return `Err` if an IO error occurs.
fn insert_indirection(&self, key: &[u8], value: ValueHandle) -> std::io::Result<()>;

/// Finishes the write batch.
///
/// This operation should be atomic.
///
/// # Errors
///
/// Will return `Err` if an IO error occurs.
fn finish(&self) -> std::io::Result<()>;
}
3 changes: 2 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#![warn(clippy::expect_used)]
#![allow(clippy::missing_const_for_fn)]

mod blob_cache;
mod config;
mod error;
mod handle;
Expand All @@ -51,7 +52,7 @@ pub use {
config::Config,
error::{Error, Result},
handle::ValueHandle,
index::Index,
index::{Index, Writer as IndexWriter},
segment::multi_writer::MultiWriter as SegmentWriter,
segment::reader::Reader as SegmentReader,
segment::Segment,
Expand Down
58 changes: 30 additions & 28 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{
path::Path,
sync::{Arc, RwLock},
};
use value_log::{Config, Index, SegmentReader, ValueHandle, ValueLog};
use value_log::{Config, Index, IndexWriter, SegmentReader, ValueHandle, ValueLog};

#[derive(Default)]
pub struct DebugIndex(RwLock<BTreeMap<Arc<[u8]>, ValueHandle>>);
Expand All @@ -30,6 +30,18 @@ impl DebugIndex {
}
}

struct DebugIndexWriter(Arc<DebugIndex>);

impl IndexWriter for DebugIndexWriter {
fn insert_indirection(&self, key: &[u8], value: ValueHandle) -> std::io::Result<()> {
self.0.insert_indirection(key, value)
}

fn finish(&self) -> std::io::Result<()> {
Ok(())
}
}

fn main() -> value_log::Result<()> {
let index = DebugIndex(RwLock::new(BTreeMap::<Arc<[u8]>, ValueHandle>::default()));
let index = Arc::new(index);
Expand All @@ -41,11 +53,7 @@ fn main() -> value_log::Result<()> {
}
std::fs::create_dir_all(vl_path)?;

let value_log = ValueLog::new(
vl_path,
Config::default().segment_size_bytes(100),
index.clone(),
)?;
let value_log = ValueLog::new(vl_path, Config::default(), index.clone())?;

{
let mut writer = value_log.get_writer()?;
Expand All @@ -68,7 +76,7 @@ fn main() -> value_log::Result<()> {
value_log.register(writer)?;
}

{
/* {
let mut writer = value_log.get_writer()?;
let segment_id = writer.segment_id();
Expand Down Expand Up @@ -131,7 +139,7 @@ fn main() -> value_log::Result<()> {
value_log.register(writer)?;
}
eprintln!("{:#?}", value_log.segments);
eprintln!("{:#?}", value_log.segments.read().unwrap());
for (key, handle) in index.0.read().expect("lock is poisoned").iter() {
eprintln!(
Expand All @@ -148,29 +156,12 @@ fn main() -> value_log::Result<()> {
);
}
/* eprintln!(
"multi: {:#?}",
value_log
.get_multiple(
&index
.0
.read()
.expect("lock is poisoned")
.values()
.cloned()
.collect::<Vec<_>>()
)?
.into_iter()
.filter(Option::is_some)
.count()
); */

index.remove("d".as_bytes())?;
for segment_id in value_log.list_segments() {
// Scan segment
let reader = SegmentReader::new(
vl_path.join("segments").join(&*segment_id),
vl_path.join("segments").join(&*segment_id).join("data"),
segment_id.clone(),
)?;
Expand Down Expand Up @@ -211,9 +202,20 @@ fn main() -> value_log::Result<()> {
}
eprintln!("=== rollover ===");
value_log.rollover(&value_log.list_segments())?;
value_log.rollover(&value_log.list_segments(), DebugIndexWriter(index.clone()))?; */

eprintln!("{:#?}", value_log.segments);
eprintln!("{:#?}", value_log.segments.read().unwrap());

for _ in 0..10 {
let value_handle = ValueHandle {
segment_id: value_log.list_segments().first().unwrap().clone(),
offset: 3,
};

let before = std::time::Instant::now();
value_log.get(&value_handle)?;
eprintln!("blob loaded in {:?}ns", before.elapsed().as_nanos());
}

Ok(())
}
4 changes: 1 addition & 3 deletions src/segment/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,12 @@ pub mod multi_writer;
pub mod reader;
pub mod writer;

/* TODO: per blob CRC value */

/// A disk segment is an immutable, sorted, contiguous file
/// that contains key-value pairs.
///
/// ### File format
///
/// KV: \<key length: u16\> \<key: N\> \<value length: u32\> \<value: N\>
/// KV: \<key length: u16\> \<key: N\> \<crc hash: u32\> \<value length: u32\> \<value: N\>
///
/// Segment: { KV } +
#[derive(Debug)]
Expand Down
15 changes: 12 additions & 3 deletions src/segment/multi_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ impl MultiWriter {
pub fn new<P: AsRef<Path>>(target_size: u64, folder: P) -> std::io::Result<Self> {
let folder = folder.as_ref();
let segment_id = generate_segment_id();
let path = folder.join("segments").join(&*segment_id);

let segment_folder = folder.join("segments").join(&*segment_id);
let path = segment_folder.join("data");

Ok(Self {
folder: folder.into(),
Expand Down Expand Up @@ -61,7 +63,11 @@ impl MultiWriter {
log::debug!("Rotating segment writer");

let new_segment_id = generate_segment_id();
let path = self.folder.join("segments").join(&*new_segment_id);
let path = self
.folder
.join("segments")
.join(&*new_segment_id)
.join("data");
self.writers.push(Writer::new(new_segment_id, path)?);

Ok(())
Expand All @@ -73,10 +79,13 @@ impl MultiWriter {
///
/// Will return `Err` if an IO error occurs.
pub fn write(&mut self, key: &[u8], value: &[u8]) -> crate::Result<()> {
let target_size = self.target_size;

let writer = self.get_active_writer_mut();
writer.write(key, value)?;

if writer.offset() >= self.target_size {
if writer.offset() >= target_size {
writer.flush()?;
self.rotate()?;
}

Expand Down
12 changes: 11 additions & 1 deletion src/segment/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use std::{
/// Reads through a segment in order.
pub struct Reader {
pub(crate) segment_id: Arc<str>,

inner: BufReader<File>,
}

Expand Down Expand Up @@ -49,6 +48,17 @@ impl Iterator for Reader {
return Some(Err(e));
};

// TODO: handle crc
let _crc = match self.inner.read_u32::<BigEndian>() {
Ok(v) => v,
Err(e) => {
if e.kind() == std::io::ErrorKind::UnexpectedEof {
return None;
}
return Some(Err(e));
}
};

let val_len = match self.inner.read_u32::<BigEndian>() {
Ok(v) => v,
Err(e) => {
Expand Down
Loading

0 comments on commit d599e9a

Please sign in to comment.