Skip to content

Commit

Permalink
feat(iceberg): optimize iceberg source count(*) query (#20151)
Browse files Browse the repository at this point in the history
  • Loading branch information
xxhZs authored Feb 18, 2025
1 parent 7aabcad commit 3afbf25
Show file tree
Hide file tree
Showing 10 changed files with 192 additions and 34 deletions.
1 change: 1 addition & 0 deletions proto/batch_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ message IcebergScanNode {
ICEBERG_SCAN_TYPE_DATA_SCAN = 1;
ICEBERG_SCAN_TYPE_EQUALITY_DELETE_SCAN = 2;
ICEBERG_SCAN_TYPE_POSITION_DELETE_SCAN = 3;
ICEBERG_SCAN_TYPE_COUNT_STAR = 4;
}
repeated plan_common.ColumnCatalog columns = 1;
map<string, string> with_properties = 2;
Expand Down
18 changes: 17 additions & 1 deletion src/batch/executors/src/executor/iceberg_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,19 @@ use risingwave_common::array::{ArrayImpl, DataChunk, I64Array, Utf8Array};
use risingwave_common::catalog::{
Field, Schema, ICEBERG_FILE_PATH_COLUMN_NAME, ICEBERG_SEQUENCE_NUM_COLUMN_NAME,
};
use risingwave_common::types::DataType;
use risingwave_common::types::{DataType, ScalarImpl};
use risingwave_common_estimate_size::EstimateSize;
use risingwave_connector::source::iceberg::{IcebergFileScanTask, IcebergProperties, IcebergSplit};
use risingwave_connector::source::{ConnectorProperties, SplitImpl, SplitMetaData};
use risingwave_connector::WithOptionsSecResolved;
use risingwave_expr::expr::LiteralExpression;
use risingwave_pb::batch_plan::plan_node::NodeBody;

use super::{BoxedExecutor, BoxedExecutorBuilder, ExecutorBuilder};
use crate::error::BatchError;
use crate::executor::Executor;
use crate::monitor::BatchMetrics;
use crate::ValuesExecutor;

pub struct IcebergScanExecutor {
iceberg_config: IcebergProperties,
Expand Down Expand Up @@ -100,6 +102,9 @@ impl IcebergScanExecutor {
Some(IcebergFileScanTask::PositionDelete(position_delete_file_scan_tasks)) => {
position_delete_file_scan_tasks
}
Some(IcebergFileScanTask::CountStar(_)) => {
bail!("iceberg scan executor does not support count star")
}
None => {
bail!("file_scan_tasks must be Some")
}
Expand Down Expand Up @@ -209,6 +214,17 @@ impl BoxedExecutorBuilder for IcebergScanExecutorBuilder {
if let ConnectorProperties::Iceberg(iceberg_properties) = config
&& let SplitImpl::Iceberg(split) = &split_list[0]
{
if let IcebergFileScanTask::CountStar(count) = split.task {
return Ok(Box::new(ValuesExecutor::new(
vec![vec![Box::new(LiteralExpression::new(
DataType::Int64,
Some(ScalarImpl::Int64(count as i64)),
))]],
schema,
source.plan_node().get_identity().clone(),
source.context().get_config().developer.chunk_size,
)));
}
let iceberg_properties: IcebergProperties = *iceberg_properties;
let split: IcebergSplit = split.clone();
let need_seq_num = schema
Expand Down
1 change: 0 additions & 1 deletion src/batch/executors/src/executor/values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ pub struct ValuesExecutor {
}

impl ValuesExecutor {
#[cfg(test)]
pub(crate) fn new(
rows: Vec<Vec<BoxedExpression>>,
schema: Schema,
Expand Down
121 changes: 90 additions & 31 deletions src/connector/src/source/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use async_trait::async_trait;
use futures_async_stream::for_await;
use iceberg::expr::Predicate as IcebergPredicate;
use iceberg::scan::FileScanTask;
use iceberg::spec::{DataContentType, ManifestList};
use iceberg::table::Table;
use iceberg::Catalog;
use itertools::Itertools;
Expand All @@ -35,6 +36,7 @@ use risingwave_common::types::JsonbVal;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_pb::batch_plan::iceberg_scan_node::IcebergScanType;
use serde::{Deserialize, Serialize};
use tokio_stream::StreamExt;

use crate::connector_common::IcebergCommon;
use crate::error::{ConnectorError, ConnectorResult};
Expand Down Expand Up @@ -118,9 +120,15 @@ pub enum IcebergFileScanTask {
Data(Vec<FileScanTask>),
EqualityDelete(Vec<FileScanTask>),
PositionDelete(Vec<FileScanTask>),
CountStar(u64),
}

impl IcebergFileScanTask {
pub fn new_with_scan_type(
pub fn new_count_star(count_sum: u64) -> Self {
IcebergFileScanTask::CountStar(count_sum)
}

pub fn new_scan_with_scan_type(
iceberg_scan_type: IcebergScanType,
data_files: Vec<FileScanTask>,
equality_delete_files: Vec<FileScanTask>,
Expand All @@ -134,25 +142,8 @@ impl IcebergFileScanTask {
IcebergScanType::PositionDeleteScan => {
IcebergFileScanTask::PositionDelete(position_delete_files)
}
IcebergScanType::Unspecified => unreachable!("Unspecified iceberg scan type"),
}
}

pub fn add_files(
&mut self,
data_file: FileScanTask,
equality_delete_file: FileScanTask,
position_delete_file: FileScanTask,
) {
match self {
IcebergFileScanTask::Data(data_files) => {
data_files.push(data_file);
}
IcebergFileScanTask::EqualityDelete(equality_delete_files) => {
equality_delete_files.push(equality_delete_file);
}
IcebergFileScanTask::PositionDelete(position_delete_files) => {
position_delete_files.push(position_delete_file);
IcebergScanType::Unspecified | IcebergScanType::CountStar => {
unreachable!("Unspecified iceberg scan type")
}
}
}
Expand All @@ -166,6 +157,7 @@ impl IcebergFileScanTask {
IcebergFileScanTask::PositionDelete(position_delete_files) => {
position_delete_files.is_empty()
}
IcebergFileScanTask::CountStar(_) => false,
}
}
}
Expand All @@ -179,15 +171,23 @@ pub struct IcebergSplit {

impl IcebergSplit {
pub fn empty(iceberg_scan_type: IcebergScanType) -> Self {
Self {
split_id: 0,
snapshot_id: 0,
task: IcebergFileScanTask::new_with_scan_type(
iceberg_scan_type,
vec![],
vec![],
vec![],
),
if let IcebergScanType::CountStar = iceberg_scan_type {
Self {
split_id: 0,
snapshot_id: 0,
task: IcebergFileScanTask::new_count_star(0),
}
} else {
Self {
split_id: 0,
snapshot_id: 0,
task: IcebergFileScanTask::new_scan_with_scan_type(
iceberg_scan_type,
vec![],
vec![],
vec![],
),
}
}
}
}
Expand Down Expand Up @@ -304,8 +304,31 @@ impl IcebergSplitEnumerator {
// If there is no snapshot, we will return a mock `IcebergSplit` with empty files.
return Ok(vec![IcebergSplit::empty(iceberg_scan_type)]);
}
let snapshot_id = snapshot_id.unwrap();
if let IcebergScanType::CountStar = iceberg_scan_type {
self.list_splits_batch_count_star(&table, snapshot_id.unwrap())
.await
} else {
self.list_splits_batch_scan(
&table,
snapshot_id.unwrap(),
schema,
batch_parallelism,
iceberg_scan_type,
predicate,
)
.await
}
}

async fn list_splits_batch_scan(
&self,
table: &Table,
snapshot_id: i64,
schema: Schema,
batch_parallelism: usize,
iceberg_scan_type: IcebergScanType,
predicate: IcebergPredicate,
) -> ConnectorResult<Vec<IcebergSplit>> {
let schema_names = schema.names();
let require_names = schema_names
.iter()
Expand Down Expand Up @@ -363,7 +386,7 @@ impl IcebergSplitEnumerator {
|(index, ((data_file, equality_delete_file), position_delete_file))| IcebergSplit {
split_id: index as i64,
snapshot_id,
task: IcebergFileScanTask::new_with_scan_type(
task: IcebergFileScanTask::new_scan_with_scan_type(
iceberg_scan_type,
data_file,
equality_delete_file,
Expand All @@ -380,6 +403,42 @@ impl IcebergSplitEnumerator {
Ok(splits)
}

pub async fn list_splits_batch_count_star(
&self,
table: &Table,
snapshot_id: i64,
) -> ConnectorResult<Vec<IcebergSplit>> {
let mut record_counts = 0;
let manifest_list: ManifestList = table
.metadata()
.snapshot_by_id(snapshot_id)
.unwrap()
.load_manifest_list(table.file_io(), table.metadata())
.await
.map_err(|e| anyhow!(e))?;

for entry in manifest_list.entries() {
let manifest = entry
.load_manifest(table.file_io())
.await
.map_err(|e| anyhow!(e))?;
let mut manifest_entries_stream =
futures::stream::iter(manifest.entries().iter().filter(|e| e.is_alive()));

while let Some(manifest_entry) = manifest_entries_stream.next().await {
let file = manifest_entry.data_file();
assert_eq!(file.content_type(), DataContentType::Data);
record_counts += file.record_count();
}
}
let split = IcebergSplit {
split_id: 0,
snapshot_id,
task: IcebergFileScanTask::new_count_star(record_counts),
};
Ok(vec![split])
}

pub async fn all_delete_parameters(
table: &Table,
snapshot_id: i64,
Expand Down
12 changes: 12 additions & 0 deletions src/frontend/src/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,12 @@ impl PlanRoot {
ApplyOrder::BottomUp,
))?;

let plan = plan.optimize_by_rules(&OptimizationStage::new(
"Iceberg Count Star",
vec![BatchIcebergCountStar::create()],
ApplyOrder::TopDown,
))?;

// For iceberg scan, we do iceberg predicate pushdown
// BatchFilter -> BatchIcebergScan
let plan = plan.optimize_by_rules(&OptimizationStage::new(
Expand Down Expand Up @@ -465,6 +471,12 @@ impl PlanRoot {
ApplyOrder::BottomUp,
))?;

let plan = plan.optimize_by_rules(&OptimizationStage::new(
"Iceberg Count Star",
vec![BatchIcebergCountStar::create()],
ApplyOrder::TopDown,
))?;

assert_eq!(plan.convention(), Convention::Batch);
Ok(plan)
}
Expand Down
22 changes: 22 additions & 0 deletions src/frontend/src/optimizer/plan_node/batch_iceberg_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ use std::rc::Rc;

use iceberg::expr::Predicate as IcebergPredicate;
use pretty_xmlish::{Pretty, XmlNode};
use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ColumnId};
use risingwave_common::types::DataType;
use risingwave_pb::batch_plan::iceberg_scan_node::IcebergScanType;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::IcebergScanNode;
Expand Down Expand Up @@ -82,6 +84,26 @@ impl BatchIcebergScan {
}
}

pub fn new_count_star_with_batch_iceberg_scan(batch_iceberg_scan: &BatchIcebergScan) -> Self {
let mut core = batch_iceberg_scan.core.clone();
core.column_catalog = vec![ColumnCatalog::visible(ColumnDesc::named(
"count",
ColumnId::first_user_column(),
DataType::Int64,
))];
let base = PlanBase::new_batch_with_core(
&core,
batch_iceberg_scan.base.distribution().clone(),
batch_iceberg_scan.base.order().clone(),
);
Self {
base,
core,
iceberg_scan_type: IcebergScanType::CountStar,
predicate: IcebergPredicate::AlwaysTrue,
}
}

pub fn iceberg_scan_type(&self) -> IcebergScanType {
self.iceberg_scan_type
}
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/batch_simple_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::optimizer::property::{Distribution, Order, RequiredDist};
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct BatchSimpleAgg {
pub base: PlanBase<Batch>,
core: generic::Agg<PlanRef>,
pub core: generic::Agg<PlanRef>,
}

impl BatchSimpleAgg {
Expand Down
46 changes: 46 additions & 0 deletions src/frontend/src/optimizer/rule/batch/batch_iceberg_count_star.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright 2025 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).

use crate::optimizer::plan_node::{BatchIcebergScan, PlanAggCall};
use crate::optimizer::rule::{BoxedRule, Rule};
use crate::PlanRef;

pub struct BatchIcebergCountStar {}

impl Rule for BatchIcebergCountStar {
fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
let agg = plan.as_batch_simple_agg()?;
if agg.core.group_key.is_empty()
&& agg.agg_calls().len() == 1
&& agg.agg_calls()[0].eq(&PlanAggCall::count_star())
{
let batch_iceberg = agg.core.input.as_batch_iceberg_scan()?;
return Some(
BatchIcebergScan::new_count_star_with_batch_iceberg_scan(batch_iceberg).into(),
);
}
None
}
}

impl BatchIcebergCountStar {
pub fn create() -> BoxedRule {
Box::new(BatchIcebergCountStar {})
}
}
1 change: 1 addition & 0 deletions src/frontend/src/optimizer/rule/batch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod batch_iceberg_count_star;
pub mod batch_iceberg_predicate_pushdown;
pub(crate) mod batch_project_merge_rule;
pub mod batch_push_limit_to_scan_rule;
2 changes: 2 additions & 0 deletions src/frontend/src/optimizer/rule/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ mod table_function_to_postgres_query_rule;
mod values_extract_project_rule;

pub use add_logstore_rule::*;
pub use batch::batch_iceberg_count_star::*;
pub use batch::batch_iceberg_predicate_pushdown::*;
pub use batch::batch_push_limit_to_scan_rule::*;
pub use pull_up_correlated_predicate_agg_rule::*;
Expand Down Expand Up @@ -335,6 +336,7 @@ macro_rules! for_all_rules {
, { ValuesExtractProjectRule }
, { BatchPushLimitToScanRule }
, { BatchIcebergPredicatePushDownRule }
, { BatchIcebergCountStar }
, { PullUpCorrelatedPredicateAggRule }
, { SourceToKafkaScanRule }
, { SourceToIcebergScanRule }
Expand Down

0 comments on commit 3afbf25

Please sign in to comment.