Skip to content

Commit

Permalink
address comment
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Feb 5, 2025
1 parent 9946e87 commit 8a9dab5
Show file tree
Hide file tree
Showing 5 changed files with 7 additions and 4 deletions.
3 changes: 3 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,8 @@ message MergeNode {
//
// `upstream_actor_id` stored in the plan node in `Fragment` meta model cannot be directly used.
// See `compose_fragment`.
// The field is deprecated because the upstream actor info is provided separately instead of
// injected here in the node.
repeated uint32 upstream_actor_id = 1 [deprecated = true];
uint32 upstream_fragment_id = 2;
// Type of the upstream dispatcher. If there's always one upstream according to this
Expand Down Expand Up @@ -972,6 +974,7 @@ message StreamActor {
// Note that upstream actor ids are also stored in the proto of merge nodes.
// It is painstaking to traverse through the node tree and get upstream actor id from the root StreamNode.
// We duplicate the information here to ease the parsing logic in stream manager.
// The field is deprecated because it's no longer used.
repeated uint32 upstream_actor_id = 6 [deprecated = true];
// Vnodes that the executors in this actor own.
// If the fragment is a singleton, this field will not be set and leave a `None`.
Expand Down
2 changes: 1 addition & 1 deletion proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ message InjectBarrierRequest {
}

stream_plan.StreamActor actor = 1;
map<uint32, UpstreamActors> upstreams = 2;
map<uint32, UpstreamActors> fragment_upstreams = 2;
}

repeated common.ActorInfo broadcast_info = 8;
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/barrier/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,7 @@ impl ControlStreamManager {
.flatten()
.map(|(actor, upstreams)| BuildActorInfo {
actor: Some(actor),
upstreams: upstreams
fragment_upstreams: upstreams
.into_iter()
.map(|(fragment_id, upstreams)| {
(
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/stream/stream_graph/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ impl ActorBuilder {
/// During this process, the following things will be done:
/// 1. Replace the logical `Exchange` in node's input with `Merge`, which can be executed on the
/// compute nodes.
/// 2. Fill the upstream mview info of the `Merge` node under the other "leaf" nodes.
/// 2. Collect the upstream actor ids of each actor for the `Merge` node to create upstream connection.
fn rewrite(&self) -> MetaResult<(StreamNode, ActorUpstreams)> {
let mut actor_upstreams = ActorUpstreams::new();
let node = self.rewrite_inner(&self.nodes, &mut actor_upstreams, 0)?;
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/task/barrier_manager/managed_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -762,7 +762,7 @@ impl DatabaseManagedBarrierState {
let subscriptions =
LazyCell::new(|| Arc::new(graph_state.mv_depended_subscriptions.clone()));
for actor in request.actors_to_build {
let upstream = actor.upstreams;
let upstream = actor.fragment_upstreams;
let actor = actor.actor.unwrap();
let actor_id = actor.actor_id;
assert!(!is_stop_actor(actor_id));
Expand Down

0 comments on commit 8a9dab5

Please sign in to comment.