Skip to content

Commit

Permalink
feat(source): update split state after yield barrier on split change …
Browse files Browse the repository at this point in the history
…for source executor (#20506)
  • Loading branch information
wenym1 authored Feb 20, 2025
1 parent 5d2265b commit 9160155
Show file tree
Hide file tree
Showing 8 changed files with 408 additions and 199 deletions.
2 changes: 1 addition & 1 deletion src/stream/src/common/table/state_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ where
Ok(())
}

async fn try_wait_committed_epoch(&self, prev_epoch: u64) -> StorageResult<()> {
pub async fn try_wait_committed_epoch(&self, prev_epoch: u64) -> StorageResult<()> {
self.store
.try_wait_epoch(
HummockReadEpoch::Committed(prev_epoch),
Expand Down
11 changes: 5 additions & 6 deletions src/stream/src/executor/source/fs_fetch_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,9 @@ impl<S: StateStore, Src: OpendalSource> FsFetchExecutor<S, Src> {
rate_limit_rps: Option<u32>,
) -> StreamExecutorResult<()> {
let mut batch = Vec::with_capacity(SPLIT_BATCH_SIZE);
'vnodes: for vnode in state_store_handler.state_table.vnodes().iter_vnodes() {
let table_iter = state_store_handler
.state_table
let state_table = state_store_handler.state_table();
'vnodes: for vnode in state_table.vnodes().iter_vnodes() {
let table_iter = state_table
.iter_with_vnode(
vnode,
&(Bound::<OwnedRow>::Unbounded, Bound::<OwnedRow>::Unbounded),
Expand Down Expand Up @@ -273,8 +273,7 @@ impl<S: StateStore, Src: OpendalSource> FsFetchExecutor<S, Src> {
}

let post_commit = state_store_handler
.state_table
.commit(barrier.epoch)
.commit_may_update_vnode_bitmap(barrier.epoch)
.await?;

let update_vnode_bitmap =
Expand Down Expand Up @@ -359,7 +358,7 @@ impl<S: StateStore, Src: OpendalSource> FsFetchExecutor<S, Src> {
.collect()
};
state_store_handler.set_states(file_assignment).await?;
state_store_handler.state_table.try_flush().await?;
state_store_handler.try_flush().await?;
}
Message::Watermark(_) => unreachable!(),
}
Expand Down
80 changes: 50 additions & 30 deletions src/stream/src/executor/source/legacy_fs_source_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
#![deprecated = "will be replaced by new fs source (list + fetch)"]
#![expect(deprecated)]

use std::collections::{HashMap, HashSet};
use std::collections::HashSet;

use anyhow::anyhow;
use either::Either;
Expand Down Expand Up @@ -139,14 +139,15 @@ impl<S: StateStore> LegacyFsSourceExecutor<S> {
Ok(())
}

async fn apply_split_change<const BIASED: bool>(
async fn apply_split_change_after_yield_barrier<const BIASED: bool>(
&mut self,
barrier_epoch: EpochPair,
source_desc: &LegacyFsSourceDesc,
stream: &mut StreamReaderWithPause<BIASED, StreamChunk>,
mapping: &HashMap<ActorId, Vec<SplitImpl>>,
target_splits: Vec<SplitImpl>,
) -> StreamExecutorResult<()> {
if let Some(target_splits) = mapping.get(&self.actor_ctx.id).cloned() {
if let Some(target_state) = self.get_diff(target_splits).await? {
{
if let Some(target_state) = self.get_diff(barrier_epoch, target_splits).await? {
tracing::info!(
actor_id = self.actor_ctx.id,
state = ?target_state,
Expand All @@ -163,14 +164,19 @@ impl<S: StateStore> LegacyFsSourceExecutor<S> {

// Note: get_diff will modify the state_cache
// rhs can not be None because we do not support split number reduction
async fn get_diff(&mut self, rhs: Vec<SplitImpl>) -> StreamExecutorResult<ConnectorState> {
async fn get_diff(
&mut self,
epoch: EpochPair,
rhs: Vec<SplitImpl>,
) -> StreamExecutorResult<ConnectorState> {
let core = &mut self.stream_source_core;
let all_completed: HashSet<SplitId> = core.split_state_store.get_all_completed().await?;

tracing::debug!(actor = self.actor_ctx.id, all_completed = ?all_completed , "get diff");

let mut target_state: Vec<SplitImpl> = Vec::new();
let mut no_change_flag = true;
let committed_reader = core.split_state_store.new_committed_reader(epoch).await?;
for sc in rhs {
if let Some(s) = core.updated_splits_in_epoch.get(&sc.id()) {
let fs = s
Expand All @@ -186,10 +192,8 @@ impl<S: StateStore> LegacyFsSourceExecutor<S> {
} else {
no_change_flag = false;
// write new assigned split to state cache. snapshot is base on cache.
let state = if let Some(recover_state) = core
.split_state_store
.try_recover_from_state_store(&sc)
.await?
let state = if let Some(recover_state) =
committed_reader.try_recover_from_state_store(&sc).await?
{
recover_state
} else {
Expand Down Expand Up @@ -271,10 +275,7 @@ impl<S: StateStore> LegacyFsSourceExecutor<S> {
core.split_state_store.set_all_complete(completed).await?
}
// commit anyway, even if no message saved
core.split_state_store
.state_table
.commit_assert_no_update_vnode_bitmap(epoch)
.await?;
core.split_state_store.commit(epoch).await?;

core.updated_splits_in_epoch.clear();
Ok(())
Expand All @@ -283,7 +284,7 @@ impl<S: StateStore> LegacyFsSourceExecutor<S> {
async fn try_flush_data(&mut self) -> StreamExecutorResult<()> {
let core = &mut self.stream_source_core;

core.split_state_store.state_table.try_flush().await?;
core.split_state_store.try_flush().await?;

Ok(())
}
Expand Down Expand Up @@ -342,15 +343,19 @@ impl<S: StateStore> LegacyFsSourceExecutor<S> {
.filter(|split| !all_completed.contains(&split.id()))
.collect_vec();

// restore the newest split info
for ele in &mut boot_state {
if let Some(recover_state) = self
{
let committed_reader = self
.stream_source_core
.split_state_store
.try_recover_from_state_store(ele)
.await?
{
*ele = recover_state;
.new_committed_reader(first_epoch)
.await?;
// restore the newest split info
for ele in &mut boot_state {
if let Some(recover_state) =
committed_reader.try_recover_from_state_store(ele).await?
{
*ele = recover_state;
}
}
}

Expand Down Expand Up @@ -407,12 +412,17 @@ impl<S: StateStore> LegacyFsSourceExecutor<S> {
self_paused = false;
}
let epoch = barrier.epoch;
let mut split_change = None;

if let Some(ref mutation) = barrier.mutation.as_deref() {
match mutation {
Mutation::SourceChangeSplit(actor_splits) => {
self.apply_split_change(&source_desc, &mut stream, actor_splits)
.await?
split_change = actor_splits
.get(&self.actor_ctx.id)
.cloned()
.map(|target_splits| {
(&source_desc, &mut stream, target_splits)
});
}
Mutation::Pause => {
command_paused = true;
Expand All @@ -423,12 +433,12 @@ impl<S: StateStore> LegacyFsSourceExecutor<S> {
stream.resume_stream()
}
Mutation::Update(UpdateMutation { actor_splits, .. }) => {
self.apply_split_change(
&source_desc,
&mut stream,
actor_splits,
)
.await?;
split_change = actor_splits
.get(&self.actor_ctx.id)
.cloned()
.map(|target_splits| {
(&source_desc, &mut stream, target_splits)
});
}
Mutation::Throttle(actor_to_apply) => {
if let Some(new_rate_limit) =
Expand All @@ -446,6 +456,16 @@ impl<S: StateStore> LegacyFsSourceExecutor<S> {
self.take_snapshot_and_clear_cache(epoch).await?;

yield msg;

if let Some((source_desc, stream, target_splits)) = split_change {
self.apply_split_change_after_yield_barrier(
epoch,
source_desc,
stream,
target_splits,
)
.await?;
}
}
_ => {
// For the source executor, the message we receive from this arm should
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ mod fs_fetch_executor;
pub use fs_fetch_executor::*;

mod source_backfill_state_table;
pub use source_backfill_state_table::BackfillStateTableHandler;
pub(crate) use source_backfill_state_table::BackfillStateTableHandler;

pub mod state_table_handler;
use futures_async_stream::try_stream;
Expand Down
Loading

0 comments on commit 9160155

Please sign in to comment.