Skip to content

Commit

Permalink
feat(frontend): support logstore plan node (#20376)
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel authored Feb 17, 2025
1 parent 6650af0 commit 88bb6a7
Show file tree
Hide file tree
Showing 11 changed files with 225 additions and 2 deletions.
1 change: 1 addition & 0 deletions e2e_test/batch/catalog/pg_settings.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ user streaming_enable_delta_join
user streaming_max_parallelism
user streaming_over_window_cache_policy
user streaming_parallelism
user streaming_unaligned_join
user streaming_use_arrangement_backfill
user streaming_use_shared_source
user streaming_use_snapshot_backfill
Expand Down
5 changes: 5 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -853,6 +853,10 @@ message RowMergeNode {
catalog.ColIndexMapping rhs_mapping = 2;
}

message SyncLogStoreNode {
catalog.Table log_store_table = 1;
}

message StreamNode {
oneof node_body {
SourceNode source = 100;
Expand Down Expand Up @@ -902,6 +906,7 @@ message StreamNode {
GlobalApproxPercentileNode global_approx_percentile = 145;
RowMergeNode row_merge = 146;
AsOfJoinNode as_of_join = 147;
SyncLogStoreNode sync_log_store = 148;
}
// The id for the operator. This is local per mview.
// TODO: should better be a uint32.
Expand Down
4 changes: 4 additions & 0 deletions src/common/src/session_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,10 @@ pub struct SessionConfig {
/// Format: iceberg_engine_connection = schema_name.connection_name.
#[parameter(default = "", check_hook = check_iceberg_engine_connection)]
iceberg_engine_connection: String,

/// Whether the streaming join should be unaligned or not.
#[parameter(default = false)]
streaming_unaligned_join: bool,
}

fn check_iceberg_engine_connection(val: &str) -> Result<(), String> {
Expand Down
4 changes: 4 additions & 0 deletions src/frontend/src/optimizer/plan_node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -981,6 +981,7 @@ mod stream_sort;
mod stream_source;
mod stream_source_scan;
mod stream_stateless_simple_agg;
mod stream_sync_log_store;
mod stream_table_scan;
mod stream_topn;
mod stream_values;
Expand Down Expand Up @@ -1106,6 +1107,7 @@ pub use stream_sort::StreamEowcSort;
pub use stream_source::StreamSource;
pub use stream_source_scan::StreamSourceScan;
pub use stream_stateless_simple_agg::StreamStatelessSimpleAgg;
pub use stream_sync_log_store::StreamSyncLogStore;
pub use stream_table_scan::StreamTableScan;
pub use stream_temporal_join::StreamTemporalJoin;
pub use stream_topn::StreamTopN;
Expand Down Expand Up @@ -1245,6 +1247,7 @@ macro_rules! for_all_plan_nodes {
, { Stream, LocalApproxPercentile }
, { Stream, RowMerge }
, { Stream, AsOfJoin }
, { Stream, SyncLogStore }
}
};
}
Expand Down Expand Up @@ -1379,6 +1382,7 @@ macro_rules! for_stream_plan_nodes {
, { Stream, LocalApproxPercentile }
, { Stream, RowMerge }
, { Stream, AsOfJoin }
, { Stream, SyncLogStore }
}
};
}
Expand Down
85 changes: 85 additions & 0 deletions src/frontend/src/optimizer/plan_node/stream_sync_log_store.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Copyright 2025 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use pretty_xmlish::XmlNode;
use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::SyncLogStoreNode;

use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
use crate::optimizer::plan_node::generic::PhysicalPlanRef;
use crate::optimizer::plan_node::stream::StreamPlanRef;
use crate::optimizer::plan_node::utils::{
childless_record, infer_synced_kv_log_store_table_catalog_inner, Distill,
};
use crate::optimizer::plan_node::{
ExprRewritable, PlanBase, PlanTreeNodeUnary, Stream, StreamNode,
};
use crate::stream_fragmenter::BuildFragmentGraphState;
use crate::PlanRef;

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct StreamSyncLogStore {
pub base: PlanBase<Stream>,
pub input: PlanRef,
}

