Skip to content

Commit

Permalink
Merge pull request #72 from fjall-rs/63
Browse files Browse the repository at this point in the history
Better handle segment target size for disjoint workloads
  • Loading branch information
marvin-j97 authored Oct 26, 2024
2 parents 64dfe9d + 3e80d2c commit db90238
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 31 deletions.
111 changes: 82 additions & 29 deletions src/compaction/leveled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,20 @@ impl CompactionStrategy for Strategy {
target_size: u64::from(self.target_size),
};

// TODO: eventually, this should happen lazily
// if a segment file lives for very long, it should get rewritten
// Rocks, by default, rewrites files that are 1 month or older
//
// TODO: 3.0.0 configuration?
// NOTE: We purposefully not trivially move segments
// if we go from L1 to L2
// https://github.com/fjall-rs/lsm-tree/issues/63
let goes_into_cold_storage = next_level_index == 2;

if goes_into_cold_storage {
return Choice::Merge(choice);
}

if can_trivial_move && level.is_disjoint {
return Choice::Move(choice);
}
Expand All @@ -225,40 +239,78 @@ impl CompactionStrategy for Strategy {
return Choice::DoNothing;
};

if first_level.len() >= self.l0_threshold.into()
&& !busy_levels.contains(&0)
&& !busy_levels.contains(&1)
{
let mut level = first_level.clone();
level.sort_by_key_range(); // TODO: disjoint levels shouldn't need sort
if first_level.len() >= self.l0_threshold.into() && !busy_levels.contains(&0) {
let first_level_size = first_level.size();

// NOTE: Special handling for disjoint workloads
if levels.is_disjoint() {
if first_level_size < self.target_size.into() {
// TODO: also do this in non-disjoint workloads
// -> intra-L0 compaction

// NOTE: Force a merge into L0 itself
// ...we seem to have *very* small flushes
return if first_level.len() >= 32 {
Choice::Merge(CompactionInput {
dest_level: 0,
segment_ids: first_level.list_ids(),
// NOTE: Allow a bit of overshooting
target_size: ((self.target_size as f32) * 1.1) as u64,
})
} else {
Choice::DoNothing
};
}

return Choice::Merge(CompactionInput {
dest_level: 1,
segment_ids: first_level.list_ids(),
target_size: ((self.target_size as f32) * 1.1) as u64,
});
}

let Some(next_level) = &resolved_view.get(1) else {
return Choice::DoNothing;
};
if first_level_size < self.target_size.into() {
// NOTE: We reached the threshold, but L0 is still very small
// meaning we have very small segments, so do intra-L0 compaction
return Choice::Merge(CompactionInput {
dest_level: 0,
segment_ids: first_level.list_ids(),
target_size: self.target_size.into(),
});
}

let mut segment_ids: Vec<u64> =
level.iter().map(|x| x.metadata.id).collect::<Vec<_>>();
if !busy_levels.contains(&1) {
let mut level = first_level.clone();
level.sort_by_key_range();

// Get overlapping segments in next level
let key_range = aggregate_key_range(&level);
let Some(next_level) = &resolved_view.get(1) else {
return Choice::DoNothing;
};

let next_level_overlapping_segment_ids: Vec<_> = next_level
.overlapping_segments(&key_range)
.map(|x| x.metadata.id)
.collect();
let mut segment_ids: Vec<u64> =
level.iter().map(|x| x.metadata.id).collect::<Vec<_>>();

segment_ids.extend(&next_level_overlapping_segment_ids);
// Get overlapping segments in next level
let key_range = aggregate_key_range(&level);

let choice = CompactionInput {
segment_ids,
dest_level: 1,
target_size: u64::from(self.target_size),
};
let next_level_overlapping_segment_ids: Vec<_> = next_level
.overlapping_segments(&key_range)
.map(|x| x.metadata.id)
.collect();

if next_level_overlapping_segment_ids.is_empty() && level.is_disjoint {
return Choice::Move(choice);
segment_ids.extend(&next_level_overlapping_segment_ids);

let choice = CompactionInput {
segment_ids,
dest_level: 1,
target_size: u64::from(self.target_size),
};

if next_level_overlapping_segment_ids.is_empty() && level.is_disjoint {
return Choice::Move(choice);
}
return Choice::Merge(choice);
}
return Choice::Merge(choice);
}
}

Expand Down Expand Up @@ -513,9 +565,8 @@ mod tests {

assert_eq!(
compactor.choose(&levels, &config),
Choice::Move(CompactionInput {
Choice::Merge(CompactionInput {
dest_level: 2,
// NOTE: segment #3 has no overlap with L2
segment_ids: vec![3],
target_size: 64 * 1_024 * 1_024
})
Expand Down Expand Up @@ -544,7 +595,9 @@ mod tests {

assert_eq!(
compactor.choose(&levels, &config),
Choice::Move(CompactionInput {
// NOTE: We merge because segments are demoted into "cold" levels
// see https://github.com/fjall-rs/lsm-tree/issues/63
Choice::Merge(CompactionInput {
dest_level: 2,
segment_ids: vec![1],
target_size: 64 * 1_024 * 1_024
Expand Down
4 changes: 4 additions & 0 deletions src/level_manifest/level.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ impl Default for Level {
}

impl Level {
pub fn list_ids(&self) -> Vec<SegmentId> {
self.segments.iter().map(|x| x.metadata.id).collect()
}

pub fn insert(&mut self, segment: Arc<Segment>) {
self.segments.push(segment);
self.set_disjoint_flag();
Expand Down
4 changes: 2 additions & 2 deletions src/level_manifest/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ impl std::fmt::Display for LevelManifest {

if level.segments.is_empty() {
write!(f, "<empty>")?;
} else if level.segments.len() >= 24 {
} else if level.segments.len() >= 10 {
#[allow(clippy::indexing_slicing)]
for segment in level.segments.iter().take(2) {
let id = segment.metadata.id;
Expand All @@ -63,7 +63,7 @@ impl std::fmt::Display for LevelManifest {
if is_hidden { ")" } else { "]" },
)?;
}
write!(f, " . . . . . ")?;
write!(f, " . . . ")?;

#[allow(clippy::indexing_slicing)]
for segment in level.segments.iter().rev().take(2).rev() {
Expand Down

0 comments on commit db90238

Please sign in to comment.