Skip to content

Commit

Permalink
chore: bump datafusion
Browse files Browse the repository at this point in the history
  • Loading branch information
tanruixiang committed Oct 8, 2023
1 parent acbd3ad commit b4dad3c
Show file tree
Hide file tree
Showing 17 changed files with 304 additions and 126 deletions.
329 changes: 248 additions & 81 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
[workspace]
members = ["datafusion_util", "influxdb_influxql_parser", "iox_query_influxql", "observability_deps", "schema", "test_helpers"]
resolver = "2"

[workspace.package]
version = "0.1.0"
Expand All @@ -8,6 +9,6 @@ edition = "2021"
license = "MIT OR Apache-2.0"

[workspace.dependencies]
arrow = { version = "43.0.0" }
datafusion = { git = "https://github.com/CeresDB/arrow-datafusion.git", rev = "9c3a537e25e5ab3299922864034f67fb2f79805d", default-features = false }
arrow = { version = "47.0.0" }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "219cfb4ccb36045c73409127db51377d66ca0f33", default-features = false }
hashbrown = { version = "0.13.2" }
3 changes: 1 addition & 2 deletions arrow_util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ license.workspace = true

[dependencies]
ahash = { version = "0.8", default-features = false, features = ["runtime-rng"] }
# need dyn_cmp_dict feature for comparing dictionary arrays
arrow = { workspace = true, features = ["prettyprint", "dyn_cmp_dict"] }
arrow = { workspace = true, features = ["prettyprint"] }
# used by arrow anyway (needed for printing workaround)
chrono = { version = "0.4", default-features = false }
comfy-table = { version = "6.1", default-features = false }
Expand Down
2 changes: 1 addition & 1 deletion arrow_util/src/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ fn array_value_to_string(column: &ArrayRef, row: usize) -> Result<String> {
)
})?;
// treat as UTC
let ts = DateTime::<Utc>::from_utc(ts, Utc);
let ts = DateTime::<Utc>::from_naive_utc_and_offset(ts, Utc);
// convert to string in preferred influx format
let use_z = true;
Ok(ts.to_rfc3339_opts(SecondsFormat::AutoSi, use_z))
Expand Down
16 changes: 7 additions & 9 deletions iox_query/src/exec/gapfill/algo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ mod interpolate;
use std::{ops::Range, sync::Arc};

