Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

build(deps): datafusion 41 #2917

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
18 changes: 9 additions & 9 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -95,18 +95,18 @@ criterion = { version = "0.5", features = [
"html_reports",
] }
crossbeam-queue = "0.3"
datafusion = { version = "40.0", default-features = false, features = [
"array_expressions",
datafusion = { version = "41.0", default-features = false, features = [
"nested_expressions",
"regex_expressions",
"unicode_expressions",
] }
datafusion-common = "40.0"
datafusion-functions = { version = "40.0", features = ["regex_expressions"] }
datafusion-sql = "40.0"
datafusion-expr = "40.0"
datafusion-execution = "40.0"
datafusion-optimizer = "40.0"
datafusion-physical-expr = { version = "40.0", features = [
datafusion-common = "41.0"
datafusion-functions = { version = "41.0", features = ["regex_expressions"] }
datafusion-sql = "41.0"
datafusion-expr = "41.0"
datafusion-execution = "41.0"
datafusion-optimizer = "41.0"
datafusion-physical-expr = { version = "41.0", features = [
"regex_expressions",
] }
deepsize = "0.2.0"
Expand Down
2 changes: 1 addition & 1 deletion rust/lance-datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ datafusion.workspace = true
datafusion-common.workspace = true
datafusion-functions.workspace = true
datafusion-physical-expr.workspace = true
datafusion-substrait = { version = "40.0", optional = true }
datafusion-substrait = { version = "41.0", optional = true }
futures.workspace = true
lance-arrow.workspace = true
lance-core = { workspace = true, features = ["datafusion"] }
Expand Down
37 changes: 28 additions & 9 deletions rust/lance-datafusion/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ use datafusion::error::Result as DFResult;
use datafusion::execution::config::SessionConfig;
use datafusion::execution::context::SessionState;
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion::execution::FunctionRegistry;
use datafusion::execution::session_state::SessionStateBuilder;
use datafusion::logical_expr::expr::ScalarFunction;
use datafusion::logical_expr::planner::ExprPlanner;
use datafusion::logical_expr::{
AggregateUDF, ColumnarValue, ScalarUDF, ScalarUDFImpl, Signature, Volatility, WindowUDF,
};
Expand Down Expand Up @@ -153,17 +154,29 @@ impl ScalarUDFImpl for CastListF16Udf {
struct LanceContextProvider {
options: datafusion::config::ConfigOptions,
state: SessionState,
expr_planners: Vec<Arc<dyn ExprPlanner>>,
}

impl Default for LanceContextProvider {
fn default() -> Self {
let config = SessionConfig::new();
let runtime_config = RuntimeConfig::new();
let runtime = Arc::new(RuntimeEnv::new(runtime_config).unwrap());
let state = SessionState::new_with_config_rt(config, runtime);
let mut state_builder = SessionStateBuilder::new()
.with_config(config)
.with_runtime_env(runtime)
.with_default_features();

// SessionState does not expose expr_planners, so we need to get the default ones from
// the builder and store them to return from get_expr_planners

// unwrap safe because with_default_features sets expr_planners
let expr_planners = state_builder.expr_planners().as_ref().unwrap().clone();

Self {
options: ConfigOptions::default(),
state,
state: state_builder.build(),
expr_planners,
}
}
}
Expand Down Expand Up @@ -216,6 +229,10 @@ impl ContextProvider for LanceContextProvider {
fn udwf_names(&self) -> Vec<String> {
self.state.window_functions().keys().cloned().collect()
}

fn get_expr_planners(&self) -> &[Arc<dyn ExprPlanner>] {
&self.expr_planners
}
}

pub struct Planner {
Expand Down Expand Up @@ -381,19 +398,15 @@ impl Planner {
}
}
let context_provider = LanceContextProvider::default();
let mut sql_to_rel = SqlToRel::new_with_options(
let sql_to_rel = SqlToRel::new_with_options(
&context_provider,
ParserOptions {
parse_float_as_decimal: false,
enable_ident_normalization: false,
support_varchar_with_length: false,
enable_options_value_normalization: false,
},
);
// These planners are not automatically propagated.
// See: https://github.com/apache/datafusion/issues/11477
for planner in context_provider.state.expr_planners() {
sql_to_rel = sql_to_rel.with_user_defined_planner(planner.clone());
}
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


let mut planner_context = PlannerContext::default();
let schema = DFSchema::try_from(self.schema.as_ref().clone())?;
Expand Down Expand Up @@ -1374,4 +1387,10 @@ mod tests {
Expr::Literal(ScalarValue::Binary(Some(vec![b'a', b'b', b'c'])))
);
}

#[test]
fn test_lance_context_provider_expr_planners() {
let ctx_provider = LanceContextProvider::default();
assert!(!ctx_provider.get_expr_planners().is_empty());
}
}
1 change: 1 addition & 0 deletions rust/lance-encoding-datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ arrow-array.workspace = true
arrow-buffer.workspace = true
arrow-schema.workspace = true
bytes.workspace = true
datafusion.workspace = true
datafusion-common.workspace = true
datafusion-expr.workspace = true
datafusion-functions.workspace = true
Expand Down
2 changes: 1 addition & 1 deletion rust/lance-encoding-datafusion/src/zone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::{collections::VecDeque, ops::Range, sync::Arc};
use arrow_array::{cast::AsArray, types::UInt32Type, ArrayRef, RecordBatch, UInt32Array};
use arrow_schema::{Field as ArrowField, Schema as ArrowSchema};
use bytes::Bytes;
use datafusion::functions_aggregate::min_max::{MaxAccumulator, MinAccumulator};
use datafusion_common::{arrow::datatypes::DataType, DFSchemaRef, ScalarValue};
use datafusion_expr::{
col,
Expand All @@ -16,7 +17,6 @@ use datafusion_expr::{
};
use datafusion_functions::core::expr_ext::FieldAccessor;
use datafusion_optimizer::simplify_expressions::ExprSimplifier;
use datafusion_physical_expr::expressions::{MaxAccumulator, MinAccumulator};
use futures::{future::BoxFuture, FutureExt};
use lance_encoding::{
buffer::LanceBuffer,
Expand Down
14 changes: 7 additions & 7 deletions rust/lance-index/src/scalar/btree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,16 @@ use std::{
use arrow_array::{Array, RecordBatch, UInt32Array};
use arrow_schema::{DataType, Field, Schema, SortOptions};
use async_trait::async_trait;
use datafusion::physical_plan::{
sorts::sort_preserving_merge::SortPreservingMergeExec, stream::RecordBatchStreamAdapter,
union::UnionExec, ExecutionPlan, RecordBatchStream, SendableRecordBatchStream,
use datafusion::{
functions_aggregate::min_max::{MaxAccumulator, MinAccumulator},
physical_plan::{
sorts::sort_preserving_merge::SortPreservingMergeExec, stream::RecordBatchStreamAdapter,
union::UnionExec, ExecutionPlan, RecordBatchStream, SendableRecordBatchStream,
},
};
use datafusion_common::{DataFusionError, ScalarValue};
use datafusion_expr::Accumulator;
use datafusion_physical_expr::{
expressions::{Column, MaxAccumulator, MinAccumulator},
PhysicalSortExpr,
};
use datafusion_physical_expr::{expressions::Column, PhysicalSortExpr};
use deepsize::DeepSizeOf;
use futures::{
future::BoxFuture,
Expand Down
8 changes: 3 additions & 5 deletions rust/lance/src/datafusion/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,11 @@ use std::{
use arrow_schema::{Schema, SchemaRef};
use async_trait::async_trait;
use datafusion::{
catalog::Session,
dataframe::DataFrame,
datasource::{streaming::StreamingTable, TableProvider},
error::DataFusionError,
execution::{
context::{SessionContext, SessionState},
TaskContext,
},
execution::{context::SessionContext, TaskContext},
logical_expr::{Expr, TableProviderFilterPushDown, TableType},
physical_plan::{streaming::PartitionStream, ExecutionPlan, SendableRecordBatchStream},
};
Expand Down Expand Up @@ -69,7 +67,7 @@ impl TableProvider for LanceTableProvider {

async fn scan(
&self,
_state: &SessionState,
_state: &dyn Session,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
Expand Down
4 changes: 2 additions & 2 deletions rust/lance/src/datafusion/logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ use std::{any::Any, sync::Arc};
use arrow_schema::Schema as ArrowSchema;
use async_trait::async_trait;
use datafusion::{
catalog::Session,
datasource::TableProvider,
error::Result as DatafusionResult,
execution::context::SessionState,
logical_expr::{LogicalPlan, TableType},
physical_plan::ExecutionPlan,
prelude::Expr,
Expand Down Expand Up @@ -40,7 +40,7 @@ impl TableProvider for Dataset {

async fn scan(
&self,
_: &SessionState,
_: &dyn Session,
projection: Option<&Vec<usize>>,
_: &[Expr],
limit: Option<usize>,
Expand Down
2 changes: 1 addition & 1 deletion rust/lance/src/dataset/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -778,7 +778,7 @@ impl Scanner {
&[],
&[],
&plan.schema(),
"",
None,
false,
false,
)?;
Expand Down
6 changes: 5 additions & 1 deletion rust/lance/src/io/exec/rowids.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use datafusion::error::{DataFusionError, Result};
use datafusion::execution::SendableRecordBatchStream;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
use datafusion_physical_expr::EquivalenceProperties;
use futures::StreamExt;
use lance_core::{ROW_ADDR_FIELD, ROW_ID};
use lance_table::rowids::RowIdIndex;
Expand Down Expand Up @@ -91,7 +92,10 @@ impl AddRowAddrExec {

// Is just a simple projections, so it inherits the partitioning and
// execution mode from parent.
let properties = input.properties().clone();
let properties = input
.properties()
.clone()
.with_eq_properties(EquivalenceProperties::new(output_schema.clone()));

Ok(Self {
input,
Expand Down
Loading