Skip to content

Commit

Permalink
[UT] Add ignoreGluten method (#5553)
Browse files Browse the repository at this point in the history
  • Loading branch information
PHILO-HE authored Apr 28, 2024
1 parent 452ebde commit dba95de
Show file tree
Hide file tree
Showing 19 changed files with 106 additions and 99 deletions.
4 changes: 2 additions & 2 deletions cpp/velox/compute/WholeStageResultIterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ std::unordered_map<std::string, std::string> WholeStageResultIterator::getQueryC
configs[velox::core::QueryConfig::kSessionTimezone] =
veloxCfg_->get<std::string>(kSessionTimezone, defaultTimezone);
// Adjust timestamp according to the above configured session timezone.
configs[velox::core::QueryConfig::kAdjustTimestampToTimezone] = std::to_string(true);
configs[velox::core::QueryConfig::kAdjustTimestampToTimezone] = "true";
// Align Velox size function with Spark.
configs[velox::core::QueryConfig::kSparkLegacySizeOfNull] = std::to_string(veloxCfg_->get<bool>(kLegacySize, true));

Expand All @@ -505,7 +505,7 @@ std::unordered_map<std::string, std::string> WholeStageResultIterator::getQueryC
configs[velox::core::QueryConfig::kAbandonPartialAggregationMinRows] =
std::to_string(veloxCfg_->get<int32_t>(kAbandonPartialAggregationMinRows, 100000));
// Spark's collect_set ignore nulls.
configs[velox::core::QueryConfig::kPrestoArrayAggIgnoreNulls] = std::to_string(true);
configs[velox::core::QueryConfig::kPrestoArrayAggIgnoreNulls] = "true";
}
// Spill configs
if (spillStrategy_ == "none") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.gluten.substrait.expression;

import org.apache.gluten.exception.GlutenNotSupportException;
import org.apache.gluten.expression.ConverterUtils;
import org.apache.gluten.substrait.type.*;