impl StreamSyncLogStore {
pub fn new(input: PlanRef) -> Self {
let base = PlanBase::new_stream(
input.ctx().clone(),
input.schema().clone(),
input.stream_key().map(|keys| keys.to_vec()),
input.functional_dependency().clone(),
input.distribution().clone(),
input.append_only(),
input.emit_on_window_close(),
input.watermark_columns().clone(),
input.columns_monotonicity().clone(),
);
Self { base, input }
}
}

impl Distill for StreamSyncLogStore {
fn distill<'a>(&self) -> XmlNode<'a> {
childless_record("StreamSyncLogStore", vec![])
}
}

impl PlanTreeNodeUnary for StreamSyncLogStore {
fn input(&self) -> PlanRef {
self.input.clone()
}

fn clone_with_input(&self, input: PlanRef) -> Self {
Self::new(input)
}
}

impl_plan_tree_node_for_unary! { StreamSyncLogStore }

impl StreamNode for StreamSyncLogStore {
fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> NodeBody {
let columns = self.input.schema().fields();
let log_store_table = infer_synced_kv_log_store_table_catalog_inner(&self.input, columns)
.with_id(state.gen_table_id_wrapped())
.to_internal_table_prost()
.into();
NodeBody::SyncLogStore(Box::new(SyncLogStoreNode { log_store_table }))
}
}

impl ExprRewritable for StreamSyncLogStore {}

impl ExprVisitable for StreamSyncLogStore {}
45 changes: 45 additions & 0 deletions src/frontend/src/optimizer/plan_node/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,51 @@ pub fn infer_kv_log_store_table_catalog_inner(
table_catalog_builder.build(dist_key, read_prefix_len_hint)
}

pub fn infer_synced_kv_log_store_table_catalog_inner(
input: &PlanRef,
columns: &[Field],
) -> TableCatalog {
let mut table_catalog_builder = TableCatalogBuilder::default();

let mut value_indices =
Vec::with_capacity(KV_LOG_STORE_PREDEFINED_COLUMNS.len() + columns.len());

for (name, data_type) in KV_LOG_STORE_PREDEFINED_COLUMNS {
let indice = table_catalog_builder.add_column(&Field::with_name(data_type, name));
value_indices.push(indice);
}

table_catalog_builder.set_vnode_col_idx(VNODE_COLUMN_INDEX);

for (i, ordering) in PK_ORDERING.iter().enumerate() {
table_catalog_builder.add_order_column(i, *ordering);
}

let read_prefix_len_hint = table_catalog_builder.get_current_pk_len();

let payload_indices = {
let mut payload_indices = Vec::with_capacity(columns.len());
for column in columns {
let payload_index = table_catalog_builder.add_column(column);
payload_indices.push(payload_index);
}
payload_indices
};

value_indices.extend(payload_indices);
table_catalog_builder.set_value_indices(value_indices);

// Modify distribution key indices based on the pre-defined columns.
let dist_key = input
.distribution()
.dist_column_indices()
.iter()
.map(|idx| idx + KV_LOG_STORE_PREDEFINED_COLUMNS.len())
.collect_vec();

table_catalog_builder.build(dist_key, read_prefix_len_hint)
}

