Skip to content

optimize_projections fails after mark-join involved #20083

@yjerry-fortinet

Description

@yjerry-fortinet

Describe the bug

optimize failed if a plan with filter: Filter: EXISTS (<subquery>) OR EXISTS (<subquery>)

To Reproduce

use datafusion::logical_expr::expr::Exists;
use datafusion::logical_expr::utils::disjunction;
use datafusion::logical_expr::{LogicalPlan, LogicalPlanBuilder, TableSource};
use datafusion::optimizer::{Optimizer, OptimizerContext, OptimizerRule};
use datafusion::prelude::{Expr, lit};
use datafusion::{
    arrow::datatypes::{DataType, Field, Schema},
    datasource::{DefaultTableSource, MemTable},
};
use datafusion_common::{Column, JoinType};
use std::sync::Arc;

async fn get_table() -> datafusion_common::Result<Arc<dyn TableSource>> {
    let schema = Arc::new(Schema::new(vec![
        Field::new("uid", DataType::Int32, false),
        Field::new("val", DataType::Int32, false),
    ]));
    let provider = MemTable::try_new(schema.clone(), vec![vec![]])?;
    let table = Arc::new(DefaultTableSource::new(Arc::new(provider)));

    Ok(table)
}

async fn build_exists_expr(table_name: &str) -> datafusion_common::Result<Expr> {
    let table = get_table().await?;

    let scan = LogicalPlanBuilder::scan(table_name, table.clone(), None)?
        .project(vec![lit(1)])?
        .build()?;

    let subquery = datafusion::logical_expr::Subquery {
        subquery: Arc::new(scan),
        outer_ref_columns: vec![],
        spans: Default::default(),
    };

    Ok(Expr::Exists(Exists::new(subquery, false)))
}

async fn build_plan() -> datafusion_common::Result<LogicalPlan> {
    let exists_a = build_exists_expr("a").await?;
    let exists_b = build_exists_expr("b").await?;

    let filter = disjunction(vec![exists_a, exists_b]);

    let mut scan_a =
        LogicalPlanBuilder::scan("a", get_table().await?, None)?.filter(filter.unwrap())?;
    
    // this test case will pass if un-comment this line
    // scan_a = scan_a.project(vec![Expr::Column(Column::from_name("uid"))])?;
    
    let scan_b = LogicalPlanBuilder::scan("b", get_table().await?, None)?.build()?;

    let plan = scan_a
        .join(
            scan_b,
            JoinType::Left,
            (vec!["uid".to_string()], vec!["uid".to_string()]),
            None,
        )?
        .build()?;

    let optimizer = Optimizer::new();
    let config = OptimizerContext::new().with_max_passes(16);
    let ret = optimizer.optimize(plan, &config, observer);
    if ret.is_err() {
        println!("ret = {:#?}", ret);
    }

    fn observer(plan: &LogicalPlan, rule: &dyn OptimizerRule) {
        println!(
            "\nAfter applying rule '{}':\n{}",
            rule.name(),
            plan.display_indent()
        )
    }

    ret
}


#[tokio::test]
async fn test_build_plan() {
    let ret = build_plan().await;
    assert!(ret.is_ok());
}

Expected behavior

this test case should pass

Actual Behavior

ret = Err(
    Context(
        "Optimizer rule 'optimize_projections' failed",
        Context(
            "Check optimizer-specific invariants after optimizer rule: optimize_projections",
            Internal(
                "Failed due to a difference in schemas: original schema: DFSchema { inner: Schema { fields: [Field { name: \"uid\", data_type: Int32 }, Field { name: \"val\", data_type: Int32 }, Field { name: \"uid\", data_type: Int32, nullable: true }, Field { name: \"val\", data_type: Int32, nullable: true }], metadata: {} }, field_qualifiers: [Some(Bare { table: \"a\" }), Some(Bare { table: \"a\" }), Some(Bare { table: \"b\" }), Some(Bare { table: \"b\" })], functional_dependencies: FunctionalDependencies { deps: [] } }, new schema: DFSchema { inner: Schema { fields: [Field { name: \"uid\", data_type: Int32 }, Field { name: \"val\", data_type: Int32 }, Field { name: \"mark\", data_type: Boolean }, Field { name: \"mark\", data_type: Boolean }, Field { name: \"uid\", data_type: Int32, nullable: true }], metadata: {} }, field_qualifiers: [Some(Bare { table: \"a\" }), Some(Bare { table: \"a\" }), Some(Bare { table: \"__correlated_sq_1\" }), Some(Bare { table: \"__correlated_sq_2\" }), Some(Bare { table: \"b\" })], functional_dependencies: FunctionalDependencies { deps: [] } }",
            ),
        ),
    ),
)

thread 'test_build_plan' (2129566) panicked at src/main.rs:86:5:
assertion failed: ret.is_ok()
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace


failures:
    test_build_plan

test result: FAILED. 0 passed; 1 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.01s


Additional context

This case will not fail if add the projection to scan_a

No response

Metadata

Metadata

Assignees

Labels

bugSomething isn't working

Type

No fields configured for Bug.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions