Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: lock retention period & set ctx status #13

Merged
merged 1 commit into from
Aug 6, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 32 additions & 26 deletions src/query/ee/src/storages/fuse/operations/vacuum_table_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,17 @@ use uuid::Version;
#[async_backtrace::framed]
pub async fn do_vacuum2(fuse_table: &FuseTable, ctx: Arc<dyn TableContext>) -> Result<Vec<String>> {
let start = std::time::Instant::now();
let Some(lvt) = set_lvt(fuse_table, ctx.as_ref()).await? else {
let retention_period_in_days = ctx.get_settings().get_data_retention_time_in_days()?;
let Some(lvt) = set_lvt(fuse_table, ctx.as_ref(), retention_period_in_days).await? else {
return Ok(vec![]);
};
info!(

ctx.set_status_info(&format!(
"set lvt for table {} takes {:?}, lvt: {:?}",
fuse_table.get_table_info().desc,
start.elapsed(),
lvt
);
));

let start = std::time::Instant::now();
let snapshots_before_lvt = list_until_timestamp(
Expand All @@ -58,29 +60,30 @@ pub async fn do_vacuum2(fuse_table: &FuseTable, ctx: Arc<dyn TableContext>) -> R
true,
)
.await?;
info!(
let elapsed = start.elapsed();
ctx.set_status_info(&format!(
"list snapshots before lvt for table {} takes {:?}, snapshots_dir: {:?}, lvt: {:?}, snapshots: {:?}",
fuse_table.get_table_info().desc,
start.elapsed(),
elapsed,
fuse_table.meta_location_generator().snapshot_dir(),
lvt,
slice_summary(&snapshots_before_lvt)
);
));

let start = std::time::Instant::now();
let is_vacuum_all = ctx.get_settings().get_data_retention_time_in_days()? == 0;
let is_vacuum_all = retention_period_in_days == 0;
let Some((gc_root, snapshots_to_gc)) =
select_gc_root(fuse_table, &snapshots_before_lvt, is_vacuum_all).await?
else {
return Ok(vec![]);
};
info!(
ctx.set_status_info(&format!(
"select gc_root for table {} takes {:?}, gc_root: {:?}, snapshots_to_gc: {:?}",
fuse_table.get_table_info().desc,
start.elapsed(),
gc_root,
slice_summary(snapshots_to_gc)
);
));

let start = std::time::Instant::now();
let least_visible_timestamp = gc_root.least_visible_timestamp.unwrap();
Expand All @@ -96,26 +99,26 @@ pub async fn do_vacuum2(fuse_table: &FuseTable, ctx: Arc<dyn TableContext>) -> R
false,
)
.await?;
info!(
ctx.set_status_info(&format!(
"list segments before gc_root for table {} takes {:?}, segment_dir: {:?}, least_visible_timestamp: {:?}, segments: {:?}",
fuse_table.get_table_info().desc,
start.elapsed(),
fuse_table.meta_location_generator().segment_dir(),
least_visible_timestamp,
slice_summary(&segments_before_gc_root)
);
));

let start = std::time::Instant::now();
let segments_to_gc: Vec<String> = segments_before_gc_root
.into_iter()
.filter(|s| !gc_root_segments.contains(s))
.collect();
info!(
ctx.set_status_info(&format!(
"Filter segments to gc for table {} takes {:?}, segments_to_gc: {:?}",
fuse_table.get_table_info().desc,
start.elapsed(),
slice_summary(&segments_to_gc)
);
));

let start = std::time::Instant::now();
let segments_io =
Expand All @@ -127,11 +130,11 @@ pub async fn do_vacuum2(fuse_table: &FuseTable, ctx: Arc<dyn TableContext>) -> R
for segment in segments {
gc_root_blocks.extend(segment?.blocks.iter().map(|b| b.location.0.clone()));
}
info!(
ctx.set_status_info(&format!(
"read segments for table {} takes {:?}",
fuse_table.get_table_info().desc,
start.elapsed(),
);
));

let start = std::time::Instant::now();
let blocks_before_gc_root = list_until_timestamp(
Expand All @@ -141,26 +144,26 @@ pub async fn do_vacuum2(fuse_table: &FuseTable, ctx: Arc<dyn TableContext>) -> R
false,
)
.await?;
info!(
ctx.set_status_info(&format!(
"list blocks before gc_root for table {} takes {:?}, block_dir: {:?}, least_visible_timestamp: {:?}, blocks: {:?}",
fuse_table.get_table_info().desc,
start.elapsed(),
fuse_table.meta_location_generator().block_dir(),
least_visible_timestamp,
slice_summary(&blocks_before_gc_root)
);
));

let start = std::time::Instant::now();
let blocks_to_gc: Vec<String> = blocks_before_gc_root
.into_iter()
.filter(|b| !gc_root_blocks.contains(b))
.collect();
info!(
ctx.set_status_info(&format!(
"Filter blocks to gc for table {} takes {:?}, blocks_to_gc: {:?}",
fuse_table.get_table_info().desc,
start.elapsed(),
slice_summary(&blocks_to_gc)
);
));

let start = std::time::Instant::now();
let catalog = ctx.get_default_catalog()?;
Expand Down Expand Up @@ -195,19 +198,19 @@ pub async fn do_vacuum2(fuse_table: &FuseTable, ctx: Arc<dyn TableContext>) -> R
.push(TableMetaLocationGenerator::gen_bloom_index_location_from_block_location(loc));
}

