From 8a9dab5bcc73cbdb25c7150ecb86556e51ff6995 Mon Sep 17 00:00:00 2001 From: William Wen Date: Wed, 5 Feb 2025 13:14:44 +0800 Subject: [PATCH] address comment --- proto/stream_plan.proto | 3 +++ proto/stream_service.proto | 2 +- src/meta/src/barrier/rpc.rs | 2 +- src/meta/src/stream/stream_graph/actor.rs | 2 +- src/stream/src/task/barrier_manager/managed_state.rs | 2 +- 5 files changed, 7 insertions(+), 4 deletions(-) diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index 91197c34fbb8d..c9247c62de6c4 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -550,6 +550,8 @@ message MergeNode { // // `upstream_actor_id` stored in the plan node in `Fragment` meta model cannot be directly used. // See `compose_fragment`. + // The field is deprecated because the upstream actor info is provided separately instead of + // injected here in the node. repeated uint32 upstream_actor_id = 1 [deprecated = true]; uint32 upstream_fragment_id = 2; // Type of the upstream dispatcher. If there's always one upstream according to this @@ -972,6 +974,7 @@ message StreamActor { // Note that upstream actor ids are also stored in the proto of merge nodes. // It is painstaking to traverse through the node tree and get upstream actor id from the root StreamNode. // We duplicate the information here to ease the parsing logic in stream manager. + // The field is deprecated because it's no longer used. repeated uint32 upstream_actor_id = 6 [deprecated = true]; // Vnodes that the executors in this actor own. // If the fragment is a singleton, this field will not be set and leave a `None`. diff --git a/proto/stream_service.proto b/proto/stream_service.proto index 949e0be84e23e..9c362fe72ffc4 100644 --- a/proto/stream_service.proto +++ b/proto/stream_service.proto @@ -23,7 +23,7 @@ message InjectBarrierRequest { } stream_plan.StreamActor actor = 1; - map upstreams = 2; + map fragment_upstreams = 2; } repeated common.ActorInfo broadcast_info = 8; diff --git a/src/meta/src/barrier/rpc.rs b/src/meta/src/barrier/rpc.rs index 94a9ffecc922f..454cfc377863d 100644 --- a/src/meta/src/barrier/rpc.rs +++ b/src/meta/src/barrier/rpc.rs @@ -543,7 +543,7 @@ impl ControlStreamManager { .flatten() .map(|(actor, upstreams)| BuildActorInfo { actor: Some(actor), - upstreams: upstreams + fragment_upstreams: upstreams .into_iter() .map(|(fragment_id, upstreams)| { ( diff --git a/src/meta/src/stream/stream_graph/actor.rs b/src/meta/src/stream/stream_graph/actor.rs index abf5413b5a90b..693f213e18899 100644 --- a/src/meta/src/stream/stream_graph/actor.rs +++ b/src/meta/src/stream/stream_graph/actor.rs @@ -127,7 +127,7 @@ impl ActorBuilder { /// During this process, the following things will be done: /// 1. Replace the logical `Exchange` in node's input with `Merge`, which can be executed on the /// compute nodes. - /// 2. Fill the upstream mview info of the `Merge` node under the other "leaf" nodes. + /// 2. Collect the upstream actor ids of each actor for the `Merge` node to create upstream connection. fn rewrite(&self) -> MetaResult<(StreamNode, ActorUpstreams)> { let mut actor_upstreams = ActorUpstreams::new(); let node = self.rewrite_inner(&self.nodes, &mut actor_upstreams, 0)?; diff --git a/src/stream/src/task/barrier_manager/managed_state.rs b/src/stream/src/task/barrier_manager/managed_state.rs index e1c78dcc597e4..7a58572bd9516 100644 --- a/src/stream/src/task/barrier_manager/managed_state.rs +++ b/src/stream/src/task/barrier_manager/managed_state.rs @@ -762,7 +762,7 @@ impl DatabaseManagedBarrierState { let subscriptions = LazyCell::new(|| Arc::new(graph_state.mv_depended_subscriptions.clone())); for actor in request.actors_to_build { - let upstream = actor.upstreams; + let upstream = actor.fragment_upstreams; let actor = actor.actor.unwrap(); let actor_id = actor.actor_id; assert!(!is_stop_actor(actor_id));