Skip to content

Commit

Permalink
use hash map
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Feb 7, 2025
1 parent 5c8106a commit 3db4b37
Show file tree
Hide file tree
Showing 6 changed files with 11 additions and 14 deletions.
6 changes: 3 additions & 3 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use std::collections::hash_map::Entry;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::collections::{HashMap, HashSet};
use std::fmt::Formatter;

use risingwave_common::bitmap::Bitmap;
Expand Down Expand Up @@ -100,7 +100,7 @@ pub struct ReplaceStreamJobPlan {
pub new_fragments: StreamJobFragments,
/// Downstream jobs of the replaced job need to update their `Merge` node to
/// connect to the new fragment.
pub merge_updates: BTreeMap<FragmentId, Vec<MergeUpdate>>,
pub merge_updates: HashMap<FragmentId, Vec<MergeUpdate>>,
pub dispatchers: HashMap<FragmentId, HashMap<ActorId, Vec<Dispatcher>>>,
/// For a table with connector, the `SourceExecutor` actor will also be rebuilt with new actor ids.
/// We need to reassign splits for it.
Expand Down Expand Up @@ -994,7 +994,7 @@ impl Command {

fn generate_update_mutation_for_replace_table(
old_fragments: &StreamJobFragments,
merge_updates: &BTreeMap<FragmentId, Vec<MergeUpdate>>,
merge_updates: &HashMap<FragmentId, Vec<MergeUpdate>>,
dispatchers: &HashMap<FragmentId, HashMap<ActorId, Vec<Dispatcher>>>,
init_split_assignment: &SplitAssignment,
) -> Option<Mutation> {
Expand Down
4 changes: 2 additions & 2 deletions src/meta/src/controller/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -977,7 +977,7 @@ impl CatalogController {
&self,
tmp_id: ObjectId,
streaming_job: StreamingJob,
merge_updates: BTreeMap<crate::model::FragmentId, Vec<MergeUpdate>>,
merge_updates: HashMap<crate::model::FragmentId, Vec<MergeUpdate>>,
col_index_mapping: Option<ColIndexMapping>,
sink_into_table_context: SinkIntoTableContext,
) -> MetaResult<NotificationVersion> {
Expand Down Expand Up @@ -1015,7 +1015,7 @@ impl CatalogController {

pub async fn finish_replace_streaming_job_inner(
tmp_id: ObjectId,
merge_updates: BTreeMap<crate::model::FragmentId, Vec<MergeUpdate>>,
merge_updates: HashMap<crate::model::FragmentId, Vec<MergeUpdate>>,
col_index_mapping: Option<ColIndexMapping>,
SinkIntoTableContext {
creating_sink_id,
Expand Down
9 changes: 3 additions & 6 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use std::cmp::Ordering;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::collections::{HashMap, HashSet};
use std::num::NonZeroUsize;
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -50,7 +50,6 @@ use risingwave_pb::ddl_service::{
use risingwave_pb::meta::table_fragments::fragment::FragmentDistributionType;
use risingwave_pb::meta::table_fragments::PbFragment;
use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::update_mutation::PbMergeUpdate;
use risingwave_pb::stream_plan::{
Dispatcher, DispatcherType, FragmentTypeFlag, MergeNode, PbStreamFragmentGraph,
StreamFragmentGraph as StreamFragmentGraphProto,
Expand All @@ -69,9 +68,7 @@ use crate::manager::{
LocalNotification, MetaSrvEnv, MetadataManager, NotificationVersion, StreamingJob,
StreamingJobType, IGNORED_NOTIFICATION_VERSION,
};
use crate::model::{
FragmentActorUpstreams, FragmentId, StreamContext, StreamJobFragments, TableParallelism,
};
use crate::model::{FragmentActorUpstreams, StreamContext, StreamJobFragments, TableParallelism};
use crate::stream::{
create_source_worker, validate_sink, ActorGraphBuildResult, ActorGraphBuilder,
CompleteStreamFragmentGraph, CreateStreamingJobContext, CreateStreamingJobOption,
Expand Down Expand Up @@ -1207,7 +1204,7 @@ impl DdlController {
)
.await?;

let result: MetaResult<BTreeMap<FragmentId, Vec<PbMergeUpdate>>> = try {
let result: MetaResult<_> = try {
let merge_updates = ctx.merge_updates.clone();

self.metadata_manager
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/stream/source_manager/split_assignment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ impl SourceManager {
pub async fn allocate_splits_for_replace_source(
&self,
job_id: &TableId,
merge_updates: &BTreeMap<FragmentId, Vec<MergeUpdate>>,
merge_updates: &HashMap<FragmentId, Vec<MergeUpdate>>,
) -> MetaResult<SplitAssignment> {
tracing::debug!(?merge_updates, "allocate_splits_for_replace_source");
if merge_updates.is_empty() {
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/stream/stream_graph/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -728,7 +728,7 @@ pub struct ActorGraphBuildResult {

/// The updates to be applied to the downstream chain actors. Used for schema change (replace
/// table plan).
pub merge_updates: BTreeMap<FragmentId, Vec<MergeUpdate>>,
pub merge_updates: HashMap<FragmentId, Vec<MergeUpdate>>,
}

/// [`ActorGraphBuilder`] builds the actor graph for the given complete fragment graph, based on the
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/stream/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ pub struct ReplaceStreamJobContext {
pub old_fragments: StreamJobFragments,

/// The updates to be applied to the downstream chain actors. Used for schema change.
pub merge_updates: BTreeMap<FragmentId, Vec<MergeUpdate>>,
pub merge_updates: HashMap<FragmentId, Vec<MergeUpdate>>,

/// New dispatchers to add from upstream actors to downstream actors.
pub dispatchers: HashMap<FragmentId, HashMap<ActorId, Vec<Dispatcher>>>,
Expand Down

0 comments on commit 3db4b37

Please sign in to comment.