Skip to content

Commit

Permalink
feat(meta): expose internal table catalog during backfill phase to ba…
Browse files Browse the repository at this point in the history
…tch engine (#20460)
  • Loading branch information
kwannoel authored Feb 18, 2025
1 parent e298fe2 commit 7335fa6
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 35 deletions.
4 changes: 2 additions & 2 deletions src/frontend/src/binder/relation/table_or_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,8 @@ impl Binder {
{
return Ok(self
.resolve_source_relation(&source_catalog.clone(), as_of));
} else if let Some(table_catalog) =
schema.get_created_table_by_name(table_name)
} else if let Some(table_catalog) = schema
.get_created_table_or_any_internal_table_by_name(table_name)
{
return self.resolve_table_relation(
table_catalog.clone(),
Expand Down
23 changes: 20 additions & 3 deletions src/frontend/src/catalog/schema_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,15 @@ impl SchemaCatalog {
}

pub fn drop_table(&mut self, id: TableId) {
let table_ref = self.table_by_id.remove(&id).unwrap();
self.table_by_name.remove(&table_ref.name).unwrap();
self.indexes_by_table_id.remove(&table_ref.id);
if let Some(table_ref) = self.table_by_id.remove(&id) {
self.table_by_name.remove(&table_ref.name).unwrap();
self.indexes_by_table_id.remove(&table_ref.id);
} else {
tracing::warn!(
id = ?id.table_id,
"table not found when dropping, frontend might not be notified yet"
);
}
}

pub fn create_index(&mut self, prost: &PbIndex) {
Expand Down Expand Up @@ -643,6 +649,17 @@ impl SchemaCatalog {
.filter(|&table| table.stream_job_status == StreamJobStatus::Created)
}

/// Get a table by name, if it's a created table,
/// or if it's an internal table (whether created or not).
pub fn get_created_table_or_any_internal_table_by_name(
&self,
table_name: &str,
) -> Option<&Arc<TableCatalog>> {
self.table_by_name.get(table_name).filter(|&table| {
table.stream_job_status == StreamJobStatus::Created || table.is_internal_table()
})
}

pub fn get_table_by_id(&self, table_id: &TableId) -> Option<&Arc<TableCatalog>> {
self.table_by_id.get(table_id)
}
Expand Down
4 changes: 3 additions & 1 deletion src/meta/src/barrier/context/context_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,16 +269,18 @@ impl CommandContext {
stream_job_fragments,
dispatchers,
init_split_assignment,
streaming_job,
..
} = info;
barrier_manager_context
.metadata_manager
.catalog_controller
.post_collect_job_fragments(
.post_collect_job_fragments_inner(
stream_job_fragments.stream_job_id().table_id as _,
stream_job_fragments.actor_ids(),
dispatchers,
init_split_assignment,
streaming_job.is_materialized_view(),
)
.await?;

Expand Down
79 changes: 50 additions & 29 deletions src/meta/src/controller/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,6 @@ impl CatalogController {
}
}

let mut objects = vec![];

match streaming_job {
StreamingJob::MaterializedView(table) => {
let job_id = Self::create_streaming_job_obj(
Expand All @@ -198,10 +196,6 @@ impl CatalogController {
table.id = job_id as _;
let table_model: table::ActiveModel = table.clone().into();
Table::insert(table_model).exec(&txn).await?;

objects.push(PbObject {
object_info: Some(PbObjectInfo::Table(table.to_owned())),
});
}
StreamingJob::Sink(sink, _) => {
if let Some(target_table_id) = sink.target_table {
Expand Down Expand Up @@ -352,11 +346,6 @@ impl CatalogController {

txn.commit().await?;

if !objects.is_empty() {
self.notify_frontend(Operation::Add, Info::ObjectGroup(PbObjectGroup { objects }))
.await;
}

Ok(())
}

Expand Down Expand Up @@ -400,21 +389,6 @@ impl CatalogController {
}
txn.commit().await?;

if job.is_materialized_view() {
self.notify_frontend(
Operation::Add,
Info::ObjectGroup(PbObjectGroup {
objects: incomplete_internal_tables
.iter()
.map(|table| PbObject {
object_info: Some(PbObjectInfo::Table(table.clone())),
})
.collect(),
}),
)
.await;
}

Ok(table_id_map)
}

Expand All @@ -428,10 +402,13 @@ impl CatalogController {
streaming_job: &StreamingJob,
for_replace: bool,
) -> MetaResult<()> {
let is_materialized_view = streaming_job.is_materialized_view();
let fragment_actors =
Self::extract_fragment_and_actors_from_fragments(stream_job_fragments)?;
let all_tables = stream_job_fragments.all_tables();
let mut all_tables = stream_job_fragments.all_tables();
let inner = self.inner.write().await;

let mut objects = vec![];
let txn = inner.db.begin().await?;

let mut fragment_relations = BTreeMap::new();
Expand Down Expand Up @@ -477,16 +454,18 @@ impl CatalogController {
Fragment::insert(fragment).exec(&txn).await?;

// Fields including `fragment_id` and `vnode_count` were placeholder values before.
// After table fragments are created, update them for all internal tables.
// After table fragments are created, update them for all tables.
if !for_replace {
for state_table_id in state_table_ids {
// Table's vnode count is not always the fragment's vnode count, so we have to
// look up the table from `TableFragments`.
// See `ActorGraphBuilder::new`.
let table = all_tables
.get(&(state_table_id as u32))
.get_mut(&(state_table_id as u32))
.unwrap_or_else(|| panic!("table {} not found", state_table_id));
assert_eq!(table.id, state_table_id as u32);
assert_eq!(table.fragment_id, fragment_id as u32);
table.job_id = Some(streaming_job.id());
let vnode_count = table.vnode_count();

table::ActiveModel {
Expand All @@ -497,6 +476,12 @@ impl CatalogController {
}
.update(&txn)
.await?;

if is_materialized_view {
objects.push(PbObject {
object_info: Some(PbObjectInfo::Table(table.clone())),
});
}
}
}
}
Expand Down Expand Up @@ -537,6 +522,12 @@ impl CatalogController {

txn.commit().await?;

if !objects.is_empty() {
assert!(is_materialized_view);
self.notify_frontend(Operation::Add, Info::ObjectGroup(PbObjectGroup { objects }))
.await;
}

Ok(())
}

Expand Down Expand Up @@ -667,6 +658,27 @@ impl CatalogController {
HashMap<crate::model::ActorId, Vec<PbDispatcher>>,
>,
split_assignment: &SplitAssignment,
) -> MetaResult<()> {
self.post_collect_job_fragments_inner(
job_id,
actor_ids,
new_actor_dispatchers,
split_assignment,
false,
)
.await
}

pub async fn post_collect_job_fragments_inner(
&self,
job_id: ObjectId,
actor_ids: Vec<crate::model::ActorId>,
new_actor_dispatchers: &HashMap<
crate::model::FragmentId,
HashMap<crate::model::ActorId, Vec<PbDispatcher>>,
>,
split_assignment: &SplitAssignment,
is_mv: bool,
) -> MetaResult<()> {
let inner = self.inner.write().await;
let txn = inner.db.begin().await?;
Expand Down Expand Up @@ -755,8 +767,17 @@ impl CatalogController {
.update(&txn)
.await?;

let fragment_mapping = if is_mv {
get_fragment_mappings(&txn, job_id as _).await?
} else {
vec![]
};

txn.commit().await?;

self.notify_fragment_mapping(NotificationOperation::Add, fragment_mapping)
.await;

Ok(())
}

Expand Down

0 comments on commit 7335fa6

Please sign in to comment.