From b59169e7a2da4a4fd9114285241cff660bcc2739 Mon Sep 17 00:00:00 2001 From: marvin-j97 Date: Tue, 1 Oct 2024 17:29:49 +0200 Subject: [PATCH 01/15] fix: maybe #63 --- src/compaction/leveled.rs | 10 +++++++++- src/compaction/worker.rs | 1 + 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/src/compaction/leveled.rs b/src/compaction/leveled.rs index fe80d57..6ba800e 100644 --- a/src/compaction/leveled.rs +++ b/src/compaction/leveled.rs @@ -160,7 +160,15 @@ impl CompactionStrategy for Strategy { target_size: u64::from(self.target_size), }; - if next_level_overlapping_segment_ids.is_empty() && level.is_disjoint { + // NOTE: We purposefully not trivially move segments + // if we go from L1 to L2 + // https://github.com/fjall-rs/lsm-tree/issues/63 + let goes_into_cold_storage = next_level_index == 2; + + if next_level_overlapping_segment_ids.is_empty() + && level.is_disjoint + && !goes_into_cold_storage + { return Choice::Move(choice); } return Choice::Merge(choice); diff --git a/src/compaction/worker.rs b/src/compaction/worker.rs index 079608d..2a21606 100644 --- a/src/compaction/worker.rs +++ b/src/compaction/worker.rs @@ -80,6 +80,7 @@ pub fn do_compaction(opts: &Options) -> crate::Result<()> { let choice = opts.strategy.choose(&original_levels, &opts.config); log::debug!("compactor: choice: {choice:?}"); + eprintln!("{original_levels}"); match choice { Choice::Merge(payload) => merge_segments(original_levels, opts, &payload), From ec159995ea245ec3dd06ebfb5e6c7a15db7edd0c Mon Sep 17 00:00:00 2001 From: marvin-j97 Date: Tue, 1 Oct 2024 18:24:04 +0200 Subject: [PATCH 02/15] remove log --- src/compaction/worker.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/compaction/worker.rs b/src/compaction/worker.rs index 2a21606..079608d 100644 --- a/src/compaction/worker.rs +++ b/src/compaction/worker.rs @@ -80,7 +80,6 @@ pub fn do_compaction(opts: &Options) -> crate::Result<()> { let choice = opts.strategy.choose(&original_levels, &opts.config); log::debug!("compactor: choice: {choice:?}"); - eprintln!("{original_levels}"); match choice { Choice::Merge(payload) => merge_segments(original_levels, opts, &payload), From 102ab8ce8fcb4959e7f4c1e182fe0369ebe5ebd9 Mon Sep 17 00:00:00 2001 From: marvin-j97 Date: Tue, 22 Oct 2024 17:26:41 +0200 Subject: [PATCH 03/15] wip --- src/compaction/leveled.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/compaction/leveled.rs b/src/compaction/leveled.rs index 297c199..5f8190e 100644 --- a/src/compaction/leveled.rs +++ b/src/compaction/leveled.rs @@ -185,7 +185,7 @@ impl CompactionStrategy for Strategy { && !busy_levels.contains(&1) { let mut level = first_level.clone(); - level.sort_by_key_range(); // TODO: disjoint levels shouldn't need sort + level.sort_by_key_range(); let Some(next_level) = &resolved_view.get(1) else { return Choice::DoNothing; From 61d75e8a63b96fd64d21edf751e16b4396ec389a Mon Sep 17 00:00:00 2001 From: marvin-j97 Date: Tue, 22 Oct 2024 18:22:31 +0200 Subject: [PATCH 04/15] wip --- src/compaction/leveled.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/compaction/leveled.rs b/src/compaction/leveled.rs index 5f8190e..9571558 100644 --- a/src/compaction/leveled.rs +++ b/src/compaction/leveled.rs @@ -160,15 +160,17 @@ impl CompactionStrategy for Strategy { target_size: u64::from(self.target_size), }; + // TODO: 3.0.0 configuration? // NOTE: We purposefully not trivially move segments // if we go from L1 to L2 // https://github.com/fjall-rs/lsm-tree/issues/63 let goes_into_cold_storage = next_level_index == 2; - if next_level_overlapping_segment_ids.is_empty() - && level.is_disjoint - && !goes_into_cold_storage - { + if goes_into_cold_storage { + return Choice::Merge(choice); + } + + if next_level_overlapping_segment_ids.is_empty() && level.is_disjoint { return Choice::Move(choice); } return Choice::Merge(choice); From eaf61aab915b3595788ebc0ed21d685414237384 Mon Sep 17 00:00:00 2001 From: marvin-j97 Date: Tue, 22 Oct 2024 18:57:51 +0200 Subject: [PATCH 05/15] fix test --- src/compaction/leveled.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/compaction/leveled.rs b/src/compaction/leveled.rs index 9571558..ebb3295 100644 --- a/src/compaction/leveled.rs +++ b/src/compaction/leveled.rs @@ -495,7 +495,9 @@ mod tests { assert_eq!( compactor.choose(&levels, &config), - Choice::Move(CompactionInput { + // NOTE: We merge because segments are demoted into "cold" levels + // see https://github.com/fjall-rs/lsm-tree/issues/63 + Choice::Merge(CompactionInput { dest_level: 2, segment_ids: vec![1], target_size: 64 * 1_024 * 1_024 From b1ee9c5ff2a7abebe2d8e0912a1ca0524a679d16 Mon Sep 17 00:00:00 2001 From: Marvin <33938500+marvin-j97@users.noreply.github.com> Date: Wed, 23 Oct 2024 23:16:29 +0200 Subject: [PATCH 06/15] Update leveled.rs --- src/compaction/leveled.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/compaction/leveled.rs b/src/compaction/leveled.rs index ebb3295..68bda7b 100644 --- a/src/compaction/leveled.rs +++ b/src/compaction/leveled.rs @@ -160,6 +160,10 @@ impl CompactionStrategy for Strategy { target_size: u64::from(self.target_size), }; + // TODO: eventually, this should happen lazily + // if a segment file lives for very long, it should get rewritten + // Rocks, by default, rewrites files that are 1 month or older + // // TODO: 3.0.0 configuration? // NOTE: We purposefully not trivially move segments // if we go from L1 to L2 From 82aa6e50f4084c08a542b32faf3a941cbda7cbc4 Mon Sep 17 00:00:00 2001 From: marvin-j97 Date: Fri, 25 Oct 2024 19:03:47 +0200 Subject: [PATCH 07/15] perf: try to keep segment size close to target in disjoint workload --- src/compaction/leveled.rs | 42 +++++++++++++++++++++++++++++++-------- src/level_manifest/mod.rs | 4 +++- 2 files changed, 37 insertions(+), 9 deletions(-) diff --git a/src/compaction/leveled.rs b/src/compaction/leveled.rs index ebb3295..34ebaa6 100644 --- a/src/compaction/leveled.rs +++ b/src/compaction/leveled.rs @@ -182,12 +182,37 @@ impl CompactionStrategy for Strategy { return Choice::DoNothing; }; - if first_level.len() >= self.l0_threshold.into() - && !busy_levels.contains(&0) - && !busy_levels.contains(&1) - { - let mut level = first_level.clone(); - level.sort_by_key_range(); + if first_level.len() >= self.l0_threshold.into() && !busy_levels.contains(&0) { + if levels.is_disjoint() { + // NOTE: Special handling for disjoint workloads + + if first_level.size() < self.target_size.into() { + // NOTE: Force a merge into L0 itself + // ...we seem to have *very* small flushes + return if first_level.len() >= 32 { + Choice::Merge(CompactionInput { + dest_level: 0, + segment_ids: first_level + .segments + .iter() + .map(|x| x.metadata.id) + .collect(), + // NOTE: Allow a bit of overshooting + target_size: ((self.target_size as f32) * 1.1) as u64, + }) + } else { + Choice::DoNothing + }; + } + + return Choice::Merge(CompactionInput { + dest_level: 1, + segment_ids: first_level.segments.iter().map(|x| x.metadata.id).collect(), + target_size: ((self.target_size as f32) * 1.1) as u64, + }); + } else if !busy_levels.contains(&1) { + let mut level = first_level.clone(); + level.sort_by_key_range(); let Some(next_level) = &resolved_view.get(1) else { return Choice::DoNothing; @@ -213,9 +238,10 @@ impl CompactionStrategy for Strategy { }; if next_level_overlapping_segment_ids.is_empty() && level.is_disjoint { - return Choice::Move(choice); + return Choice::Move(choice); + } + return Choice::Merge(choice); } - return Choice::Merge(choice); } } diff --git a/src/level_manifest/mod.rs b/src/level_manifest/mod.rs index 75e0648..db960be 100644 --- a/src/level_manifest/mod.rs +++ b/src/level_manifest/mod.rs @@ -313,7 +313,9 @@ impl LevelManifest { #[must_use] pub fn is_disjoint(&self) -> bool { - self.is_disjoint && self.levels.iter().all(|x| x.is_disjoint) + self.is_disjoint + // vv TODO: not needed? + && self.levels.iter().all(|x| x.is_disjoint) } /// Returns `true` if there are no segments From 184f6eef0d8b22bf0428fffee8de9893d780516c Mon Sep 17 00:00:00 2001 From: marvin-j97 Date: Fri, 25 Oct 2024 19:05:50 +0200 Subject: [PATCH 08/15] fmt --- src/level_manifest/mod.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/level_manifest/mod.rs b/src/level_manifest/mod.rs index db960be..d731ba5 100644 --- a/src/level_manifest/mod.rs +++ b/src/level_manifest/mod.rs @@ -313,9 +313,8 @@ impl LevelManifest { #[must_use] pub fn is_disjoint(&self) -> bool { - self.is_disjoint - // vv TODO: not needed? - && self.levels.iter().all(|x| x.is_disjoint) + self.is_disjoint && self.levels.iter().all(|x| x.is_disjoint) + // TODO: not needed? -----------^ } /// Returns `true` if there are no segments From 985f5fd8c221051063aff92ea4abb2ed7b42f4af Mon Sep 17 00:00:00 2001 From: marvin-j97 Date: Fri, 25 Oct 2024 20:33:09 +0200 Subject: [PATCH 09/15] fmt --- src/compaction/leveled.rs | 36 ++++++++++++++++++------------------ 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/src/compaction/leveled.rs b/src/compaction/leveled.rs index 04362c1..d3a17ce 100644 --- a/src/compaction/leveled.rs +++ b/src/compaction/leveled.rs @@ -218,30 +218,30 @@ impl CompactionStrategy for Strategy { let mut level = first_level.clone(); level.sort_by_key_range(); - let Some(next_level) = &resolved_view.get(1) else { - return Choice::DoNothing; - }; + let Some(next_level) = &resolved_view.get(1) else { + return Choice::DoNothing; + }; - let mut segment_ids: Vec = - level.iter().map(|x| x.metadata.id).collect::>(); + let mut segment_ids: Vec = + level.iter().map(|x| x.metadata.id).collect::>(); - // Get overlapping segments in next level - let key_range = aggregate_key_range(&level); + // Get overlapping segments in next level + let key_range = aggregate_key_range(&level); - let next_level_overlapping_segment_ids: Vec<_> = next_level - .overlapping_segments(&key_range) - .map(|x| x.metadata.id) - .collect(); + let next_level_overlapping_segment_ids: Vec<_> = next_level + .overlapping_segments(&key_range) + .map(|x| x.metadata.id) + .collect(); - segment_ids.extend(&next_level_overlapping_segment_ids); + segment_ids.extend(&next_level_overlapping_segment_ids); - let choice = CompactionInput { - segment_ids, - dest_level: 1, - target_size: u64::from(self.target_size), - }; + let choice = CompactionInput { + segment_ids, + dest_level: 1, + target_size: u64::from(self.target_size), + }; - if next_level_overlapping_segment_ids.is_empty() && level.is_disjoint { + if next_level_overlapping_segment_ids.is_empty() && level.is_disjoint { return Choice::Move(choice); } return Choice::Merge(choice); From a742daf0566f35437134b3120c70003210c46ae3 Mon Sep 17 00:00:00 2001 From: marvin-j97 Date: Fri, 25 Oct 2024 20:49:07 +0200 Subject: [PATCH 10/15] add comment --- src/compaction/leveled.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/compaction/leveled.rs b/src/compaction/leveled.rs index d3a17ce..f29823f 100644 --- a/src/compaction/leveled.rs +++ b/src/compaction/leveled.rs @@ -191,6 +191,9 @@ impl CompactionStrategy for Strategy { // NOTE: Special handling for disjoint workloads if first_level.size() < self.target_size.into() { + // TODO: also do this in non-disjoint workloads + // -> intra-L0 compaction + // NOTE: Force a merge into L0 itself // ...we seem to have *very* small flushes return if first_level.len() >= 32 { From 800df8da12562492a4d0f194e7e4894a3c04ac00 Mon Sep 17 00:00:00 2001 From: Marvin <33938500+marvin-j97@users.noreply.github.com> Date: Sat, 26 Oct 2024 20:01:22 +0200 Subject: [PATCH 11/15] Update mod.rs --- src/level_manifest/mod.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/level_manifest/mod.rs b/src/level_manifest/mod.rs index d731ba5..75e0648 100644 --- a/src/level_manifest/mod.rs +++ b/src/level_manifest/mod.rs @@ -314,7 +314,6 @@ impl LevelManifest { #[must_use] pub fn is_disjoint(&self) -> bool { self.is_disjoint && self.levels.iter().all(|x| x.is_disjoint) - // TODO: not needed? -----------^ } /// Returns `true` if there are no segments From d65f523b2258fc1c6d40772fe383fa7649d5d97b Mon Sep 17 00:00:00 2001 From: marvin-j97 Date: Sat, 26 Oct 2024 21:58:33 +0200 Subject: [PATCH 12/15] intra-L0 compaction --- src/compaction/leveled.rs | 32 ++++++++++++++++++++++---------- src/level_manifest/level.rs | 4 ++++ 2 files changed, 26 insertions(+), 10 deletions(-) diff --git a/src/compaction/leveled.rs b/src/compaction/leveled.rs index f29823f..8b6553c 100644 --- a/src/compaction/leveled.rs +++ b/src/compaction/leveled.rs @@ -187,10 +187,11 @@ impl CompactionStrategy for Strategy { }; if first_level.len() >= self.l0_threshold.into() && !busy_levels.contains(&0) { - if levels.is_disjoint() { - // NOTE: Special handling for disjoint workloads + let first_level_size = first_level.size(); - if first_level.size() < self.target_size.into() { + // NOTE: Special handling for disjoint workloads + if levels.is_disjoint() { + if first_level_size < self.target_size.into() { // TODO: also do this in non-disjoint workloads // -> intra-L0 compaction @@ -199,11 +200,7 @@ impl CompactionStrategy for Strategy { return if first_level.len() >= 32 { Choice::Merge(CompactionInput { dest_level: 0, - segment_ids: first_level - .segments - .iter() - .map(|x| x.metadata.id) - .collect(), + segment_ids: first_level.list_ids(), // NOTE: Allow a bit of overshooting target_size: ((self.target_size as f32) * 1.1) as u64, }) @@ -214,10 +211,25 @@ impl CompactionStrategy for Strategy { return Choice::Merge(CompactionInput { dest_level: 1, - segment_ids: first_level.segments.iter().map(|x| x.metadata.id).collect(), + segment_ids: first_level.list_ids(), target_size: ((self.target_size as f32) * 1.1) as u64, }); - } else if !busy_levels.contains(&1) { + } + + /* let l0_threshold_size = (self.l0_threshold as u32) * self.target_size; + + if (first_level_size as f32) < (l0_threshold_size as f32) * 0.66 { + // NOTE: We reached the threshold, but L0 is still very small + // meaning we have very small segments, so do intra-L0 compaction + return Choice::Merge(CompactionInput { + dest_level: 0, + segment_ids: first_level.list_ids(), + // NOTE: Allow a bit of overshooting + target_size: self.target_size.into(), + }); + } */ + + if !busy_levels.contains(&1) { let mut level = first_level.clone(); level.sort_by_key_range(); diff --git a/src/level_manifest/level.rs b/src/level_manifest/level.rs index 2b14cdf..4ddc7d2 100644 --- a/src/level_manifest/level.rs +++ b/src/level_manifest/level.rs @@ -47,6 +47,10 @@ impl Default for Level { } impl Level { + pub fn list_ids(&self) -> Vec { + self.segments.iter().map(|x| x.metadata.id).collect() + } + pub fn insert(&mut self, segment: Arc) { self.segments.push(segment); self.set_disjoint_flag(); From b4dbd2aed3bf33781d8d1dadc79f618a6cf7867d Mon Sep 17 00:00:00 2001 From: marvin-j97 Date: Sat, 26 Oct 2024 23:01:19 +0200 Subject: [PATCH 13/15] wip --- src/compaction/leveled.rs | 8 +++----- src/level_manifest/mod.rs | 4 ++-- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/src/compaction/leveled.rs b/src/compaction/leveled.rs index e9ce95e..240e7a5 100644 --- a/src/compaction/leveled.rs +++ b/src/compaction/leveled.rs @@ -227,7 +227,7 @@ impl CompactionStrategy for Strategy { return Choice::Merge(choice); } - if next_level_overlapping_segment_ids.is_empty() && level.is_disjoint { + if can_trivial_move && level.is_disjoint { return Choice::Move(choice); } return Choice::Merge(choice); @@ -269,9 +269,7 @@ impl CompactionStrategy for Strategy { }); } - /* let l0_threshold_size = (self.l0_threshold as u32) * self.target_size; - - if (first_level_size as f32) < (l0_threshold_size as f32) * 0.66 { + if first_level_size < self.target_size.into() { // NOTE: We reached the threshold, but L0 is still very small // meaning we have very small segments, so do intra-L0 compaction return Choice::Merge(CompactionInput { @@ -280,7 +278,7 @@ impl CompactionStrategy for Strategy { // NOTE: Allow a bit of overshooting target_size: self.target_size.into(), }); - } */ + } if !busy_levels.contains(&1) { let mut level = first_level.clone(); diff --git a/src/level_manifest/mod.rs b/src/level_manifest/mod.rs index 75e0648..0e6af53 100644 --- a/src/level_manifest/mod.rs +++ b/src/level_manifest/mod.rs @@ -50,7 +50,7 @@ impl std::fmt::Display for LevelManifest { if level.segments.is_empty() { write!(f, "")?; - } else if level.segments.len() >= 24 { + } else if level.segments.len() >= 10 { #[allow(clippy::indexing_slicing)] for segment in level.segments.iter().take(2) { let id = segment.metadata.id; @@ -63,7 +63,7 @@ impl std::fmt::Display for LevelManifest { if is_hidden { ")" } else { "]" }, )?; } - write!(f, " . . . . . ")?; + write!(f, " . . . ")?; #[allow(clippy::indexing_slicing)] for segment in level.segments.iter().rev().take(2).rev() { From 226c94adab51a7ac50db7a781ab53b6d84d2eec8 Mon Sep 17 00:00:00 2001 From: marvin-j97 Date: Sat, 26 Oct 2024 23:21:39 +0200 Subject: [PATCH 14/15] fix test case --- src/compaction/leveled.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/compaction/leveled.rs b/src/compaction/leveled.rs index 240e7a5..f9f2720 100644 --- a/src/compaction/leveled.rs +++ b/src/compaction/leveled.rs @@ -566,9 +566,8 @@ mod tests { assert_eq!( compactor.choose(&levels, &config), - Choice::Move(CompactionInput { + Choice::Merge(CompactionInput { dest_level: 2, - // NOTE: segment #3 has no overlap with L2 segment_ids: vec![3], target_size: 64 * 1_024 * 1_024 }) From 3e80d2ca90abfba29d49f092ecc5958d72e87dd4 Mon Sep 17 00:00:00 2001 From: marvin-j97 Date: Sat, 26 Oct 2024 23:32:34 +0200 Subject: [PATCH 15/15] wip --- src/compaction/leveled.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/compaction/leveled.rs b/src/compaction/leveled.rs index f9f2720..324c1bd 100644 --- a/src/compaction/leveled.rs +++ b/src/compaction/leveled.rs @@ -275,7 +275,6 @@ impl CompactionStrategy for Strategy { return Choice::Merge(CompactionInput { dest_level: 0, segment_ids: first_level.list_ids(), - // NOTE: Allow a bit of overshooting target_size: self.target_size.into(), }); }