Expand Down Expand Up @@ -204,7 +205,7 @@ public static LiteralNode makeLiteral(Object obj, TypeNode typeNode) {
if (typeNode instanceof StructNode) {
return makeStructLiteral((InternalRow) obj, typeNode);
}
throw new UnsupportedOperationException(
throw new GlutenNotSupportException(
String.format(
"Type not supported: %s, obj: %s, class: %s",
typeNode.toString(), obj.toString(), obj.getClass().toString()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ trait GlutenPlan extends SparkPlan with LogLevelUtil {
} catch {
case e @ (_: GlutenNotSupportException | _: UnsupportedOperationException) =>
if (!e.isInstanceOf[GlutenNotSupportException]) {
logDebug(s"This exception may need to be fixed: ${e.getMessage}")
logDebug(s"Just a warning. This exception perhaps needs to be fixed.", e)
}
// FIXME: Use a validation-specific method to catch validation failures
TestStats.addFallBackClassName(this.getClass.toString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,7 @@ case class AddTransformHintRule() extends Rule[SparkPlan] {
s"${e.getMessage}, original Spark plan is " +
s"${plan.getClass}(${plan.children.toList.map(_.getClass)})")
if (!e.isInstanceOf[GlutenNotSupportException]) {
logDebug("This exception may need to be fixed: " + e.getMessage)
logDebug("Just a warning. This exception perhaps needs to be fixed.", e)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.iceberg.spark.source

import org.apache.gluten.exception.GlutenNotSupportException
import org.apache.gluten.substrait.rel.{IcebergLocalFilesBuilder, SplitInfo}
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat

Expand Down Expand Up @@ -88,9 +89,9 @@ object GlutenIcebergSourceUtil {
case _ =>
}
}
throw new UnsupportedOperationException("Iceberg Only support parquet and orc file format.")
throw new GlutenNotSupportException("Iceberg Only support parquet and orc file format.")
case _ =>
throw new UnsupportedOperationException("Only support iceberg SparkBatchQueryScan.")
throw new GlutenNotSupportException("Only support iceberg SparkBatchQueryScan.")
}

def getPartitionSchema(sparkScan: Scan): StructType = sparkScan match {
Expand Down Expand Up @@ -156,6 +157,6 @@ object GlutenIcebergSourceUtil {
case FileFormat.PARQUET => ReadFileFormat.ParquetReadFormat
case FileFormat.ORC => ReadFileFormat.OrcReadFormat
case _ =>
throw new UnsupportedOperationException("Iceberg Only support parquet and orc file format.")
throw new GlutenNotSupportException("Iceberg Only support parquet and orc file format.")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ trait GlutenSQLTestsBaseTrait extends SharedSparkSession with GlutenTestsBaseTra
pos: Position): Unit = {
test(GLUTEN_TEST + testName, testTag: _*)(testFun)
}

protected def ignoreGluten(testName: String, testTag: Tag*)(testFun: => Any)(implicit
pos: Position): Unit = {
super.ignore(GLUTEN_TEST + testName, testTag: _*)(testFun)
}

override protected def test(testName: String, testTags: Tag*)(testFun: => Any)(implicit
pos: Position): Unit = {
if (shouldRun(testName)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ trait GlutenTestsCommonTrait
pos: Position): Unit = {
test(GLUTEN_TEST + testName, testTag: _*)(testFun)
}

protected def ignoreGluten(testName: String, testTag: Tag*)(testFun: => Any)(implicit
pos: Position): Unit = {
super.ignore(GLUTEN_TEST + testName, testTag: _*)(testFun)
}

override protected def test(testName: String, testTags: Tag*)(testFun: => Any)(implicit
pos: Position): Unit = {
if (shouldRun(testName)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import org.apache.gluten.execution.{BatchScanExecTransformer, FileSourceScanExec
import org.apache.gluten.utils.BackendTestUtils

import org.apache.spark.SparkConf
import org.apache.spark.sql.GlutenTestConstants.GLUTEN_TEST
import org.apache.spark.sql.catalyst.expressions.{DynamicPruningExpression, Expression}
import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode.{CODEGEN_ONLY, NO_CODEGEN}
import org.apache.spark.sql.catalyst.plans.ExistenceJoin
Expand Down Expand Up @@ -57,7 +56,7 @@ abstract class GlutenDynamicPartitionPruningSuiteBase

// === Following cases override super class's cases ===

ignore(GLUTEN_TEST + "DPP should not be rewritten as an existential join") {
ignoreGluten("DPP should not be rewritten as an existential join") {
// ignored: BroadcastHashJoinExec is from Vanilla Spark
withSQLConf(
SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SQLTestData.TestData
import org.apache.spark.sql.types.{IntegerType, StructType}

import org.apache.log4j.Level

class VeloxAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with GlutenSQLTestsTrait {
import testImplicits._

Expand Down Expand Up @@ -814,30 +816,30 @@ class VeloxAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with GlutenSQLT
}
}

// testGluten("Logging plan changes for AQE") {
// val testAppender = new LogAppender("plan changes")
// withLogAppender(testAppender) {
// withSQLConf(
// // this test default level is WARN, so we should check warn level
// SQLConf.PLAN_CHANGE_LOG_LEVEL.key -> "WARN",
// SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
// SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80"
// ) {
// sql(
// "SELECT * FROM testData JOIN testData2 ON key = a " +
// "WHERE value = (SELECT max(a) FROM testData3)").collect()
// }
// Seq(
// "=== Result of Batch AQE Preparations ===",
// "=== Result of Batch AQE Post Stage Creation ===",
// "=== Result of Batch AQE Replanning ===",
// "=== Result of Batch AQE Query Stage Optimization ==="
// ).foreach {
// expectedMsg =>
// assert(testAppender.loggingEvents.exists(_.getRenderedMessage.contains(expectedMsg)))
// }
// }
// }
ignoreGluten("Logging plan changes for AQE") {
val testAppender = new LogAppender("plan changes")
withLogAppender(testAppender) {
withSQLConf(
// this test default level is WARN, so we should check warn level
SQLConf.PLAN_CHANGE_LOG_LEVEL.key -> "WARN",
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80"
) {
sql(
"SELECT * FROM testData JOIN testData2 ON key = a " +
"WHERE value = (SELECT max(a) FROM testData3)").collect()
}
Seq(
"=== Result of Batch AQE Preparations ===",
"=== Result of Batch AQE Post Stage Creation ===",
"=== Result of Batch AQE Replanning ===",
"=== Result of Batch AQE Query Stage Optimization ==="
).foreach {
expectedMsg =>
assert(testAppender.loggingEvents.exists(_.getRenderedMessage.contains(expectedMsg)))
}
}
}

testGluten("SPARK-33551: Do not use AQE shuffle read for repartition") {
def hasRepartitionShuffle(plan: SparkPlan): Boolean = {
Expand Down Expand Up @@ -1450,51 +1452,51 @@ class VeloxAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with GlutenSQLT
}
}

// testGluten("test log level") {
// def verifyLog(expectedLevel: Level): Unit = {
// val logAppender = new LogAppender("adaptive execution")
// logAppender.setThreshold(expectedLevel)
// withLogAppender(
// logAppender,
// loggerNames = Seq(AdaptiveSparkPlanExec.getClass.getName.dropRight(1)),
// level = Some(Level.TRACE)) {
// withSQLConf(
// SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
// SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "300") {
// sql("SELECT * FROM testData join testData2 ON key = a where value = '1'").collect()
// }
// }
// Seq("Plan changed", "Final plan").foreach {
// msg =>
// assert(logAppender.loggingEvents.exists {
// event => event.getRenderedMessage.contains(msg) && event.getLevel == expectedLevel
// })
// }
// }
//
// // Verify default log level
// verifyLog(Level.DEBUG)
//
// // Verify custom log level
// val levels = Seq(
// "TRACE" -> Level.TRACE,
// "trace" -> Level.TRACE,
// "DEBUG" -> Level.DEBUG,
// "debug" -> Level.DEBUG,
// "INFO" -> Level.INFO,
// "info" -> Level.INFO,
// "WARN" -> Level.WARN,
// "warn" -> Level.WARN,
// "ERROR" -> Level.ERROR,
// "error" -> Level.ERROR,
// "deBUG" -> Level.DEBUG
// )
//
// levels.foreach {
// level =>
// withSQLConf(SQLConf.ADAPTIVE_EXECUTION_LOG_LEVEL.key -> level._1) {
// verifyLog(level._2)
// }
// }
// }
ignoreGluten("test log level") {
def verifyLog(expectedLevel: Level): Unit = {
val logAppender = new LogAppender("adaptive execution")
logAppender.setThreshold(expectedLevel)
withLogAppender(
logAppender,
loggerNames = Seq(AdaptiveSparkPlanExec.getClass.getName.dropRight(1)),
level = Some(Level.TRACE)) {
withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "300") {
sql("SELECT * FROM testData join testData2 ON key = a where value = '1'").collect()
}
}
Seq("Plan changed", "Final plan").foreach {
msg =>
assert(logAppender.loggingEvents.exists {
event => event.getRenderedMessage.contains(msg) && event.getLevel == expectedLevel
})
}
}

// Verify default log level
verifyLog(Level.DEBUG)

// Verify custom log level
val levels = Seq(
"TRACE" -> Level.TRACE,
"trace" -> Level.TRACE,
"DEBUG" -> Level.DEBUG,
"debug" -> Level.DEBUG,
"INFO" -> Level.INFO,
"info" -> Level.INFO,
"WARN" -> Level.WARN,
"warn" -> Level.WARN,
"ERROR" -> Level.ERROR,
"error" -> Level.ERROR,
"deBUG" -> Level.DEBUG
)

levels.foreach {
level =>
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_LOG_LEVEL.key -> level._1) {
verifyLog(level._2)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package org.apache.spark.sql

import org.apache.gluten.execution.HashAggregateExecBaseTransformer

import org.apache.spark.sql.GlutenTestConstants.GLUTEN_TEST
import org.apache.spark.sql.execution.WholeStageCodegenExec
import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, SortAggregateExec}
import org.apache.spark.sql.expressions.Aggregator
Expand Down Expand Up @@ -130,7 +129,7 @@ class GlutenDataFrameAggregateSuite extends DataFrameAggregateSuite with GlutenS
// Row(new java.math.BigDecimal(2), new java.math.BigDecimal(6)) :: Nil)
}

ignore(GLUTEN_TEST + "SPARK-32038: NormalizeFloatingNumbers should work on distinct aggregate") {
ignoreGluten("SPARK-32038: NormalizeFloatingNumbers should work on distinct aggregate") {
withTempView("view") {
Seq(
("mithunr", Float.NaN),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import org.apache.gluten.execution.{BatchScanExecTransformer, FileSourceScanExec
import org.apache.gluten.utils.BackendTestUtils

import org.apache.spark.SparkConf
import org.apache.spark.sql.GlutenTestConstants.GLUTEN_TEST
import org.apache.spark.sql.catalyst.expressions.{DynamicPruningExpression, Expression}
import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode.{CODEGEN_ONLY, NO_CODEGEN}
import org.apache.spark.sql.catalyst.plans.ExistenceJoin
Expand Down Expand Up @@ -57,7 +56,7 @@ abstract class GlutenDynamicPartitionPruningSuiteBase

// === Following cases override super class's cases ===

ignore(GLUTEN_TEST + "DPP should not be rewritten as an existential join") {
ignoreGluten("DPP should not be rewritten as an existential join") {
// ignored: BroadcastHashJoinExec is from Vanilla Spark
withSQLConf(
SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package org.apache.spark.sql

import org.apache.gluten.execution.HashAggregateExecBaseTransformer

import org.apache.spark.sql.GlutenTestConstants.GLUTEN_TEST
import org.apache.spark.sql.execution.WholeStageCodegenExec
import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, SortAggregateExec}
import org.apache.spark.sql.expressions.Aggregator
Expand Down Expand Up @@ -130,7 +129,7 @@ class GlutenDataFrameAggregateSuite extends DataFrameAggregateSuite with GlutenS
// Row(new java.math.BigDecimal(2), new java.math.BigDecimal(6)) :: Nil)
}

ignore(GLUTEN_TEST + "SPARK-32038: NormalizeFloatingNumbers should work on distinct aggregate") {
ignoreGluten("SPARK-32038: NormalizeFloatingNumbers should work on distinct aggregate") {
withTempView("view") {
Seq(
("mithunr", Float.NaN),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import org.apache.gluten.GlutenConfig
import org.apache.gluten.execution.{BatchScanExecTransformer, FileSourceScanExecTransformer, FilterExecTransformerBase}

import org.apache.spark.SparkConf
import org.apache.spark.sql.GlutenTestConstants.GLUTEN_TEST
import org.apache.spark.sql.catalyst.expressions.{DynamicPruningExpression, Expression}
import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode.{CODEGEN_ONLY, NO_CODEGEN}
import org.apache.spark.sql.catalyst.plans.ExistenceJoin
Expand Down Expand Up @@ -58,7 +57,7 @@ abstract class GlutenDynamicPartitionPruningSuiteBase

// === Following cases override super class's cases ===

ignore(GLUTEN_TEST + "DPP should not be rewritten as an existential join") {
ignoreGluten("DPP should not be rewritten as an existential join") {
// ignored: BroadcastHashJoinExec is from Vanilla Spark
withSQLConf(
SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ abstract class GltuenParquetFilterSuite extends ParquetFilterSuite with GlutenSQ
}

// Velox doesn't support ParquetOutputFormat.PAGE_SIZE and ParquetOutputFormat.BLOCK_SIZE.
ignore(GlutenTestConstants.GLUTEN_TEST + "Support Parquet column index") {
ignoreGluten("Support Parquet column index") {
// block 1:
// null count min max
// page-0 0 0 99
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import org.apache.spark.SparkConf
import org.apache.spark.executor.OutputMetrics
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import org.apache.spark.sql._
import org.apache.spark.sql.GlutenTestConstants.GLUTEN_TEST
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.{CommandResultExec, QueryExecution, VeloxColumnarWriteFilesExec}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
Expand Down Expand Up @@ -487,7 +486,7 @@ class GlutenInsertSuite
}
}

ignore(GLUTEN_TEST + "SPARK-39557 INSERT INTO statements with tables with map defaults") {
ignoreGluten("SPARK-39557 INSERT INTO statements with tables with map defaults") {
withSQLConf("spark.gluten.sql.complexType.scan.fallback.enabled" -> "false") {

import testImplicits._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package org.apache.spark.sql

import org.apache.gluten.execution.HashAggregateExecBaseTransformer

import org.apache.spark.sql.GlutenTestConstants.GLUTEN_TEST
import org.apache.spark.sql.execution.WholeStageCodegenExec
import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, SortAggregateExec}
import org.apache.spark.sql.expressions.Aggregator
Expand Down Expand Up @@ -130,7 +129,7 @@ class GlutenDataFrameAggregateSuite extends DataFrameAggregateSuite with GlutenS
// Row(new java.math.BigDecimal(2), new java.math.BigDecimal(6)) :: Nil)
}

ignore(GLUTEN_TEST + "SPARK-32038: NormalizeFloatingNumbers should work on distinct aggregate") {
ignoreGluten("SPARK-32038: NormalizeFloatingNumbers should work on distinct aggregate") {
withTempView("view") {
Seq(
("mithunr", Float.NaN),
Expand Down
Loading

0 comments on commit dba95de

Please sign in to comment.