diff --git a/crates/store/re_chunk/src/chunk.rs b/crates/store/re_chunk/src/chunk.rs index f21869bf39e0..a90e09fdee81 100644 --- a/crates/store/re_chunk/src/chunk.rs +++ b/crates/store/re_chunk/src/chunk.rs @@ -116,6 +116,18 @@ impl PartialEq for Chunk { } impl Chunk { + /// Returns a version of us with a new [`ChunkId`]. + /// + /// Reminder: + /// * The returned [`Chunk`] will re-use the exact same [`RowId`]s as `self`. + /// * Duplicated [`RowId`]s in the `ChunkStore` is undefined behavior. + #[must_use] + #[inline] + pub fn with_id(mut self, id: ChunkId) -> Self { + self.id = id; + self + } + /// Returns `true` is two [`Chunk`]s are similar, although not byte-for-byte equal. /// /// In particular, this ignores chunks and row IDs, as well as temporal timestamps. diff --git a/crates/store/re_chunk/src/shuffle.rs b/crates/store/re_chunk/src/shuffle.rs index f42cf74b5d06..21982213384e 100644 --- a/crates/store/re_chunk/src/shuffle.rs +++ b/crates/store/re_chunk/src/shuffle.rs @@ -111,6 +111,9 @@ impl Chunk { /// The underlying arrow data will be copied and shuffled in memory in order to make it contiguous. /// /// This is a no-op if the underlying timeline is already sorted appropriately (happy path). + /// + /// WARNING: the returned chunk has the same old [`crate::ChunkId`]! Change it with [`Self::with_id`]. + #[must_use] pub fn sorted_by_timeline_if_unsorted(&self, timeline: &Timeline) -> Self { let mut chunk = self.clone(); diff --git a/crates/store/re_chunk/src/slice.rs b/crates/store/re_chunk/src/slice.rs index ccb577afe768..91683b0d0251 100644 --- a/crates/store/re_chunk/src/slice.rs +++ b/crates/store/re_chunk/src/slice.rs @@ -61,6 +61,9 @@ impl Chunk { /// This cannot fail nor panic: `index` and `len` will be capped so that they cannot /// run out of bounds. /// This can result in an empty [`Chunk`] being returned if the slice is completely OOB. + /// + /// WARNING: the returned chunk has the same old [`crate::ChunkId`]! Change it with [`Self::with_id`]. + #[must_use] #[inline] pub fn row_sliced(&self, index: usize, len: usize) -> Self { re_tracing::profile_function!(); @@ -144,6 +147,9 @@ impl Chunk { /// /// If `timeline` is not found within the [`Chunk`], the end result will be the same as the /// current chunk but without any timeline column. + /// + /// WARNING: the returned chunk has the same old [`crate::ChunkId`]! Change it with [`Self::with_id`]. + #[must_use] #[inline] pub fn timeline_sliced(&self, timeline: Timeline) -> Self { let Self { @@ -184,6 +190,9 @@ impl Chunk { /// /// If `component_name` is not found within the [`Chunk`], the end result will be the same as the /// current chunk but without any component column. + /// + /// WARNING: the returned chunk has the same old [`crate::ChunkId`]! Change it with [`Self::with_id`]. + #[must_use] #[inline] pub fn component_sliced(&self, component_name: ComponentName) -> Self { let Self { @@ -224,6 +233,9 @@ impl Chunk { /// /// If none of the selected timelines exist in the [`Chunk`], the end result will be the same as the /// current chunk but without any timeline column. + /// + /// WARNING: the returned chunk has the same old [`crate::ChunkId`]! Change it with [`Self::with_id`]. + #[must_use] #[inline] pub fn timelines_sliced(&self, timelines_to_keep: &IntSet) -> Self { let Self { @@ -264,6 +276,9 @@ impl Chunk { /// /// If none of the `component_names` exist in the [`Chunk`], the end result will be the same as the /// current chunk but without any component column. + /// + /// WARNING: the returned chunk has the same old [`crate::ChunkId`]! Change it with [`Self::with_id`]. + #[must_use] #[inline] pub fn components_sliced(&self, component_names: &IntSet) -> Self { let Self { @@ -306,6 +321,9 @@ impl Chunk { /// /// If `component_name` doesn't exist in this [`Chunk`], or if it is already dense, this method /// is a no-op. + /// + /// WARNING: the returned chunk has the same old [`crate::ChunkId`]! Change it with [`Self::with_id`]. + #[must_use] #[inline] pub fn densified(&self, component_name_pov: ComponentName) -> Self { let Self { @@ -400,6 +418,9 @@ impl Chunk { /// Empties the [`Chunk`] vertically. /// /// The result is a new [`Chunk`] with the same columns but zero rows. + /// + /// WARNING: the returned chunk has the same old [`crate::ChunkId`]! Change it with [`Self::with_id`]. + #[must_use] #[inline] pub fn emptied(&self) -> Self { let Self { diff --git a/crates/store/re_chunk_store/src/drop_time_range.rs b/crates/store/re_chunk_store/src/drop_time_range.rs new file mode 100644 index 000000000000..7058425db913 --- /dev/null +++ b/crates/store/re_chunk_store/src/drop_time_range.rs @@ -0,0 +1,114 @@ +use re_chunk::{ChunkId, Timeline}; +use re_log_types::ResolvedTimeRange; + +use crate::{ChunkStore, ChunkStoreEvent}; + +impl ChunkStore { + /// Drop all events that are in the given range on the given timeline. + /// + /// Note that matching events will be dropped from all timelines they appear on. + /// + /// Static chunks are unaffected. + /// + /// Used to implement undo (erase the last event from the blueprint db). + pub fn drop_time_range( + &mut self, + timeline: &Timeline, + drop_range: ResolvedTimeRange, + ) -> Vec { + re_tracing::profile_function!(); + + if drop_range.max() < drop_range.min() { + return Default::default(); + } + + // Prepare the changes: + + let mut chunk_ids_to_drop = vec![]; + let mut new_chunks = vec![]; + + for (chunk_id, chunk) in &self.chunks_per_chunk_id { + let Some(time_column) = chunk.timelines().get(timeline) else { + // static chunk, or chunk that doesn't overlap this timeline + continue; // keep it + }; + + let chunk_range = time_column.time_range(); + + if drop_range.contains_range(chunk_range) { + // The whole chunk should be dropped! + chunk_ids_to_drop.push(*chunk_id); + } else if drop_range.intersects(chunk_range) { + let chunk = chunk.sorted_by_timeline_if_unsorted(timeline); + + let num_rows = chunk.num_rows(); + + // Get the sorted times: + #[allow(clippy::unwrap_used)] // We already know the chunk has the timeline + let time_column = chunk.timelines().get(timeline).unwrap(); + let times = time_column.times_raw(); + + let drop_range_min = drop_range.min().as_i64(); + let drop_range_max = drop_range.max().as_i64(); + + let min_idx = times.partition_point(|&time| time < drop_range_min); + let max_idx = times.partition_point(|&time| time <= drop_range_max); + + { + // Sanity check: + debug_assert!(min_idx <= max_idx); + debug_assert!(drop_range_min <= times[min_idx]); + if 0 < min_idx { + debug_assert!(times[min_idx - 1] < drop_range_min); + } + if max_idx < num_rows { + debug_assert!(drop_range_max < times[max_idx]); + if 0 < max_idx { + debug_assert!(times[max_idx - 1] <= drop_range_max); + } + } + } + + if min_idx < max_idx { + chunk_ids_to_drop.push(*chunk_id); + if 0 < min_idx { + new_chunks.push(chunk.row_sliced(0, min_idx).with_id(ChunkId::new())); + } + if max_idx < num_rows { + new_chunks.push( + chunk + .row_sliced(max_idx, num_rows - max_idx) + .with_id(ChunkId::new()), + ); + } + } + } + } + + // ------------------ + // Apply the changes: + + let generation = self.generation(); + let mut events: Vec = vec![]; + + for chunk_id in chunk_ids_to_drop { + for diff in self.remove_chunk(chunk_id) { + events.push(ChunkStoreEvent { + store_id: self.id.clone(), + store_generation: generation.clone(), + event_id: self + .event_id + .fetch_add(1, std::sync::atomic::Ordering::Relaxed), + diff, + }); + } + } + for mut chunk in new_chunks { + chunk.sort_if_unsorted(); + #[allow(clippy::unwrap_used)] // The chunk came from the store, so it should be fine + events.append(&mut self.insert_chunk(&chunk.into()).unwrap()); + } + + events + } +} diff --git a/crates/store/re_chunk_store/src/lib.rs b/crates/store/re_chunk_store/src/lib.rs index 54f0df1f8e8a..ddeb2e7bb87e 100644 --- a/crates/store/re_chunk_store/src/lib.rs +++ b/crates/store/re_chunk_store/src/lib.rs @@ -15,6 +15,7 @@ //! mod dataframe; +mod drop_time_range; mod events; mod gc; mod query; diff --git a/crates/store/re_chunk_store/tests/drop_time_range.rs b/crates/store/re_chunk_store/tests/drop_time_range.rs new file mode 100644 index 000000000000..2400dcac8127 --- /dev/null +++ b/crates/store/re_chunk_store/tests/drop_time_range.rs @@ -0,0 +1,82 @@ +// https://github.com/rust-lang/rust-clippy/issues/10011 +#![cfg(test)] + +use std::sync::Arc; + +use re_chunk::{Chunk, RowId}; +use re_chunk_store::{ChunkStore, ChunkStoreConfig}; +use re_log_types::{example_components::MyColor, ResolvedTimeRange}; +use re_log_types::{EntityPath, TimePoint, Timeline}; +use re_types_core::Loggable as _; + +#[test] +fn drop_time_range() -> anyhow::Result<()> { + re_log::setup_logging(); + + let entity_path = EntityPath::from("this/that"); + let timeline = Timeline::new_sequence("timeline"); + let data = MyColor::from_rgb(255, 0, 0); + let time_point_at = |time: i64| TimePoint::from([(timeline, time)]); + + for config in [ + ChunkStoreConfig::DEFAULT, + ChunkStoreConfig::COMPACTION_DISABLED, + ] { + let mut store = ChunkStore::new( + re_log_types::StoreId::random(re_log_types::StoreKind::Recording), + config, + ); + + let num_events = |store: &ChunkStore| { + store.num_temporal_events_for_component_on_timeline( + &timeline, + &entity_path, + MyColor::name(), + ) + }; + + store.insert_chunk(&Arc::new( + Chunk::builder(entity_path.clone()) + .with_component_batch(RowId::new(), time_point_at(0), &data) + .with_component_batch(RowId::new(), time_point_at(1), &data) + .with_component_batch(RowId::new(), time_point_at(2), &data) + .with_component_batch(RowId::new(), time_point_at(3), &data) + .build()?, + ))?; + + store.insert_chunk(&Arc::new( + Chunk::builder(entity_path.clone()) + .with_component_batch(RowId::new(), time_point_at(2), &data) + .with_component_batch(RowId::new(), time_point_at(3), &data) + .with_component_batch(RowId::new(), time_point_at(4), &data) + .with_component_batch(RowId::new(), time_point_at(5), &data) + .build()?, + ))?; + + store.insert_chunk(&Arc::new( + Chunk::builder(entity_path.clone()) + .with_component_batch(RowId::new(), time_point_at(4), &data) + .with_component_batch(RowId::new(), time_point_at(5), &data) + .with_component_batch(RowId::new(), time_point_at(6), &data) + .with_component_batch(RowId::new(), time_point_at(7), &data) + .build()?, + ))?; + + assert_eq!(num_events(&store), 12); + + // Drop nothing: + store.drop_time_range(&timeline, ResolvedTimeRange::new(10, 100)); + store.drop_time_range(&timeline, ResolvedTimeRange::new(-100, -10)); + assert_eq!(num_events(&store), 12); + + // Drop stuff from the middle of the first chunk, and the start of the second: + store.drop_time_range(&timeline, ResolvedTimeRange::new(1, 2)); + assert_eq!(num_events(&store), 9); + + // Drop a bunch in the middle (including all of middle chunk): + store.drop_time_range(&timeline, ResolvedTimeRange::new(2, 5)); + assert_eq!(num_events(&store), 3); + } + + Ok(()) +} diff --git a/crates/store/re_entity_db/src/entity_db.rs b/crates/store/re_entity_db/src/entity_db.rs index a2c1356d3f48..963284e0fa74 100644 --- a/crates/store/re_entity_db/src/entity_db.rs +++ b/crates/store/re_entity_db/src/entity_db.rs @@ -446,6 +446,19 @@ impl EntityDb { store_events } + /// Drop all events in the given time range from the given timeline. + /// + /// Used to implement undo (erase the last event from the blueprint db). + pub fn drop_time_range( + &mut self, + timeline: &Timeline, + drop_range: ResolvedTimeRange, + ) -> Vec { + let store_events = self.data_store.drop_time_range(timeline, drop_range); + self.on_store_deletions(&store_events); + store_events + } + /// Unconditionally drops all the data for a given [`EntityPath`] . /// /// This is _not_ recursive. Children of this entity will not be affected. diff --git a/crates/store/re_log_types/src/resolved_time_range.rs b/crates/store/re_log_types/src/resolved_time_range.rs index f542a173b2ba..5cd8af4a561c 100644 --- a/crates/store/re_log_types/src/resolved_time_range.rs +++ b/crates/store/re_log_types/src/resolved_time_range.rs @@ -89,6 +89,12 @@ impl ResolvedTimeRange { self.min <= time && time <= self.max } + /// Does this range fully contain the other? + #[inline] + pub fn contains_range(&self, other: Self) -> bool { + self.min <= other.min && other.max <= self.max + } + #[inline] pub fn intersects(&self, other: Self) -> bool { self.min <= other.max && self.max >= other.min diff --git a/crates/store/re_log_types/src/time_point/time_int.rs b/crates/store/re_log_types/src/time_point/time_int.rs index 1d254c90579d..de51b6da6c1e 100644 --- a/crates/store/re_log_types/src/time_point/time_int.rs +++ b/crates/store/re_log_types/src/time_point/time_int.rs @@ -118,6 +118,7 @@ impl TimeInt { /// Always returns [`Self::STATIC`] for [`Self::STATIC`]. #[inline] + #[must_use] pub fn inc(&self) -> Self { match self.0 { Some(t) => Self::new_temporal(t.get().saturating_add(1)), @@ -127,6 +128,7 @@ impl TimeInt { /// Always returns [`Self::STATIC`] for [`Self::STATIC`]. #[inline] + #[must_use] pub fn dec(&self) -> Self { match self.0 { Some(t) => Self::new_temporal(t.get().saturating_sub(1)),