Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GLUTEN-8516][CH]Optimize and filter by move equal conditions ahead #8517

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ object CHRuleApi {
injector.injectPostTransform(c => InsertTransitions.create(c.outputsColumnar, CHBatch))
injector.injectPostTransform(c => RemoveDuplicatedColumns.apply(c.session))
injector.injectPostTransform(c => AddPreProjectionForHashJoin.apply(c.session))
injector.injectPostTransform(c => MoveAndFilterEqualConditionsAhead.apply(c.session))

// Gluten columnar: Fallback policies.
injector.injectFallbackPolicy(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@
package org.apache.gluten.execution.hive

import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.execution.{FileSourceScanExecTransformer, GlutenClickHouseWholeStageTransformerSuite, ProjectExecTransformer, TransformSupport}
import org.apache.gluten.execution.{FileSourceScanExecTransformer, FilterExecTransformer, GlutenClickHouseWholeStageTransformerSuite, ProjectExecTransformer, TransformSupport}
import org.apache.gluten.test.AllDataTypesWithComplexType
import org.apache.gluten.utils.UTSystemParameters

import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SaveMode}
import org.apache.spark.sql.catalyst.expressions.{And, Attribute, EqualNullSafe, EqualTo, Literal}
import org.apache.spark.sql.delta.DeltaLog
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.datasources.v2.clickhouse.ClickHouseConfig
Expand Down Expand Up @@ -1648,4 +1649,44 @@ class GlutenClickHouseHiveTableSuite
}
}

test("GLUTEN-8516: Optimize and filter by move equal conditions ahead") {

import org.apache.spark.sql.execution.SparkPlan

def checkConditionsMoveAhead(plan: SparkPlan): Boolean = {
var ruleEffected = false
plan match {
case f: FilterExecTransformer if f.condition.isInstanceOf[And] =>
val cond = f.condition.asInstanceOf[And]
cond.left match {
case e: EqualTo if (e.left.isInstanceOf[Attribute] && e.right.isInstanceOf[Literal]) =>
ruleEffected = true
case e: EqualNullSafe
if (e.left.isInstanceOf[Attribute] && e.right.isInstanceOf[Literal]) =>
ruleEffected = true
case _ =>
}
case p =>
if (!ruleEffected) {
ruleEffected = p.children.exists(c => checkConditionsMoveAhead(c))
}
}
ruleEffected
}
val create_table_sql = "create table test_tbl_8516(a int, b float) using parquet"
val insert_data_sql = "insert into test_tbl_8516 values(1, 2), (2, 3), (3, 4)"
val query_sql_1 = "select count(1) from test_tbl_8516 where cast(b as string) != '' and a = 1"
val query_sql_2 =
"select count(1) from test_tbl_8516 where cast(b as string) != '' and a <=> 1"
spark.sql(create_table_sql)
spark.sql(insert_data_sql)
withSQLConf(("spark.gluten.sql.moveAndFilterEqualConditionsAhead.enabled", "true")) {
runQueryAndCompare(query_sql_1)(
x => assert(checkConditionsMoveAhead(x.queryExecution.executedPlan)))
runQueryAndCompare(query_sql_2)(
x => assert(checkConditionsMoveAhead(x.queryExecution.executedPlan)))
}
spark.sql("drop table test_tbl_8516")
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.extension.columnar

import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.execution.FilterExecTransformerBase

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.{And, Attribute, Cast, EqualNullSafe, EqualTo, Expression, Literal}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.SparkPlan

/**
* Rewrite `and` filter conditions, move the equal conditions ahead. e.g. `cast(b as string) != ''
* and a = 1` => `a = 1 and cast(b as string) != ''`
*/
case class MoveAndFilterEqualConditionsAhead(spark: SparkSession) extends Rule[SparkPlan] {
override def apply(plan: SparkPlan): SparkPlan = {
if (GlutenConfig.get.enableMoveAndFilterEqualConditionsAhead) {
optimize(plan)
} else {
plan
}
}

private def optimize(plan: SparkPlan): SparkPlan = {
plan match {
case f: FilterExecTransformerBase if f.cond.isInstanceOf[And] =>
val newCond = optimize(f.cond)
BackendsApiManager.getSparkPlanExecApiInstance.genFilterExecTransformer(newCond, f.child)
case _ =>
val newChildren = plan.children.map(p => optimize(p))
plan.withNewChildren(newChildren)
}
}

private def optimize(expr: Expression): Expression = expr match {
case a: And =>
if (isEqualCondition(a.right)) {
val newLeft = optimize(a.left)
val newRight = optimize(a.right)
val newExpr = And(newRight, newLeft)
newExpr
} else {
val newLeft = optimize(a.left)
val newRight = optimize(a.right)
val newExpr = And(newLeft, newRight)
newExpr
}
case _ => expr
}

private def isEqualCondition(expr: Expression): Boolean = {
var leftExpr = null.asInstanceOf[Expression]
var rightExpr = null.asInstanceOf[Expression]
expr match {
case e: EqualTo => leftExpr = e.left; rightExpr = e.right
case e: EqualNullSafe => leftExpr = e.left; rightExpr = e.right
case _ =>
return false
}
if (leftExpr == null || rightExpr == null) {
return false
}
def f(left: Expression, right: Expression): Boolean = {
left match {
case _: Attribute =>
right match {
case _: Literal =>
return true
case c: Cast if c.child.isInstanceOf[Literal] =>
return true
case _ =>
}
case _ =>
}
false
}
f(leftExpr, rightExpr) || f(rightExpr, leftExpr)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ class GlutenConfig(conf: SQLConf) extends Logging {
def enableCollapseNestedGetJsonObject: Boolean =
getConf(ENABLE_COLLAPSE_GET_JSON_OBJECT)

def enableMoveAndFilterEqualConditionsAhead: Boolean =
getConf(ENABLE_MOVE_AND_FILTER_EQUAL_CONDITIONS_AHEAD)

def enableCHRewriteDateConversion: Boolean =
getConf(ENABLE_CH_REWRITE_DATE_CONVERSION)

Expand Down Expand Up @@ -1986,6 +1989,13 @@ object GlutenConfig {
.booleanConf
.createWithDefault(false)

val ENABLE_MOVE_AND_FILTER_EQUAL_CONDITIONS_AHEAD =
buildConf("spark.gluten.sql.moveAndFilterEqualConditionsAhead.enabled")
.internal()
.doc("Move and filter equal conditions ahead for optimization.")
.booleanConf
.createWithDefault(true)

val ENABLE_CH_REWRITE_DATE_CONVERSION =
buildConf("spark.gluten.sql.columnar.backend.ch.rewrite.dateConversion")
.internal()
Expand Down
Loading