Skip to content

Commit

Permalink
Merge pull request #67 from fjall-rs/perf/level-comp-picking
Browse files Browse the repository at this point in the history
Improve level compaction picking function
  • Loading branch information
marvin-j97 authored Oct 26, 2024
2 parents 416a049 + 2618c2b commit 64dfe9d
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 76 deletions.
199 changes: 130 additions & 69 deletions src/compaction/leveled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@

use super::{Choice, CompactionStrategy, Input as CompactionInput};
use crate::{
config::Config, key_range::KeyRange, level_manifest::LevelManifest, segment::Segment, HashSet,
config::Config,
key_range::KeyRange,
level_manifest::{level::Level, LevelManifest},
segment::Segment,
HashSet, SegmentId,
};
use std::sync::Arc;

Expand Down Expand Up @@ -51,7 +55,7 @@ impl Default for Strategy {
Self {
l0_threshold: 4,
target_size: 64 * 1_024 * 1_024,
level_ratio: 8,
level_ratio: 8, // TODO: benchmark vs 10
}
}
}
Expand All @@ -64,6 +68,93 @@ fn desired_level_size_in_bytes(level_idx: u8, ratio: u8, target_size: u32) -> us
(ratio as usize).pow(u32::from(level_idx)) * (target_size as usize)
}

fn pick_minimal_overlap(
curr_level: &Level,
next_level: &Level,
overshoot: u64,
) -> (HashSet<SegmentId>, bool) {
let mut choices = vec![];

for size in 1..=curr_level.len() {
let windows = curr_level.windows(size);

for window in windows {
let size_sum = window.iter().map(|x| x.metadata.file_size).sum::<u64>();

if size_sum >= overshoot {
// NOTE: Consider this window

let mut segment_ids: HashSet<SegmentId> =
window.iter().map(|x| x.metadata.id).collect();

// Get overlapping segments in next level
let key_range = aggregate_key_range(window);

let next_level_overlapping_segments: Vec<_> = next_level
.overlapping_segments(&key_range)
.cloned()
.collect();

// Get overlapping segments in same level
let key_range = aggregate_key_range(&next_level_overlapping_segments);

let curr_level_overlapping_segment_ids: Vec<_> = curr_level
.overlapping_segments(&key_range)
.filter(|x| !segment_ids.contains(&x.metadata.id))
.collect();

// Calculate effort
let size_next_level = next_level_overlapping_segments
.iter()
.map(|x| x.metadata.file_size)
.sum::<u64>();

let size_curr_level = curr_level_overlapping_segment_ids
.iter()
.map(|x| x.metadata.file_size)
.sum::<u64>();

let effort = size_sum + size_next_level + size_curr_level;

segment_ids.extend(
next_level_overlapping_segments
.iter()
.map(|x| x.metadata.id),
);

segment_ids.extend(
curr_level_overlapping_segment_ids
.iter()
.map(|x| x.metadata.id),
);

// TODO: need to calculate write_amp and choose minimum write_amp instead
//
// consider the segments in La = A to be the ones in the window
// and the segments in La+1 B to be the ones that overlap
// and r = A / B
// we want to avoid compactions that have a low ratio r
// because that means we don't clear out a lot of segments in La
// but have to rewrite a lot of segments in La+1
//
// ultimately, we want the highest ratio
// to maximize the amount of segments we are getting rid of in La
// for the least amount of effort
choices.push((
effort,
segment_ids,
next_level_overlapping_segments.is_empty(),
));
}
}
}

let minimum_effort_choice = choices.into_iter().min_by(|a, b| a.0.cmp(&b.0));
let (_, set, can_trivial_move) = minimum_effort_choice.expect("should exist");

(set, can_trivial_move)
}