use arrow::{
array::{Array, ArrayRef, TimestampNanosecondArray, UInt64Array},
compute::{kernels::take, SortColumn},
array::{Array, ArrayRef, PrimitiveArray, TimestampNanosecondArray, UInt64Array},
compute::kernels::take,
datatypes::SchemaRef,
record_batch::RecordBatch,
};
Expand Down Expand Up @@ -151,13 +151,11 @@ impl GapFiller {

let sort_columns = group_arr
.iter()
.map(|(_, arr)| SortColumn {
values: Arc::clone(arr),
options: None,
})
.map(|(_, arr)| Arc::clone(arr))
.collect::<Vec<_>>();
let mut ranges = arrow::compute::lexicographical_partition_ranges(&sort_columns)
.map_err(DataFusionError::ArrowError)?;
let mut ranges = arrow::compute::partition(&sort_columns)?
.ranges()
.into_iter();

let mut series_ends = vec![];
let mut cursor = self.cursor.clone_for_aggr_col(None)?;
Expand Down Expand Up @@ -941,7 +939,7 @@ impl StashedAggrBuilder<'_> {
/// `input_aggr_array` at `offset` for use with the [`interleave`](arrow::compute::interleave)
/// kernel.
fn create_stash(input_aggr_array: &ArrayRef, offset: u64) -> Result<ArrayRef> {
let take_arr = vec![None, Some(offset)].into();
let take_arr: PrimitiveArray<_> = vec![None, Some(offset)].into();
let stash =
take::take(input_aggr_array, &take_arr, None).map_err(DataFusionError::ArrowError)?;
Ok(stash)
Expand Down
4 changes: 2 additions & 2 deletions iox_query/src/exec/gapfill/exec_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1083,7 +1083,7 @@ struct TestCase {
impl TestCase {
fn run(self) -> Result<Vec<RecordBatch>> {
block_on(async {
let session_ctx = SessionContext::with_config(
let session_ctx = SessionContext::new_with_config(
SessionConfig::default().with_batch_size(self.output_batch_size),
)
.into();
Expand All @@ -1093,7 +1093,7 @@ impl TestCase {

fn run_with_memory_limit(self, limit: usize) -> Result<Vec<RecordBatch>> {
block_on(async {
let session_ctx = SessionContext::with_config_rt(
let session_ctx = SessionContext::new_with_config_rt(
SessionConfig::default().with_batch_size(self.output_batch_size),
RuntimeEnv::new(RuntimeConfig::default().with_memory_limit(limit, 1.0))?.into(),
)
Expand Down
6 changes: 3 additions & 3 deletions iox_query/src/exec/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ fn negate(v: &ColumnarValue) -> Result<ColumnarValue> {
} else {
let msg = format!(
"Expected boolean literal, but got type {:?}",
val.get_datatype()
val.data_type()
);
Err(DataFusionError::Internal(msg))
}
Expand All @@ -515,8 +515,8 @@ fn and(left: &ColumnarValue, right: &ColumnarValue) -> Result<ColumnarValue> {
} else {
let msg = format!(
"Expected two boolean literals, but got type {:?} and type {:?}",
val_left.get_datatype(),
val_right.get_datatype()
val_left.data_type(),
val_right.data_type()
);
Err(DataFusionError::Internal(msg))
}
Expand Down
9 changes: 7 additions & 2 deletions iox_query/src/logical_optimizer/handle_gapfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,12 @@ fn build_gapfill_node(
let time_column =
col(new_aggr_plan.schema().fields()[date_bin_gapfill_index].qualified_column());

let aggr = Aggregate::try_from_plan(&new_aggr_plan)?;
let aggr = match &new_aggr_plan {
LogicalPlan::Aggregate(it) => Ok(it),
_ => Err(DataFusionError::Plan(
"Could not coerce into Aggregate!".to_string(),
)),
}?;
let mut new_group_expr: Vec<_> = aggr
.schema
.fields()
Expand Down Expand Up @@ -363,7 +368,7 @@ fn handle_projection(proj: &Projection) -> Result<Option<LogicalPlan>> {
}) else {
// If this is not a projection that is a parent to a GapFill node,
// then there is nothing to do.
return Ok(None)
return Ok(None);
};

let fill_cols: Vec<(&Expr, FillStrategy)> = proj_exprs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ use datafusion::{
common::{tree_node::TreeNodeRewriter, DFSchema},
error::DataFusionError,
logical_expr::{
expr::ScalarUDF, expr_rewriter::rewrite_preserving_name, utils::from_plan, LogicalPlan,
Operator,
expr::ScalarUDF, expr_rewriter::rewrite_preserving_name, LogicalPlan, Operator,
},
optimizer::{OptimizerConfig, OptimizerRule},
prelude::{binary_expr, lit, Expr},
Expand Down Expand Up @@ -67,7 +66,7 @@ fn optimize(plan: &LogicalPlan) -> Result<LogicalPlan, DataFusionError> {
.map(|expr| rewrite_preserving_name(expr, &mut expr_rewriter))
.collect::<Result<Vec<_>, DataFusionError>>()?;

from_plan(plan, new_exprs.as_slice(), new_inputs.as_slice())
plan.with_new_exprs(new_exprs, &new_inputs)
}

impl TreeNodeRewriter for InfluxRegexToDataFusionRegex {
Expand Down
24 changes: 17 additions & 7 deletions iox_query_influxql/src/plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ use datafusion::logical_expr::logical_plan::Analyze;
use datafusion::logical_expr::utils::{expr_as_column_expr, find_aggregate_exprs};
use datafusion::logical_expr::{
binary_expr, col, date_bin, expr, expr::WindowFunction, lit, lit_timestamp_nano, now,
window_function, Aggregate, AggregateFunction, AggregateUDF, Between, BinaryExpr,
BuiltInWindowFunction, BuiltinScalarFunction, EmptyRelation, Explain, Expr, ExprSchemable,
Extension, LogicalPlan, LogicalPlanBuilder, Operator, PlanType, ScalarUDF, TableSource,
ToStringifiedPlan, WindowFrame, WindowFrameBound, WindowFrameUnits,
window_function, AggregateFunction, AggregateUDF, Between, BinaryExpr, BuiltInWindowFunction,
BuiltinScalarFunction, EmptyRelation, Explain, Expr, ExprSchemable, Extension, LogicalPlan,
LogicalPlanBuilder, Operator, PlanType, ScalarUDF, TableSource, ToStringifiedPlan, WindowFrame,
WindowFrameBound, WindowFrameUnits,
};
use datafusion_util::{lit_dict, AsExpr};
use generated_types::influxdata::iox::querier::v1::InfluxQlMetadata;
Expand Down Expand Up @@ -561,7 +561,9 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
}

let Some(time_column_index) = find_time_column_index(fields) else {
return Err(DataFusionError::Internal("unable to find time column".to_owned()))
return Err(DataFusionError::Internal(
"unable to find time column".to_owned(),
));
};

// Find a list of unique aggregate expressions from the projection.
Expand Down Expand Up @@ -1153,7 +1155,10 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
MeasurementSelection::Subquery(_) => Err(DataFusionError::NotImplemented(
"subquery in FROM clause".into(),
)),
}? else { continue };
}?
else {
continue;
};
table_projs.push_back(table_proj);
}
Ok(table_projs)
Expand Down Expand Up @@ -1230,7 +1235,12 @@ fn build_gap_fill_node(
)));
};

let aggr = Aggregate::try_from_plan(&input)?;
let aggr = match &input {
LogicalPlan::Aggregate(ref it) => Ok(it),
_ => Err(DataFusionError::Plan(
"Could not coerce into Aggregate!".to_string(),
)),
}?;
let mut new_group_expr: Vec<_> = aggr
.schema
.fields()
Expand Down
4 changes: 2 additions & 2 deletions iox_query_influxql/src/plan/planner_time_range_expression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ fn reduce_expr(expr: &Expr, tz: Option<chrono_tz::Tz>) -> ExprResult {
Literal::Float(v) => Ok(lit(*v)),
Literal::String(v) => Ok(lit(v.clone())),
Literal::Timestamp(v) => Ok(lit(ScalarValue::TimestampNanosecond(
Some(v.timestamp_nanos()),
v.timestamp_nanos_opt(),
None,
))),
Literal::Duration(v) => Ok(lit(ScalarValue::new_interval_mdn(0, 0, **v))),
Expand Down Expand Up @@ -415,7 +415,7 @@ fn reduce_binary_lhs_string_df_expr(

fn parse_timestamp_nanos(s: &str, tz: Option<chrono_tz::Tz>) -> Result<i64> {
parse_timestamp(s, tz)
.map(|ts| ts.timestamp_nanos())
.map(|ts| ts.timestamp_nanos_opt().unwrap())
.map_err(|_| DataFusionError::Plan(format!("'{s}' is not a valid timestamp")))
}

Expand Down
3 changes: 1 addition & 2 deletions iox_query_influxql/src/plan/timestamp.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use chrono::{DateTime, FixedOffset, NaiveDate, NaiveDateTime, NaiveTime, Offset, TimeZone};
use datafusion::common::{DataFusionError, Result};

/// Parse the timestamp string and return a DateTime in UTC.
fn parse_timestamp_utc(s: &str) -> Result<DateTime<FixedOffset>> {
// 1a. Try a date time format string with nanosecond precision and then without
Expand All @@ -18,7 +17,7 @@ fn parse_timestamp_utc(s: &str) -> Result<DateTime<FixedOffset>> {
NaiveDate::parse_from_str(s, "%Y-%m-%d")
.map(|nd| nd.and_time(NaiveTime::default())),
)
.map(|ts| DateTime::from_utc(ts, chrono::Utc.fix()))
.map(|ts| DateTime::from_naive_utc_and_offset(ts, chrono::Utc.fix()))
.map_err(|_| DataFusionError::Plan("invalid timestamp string".into()))
}

Expand Down
4 changes: 2 additions & 2 deletions iox_query_influxql/src/plan/util_copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,10 +273,10 @@ where
})),
Expr::Wildcard => Ok(Expr::Wildcard),
Expr::QualifiedWildcard { .. } => Ok(expr.clone()),
Expr::GetIndexedField(GetIndexedField { key, expr }) => {
Expr::GetIndexedField(GetIndexedField { expr, field }) => {
Ok(Expr::GetIndexedField(GetIndexedField::new(
Box::new(clone_with_replacement(expr.as_ref(), replacement_fn)?),
key.clone(),
field.clone(),
)))
}
Expr::GroupingSet(set) => match set {
Expand Down
4 changes: 2 additions & 2 deletions query_functions/src/selectors/internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ impl ToState<String> for &str {

fn make_scalar_struct(data_fields: Vec<ScalarValue>) -> ScalarValue {
let fields = vec![
Field::new("value", data_fields[0].get_datatype(), true),
Field::new("time", data_fields[1].get_datatype(), true),
Field::new("value", data_fields[0].data_type(), true),
Field::new("time", data_fields[1].data_type(), true),
];

ScalarValue::Struct(Some(data_fields), Fields::from(fields))
Expand Down
8 changes: 4 additions & 4 deletions query_functions/src/window/internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ fn timestamp_to_datetime(ts: i64) -> DateTime<Utc> {
// Note that nsec as u32 is safe here because modulo on a negative ts value
// still produces a positive remainder.
let datetime = NaiveDateTime::from_timestamp_opt(secs, nsec as u32).expect("ts in range");
DateTime::from_utc(datetime, Utc)
DateTime::from_naive_utc_and_offset(datetime, Utc)
}

/// Original: <https://github.com/influxdata/flux/blob/1e9bfd49f21c0e679b42acf6fc515ce05c6dec2b/values/time.go#L491>
Expand Down Expand Up @@ -198,8 +198,8 @@ fn to_timestamp_nanos_utc(
NaiveTime::from_hms_nano_opt(hour, min, sec, nano).expect("hour-min-sec-nano in range");
let ndatetime = NaiveDateTime::new(ndate, ntime);

let datetime = DateTime::<Utc>::from_utc(ndatetime, Utc);
datetime.timestamp_nanos()
let datetime = DateTime::<Utc>::from_naive_utc_and_offset(ndatetime, Utc);
datetime.timestamp_nanos_opt().unwrap()
}

impl Add<Duration> for i64 {
Expand Down Expand Up @@ -386,7 +386,7 @@ mod tests {
/// t: mustParseTime("1970-02-01T00:00:00Z"),
fn must_parse_time(s: &str) -> i64 {
let datetime = DateTime::parse_from_rfc3339(s).unwrap();
datetime.timestamp_nanos()
datetime.timestamp_nanos_opt().unwrap()
}

/// TestWindow_GetEarliestBounds
Expand Down
2 changes: 1 addition & 1 deletion rust-toolchain
Original file line number Diff line number Diff line change
@@ -1 +1 @@
nightly-2023-02-02
nightly-2023-08-28
2 changes: 1 addition & 1 deletion test_helpers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ tempfile = "3.4.0"
tracing-log = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
observability_deps = { path = "../observability_deps" }
async-trait = { version = "0.1.66", optional = true }
async-trait = { version = "0.1.73", optional = true }
tokio = { version = "1.26.0", optional = true, default_features = false, features = ["time"] }

[features]
Expand Down

0 comments on commit b4dad3c

Please sign in to comment.