info!(
ctx.set_status_info(&format!(
"collect indexes to gc for table {} takes {:?}, indexes_to_gc: {:?}",
fuse_table.get_table_info().desc,
start.elapsed(),
slice_summary(&indexes_to_gc)
);
));

let start = std::time::Instant::now();
let subject_files_to_gc: Vec<_> = segments_to_gc
.into_iter()
.chain(blocks_to_gc.into_iter())
.collect();
let op = Files::create(ctx, fuse_table.get_operator());
let op = Files::create(ctx.clone(), fuse_table.get_operator());

// order is important
// indexes should be removed before blocks, because index locations to gc are generated from block locations
Expand All @@ -221,19 +224,23 @@ pub async fn do_vacuum2(fuse_table: &FuseTable, ctx: Arc<dyn TableContext>) -> R
.chain(snapshots_to_gc.iter().cloned())
.chain(indexes_to_gc.into_iter())
.collect();
info!(
ctx.set_status_info(&format!(
"remove files for table {} takes {:?}, files_to_gc: {:?}",
fuse_table.get_table_info().desc,
start.elapsed(),
slice_summary(&files_to_gc)
);
));
Ok(files_to_gc)
}

/// Try set lvt as min(latest_snapshot.timestamp, now - retention_time).
///
/// Return `None` means we stop vacuumming, but don't want to report error to user.
async fn set_lvt(fuse_table: &FuseTable, ctx: &dyn TableContext) -> Result<Option<DateTime<Utc>>> {
async fn set_lvt(
fuse_table: &FuseTable,
ctx: &dyn TableContext,
retention: u64,
) -> Result<Option<DateTime<Utc>>> {
let Some(latest_snapshot) = fuse_table.read_table_snapshot().await? else {
info!(
"Table {} has no snapshot, stop vacuuming",
Expand All @@ -249,7 +256,6 @@ async fn set_lvt(fuse_table: &FuseTable, ctx: &dyn TableContext) -> Result<Optio
return Ok(None);
}
let cat = ctx.get_default_catalog()?;
let retention = ctx.get_settings().get_data_retention_time_in_days()?;
// safe to unwrap, as we have checked the version is v5
let latest_ts = latest_snapshot.timestamp.unwrap();
let lvt_point_candidate = if retention == 0 {
Expand Down
Loading