Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better handle segment target size for disjoint workloads #72

Merged
merged 24 commits into from
Oct 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 82 additions & 29 deletions src/compaction/leveled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,20 @@ impl CompactionStrategy for Strategy {
target_size: u64::from(self.target_size),
};

// TODO: eventually, this should happen lazily
// if a segment file lives for very long, it should get rewritten
// Rocks, by default, rewrites files that are 1 month or older
//
// TODO: 3.0.0 configuration?
// NOTE: We purposefully not trivially move segments
// if we go from L1 to L2
// https://github.com/fjall-rs/lsm-tree/issues/63
let goes_into_cold_storage = next_level_index == 2;

if goes_into_cold_storage {
return Choice::Merge(choice);
}

if can_trivial_move && level.is_disjoint {
return Choice::Move(choice);
}
Expand All @@ -225,40 +239,78 @@ impl CompactionStrategy for Strategy {
return Choice::DoNothing;
};

if first_level.len() >= self.l0_threshold.into()
&& !busy_levels.contains(&0)
&& !busy_levels.contains(&1)
{
let mut level = first_level.clone();
level.sort_by_key_range(); // TODO: disjoint levels shouldn't need sort
if first_level.len() >= self.l0_threshold.into() && !busy_levels.contains(&0) {
let first_level_size = first_level.size();

// NOTE: Special handling for disjoint workloads
if levels.is_disjoint() {
if first_level_size < self.target_size.into() {
// TODO: also do this in non-disjoint workloads
// -> intra-L0 compaction

// NOTE: Force a merge into L0 itself
// ...we seem to have *very* small flushes
return if first_level.len() >= 32 {
Choice::Merge(CompactionInput {
dest_level: 0,
segment_ids: first_level.list_ids(),
// NOTE: Allow a bit of overshooting
target_size: ((self.target_size as f32) * 1.1) as u64,
})
} else {
Choice::DoNothing
};
}

return Choice::Merge(CompactionInput {
dest_level: 1,
segment_ids: first_level.list_ids(),
target_size: ((self.target_size as f32) * 1.1) as u64,
});
}

let Some(next_level) = &resolved_view.get(1) else {
return Choice::DoNothing;
};
if first_level_size < self.target_size.into() {
// NOTE: We reached the threshold, but L0 is still very small
// meaning we have very small segments, so do intra-L0 compaction
return Choice::Merge(CompactionInput {
dest_level: 0,
segment_ids: first_level.list_ids(),
target_size: self.target_size.into(),
});
}

let mut segment_ids: Vec<u64> =
level.iter().map(|x| x.metadata.id).collect::<Vec<_>>();
if !busy_levels.contains(&1) {
let mut level = first_level.clone();
level.sort_by_key_range();

// Get overlapping segments in next level
let key_range = aggregate_key_range(&level);
let Some(next_level) = &resolved_view.get(1) else {
return Choice::DoNothing;
};

let next_level_overlapping_segment_ids: Vec<_> = next_level
.overlapping_segments(&key_range)
.map(|x| x.metadata.id)
.collect();
let mut segment_ids: Vec<u64> =
level.iter().map(|x| x.metadata.id).collect::<Vec<_>>();

segment_ids.extend(&next_level_overlapping_segment_ids);
// Get overlapping segments in next level
let key_range = aggregate_key_range(&level);

let choice = CompactionInput {
segment_ids,
dest_level: 1,
target_size: u64::from(self.target_size),
};
let next_level_overlapping_segment_ids: Vec<_> = next_level
.overlapping_segments(&key_range)
.map(|x| x.metadata.id)
.collect();

if next_level_overlapping_segment_ids.is_empty() && level.is_disjoint {
return Choice::Move(choice);
segment_ids.extend(&next_level_overlapping_segment_ids);

let choice = CompactionInput {
segment_ids,
dest_level: 1,
target_size: u64::from(self.target_size),
};

if next_level_overlapping_segment_ids.is_empty() && level.is_disjoint {
return Choice::Move(choice);
}
return Choice::Merge(choice);
}
return Choice::Merge(choice);
}
}

Expand Down Expand Up @@ -513,9 +565,8 @@ mod tests {

assert_eq!(
compactor.choose(&levels, &config),
Choice::Move(CompactionInput {
Choice::Merge(CompactionInput {
dest_level: 2,
// NOTE: segment #3 has no overlap with L2
segment_ids: vec![3],
target_size: 64 * 1_024 * 1_024
})
Expand Down Expand Up @@ -544,7 +595,9 @@ mod tests {

assert_eq!(
compactor.choose(&levels, &config),
Choice::Move(CompactionInput {
// NOTE: We merge because segments are demoted into "cold" levels
// see https://github.com/fjall-rs/lsm-tree/issues/63
Choice::Merge(CompactionInput {
dest_level: 2,
segment_ids: vec![1],
target_size: 64 * 1_024 * 1_024
Expand Down
4 changes: 4 additions & 0 deletions src/level_manifest/level.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ impl Default for Level {
}

impl Level {
pub fn list_ids(&self) -> Vec<SegmentId> {
self.segments.iter().map(|x| x.metadata.id).collect()
}

pub fn insert(&mut self, segment: Arc<Segment>) {
self.segments.push(segment);
self.set_disjoint_flag();
Expand Down
4 changes: 2 additions & 2 deletions src/level_manifest/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ impl std::fmt::Display for LevelManifest {

if level.segments.is_empty() {
write!(f, "<empty>")?;
} else if level.segments.len() >= 24 {
} else if level.segments.len() >= 10 {
#[allow(clippy::indexing_slicing)]
for segment in level.segments.iter().take(2) {
let id = segment.metadata.id;
Expand All @@ -63,7 +63,7 @@ impl std::fmt::Display for LevelManifest {
if is_hidden { ")" } else { "]" },
)?;
}
write!(f, " . . . . . ")?;
write!(f, " . . . ")?;

#[allow(clippy::indexing_slicing)]
for segment in level.segments.iter().rev().take(2).rev() {
Expand Down
Loading