impl CompactionStrategy for Strategy {
#[allow(clippy::too_many_lines)]
fn choose(&self, levels: &LevelManifest, _: &Config) -> Choice {
Expand Down Expand Up @@ -99,56 +190,18 @@ impl CompactionStrategy for Strategy {
continue;
}

let curr_level_bytes = level.size();

let desired_bytes =
desired_level_size_in_bytes(curr_level_index, self.level_ratio, self.target_size);

let mut overshoot = curr_level_bytes.saturating_sub(desired_bytes as u64);
let overshoot = level.size().saturating_sub(desired_bytes as u64);

if overshoot > 0 {
let mut segments_to_compact = vec![];

let mut level = level.clone();
level.sort_by_key_range(); // TODO: disjoint levels shouldn't need sort

for segment in level.iter().take(self.level_ratio.into()).cloned() {
if overshoot == 0 {
break;
}

overshoot = overshoot.saturating_sub(segment.metadata.file_size);
segments_to_compact.push(segment);
}

debug_assert!(!segments_to_compact.is_empty());

let Some(next_level) = &resolved_view.get(next_level_index as usize) else {
break;
};

let mut segment_ids: HashSet<u64> =
segments_to_compact.iter().map(|x| x.metadata.id).collect();

// Get overlapping segments in same level
let key_range = aggregate_key_range(&segments_to_compact);

let curr_level_overlapping_segment_ids: Vec<_> = level
.overlapping_segments(&key_range)
.map(|x| x.metadata.id)
.collect();

segment_ids.extend(&curr_level_overlapping_segment_ids);

// Get overlapping segments in next level
let key_range = aggregate_key_range(&segments_to_compact);

let next_level_overlapping_segment_ids: Vec<_> = next_level
.overlapping_segments(&key_range)
.map(|x| x.metadata.id)
.collect();

segment_ids.extend(&next_level_overlapping_segment_ids);
let (segment_ids, can_trivial_move) =
pick_minimal_overlap(level, next_level, overshoot);

let choice = CompactionInput {
segment_ids: {
Expand All @@ -160,7 +213,7 @@ impl CompactionStrategy for Strategy {
target_size: u64::from(self.target_size),
};

if next_level_overlapping_segment_ids.is_empty() && level.is_disjoint {
if can_trivial_move && level.is_disjoint {
return Choice::Move(choice);
}
return Choice::Merge(choice);
Expand Down Expand Up @@ -298,18 +351,23 @@ mod tests {
#[allow(clippy::expect_used)]
fn build_levels(
path: &Path,
recipe: Vec<Vec<(SegmentId, &str, &str)>>,
recipe: Vec<Vec<(SegmentId, &str, &str, u64)>>,
) -> crate::Result<LevelManifest> {
let mut levels = LevelManifest::create_new(
recipe.len().try_into().expect("oopsie"),
path.join("levels"),
)?;

for (idx, level) in recipe.into_iter().enumerate() {
for (id, min, max) in level {
for (id, min, max, size_mib) in level {
levels.insert_into_level(
idx.try_into().expect("oopsie"),
fixture_segment(id, string_key_range(min, max), 64 * 1_024 * 1_024, 0.0),
fixture_segment(
id,
string_key_range(min, max),
size_mib * 1_024 * 1_024,
0.0,
),
);
}
}
Expand Down Expand Up @@ -348,7 +406,7 @@ mod tests {

#[rustfmt::skip]
let mut levels = build_levels(tempdir.path(), vec![
vec![(1, "a", "z"), (2, "a", "z"), (3, "a", "z"), (4, "a", "z")],
vec![(1, "a", "z", 64), (2, "a", "z", 64), (3, "a", "z", 64), (4, "a", "z", 64)],
vec![],
vec![],
vec![],
Expand Down Expand Up @@ -383,8 +441,8 @@ mod tests {

#[rustfmt::skip]
let levels = build_levels(tempdir.path(), vec![
vec![(1, "h", "t"), (2, "h", "t"), (3, "h", "t"), (4, "h", "t")],
vec![(5, "a", "g"), (6, "a", "g"), (7, "a", "g"), (8, "a", "g")],
vec![(1, "h", "t", 64), (2, "h", "t", 64), (3, "h", "t", 64), (4, "h", "t", 64)],
vec![(5, "a", "g", 64), (6, "a", "g", 64), (7, "a", "g", 64), (8, "a", "g", 64)],
vec![],
vec![],
])?;
Expand All @@ -411,8 +469,8 @@ mod tests {

#[rustfmt::skip]
let mut levels = build_levels(tempdir.path(), vec![
vec![(1, "a", "g"), (2, "h", "t"), (3, "i", "t"), (4, "j", "t")],
vec![(5, "a", "g"), (6, "a", "g"), (7, "y", "z"), (8, "y", "z")],
vec![(1, "a", "g", 64), (2, "h", "t", 64), (3, "i", "t", 64), (4, "j", "t", 64)],
vec![(5, "a", "g", 64), (6, "a", "g", 64), (7, "y", "z", 64), (8, "y", "z", 64)],
vec![],
vec![],
])?;
Expand Down Expand Up @@ -448,16 +506,17 @@ mod tests {
#[rustfmt::skip]
let levels = build_levels(tempdir.path(), vec![
vec![],
vec![(1, "a", "g"), (2, "h", "t"), (3, "x", "z")],
vec![(4, "f", "l")],
vec![(1, "a", "g", 64), (2, "h", "t", 64), (3, "x", "z", 64)],
vec![(4, "f", "l", 64)],
vec![],
])?;

assert_eq!(
compactor.choose(&levels, &config),
Choice::Merge(CompactionInput {
Choice::Move(CompactionInput {
dest_level: 2,
segment_ids: vec![1, 4],
// NOTE: segment #3 has no overlap with L2
segment_ids: vec![3],
target_size: 64 * 1_024 * 1_024
})
);
Expand All @@ -478,8 +537,8 @@ mod tests {
#[rustfmt::skip]
let levels = build_levels(tempdir.path(), vec![
vec![],
vec![(1, "a", "g"), (2, "h", "j"), (3, "k", "t")],
vec![(4, "k", "l")],
vec![(1, "a", "g", 64), (2, "h", "j", 64), (3, "k", "t", 64)],
vec![(4, "k", "l", 64)],
vec![],
])?;

Expand Down Expand Up @@ -509,15 +568,16 @@ mod tests {
let levels = build_levels(tempdir.path(), vec![
vec![],
vec![],
vec![(1, "a", "g"), (2, "a", "g"), (3, "a", "g"), (4, "a", "g"), (5, "y", "z")],
vec![(6, "f", "l")],
vec![(1, "a", "g", 64), (2, "a", "g", 64), (3, "a", "g", 64), (4, "a", "g", 64), (5, "y", "z", 64)],
vec![(6, "f", "l", 64)],
])?;

assert_eq!(
compactor.choose(&levels, &config),
Choice::Merge(CompactionInput {
dest_level: 3,
segment_ids: vec![1, 2, 3, 4, 6],
// NOTE: 5 is the only segment that has no overlap with #3
segment_ids: vec![5],
target_size: 64 * 1_024 * 1_024
})
);
Expand All @@ -539,15 +599,16 @@ mod tests {
let levels = build_levels(tempdir.path(), vec![
vec![],
vec![],
vec![(1, "a", "g"), (2, "h", "j"), (3, "k", "l"), (4, "m", "n"), (5, "y", "z")],
vec![(6, "f", "l")],
vec![(1, "a", "g", 64), (2, "h", "j", 64), (3, "k", "l", 64), (4, "m", "n", 64), (5, "y", "z", 64)],
vec![(6, "f", "l", 64)],
])?;

assert_eq!(
compactor.choose(&levels, &config),
Choice::Merge(CompactionInput {
Choice::Move(CompactionInput {
dest_level: 3,
segment_ids: vec![1, 6],
// NOTE: segment #4 is the left-most segment that has no overlap with L3
segment_ids: vec![4],
target_size: 64 * 1_024 * 1_024
})
);
Expand All @@ -569,8 +630,8 @@ mod tests {
let levels = build_levels(tempdir.path(), vec![
vec![],
vec![],
vec![(1, "a", "g"), (2, "h", "j"), (3, "k", "l"), (4, "m", "n"), (5, "y", "z")],
vec![(6, "w", "x")],
vec![(1, "a", "g", 64), (2, "h", "j", 64), (3, "k", "l", 64), (4, "m", "n", 64), (5, "y", "z", 64)],
vec![(6, "w", "x", 64)],
])?;

assert_eq!(
Expand Down Expand Up @@ -598,8 +659,8 @@ mod tests {
#[rustfmt::skip]
let levels = build_levels(tempdir.path(), vec![
vec![],
vec![(1, "a", "z"), (2, "a", "z"), (3, "g", "z")],
vec![(4, "a", "g")],
vec![(1, "a", "z", 64), (2, "a", "z", 64), (3, "g", "z", 64)],
vec![(4, "a", "g", 64)],
vec![],
])?;

Expand Down
1 change: 1 addition & 0 deletions src/compaction/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ fn merge_segments(
let last_level = levels.last_level_index();

levels.hide_segments(&payload.segment_ids);

drop(levels);

// NOTE: Only evict tombstones when reaching the last level,
Expand Down
10 changes: 3 additions & 7 deletions src/key_range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,14 +117,10 @@ impl KeyRange {
}

/// Aggregates a key range.
///
/// # Panics
///
/// The iterator must not be empty
pub fn aggregate<'a>(mut iter: impl Iterator<Item = &'a Self>) -> Self {
// NOTE: See function documentation
#[allow(clippy::expect_used)]
let first = iter.next().expect("should not be empty");
let Some(first) = iter.next() else {
return Self::empty();
};

let mut min = first.min();
let mut max = first.max();
Expand Down
9 changes: 9 additions & 0 deletions src/segment/block/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,13 @@ impl<T: Clone + Encode + Decode + ItemSize> Block<T> {
let header = BlockHeader::decode_from(reader)?;
log::trace!("Got block header: {header:?}");

// Read the (possibly compressed) data
let mut bytes = vec![0u8; header.data_length as usize];
reader.read_exact(&mut bytes)?;

// TODO: 3.0.0 when header.compressed is reliable
// can we preallocate a vector to stream the compression into?
// -> saves reallocation costs
let bytes = match header.compression {
super::meta::CompressionType::None => bytes,

Expand Down Expand Up @@ -103,6 +107,9 @@ impl<T: Clone + Encode + Decode + ItemSize> Block<T> {
#[allow(clippy::cast_possible_truncation)]
data_length: packed.len() as u32,

// TODO: 3.0.0 pack_items should return the uncompressed, serialized
// size directly

// NOTE: Truncation is OK because a block cannot possible contain 4 billion items
#[allow(clippy::cast_possible_truncation)]
uncompressed_length: items.size() as u32,
Expand All @@ -123,6 +130,8 @@ impl<T: Clone + Encode + Decode + ItemSize> Block<T> {
value.encode_into(&mut buf)?;
}

// TODO: 3.0.0 return buf.len() - 4 as uncompressed size

Ok(match compression {
CompressionType::None => buf,

Expand Down

0 comments on commit 64dfe9d

Please sign in to comment.