/// Check that all leaf nodes must be stream table scan,
/// since that plan node maps to `backfill` executor, which supports recovery.
/// Some other leaf nodes like `StreamValues` do not support recovery, and they
Expand Down
1 change: 1 addition & 0 deletions src/prost/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
.boxed(".stream_plan.StreamNode.node_body.global_approx_percentile")
.boxed(".stream_plan.StreamNode.node_body.row_merge")
.boxed(".stream_plan.StreamNode.node_body.as_of_join")
.boxed(".stream_plan.StreamNode.node_body.sync_log_store")
// `Udf` is 248 bytes, while 2nd largest field is 32 bytes.
.boxed(".expr.ExprNode.rex_node.udf")
// Eq + Hash are for plan nodes to do common sub-plan detection.
Expand Down
1 change: 1 addition & 0 deletions src/stream/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ pub use simple_agg::SimpleAggExecutor;
pub use sink::SinkExecutor;
pub use sort::*;
pub use stateless_simple_agg::StatelessSimpleAggExecutor;
pub use sync_kv_log_store::SyncedKvLogStoreExecutor;
pub use temporal_join::TemporalJoinExecutor;
pub use top_n::{
AppendOnlyGroupTopNExecutor, AppendOnlyTopNExecutor, GroupTopNExecutor, TopNExecutor,
Expand Down
3 changes: 1 addition & 2 deletions src/stream/src/executor/sync_kv_log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ use crate::executor::{

type ReadFlushedChunkFuture = BoxFuture<'static, LogStoreResult<(ChunkId, StreamChunk, u64)>>;

struct SyncedKvLogStoreExecutor<S: StateStore> {
pub struct SyncedKvLogStoreExecutor<S: StateStore> {
actor_context: ActorContextRef,
table_id: TableId,
metrics: KvLogStoreMetrics,
Expand All @@ -114,7 +114,6 @@ struct SyncedKvLogStoreExecutor<S: StateStore> {
}
// Stream interface
impl<S: StateStore> SyncedKvLogStoreExecutor<S> {
#[cfg_attr(not(test), expect(dead_code))]
pub(crate) fn new(
actor_context: ActorContextRef,
table_id: u32,
Expand Down
4 changes: 4 additions & 0 deletions src/stream/src/from_proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ mod row_merge;

mod approx_percentile;

mod sync_log_store;

// import for submodules
use itertools::Itertools;
use risingwave_pb::stream_plan::stream_node::NodeBody;
Expand Down Expand Up @@ -99,6 +101,7 @@ use self::source_backfill::*;
use self::stateless_simple_agg::*;
use self::stream_cdc_scan::*;
use self::stream_scan::*;
use self::sync_log_store::*;
use self::temporal_join::*;
use self::top_n::*;
use self::union::*;
Expand Down Expand Up @@ -188,5 +191,6 @@ pub async fn create_executor(
NodeBody::LocalApproxPercentile => LocalApproxPercentileExecutorBuilder,
NodeBody::RowMerge => RowMergeExecutorBuilder,
NodeBody::AsOfJoin => AsOfJoinExecutorBuilder,
NodeBody::SyncLogStore => SyncLogStoreExecutorBuilder,
}
}
74 changes: 74 additions & 0 deletions src/stream/src/from_proto/sync_log_store.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright 2025 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_pb::stream_plan::SyncLogStoreNode;
use risingwave_storage::StateStore;

use crate::common::log_store_impl::kv_log_store::serde::LogStoreRowSerde;
use crate::common::log_store_impl::kv_log_store::{KvLogStoreMetrics, KV_LOG_STORE_V2_INFO};
use crate::error::StreamResult;
use crate::executor::{Executor, SyncedKvLogStoreExecutor};
use crate::from_proto::ExecutorBuilder;
use crate::task::ExecutorParams;

pub struct SyncLogStoreExecutorBuilder;

impl ExecutorBuilder for SyncLogStoreExecutorBuilder {
type Node = SyncLogStoreNode;

async fn new_boxed_executor(
params: ExecutorParams,
node: &Self::Node,
store: impl StateStore,
) -> StreamResult<Executor> {
let actor_context = params.actor_context.clone();
let table = node.log_store_table.as_ref().unwrap().clone();
let table_id = table.id;

let metrics = {
let streaming_metrics = actor_context.streaming_metrics.as_ref();
let actor_id = actor_context.id;
let join_fragment_id = 0;
let name = "sync_log_store";
let target = "unaligned_hash_join";
KvLogStoreMetrics::new_inner(
streaming_metrics,
actor_id,
join_fragment_id,
name,
target,
)
};

let serde = LogStoreRowSerde::new(
&table,
params.vnode_bitmap.map(|b| b.into()),
&KV_LOG_STORE_V2_INFO,
);
// FIXME(kwannoel): Make configurable
let buffer_max_size = 1000;
let [upstream] = params.input.try_into().unwrap();

let executor = SyncedKvLogStoreExecutor::new(
actor_context,
table_id,
metrics,
serde,
store,
buffer_max_size,
upstream,
);
Ok((params.info, executor).into())
}
}

0 comments on commit 88bb6a7

Please sign in to comment.