-
Notifications
You must be signed in to change notification settings - Fork 21
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] Stats pruning #979
base: develop
Are you sure you want to change the base?
[WIP] Stats pruning #979
Conversation
@@ -100,6 +100,15 @@ impl BoolArrayTrait for ByteBoolArray { | |||
fn maybe_null_slices_iter<'a>(&'a self) -> Box<dyn Iterator<Item = (usize, usize)> + 'a> { | |||
todo!() | |||
} | |||
|
|||
fn not(&self) -> Array { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename to inverse
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Invert? Negate?
@@ -104,6 +104,12 @@ impl BoolArrayTrait for RoaringBoolArray { | |||
fn maybe_null_slices_iter<'a>(&'a self) -> Box<dyn Iterator<Item = (usize, usize)> + 'a> { | |||
todo!() | |||
} | |||
|
|||
fn not(&self) -> Array { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename to inverse
@@ -113,6 +114,12 @@ impl BoolArrayTrait for RunEndBoolArray { | |||
fn maybe_null_slices_iter<'a>(&'a self) -> Box<dyn Iterator<Item = (usize, usize)> + 'a> { | |||
todo!() | |||
} | |||
|
|||
fn not(&self) -> Array { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename to inverse
@@ -103,6 +103,12 @@ impl BoolArrayTrait for BoolArray { | |||
fn maybe_null_slices_iter<'a>(&'a self) -> Box<dyn Iterator<Item = (usize, usize)> + 'a> { | |||
Box::new(BitSliceIterator::new(self.buffer(), 0, self.len())) | |||
} | |||
|
|||
fn not(&self) -> Array { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename to inverse
@@ -54,6 +54,16 @@ impl BoolArrayTrait for ChunkedArray { | |||
fn maybe_null_slices_iter(&self) -> Box<dyn Iterator<Item = (usize, usize)>> { | |||
todo!() | |||
} | |||
|
|||
fn not(&self) -> Array { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename to inverse
@@ -78,6 +78,18 @@ impl BoolArrayTrait for ConstantArray { | |||
Box::new(iter::empty()) | |||
} | |||
} | |||
|
|||
fn not(&self) -> Array { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename to inverse
@@ -55,6 +55,18 @@ impl BoolArrayTrait for SparseArray { | |||
fn maybe_null_slices_iter(&self) -> Box<dyn Iterator<Item = (usize, usize)>> { | |||
todo!() | |||
} | |||
|
|||
fn not(&self) -> Array { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename to inverse
@@ -107,20 +119,6 @@ where | |||
} | |||
|
|||
impl PrimitiveArrayTrait for ConstantArray { | |||
fn f32_accessor(&self) -> Option<AccessorRef<f32>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
undo this diff
@@ -95,7 +95,7 @@ pub fn compare( | |||
vortex_bail!("Compare operations only support arrays of the same type"); | |||
} | |||
|
|||
if left.is_encoding(Constant::ID) { | |||
if left.is_encoding(Constant::ID) && !right.is_encoding(Constant::ID) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make a separate pr and add tests
@@ -103,6 +103,8 @@ pub trait BoolArrayTrait: ArrayTrait { | |||
// and the buffer is a slice of that array, omitted slices | |||
// could be either true or false signified by the initial | |||
// value returned. | |||
|
|||
fn not(&self) -> Array; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename to inverse
pub trait VortexExpr: Debug + Send + Sync + PartialEq<dyn Any> { | ||
fn as_any(&self) -> &dyn Any; | ||
|
||
fn evaluate(&self, batch: &Array) -> VortexResult<Array>; | ||
|
||
fn references(&self) -> HashSet<Field>; | ||
|
||
fn estimate_cost(&self, schema: &Schema) -> usize; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has to come back at some point, however, it has to have different form
@@ -43,9 +40,6 @@ fn unbox_any(any: &dyn Any) -> &dyn Any { | |||
} | |||
} | |||
|
|||
#[derive(Debug, PartialEq, Hash, Clone, Eq)] | |||
pub struct NoOp; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was no need to translate it, we will never see it as a datafusion table provider
// TODO(robert): This might be too broad of a check since it should be limited only to fields in the projection | ||
self.rhs | ||
.references() | ||
.intersection(&lhsp.references()) | ||
.next() | ||
.is_none() | ||
.then_some(lhsp) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Narrow this check down
fn estimate_cost(&self, schema: &Schema) -> usize; | ||
fn project(&self, projection: &[Field]) -> Option<Arc<dyn VortexExpr>>; | ||
|
||
fn is_constant(&self) -> bool; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should actually be fn fold(&self) -> Option<Scalar>
, i.e. can this be evaluated to a constant?
@@ -10,20 +8,25 @@ pub use expr::*; | |||
pub use operators::*; | |||
|
|||
pub fn split_conjunction(expr: &Arc<dyn VortexExpr>) -> Vec<Arc<dyn VortexExpr>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Split this diff to separate pr
encoding: uint16; | ||
metadata: [ubyte]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually use this, it's never written for now
@@ -171,14 +171,6 @@ macro_rules! primitive_scalar { | |||
} | |||
} | |||
|
|||
impl From<Option<$T>> for ScalarValue { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can actually have a generic instead of generating all of the impls
@@ -135,6 +135,12 @@ impl ScalarValue { | |||
} | |||
} | |||
|
|||
impl<T: Into<ScalarValue>> From<Option<T>> for ScalarValue { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
like this
vortex-serde/Cargo.toml
Outdated
simplelog = { workspace = true } | ||
tokio = { workspace = true, features = ["full"] } | ||
vortex-alp = { path = "../encodings/alp" } | ||
vortex-fastlanes = { path = "../encodings/fastlanes" } | ||
vortex-sampling-compressor = { path = "../vortex-sampling-compressor" } | ||
|
||
[lints] | ||
workspace = true | ||
# workspace = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bring this back
pub fn try_new(original_expr: &Arc<dyn VortexExpr>) -> Option<Self> { | ||
let (expr, required_stats) = convert_to_pruning_expression(original_expr); | ||
// TODO(robert): Could be constant false but right now we don't generate such expressions, need to have way to reduce constant expressions | ||
if expr.is_constant() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should use fold
if let Some(nexp) = expr.as_any().downcast_ref::<Not>() { | ||
if nexp.child().as_any().downcast_ref::<Column>().is_some() { | ||
let mut references = HashMap::new(); | ||
let min_expr = replace_column_with_stat(expr, Stat::Min, &mut references); | ||
let max_expr = replace_column_with_stat(expr, Stat::Max, &mut references); | ||
return min_expr | ||
.zip(max_expr) | ||
.map(|(min_exp, max_exp)| { | ||
( | ||
Arc::new(BinaryExpr::new(min_exp, Operator::And, max_exp)) | ||
as Arc<dyn VortexExpr>, | ||
references, | ||
) | ||
}) | ||
.unwrap_or_else(|| { | ||
( | ||
Arc::new(Literal::new(Scalar::bool(false, Nullability::NonNullable))), | ||
HashMap::new(), | ||
) | ||
}); | ||
} | ||
} | ||
|
||
if expr.as_any().downcast_ref::<Column>().is_some() { | ||
return (fallback, HashMap::new()); | ||
let mut references = HashMap::new(); | ||
let min_expr = replace_column_with_stat(expr, Stat::Min, &mut references); | ||
let max_expr = replace_column_with_stat(expr, Stat::Max, &mut references); | ||
return min_expr | ||
.zip(max_expr) | ||
.map(|(min_exp, max_exp)| { | ||
( | ||
Arc::new(Not::new(Arc::new(BinaryExpr::new( | ||
min_exp, | ||
Operator::Or, | ||
max_exp, | ||
)))) as Arc<dyn VortexExpr>, | ||
references, | ||
) | ||
}) | ||
.unwrap_or_else(|| { | ||
( | ||
Arc::new(Literal::new(Scalar::bool(false, Nullability::NonNullable))), | ||
HashMap::new(), | ||
) | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extract this to separate function
pub struct BatchPruner { | ||
children: Vec<Box<dyn LayoutReader>>, | ||
selectors: Vec<Option<RowSelector>>, | ||
projected_scans: Vec<(usize, PruningScan)>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a bit too similar to BatchReader. Consider unifying
"Mask arrays have to be integer arrays" | ||
); | ||
self.indices = Some(array); | ||
pub fn with_row_selector(mut self, array: RowSelector) -> Self { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's strictly more powerful to use row ranges than individual indices
let pruner = if let Some(pscan) = pruning_scan { | ||
if let Some(fr) = filter_reader { | ||
Ok(LayoutPruner::new( | ||
self.reader, | ||
fr, | ||
message_cache.clone(), | ||
pscan, | ||
)) | ||
} else { | ||
Err((self.reader, filter_reader)) | ||
} | ||
} else { | ||
Err((self.reader, filter_reader)) | ||
}; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is terrible, we have to push filtering in to readers to avoid this mess
pub fn has_dtype(&self) -> bool { | ||
self.dtype.is_some() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A bit of a hack. We know that if we don't have dtype it must be stored somehow. However, I think we can deduce stats schema and avoid storing dtype altogether if we store stats as column
if let Some(pf) = &scan.filter { | ||
let field_prefix = scan | ||
.stats_projection | ||
.keys() | ||
.next() | ||
.cloned() | ||
.vortex_expect("Must have only single field ref"); | ||
let field_stats = scan | ||
.stats_projection | ||
.get(&field_prefix) | ||
.vortex_expect("We got the key by iterating keys"); | ||
let prefix_str = match field_prefix { | ||
Field::Name(n) => n.to_string(), | ||
Field::Index(i) => i.to_string(), | ||
}; | ||
let expanded_batch = add_missing_columns( | ||
b, | ||
&prefix_str, | ||
field_stats, | ||
self.message_cache.dtype(), | ||
)?; | ||
let mask = pf.evaluate(&expanded_batch)?; | ||
let row_offsets = expanded_batch | ||
.with_dyn(|a| { | ||
a.as_struct_array() | ||
.ok_or_else(|| vortex_err!("Stats weren't a struct array")) | ||
.map(|s| { | ||
s.field_by_name(&format!("{prefix_str}_row_offset")) | ||
}) | ||
})? | ||
.ok_or_else(|| vortex_err!("Missing row offsets"))?; | ||
filter_offsets(row_offsets, mask, scan.row_count as usize) | ||
.map(|s| Some(PlanResult::Range(s))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is too much logic here
}) | ||
})? | ||
.ok_or_else(|| vortex_err!("Missing row offsets"))?; | ||
filter_offsets(row_offsets, mask, scan.row_count as usize) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we return chunks that should be pruned and we want to produce rows to include we need to flip the mask in this function
} | ||
} | ||
|
||
fn filter_offsets(offsets: Array, mask: Array, row_count: usize) -> VortexResult<RowSelector> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The mask has to be flipped before being passed to this function or this function flips the mask
let dtype = if self.cache.has_dtype() { | ||
self.cache.dtype().clone() | ||
} else { | ||
let dtype_fb_length = buf.get_u32_le(); | ||
let dtype_buf = buf.split_to(dtype_fb_length as usize); | ||
let msg = unsafe { root_unchecked::<fb::Message>(&dtype_buf) } | ||
.header_as_schema() | ||
.ok_or_else(|| { | ||
vortex_err!( | ||
"Expected schema message; this was checked earlier in the function" | ||
) | ||
})?; | ||
IPCDType::read_flatbuffer(&msg)?.0 | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this likely goes away if we change the way stats are stored
let mut input = None; | ||
let mut filter_reader = None; | ||
let mut layout_pruner = None; | ||
let mut state = StreamingState::Init; | ||
if let Ok(pruner) = pruner { | ||
layout_pruner = Some(pruner); | ||
state = StreamingState::Pruning; | ||
} else { | ||
StreamingState::Init | ||
}; | ||
(input, filter_reader) = pruner.map_err(|(i, f)| (Some(i), f)).unwrap_err(); | ||
if filter_reader.is_some() { | ||
state = StreamingState::FilterInit; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I hate this code but ownership is hard
} | ||
|
||
impl RowSelector { | ||
pub fn new(ranges: Vec<RowRange>) -> Self { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should have a sorting constructor and unsafe non sorting one
There's a lot of cleanup to do here but this performs pruning, however, we don't
yet make use of that pruning result