diff --git a/src/compaction/leveled.rs b/src/compaction/leveled.rs index 534052e..324c1bd 100644 --- a/src/compaction/leveled.rs +++ b/src/compaction/leveled.rs @@ -213,6 +213,20 @@ impl CompactionStrategy for Strategy { target_size: u64::from(self.target_size), }; + // TODO: eventually, this should happen lazily + // if a segment file lives for very long, it should get rewritten + // Rocks, by default, rewrites files that are 1 month or older + // + // TODO: 3.0.0 configuration? + // NOTE: We purposefully not trivially move segments + // if we go from L1 to L2 + // https://github.com/fjall-rs/lsm-tree/issues/63 + let goes_into_cold_storage = next_level_index == 2; + + if goes_into_cold_storage { + return Choice::Merge(choice); + } + if can_trivial_move && level.is_disjoint { return Choice::Move(choice); } @@ -225,40 +239,78 @@ impl CompactionStrategy for Strategy { return Choice::DoNothing; }; - if first_level.len() >= self.l0_threshold.into() - && !busy_levels.contains(&0) - && !busy_levels.contains(&1) - { - let mut level = first_level.clone(); - level.sort_by_key_range(); // TODO: disjoint levels shouldn't need sort + if first_level.len() >= self.l0_threshold.into() && !busy_levels.contains(&0) { + let first_level_size = first_level.size(); + + // NOTE: Special handling for disjoint workloads + if levels.is_disjoint() { + if first_level_size < self.target_size.into() { + // TODO: also do this in non-disjoint workloads + // -> intra-L0 compaction + + // NOTE: Force a merge into L0 itself + // ...we seem to have *very* small flushes + return if first_level.len() >= 32 { + Choice::Merge(CompactionInput { + dest_level: 0, + segment_ids: first_level.list_ids(), + // NOTE: Allow a bit of overshooting + target_size: ((self.target_size as f32) * 1.1) as u64, + }) + } else { + Choice::DoNothing + }; + } + + return Choice::Merge(CompactionInput { + dest_level: 1, + segment_ids: first_level.list_ids(), + target_size: ((self.target_size as f32) * 1.1) as u64, + }); + } - let Some(next_level) = &resolved_view.get(1) else { - return Choice::DoNothing; - }; + if first_level_size < self.target_size.into() { + // NOTE: We reached the threshold, but L0 is still very small + // meaning we have very small segments, so do intra-L0 compaction + return Choice::Merge(CompactionInput { + dest_level: 0, + segment_ids: first_level.list_ids(), + target_size: self.target_size.into(), + }); + } - let mut segment_ids: Vec = - level.iter().map(|x| x.metadata.id).collect::>(); + if !busy_levels.contains(&1) { + let mut level = first_level.clone(); + level.sort_by_key_range(); - // Get overlapping segments in next level - let key_range = aggregate_key_range(&level); + let Some(next_level) = &resolved_view.get(1) else { + return Choice::DoNothing; + }; - let next_level_overlapping_segment_ids: Vec<_> = next_level - .overlapping_segments(&key_range) - .map(|x| x.metadata.id) - .collect(); + let mut segment_ids: Vec = + level.iter().map(|x| x.metadata.id).collect::>(); - segment_ids.extend(&next_level_overlapping_segment_ids); + // Get overlapping segments in next level + let key_range = aggregate_key_range(&level); - let choice = CompactionInput { - segment_ids, - dest_level: 1, - target_size: u64::from(self.target_size), - }; + let next_level_overlapping_segment_ids: Vec<_> = next_level + .overlapping_segments(&key_range) + .map(|x| x.metadata.id) + .collect(); - if next_level_overlapping_segment_ids.is_empty() && level.is_disjoint { - return Choice::Move(choice); + segment_ids.extend(&next_level_overlapping_segment_ids); + + let choice = CompactionInput { + segment_ids, + dest_level: 1, + target_size: u64::from(self.target_size), + }; + + if next_level_overlapping_segment_ids.is_empty() && level.is_disjoint { + return Choice::Move(choice); + } + return Choice::Merge(choice); } - return Choice::Merge(choice); } } @@ -513,9 +565,8 @@ mod tests { assert_eq!( compactor.choose(&levels, &config), - Choice::Move(CompactionInput { + Choice::Merge(CompactionInput { dest_level: 2, - // NOTE: segment #3 has no overlap with L2 segment_ids: vec![3], target_size: 64 * 1_024 * 1_024 }) @@ -544,7 +595,9 @@ mod tests { assert_eq!( compactor.choose(&levels, &config), - Choice::Move(CompactionInput { + // NOTE: We merge because segments are demoted into "cold" levels + // see https://github.com/fjall-rs/lsm-tree/issues/63 + Choice::Merge(CompactionInput { dest_level: 2, segment_ids: vec![1], target_size: 64 * 1_024 * 1_024 diff --git a/src/level_manifest/level.rs b/src/level_manifest/level.rs index 3d5c4aa..8eee53e 100644 --- a/src/level_manifest/level.rs +++ b/src/level_manifest/level.rs @@ -47,6 +47,10 @@ impl Default for Level { } impl Level { + pub fn list_ids(&self) -> Vec { + self.segments.iter().map(|x| x.metadata.id).collect() + } + pub fn insert(&mut self, segment: Arc) { self.segments.push(segment); self.set_disjoint_flag(); diff --git a/src/level_manifest/mod.rs b/src/level_manifest/mod.rs index 75e0648..0e6af53 100644 --- a/src/level_manifest/mod.rs +++ b/src/level_manifest/mod.rs @@ -50,7 +50,7 @@ impl std::fmt::Display for LevelManifest { if level.segments.is_empty() { write!(f, "")?; - } else if level.segments.len() >= 24 { + } else if level.segments.len() >= 10 { #[allow(clippy::indexing_slicing)] for segment in level.segments.iter().take(2) { let id = segment.metadata.id; @@ -63,7 +63,7 @@ impl std::fmt::Display for LevelManifest { if is_hidden { ")" } else { "]" }, )?; } - write!(f, " . . . . . ")?; + write!(f, " . . . ")?; #[allow(clippy::indexing_slicing)] for segment in level.segments.iter().rev().take(2).rev() {