Skip to content

Commit

Permalink
refactor: make actor graph builder fragment aware (#20385)
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored Feb 7, 2025
1 parent ad80f2a commit 7c7e0ad
Show file tree
Hide file tree
Showing 10 changed files with 346 additions and 243 deletions.
23 changes: 13 additions & 10 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ pub struct ReplaceStreamJobPlan {
pub new_fragments: StreamJobFragments,
/// Downstream jobs of the replaced job need to update their `Merge` node to
/// connect to the new fragment.
pub merge_updates: Vec<MergeUpdate>,
pub dispatchers: HashMap<ActorId, Vec<Dispatcher>>,
pub merge_updates: HashMap<FragmentId, Vec<MergeUpdate>>,
pub dispatchers: HashMap<FragmentId, HashMap<ActorId, Vec<Dispatcher>>>,
/// For a table with connector, the `SourceExecutor` actor will also be rebuilt with new actor ids.
/// We need to reassign splits for it.
///
Expand Down Expand Up @@ -157,7 +157,7 @@ impl ReplaceStreamJobPlan {
/// `old_fragment_id` -> `new_fragment_id`
pub fn fragment_replacements(&self) -> HashMap<FragmentId, FragmentId> {
let mut fragment_replacements = HashMap::new();
for merge_update in &self.merge_updates {
for merge_update in self.merge_updates.values().flatten() {
if let Some(new_upstream_fragment_id) = merge_update.new_upstream_fragment_id {
let r = fragment_replacements
.insert(merge_update.upstream_fragment_id, new_upstream_fragment_id);
Expand All @@ -174,7 +174,8 @@ impl ReplaceStreamJobPlan {

pub fn dropped_actors(&self) -> HashSet<ActorId> {
self.merge_updates
.iter()
.values()
.flatten()
.flat_map(|merge_update| merge_update.removed_upstream_actor_id.clone())
.collect()
}
Expand All @@ -187,7 +188,7 @@ pub struct CreateStreamingJobCommandInfo {
pub stream_job_fragments: StreamJobFragments,
/// Refer to the doc on [`crate::manager::MetadataManager::get_upstream_root_fragments`] for the meaning of "root".
pub upstream_root_actors: HashMap<TableId, Vec<ActorId>>,
pub dispatchers: HashMap<ActorId, Vec<Dispatcher>>,
pub dispatchers: HashMap<FragmentId, HashMap<ActorId, Vec<Dispatcher>>>,
pub init_split_assignment: SplitAssignment,
pub definition: String,
pub job_type: StreamingJobType,
Expand Down Expand Up @@ -689,7 +690,8 @@ impl Command {
job_type,
} => {
let actor_dispatchers = dispatchers
.iter()
.values()
.flatten()
.map(|(&actor_id, dispatchers)| {
(
actor_id,
Expand Down Expand Up @@ -992,14 +994,15 @@ impl Command {

fn generate_update_mutation_for_replace_table(
old_fragments: &StreamJobFragments,
merge_updates: &[MergeUpdate],
dispatchers: &HashMap<ActorId, Vec<Dispatcher>>,
merge_updates: &HashMap<FragmentId, Vec<MergeUpdate>>,
dispatchers: &HashMap<FragmentId, HashMap<ActorId, Vec<Dispatcher>>>,
init_split_assignment: &SplitAssignment,
) -> Option<Mutation> {
let dropped_actors = old_fragments.actor_ids();

let actor_new_dispatchers = dispatchers
.iter()
.values()
.flatten()
.map(|(&actor_id, dispatchers)| {
(
actor_id,
Expand All @@ -1017,7 +1020,7 @@ impl Command {

Some(Mutation::Update(UpdateMutation {
actor_new_dispatchers,
merge_update: merge_updates.to_owned(),
merge_update: merge_updates.values().flatten().cloned().collect(),
dropped_actors,
actor_splits,
..Default::default()
Expand Down
10 changes: 5 additions & 5 deletions src/meta/src/barrier/context/context_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ impl CommandContext {
.post_collect_job_fragments(
new_fragments.stream_job_id().table_id as _,
new_fragments.actor_ids(),
dispatchers.clone(),
dispatchers,
init_split_assignment,
)
.await?;
Expand All @@ -209,7 +209,7 @@ impl CommandContext {
old_fragments,
new_fragments.stream_source_fragments(),
init_split_assignment.clone(),
replace_plan.fragment_replacements(),
replace_plan,
)
.await;
}
Expand Down Expand Up @@ -251,7 +251,7 @@ impl CommandContext {
.post_collect_job_fragments(
stream_job_fragments.stream_job_id().table_id as _,
stream_job_fragments.actor_ids(),
dispatchers.clone(),
dispatchers,
init_split_assignment,
)
.await?;
Expand Down Expand Up @@ -295,7 +295,7 @@ impl CommandContext {
.post_collect_job_fragments(
new_fragments.stream_job_id().table_id as _,
new_fragments.actor_ids(),
dispatchers.clone(),
dispatchers,
init_split_assignment,
)
.await?;
Expand All @@ -307,7 +307,7 @@ impl CommandContext {
old_fragments,
new_fragments.stream_source_fragments(),
init_split_assignment.clone(),
replace_plan.fragment_replacements(),
replace_plan,
)
.await;
}
Expand Down
3 changes: 2 additions & 1 deletion src/meta/src/barrier/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,8 @@ impl CreateMviewProgressTracker {
for (table_id, actors) in upstream_root_actors {
assert!(!actors.is_empty());
let dispatch_count: usize = dispatchers
.iter()
.values()
.flatten()
.filter(|(upstream_actor_id, _)| actors.contains(upstream_actor_id))
.map(|(_, v)| v.len())
.sum();
Expand Down
57 changes: 27 additions & 30 deletions src/meta/src/controller/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use risingwave_pb::meta::{PbFragmentWorkerSlotMapping, PbObject, PbObjectGroup};
use risingwave_pb::source::{PbConnectorSplit, PbConnectorSplits};
use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
use risingwave_pb::stream_plan::update_mutation::PbMergeUpdate;
use risingwave_pb::stream_plan::update_mutation::MergeUpdate;
use risingwave_pb::stream_plan::{
PbDispatcher, PbDispatcherType, PbFragmentTypeFlag, PbStreamActor, PbStreamNode,
};
Expand Down Expand Up @@ -626,7 +626,10 @@ impl CatalogController {
&self,
job_id: ObjectId,
actor_ids: Vec<crate::model::ActorId>,
new_actor_dispatchers: HashMap<crate::model::ActorId, Vec<PbDispatcher>>,
new_actor_dispatchers: &HashMap<
crate::model::FragmentId,
HashMap<crate::model::ActorId, Vec<PbDispatcher>>,
>,
split_assignment: &SplitAssignment,
) -> MetaResult<()> {
let inner = self.inner.write().await;
Expand Down Expand Up @@ -659,10 +662,11 @@ impl CatalogController {
}

let mut actor_dispatchers = vec![];
for (actor_id, dispatchers) in new_actor_dispatchers {
for (actor_id, dispatchers) in new_actor_dispatchers.values().flatten() {
for dispatcher in dispatchers {
let mut actor_dispatcher =
actor_dispatcher::Model::from((actor_id, dispatcher)).into_active_model();
actor_dispatcher::Model::from((*actor_id, dispatcher.clone()))
.into_active_model();
actor_dispatcher.id = NotSet;
actor_dispatchers.push(actor_dispatcher);
}
Expand Down Expand Up @@ -973,7 +977,7 @@ impl CatalogController {
&self,
tmp_id: ObjectId,
streaming_job: StreamingJob,
merge_updates: Vec<PbMergeUpdate>,
merge_updates: HashMap<crate::model::FragmentId, Vec<MergeUpdate>>,
col_index_mapping: Option<ColIndexMapping>,
sink_into_table_context: SinkIntoTableContext,
) -> MetaResult<NotificationVersion> {
Expand Down Expand Up @@ -1011,7 +1015,7 @@ impl CatalogController {

pub async fn finish_replace_streaming_job_inner(
tmp_id: ObjectId,
merge_updates: Vec<PbMergeUpdate>,
merge_updates: HashMap<crate::model::FragmentId, Vec<MergeUpdate>>,
col_index_mapping: Option<ColIndexMapping>,
SinkIntoTableContext {
creating_sink_id,
Expand Down Expand Up @@ -1112,31 +1116,15 @@ impl CatalogController {
.await?;

// 2. update merges.
let fragment_replace_map: HashMap<_, _> = merge_updates
.iter()
.map(|update| {
(
update.upstream_fragment_id,
update.new_upstream_fragment_id.unwrap(),
)
})
.collect();

// TODO: remove cache upstream fragment/actor ids and derive them from `actor_dispatcher` table.
let mut to_update_fragment_ids = HashSet::new();
// 2.1 update downstream actor's upstream_actor_ids
for merge_update in merge_updates {
for merge_update in merge_updates.values().flatten() {
assert!(merge_update.removed_upstream_actor_id.is_empty());
assert!(merge_update.new_upstream_fragment_id.is_some());
let (actor_id, fragment_id, mut upstream_actors) =
let (actor_id, mut upstream_actors) =
Actor::find_by_id(merge_update.actor_id as ActorId)
.select_only()
.columns([
actor::Column::ActorId,
actor::Column::FragmentId,
actor::Column::UpstreamActorIds,
])
.into_tuple::<(ActorId, FragmentId, ActorUpstreamActors)>()
.columns([actor::Column::ActorId, actor::Column::UpstreamActorIds])
.into_tuple::<(ActorId, ActorUpstreamActors)>()
.one(txn)
.await?
.ok_or_else(|| {
Expand All @@ -1162,13 +1150,12 @@ impl CatalogController {
}
.update(txn)
.await?;

to_update_fragment_ids.insert(fragment_id);
}

// 2.2 update downstream fragment's Merge node, and upstream_fragment_id
for fragment_id in to_update_fragment_ids {
for (fragment_id, merge_updates) in merge_updates {
let (fragment_id, mut stream_node, mut upstream_fragment_id) =
Fragment::find_by_id(fragment_id)
Fragment::find_by_id(fragment_id as FragmentId)
.select_only()
.columns([
fragment::Column::FragmentId,
Expand All @@ -1180,6 +1167,16 @@ impl CatalogController {
.await?
.map(|(id, node, upstream)| (id, node.to_protobuf(), upstream))
.ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
let fragment_replace_map: HashMap<_, _> = merge_updates
.iter()
.map(|update| {
(
update.upstream_fragment_id,
update.new_upstream_fragment_id.unwrap(),
)
})
.collect();

visit_stream_node_mut(&mut stream_node, |body| {
if let PbNodeBody::Merge(m) = body
&& let Some(new_fragment_id) = fragment_replace_map.get(&m.upstream_fragment_id)
Expand Down
33 changes: 19 additions & 14 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ use risingwave_pb::ddl_service::{
use risingwave_pb::meta::table_fragments::fragment::FragmentDistributionType;
use risingwave_pb::meta::table_fragments::PbFragment;
use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::update_mutation::PbMergeUpdate;
use risingwave_pb::stream_plan::{
Dispatcher, DispatcherType, FragmentTypeFlag, MergeNode, PbStreamFragmentGraph,
StreamFragmentGraph as StreamFragmentGraphProto,
Expand Down Expand Up @@ -830,19 +829,25 @@ impl DdlController {
};

let upstream_actors = sink_fragment.get_actors();
let sink_fragment_dispatchers = replace_table_ctx
.dispatchers
.entry(sink_fragment.fragment_id)
.or_default();

for actor in upstream_actors {
replace_table_ctx.dispatchers.insert(
actor.actor_id,
vec![Dispatcher {
r#type: DispatcherType::Hash as _,
dist_key_indices: dist_key_indices.clone(),
output_indices: output_indices.clone(),
hash_mapping: mapping.as_ref().map(|m| m.to_protobuf()),
dispatcher_id: union_fragment.fragment_id as _,
downstream_actor_id: downstream_actor_ids.clone(),
}],
);
sink_fragment_dispatchers
.try_insert(
actor.actor_id,
vec![Dispatcher {
r#type: DispatcherType::Hash as _,
dist_key_indices: dist_key_indices.clone(),
output_indices: output_indices.clone(),
hash_mapping: mapping.as_ref().map(|m| m.to_protobuf()),
dispatcher_id: union_fragment.fragment_id as _,
downstream_actor_id: downstream_actor_ids.clone(),
}],
)
.expect("non-duplicate");
}

let upstream_fragment_id = sink_fragment.fragment_id;
Expand Down Expand Up @@ -1199,7 +1204,7 @@ impl DdlController {
)
.await?;

let result: MetaResult<Vec<PbMergeUpdate>> = try {
let result: MetaResult<_> = try {
let merge_updates = ctx.merge_updates.clone();

self.metadata_manager
Expand Down Expand Up @@ -1356,7 +1361,7 @@ impl DdlController {
tracing::debug!(id = job_id, "building replace streaming job");
let mut updated_sink_catalogs = vec![];

let result: MetaResult<Vec<PbMergeUpdate>> = try {
let result: MetaResult<_> = try {
let (mut ctx, mut stream_job_fragments) = self
.build_replace_job(
ctx,
Expand Down
6 changes: 3 additions & 3 deletions src/meta/src/stream/source_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use tokio::{select, time};
pub use worker::create_source_worker;
use worker::{create_source_worker_async, ConnectorSourceWorkerHandle};

use crate::barrier::{BarrierScheduler, Command};
use crate::barrier::{BarrierScheduler, Command, ReplaceStreamJobPlan};
use crate::manager::MetadataManager;
use crate::model::{ActorId, FragmentId, StreamJobFragments};
use crate::rpc::metrics::MetaMetrics;
Expand Down Expand Up @@ -369,7 +369,7 @@ impl SourceManager {
dropped_job_fragments: &StreamJobFragments,
added_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
split_assignment: SplitAssignment,
fragment_replacements: HashMap<FragmentId, FragmentId>,
replace_plan: &ReplaceStreamJobPlan,
) {
// Extract the fragments that include source operators.
let dropped_source_fragments = dropped_job_fragments.stream_source_fragments().clone();
Expand All @@ -388,7 +388,7 @@ impl SourceManager {
dropped_actors,
added_source_fragments,
split_assignment,
fragment_replacements,
fragment_replacements: replace_plan.fragment_replacements(),
})
.await;
}
Expand Down
Loading

0 comments on commit 7c7e0ad

Please sign in to comment.