diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala index 21ae342a2263..1c70e3b7d405 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala @@ -118,6 +118,7 @@ object CHRuleApi { injector.injectPostTransform(c => InsertTransitions.create(c.outputsColumnar, CHBatch)) injector.injectPostTransform(c => RemoveDuplicatedColumns.apply(c.session)) injector.injectPostTransform(c => AddPreProjectionForHashJoin.apply(c.session)) + injector.injectPostTransform(c => MoveAndFilterEqualConditionsAhead.apply(c.session)) // Gluten columnar: Fallback policies. injector.injectFallbackPolicy( diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseHiveTableSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseHiveTableSuite.scala index 21289f35d430..e0f9fa7512e0 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseHiveTableSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseHiveTableSuite.scala @@ -17,12 +17,13 @@ package org.apache.gluten.execution.hive import org.apache.gluten.config.GlutenConfig -import org.apache.gluten.execution.{FileSourceScanExecTransformer, GlutenClickHouseWholeStageTransformerSuite, ProjectExecTransformer, TransformSupport} +import org.apache.gluten.execution.{FileSourceScanExecTransformer, FilterExecTransformer, GlutenClickHouseWholeStageTransformerSuite, ProjectExecTransformer, TransformSupport} import org.apache.gluten.test.AllDataTypesWithComplexType import org.apache.gluten.utils.UTSystemParameters import org.apache.spark.SparkConf import org.apache.spark.sql.{DataFrame, SaveMode} +import org.apache.spark.sql.catalyst.expressions.{And, Attribute, EqualNullSafe, EqualTo, Literal} import org.apache.spark.sql.delta.DeltaLog import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.datasources.v2.clickhouse.ClickHouseConfig @@ -1648,4 +1649,44 @@ class GlutenClickHouseHiveTableSuite } } + test("GLUTEN-8516: Optimize and filter by move equal conditions ahead") { + + import org.apache.spark.sql.execution.SparkPlan + + def checkConditionsMoveAhead(plan: SparkPlan): Boolean = { + var ruleEffected = false + plan match { + case f: FilterExecTransformer if f.condition.isInstanceOf[And] => + val cond = f.condition.asInstanceOf[And] + cond.left match { + case e: EqualTo if (e.left.isInstanceOf[Attribute] && e.right.isInstanceOf[Literal]) => + ruleEffected = true + case e: EqualNullSafe + if (e.left.isInstanceOf[Attribute] && e.right.isInstanceOf[Literal]) => + ruleEffected = true + case _ => + } + case p => + if (!ruleEffected) { + ruleEffected = p.children.exists(c => checkConditionsMoveAhead(c)) + } + } + ruleEffected + } + val create_table_sql = "create table test_tbl_8516(a int, b float) using parquet" + val insert_data_sql = "insert into test_tbl_8516 values(1, 2), (2, 3), (3, 4)" + val query_sql_1 = "select count(1) from test_tbl_8516 where cast(b as string) != '' and a = 1" + val query_sql_2 = + "select count(1) from test_tbl_8516 where cast(b as string) != '' and a <=> 1" + spark.sql(create_table_sql) + spark.sql(insert_data_sql) + withSQLConf(("spark.gluten.sql.moveAndFilterEqualConditionsAhead.enabled", "true")) { + runQueryAndCompare(query_sql_1)( + x => assert(checkConditionsMoveAhead(x.queryExecution.executedPlan))) + runQueryAndCompare(query_sql_2)( + x => assert(checkConditionsMoveAhead(x.queryExecution.executedPlan))) + } + spark.sql("drop table test_tbl_8516") + } + } diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/MoveAndFilterEqualConditionsAhead.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/MoveAndFilterEqualConditionsAhead.scala new file mode 100644 index 000000000000..75f784520a01 --- /dev/null +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/MoveAndFilterEqualConditionsAhead.scala @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.extension.columnar + +import org.apache.gluten.backendsapi.BackendsApiManager +import org.apache.gluten.config.GlutenConfig +import org.apache.gluten.execution.FilterExecTransformerBase + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.expressions.{And, Attribute, Cast, EqualNullSafe, EqualTo, Expression, Literal} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.SparkPlan + +/** + * Rewrite `and` filter conditions, move the equal conditions ahead. e.g. `cast(b as string) != '' + * and a = 1` => `a = 1 and cast(b as string) != ''` + */ +case class MoveAndFilterEqualConditionsAhead(spark: SparkSession) extends Rule[SparkPlan] { + override def apply(plan: SparkPlan): SparkPlan = { + if (GlutenConfig.get.enableMoveAndFilterEqualConditionsAhead) { + optimize(plan) + } else { + plan + } + } + + private def optimize(plan: SparkPlan): SparkPlan = { + plan match { + case f: FilterExecTransformerBase if f.cond.isInstanceOf[And] => + val newCond = optimize(f.cond) + BackendsApiManager.getSparkPlanExecApiInstance.genFilterExecTransformer(newCond, f.child) + case _ => + val newChildren = plan.children.map(p => optimize(p)) + plan.withNewChildren(newChildren) + } + } + + private def optimize(expr: Expression): Expression = expr match { + case a: And => + if (isEqualCondition(a.right)) { + val newLeft = optimize(a.left) + val newRight = optimize(a.right) + val newExpr = And(newRight, newLeft) + newExpr + } else { + val newLeft = optimize(a.left) + val newRight = optimize(a.right) + val newExpr = And(newLeft, newRight) + newExpr + } + case _ => expr + } + + private def isEqualCondition(expr: Expression): Boolean = { + var leftExpr = null.asInstanceOf[Expression] + var rightExpr = null.asInstanceOf[Expression] + expr match { + case e: EqualTo => leftExpr = e.left; rightExpr = e.right + case e: EqualNullSafe => leftExpr = e.left; rightExpr = e.right + case _ => + return false + } + if (leftExpr == null || rightExpr == null) { + return false + } + def f(left: Expression, right: Expression): Boolean = { + left match { + case _: Attribute => + right match { + case _: Literal => + return true + case c: Cast if c.child.isInstanceOf[Literal] => + return true + case _ => + } + case _ => + } + false + } + f(leftExpr, rightExpr) || f(rightExpr, leftExpr) + } +} diff --git a/shims/common/src/main/scala/org/apache/gluten/config/GlutenConfig.scala b/shims/common/src/main/scala/org/apache/gluten/config/GlutenConfig.scala index d4083d5896eb..d5c70b5763a4 100644 --- a/shims/common/src/main/scala/org/apache/gluten/config/GlutenConfig.scala +++ b/shims/common/src/main/scala/org/apache/gluten/config/GlutenConfig.scala @@ -115,6 +115,9 @@ class GlutenConfig(conf: SQLConf) extends Logging { def enableCollapseNestedGetJsonObject: Boolean = getConf(ENABLE_COLLAPSE_GET_JSON_OBJECT) + def enableMoveAndFilterEqualConditionsAhead: Boolean = + getConf(ENABLE_MOVE_AND_FILTER_EQUAL_CONDITIONS_AHEAD) + def enableCHRewriteDateConversion: Boolean = getConf(ENABLE_CH_REWRITE_DATE_CONVERSION) @@ -1986,6 +1989,13 @@ object GlutenConfig { .booleanConf .createWithDefault(false) + val ENABLE_MOVE_AND_FILTER_EQUAL_CONDITIONS_AHEAD = + buildConf("spark.gluten.sql.moveAndFilterEqualConditionsAhead.enabled") + .internal() + .doc("Move and filter equal conditions ahead for optimization.") + .booleanConf + .createWithDefault(true) + val ENABLE_CH_REWRITE_DATE_CONVERSION = buildConf("spark.gluten.sql.columnar.backend.ch.rewrite.dateConversion") .internal()