Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: snapshots generated in multi txn has same prev_snapshot_id #16044

Merged
merged 4 commits into from
Jul 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions src/query/service/tests/it/storages/fuse/conflict.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use databend_common_storages_fuse::operations::SnapshotChanges;
use databend_common_storages_fuse::operations::SnapshotGenerator;
use databend_storages_common_table_meta::meta::Statistics;
use databend_storages_common_table_meta::meta::TableSnapshot;
use databend_storages_common_txn::TxnManager;

#[test]
/// base snapshot contains segments 1, 2, 3,
Expand Down Expand Up @@ -62,6 +63,8 @@ fn test_unresolvable_delete_conflict() {
None,
Some(Arc::new(latest_snapshot)),
None,
TxnManager::init(),
0,
);
assert!(result.is_err());
}
Expand Down Expand Up @@ -151,6 +154,8 @@ fn test_resolvable_delete_conflict() {
None,
Some(Arc::new(latest_snapshot)),
None,
TxnManager::init(),
0,
);
let snapshot = result.unwrap();
let expected = vec![("8".to_string(), 1), ("4".to_string(), 1)];
Expand Down Expand Up @@ -255,6 +260,8 @@ fn test_resolvable_replace_conflict() {
None,
Some(Arc::new(latest_snapshot)),
None,
TxnManager::init(),
0,
);
let snapshot = result.unwrap();
let expected = vec![
Expand Down
1 change: 1 addition & 0 deletions src/query/storages/fuse/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ databend-storages-common-index = { workspace = true }
databend-storages-common-io = { workspace = true }
databend-storages-common-pruner = { workspace = true }
databend-storages-common-table-meta = { workspace = true }
databend-storages-common-txn = { workspace = true }
enum-as-inner = "0.5"
futures = { workspace = true }
futures-util = { workspace = true }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,11 @@ impl SnapshotGenerator for AppendGenerator {
Ok(())
}

fn generate_new_snapshot(
fn do_generate_new_snapshot(
&self,
schema: TableSchema,
cluster_key_meta: Option<ClusterKey>,
previous: Option<Arc<TableSnapshot>>,
previous: &Option<Arc<TableSnapshot>>,
prev_table_seq: Option<u64>,
) -> Result<TableSnapshot> {
let (snapshot_merged, expected_schema) = self.conflict_resolve_ctx()?;
Expand All @@ -133,7 +133,7 @@ impl SnapshotGenerator for AppendGenerator {
let mut new_segments = snapshot_merged.merged_segments.clone();
let mut new_summary = snapshot_merged.merged_statistics.clone();

if let Some(snapshot) = &previous {
if let Some(snapshot) = previous {
prev_timestamp = snapshot.timestamp;
prev_snapshot_id = Some((snapshot.snapshot_id, snapshot.format_version));
table_statistics_location = snapshot.table_statistics_location.clone();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,27 +56,33 @@ impl SnapshotGenerator for MutationGenerator {
self.conflict_resolve_ctx = ctx;
}

fn generate_new_snapshot(
fn do_generate_new_snapshot(
&self,
schema: TableSchema,
cluster_key_meta: Option<ClusterKey>,
previous: Option<Arc<TableSnapshot>>,
previous: &Option<Arc<TableSnapshot>>,
prev_table_seq: Option<u64>,
) -> Result<TableSnapshot> {
let default_cluster_key_id = cluster_key_meta.clone().map(|v| v.0);

let previous = previous.unwrap_or_else(|| {
Arc::new(TableSnapshot::new_empty_snapshot(
schema.clone(),
prev_table_seq,
))
});
let empty_snapshot;
let previous = match previous {
Some(prev) => prev,
None => {
empty_snapshot = Arc::new(TableSnapshot::new_empty_snapshot(
schema.clone(),
prev_table_seq,
));
&empty_snapshot
}
};

match &self.conflict_resolve_ctx {
ConflictResolveContext::ModifiedSegmentExistsInLatest(ctx) => {
if let Some((removed, replaced)) =
ConflictResolveContext::is_modified_segments_exists_in_latest(
&self.base_snapshot,
&previous,
previous,
&ctx.replaced_segments,
&ctx.removed_segment_indexes,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use databend_common_exception::Result;
use databend_common_expression::TableSchema;
use databend_storages_common_table_meta::meta::ClusterKey;
use databend_storages_common_table_meta::meta::TableSnapshot;
use databend_storages_common_txn::TxnManagerRef;

use crate::operations::common::ConflictResolveContext;

Expand All @@ -43,5 +44,34 @@ pub trait SnapshotGenerator {
cluster_key_meta: Option<ClusterKey>,
previous: Option<Arc<TableSnapshot>>,
prev_table_seq: Option<u64>,
txn_mgr: TxnManagerRef,
table_id: u64,
) -> Result<TableSnapshot> {
let mut snapshot =
self.do_generate_new_snapshot(schema, cluster_key_meta, &previous, prev_table_seq)?;

let has_pending_transactional_mutations = {
let guard = txn_mgr.lock();
// NOTE:
// When generating a new snapshot for a mutation of table for the first time,
// there is no buffered table ID inside txn_mgr for this table.
guard.is_active() && guard.get_table_from_buffer_by_id(table_id).is_some()
};

if has_pending_transactional_mutations {
// Adjust the `prev_snapshot_id` of the newly created snapshot to match the
// `prev_snapshot_id` of the table when it first appeared in the transaction.
let previous_of_previous = previous.as_ref().and_then(|prev| prev.prev_snapshot_id);
snapshot.prev_snapshot_id = previous_of_previous;
}
Ok(snapshot)
}

fn do_generate_new_snapshot(
&self,
schema: TableSchema,
cluster_key_meta: Option<ClusterKey>,
previous: &Option<Arc<TableSnapshot>>,
prev_table_seq: Option<u64>,
) -> Result<TableSnapshot>;
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ impl SnapshotGenerator for TruncateGenerator {
self
}

fn generate_new_snapshot(
fn do_generate_new_snapshot(
&self,
schema: TableSchema,
cluster_key_meta: Option<ClusterKey>,
previous: Option<Arc<TableSnapshot>>,
previous: &Option<Arc<TableSnapshot>>,
prev_table_seq: Option<u64>,
) -> Result<TableSnapshot> {
let (prev_timestamp, prev_snapshot_id) = if let Some(prev_snapshot) = previous {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use databend_common_meta_types::MatchSeq;
use databend_common_pipeline_sinks::AsyncSink;
use databend_storages_common_table_meta::meta::TableSnapshot;
use databend_storages_common_table_meta::meta::Versioned;
use databend_storages_common_txn::TxnManagerRef;
use log::debug;
use log::error;
use log::info;
Expand Down Expand Up @@ -90,7 +91,12 @@ impl AsyncSink for CommitMultiTableInsert {
snapshot_generator.set_conflict_resolve_context(commit_meta.conflict_resolve_context);
let table = self.tables.get(&table_id).unwrap();
update_table_metas.push((
build_update_table_meta_req(table.as_ref(), &snapshot_generator).await?,
build_update_table_meta_req(
table.as_ref(),
&snapshot_generator,
self.ctx.txn_mgr(),
)
.await?,
table.get_table_info().clone(),
));
snapshot_generators.insert(table_id, snapshot_generator);
Expand Down Expand Up @@ -173,6 +179,7 @@ impl AsyncSink for CommitMultiTableInsert {
*req = build_update_table_meta_req(
table.as_ref(),
snapshot_generators.get(&tid).unwrap(),
self.ctx.txn_mgr(),
)
.await?;
break;
Expand Down Expand Up @@ -227,6 +234,7 @@ impl AsyncSink for CommitMultiTableInsert {
async fn build_update_table_meta_req(
table: &dyn Table,
snapshot_generator: &AppendGenerator,
txn_mgr: TxnManagerRef,
) -> Result<UpdateTableMetaReq> {
let fuse_table = FuseTable::try_from_table(table)?;
let previous = fuse_table.read_table_snapshot().await?;
Expand All @@ -235,6 +243,8 @@ async fn build_update_table_meta_req(
fuse_table.cluster_key_meta.clone(),
previous,
Some(fuse_table.table_info.ident.seq),
txn_mgr,
table.get_id(),
)?;

// write snapshot
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,8 @@ where F: SnapshotGenerator + Send + 'static
cluster_key_meta,
previous,
Some(table_info.ident.seq),
self.ctx.txn_mgr(),
table_info.ident.table_id,
) {
Ok(snapshot) => {
self.state = State::TryCommit {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,11 @@ begin transaction;
statement ok
insert into s values(3,4),(1,2),(5,6);

query I
select count(*) from fuse_snapshot('default', 's');
----
1

query II
INSERT FIRST
WHEN c3 = 5 THEN
Expand All @@ -335,6 +340,16 @@ SELECT * from s;
----
1 2

query I
select count(*) from fuse_snapshot('default', 't1');
----
1

query I
select count(*) from fuse_snapshot('default', 't2');
----
1

query II
select * from t1 order by c1;
----
Expand All @@ -349,6 +364,22 @@ select * from t2;
statement ok
rollback;

query I
select count(*) from fuse_snapshot('default', 's');
----
0

query I
select count(*) from fuse_snapshot('default', 't1');
----
0

query I
select count(*) from fuse_snapshot('default', 't2');
----
0


query II
select * from t1 order by c1;
----
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,21 @@ INSERT INTO employees VALUES
(3, 'Charlie', 'Finance'),
(4, 'David', 'HR');

query I
select count(*) from fuse_snapshot('test_txn_merge_into','employees');
----
1

statement ok
INSERT INTO salaries VALUES
(1, 50000.00),
(2, 60000.00);

query I
select count(*) from fuse_snapshot('test_txn_merge_into','salaries');
----
1

statement ok
MERGE INTO salaries
USING (SELECT * FROM employees) AS employees
Expand All @@ -49,6 +59,11 @@ MERGE INTO salaries
INSERT (employee_id, salary)
VALUES (employees.employee_id, 55000.00);

query I
select count(*) from fuse_snapshot('test_txn_merge_into','salaries');
----
1

query IF
SELECT employee_id, salary FROM salaries order by employee_id;
----
Expand All @@ -60,6 +75,16 @@ SELECT employee_id, salary FROM salaries order by employee_id;
statement ok
COMMIT;

query I
select count(*) from fuse_snapshot('test_txn_merge_into','salaries');
----
1

query I
select count(*) from fuse_snapshot('test_txn_merge_into','employees');
----
1

query IF
SELECT employee_id, salary FROM salaries order by employee_id;
----
Expand Down Expand Up @@ -145,4 +170,4 @@ SELECT employee_id, salary FROM salaries order by employee_id;


statement ok
drop database test_txn_merge_into;
drop database test_txn_merge_into;
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
statement ok
create or replace database test_txn_snapshots;

statement ok
use test_txn_snapshots;

statement ok
create or replace table t(c int);


###################################
# no snapshots left if tx aborted #
###################################

statement ok
begin;

statement ok
insert into t values(1);

statement ok
insert into t values(1);

statement ok
rollback;

query I
select count() from fuse_snapshot('test_txn_snapshots', 't');
----
0


#####################################################
# one snapshot left if table mutated multiple times #
#####################################################



statement ok
begin;

statement ok
insert into t values(1);

statement ok
insert into t values(1);

statement ok
insert into t values(1);

statement ok
commit;

query I
select count() from fuse_snapshot('test_txn_snapshots', 't');
----
1


Loading
Loading