Skip to content

Commit

Permalink
optimize and filter by move equal conditions ahead
Browse files Browse the repository at this point in the history
  • Loading branch information
BIGO authored and BIGO committed Jan 13, 2025
1 parent fdc7fa4 commit e2046fd
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ object CHRuleApi {
injector.injectPostTransform(c => InsertTransitions.create(c.outputsColumnar, CHBatch))
injector.injectPostTransform(c => RemoveDuplicatedColumns.apply(c.session))
injector.injectPostTransform(c => AddPreProjectionForHashJoin.apply(c.session))
injector.injectPostTransform(_ => MoveEqualConditionsAheadForAnd)
injector.injectPostTransform(c => MoveAndFilterEqualConditionsAhead.apply(c.session))

// Gluten columnar: Fallback policies.
injector.injectFallbackPolicy(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@
package org.apache.gluten.execution.hive

import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.execution.{FileSourceScanExecTransformer, GlutenClickHouseWholeStageTransformerSuite, ProjectExecTransformer, TransformSupport}
import org.apache.gluten.execution.{FileSourceScanExecTransformer, FilterExecTransformer, GlutenClickHouseWholeStageTransformerSuite, ProjectExecTransformer, TransformSupport}
import org.apache.gluten.test.AllDataTypesWithComplexType
import org.apache.gluten.utils.UTSystemParameters

import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SaveMode}
import org.apache.spark.sql.catalyst.expressions.{And, Attribute, EqualNullSafe, EqualTo, Literal}
import org.apache.spark.sql.delta.DeltaLog
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.datasources.v2.clickhouse.ClickHouseConfig
Expand Down Expand Up @@ -1648,4 +1649,38 @@ class GlutenClickHouseHiveTableSuite
}
}

test("GLUTEN-8516: Optimize and filter by move equal conditions ahead") {

def checkConditionsMoveAhead(x: DataFrame): Boolean = {
var ruleEffected = false
val plan = x.queryExecution.sparkPlan
plan.children.foreach {
case f: FilterExecTransformer if f.condition.isInstanceOf[And] =>
val cond = f.condition.asInstanceOf[And]
cond.left match {
case e: EqualTo if (e.left.isInstanceOf[Attribute] && e.right.isInstanceOf[Literal]) =>
ruleEffected = true
case en: EqualNullSafe
if (en.left.isInstanceOf[Attribute] && en.right.isInstanceOf[Literal]) =>
ruleEffected = true
case _ =>
}
case _ =>
}
ruleEffected
}
val create_table_sql = "create table test_tbl_8516(a int, b float) using parquet"
val insert_data_sql = "insert into test_tbl_8516 values(1, 2), (2, 3), (3, 4)"
val query_sql_1 = "select count(1) from test_tbl_8516 where cast(b as string) != '' and a = 1"
val query_sql_2 =
"select count(1) from test_tbl_8516 where cast(b as string) != '' and a is null"
spark.sql(create_table_sql)
spark.sql(insert_data_sql)
withSQLConf(("spark.gluten.sql.moveAndFilterEqualConditionsAhead.enabled", "true")) {
runQueryAndCompare(query_sql_1)(x => checkConditionsMoveAhead(x))
runQueryAndCompare(query_sql_2)(x => checkConditionsMoveAhead(x))
}
spark.sql("drop table test_tbl_8516")
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.extension.columnar

import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.execution.FilterExecTransformerBase

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.{And, Attribute, Cast, EqualNullSafe, EqualTo, Expression, Literal}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.SparkPlan

/**
* Rewrite `and` filter conditions, move the equal conditions ahead. e.g. `cast(b as string) != ''
* and a = 1` => `a = 1 and cast(b as string) != ''`
*/
case class MoveAndFilterEqualConditionsAhead(spark: SparkSession) extends Rule[SparkPlan] {
override def apply(plan: SparkPlan): SparkPlan = {
if (GlutenConfig.get.enableMoveAndFilterEqualConditionsAhead) {
optimize(plan)
} else {
plan
}
}

private def optimize(plan: SparkPlan): SparkPlan = {
plan match {
case f: FilterExecTransformerBase if f.cond.isInstanceOf[And] =>
val newCond = optimize(f.cond)
BackendsApiManager.getSparkPlanExecApiInstance.genFilterExecTransformer(newCond, f.child)
case _ =>
val newChildren = plan.children.map(p => optimize(p))
plan.withNewChildren(newChildren)
}
}

private def optimize(expr: Expression): Expression = expr match {
case a: And =>
if (isEqualCondition(a.right)) {
val newLeft = optimize(a.left)
val newRight = optimize(a.right)
val newExpr = And(newRight, newLeft)
newExpr
} else {
val newLeft = optimize(a.left)
val newRight = optimize(a.right)
val newExpr = And(newLeft, newRight)
newExpr
}
case _ => expr
}

private def isEqualCondition(expr: Expression): Boolean = {
var leftExpr = null.asInstanceOf[Expression]
var rightExpr = null.asInstanceOf[Expression]
expr match {
case e: EqualTo => leftExpr = e.left; rightExpr = e.right
case e: EqualNullSafe => leftExpr = e.left; rightExpr = e.right
case _ =>
return false
}
if (leftExpr == null || rightExpr == null) {
return false
}
def f(left: Expression, right: Expression): Boolean = {
left match {
case _: Attribute =>
right match {
case _: Literal =>
return true
case c: Cast if c.child.isInstanceOf[Literal] =>
return true
case _ =>
}
case _ =>
}
false
}
f(leftExpr, rightExpr) || f(rightExpr, leftExpr)
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ class GlutenConfig(conf: SQLConf) extends Logging {
def enableCollapseNestedGetJsonObject: Boolean =
getConf(ENABLE_COLLAPSE_GET_JSON_OBJECT)

def enableMoveAndFilterEqualConditionsAhead: Boolean =
getConf(ENABLE_MOVE_AND_FILTER_EQUAL_CONDITIONS_AHEAD)

def enableCHRewriteDateConversion: Boolean =
getConf(ENABLE_CH_REWRITE_DATE_CONVERSION)

Expand Down Expand Up @@ -1986,6 +1989,13 @@ object GlutenConfig {
.booleanConf
.createWithDefault(false)

val ENABLE_MOVE_AND_FILTER_EQUAL_CONDITIONS_AHEAD =
buildConf("spark.gluten.sql.moveAndFilterEqualConditionsAhead.enabled")
.internal()
.doc("Move and filter equal conditions ahead for optimization.")
.booleanConf
.createWithDefault(true)

val ENABLE_CH_REWRITE_DATE_CONVERSION =
buildConf("spark.gluten.sql.columnar.backend.ch.rewrite.dateConversion")
.internal()
Expand Down

0 comments on commit e2046fd

Please sign in to comment.