Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: snapshots generated in multi txn has same prev_snapshot_id #16044

Merged
merged 4 commits into from
Jul 15, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions src/query/service/tests/it/storages/fuse/conflict.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use databend_common_storages_fuse::operations::SnapshotChanges;
use databend_common_storages_fuse::operations::SnapshotGenerator;
use databend_storages_common_table_meta::meta::Statistics;
use databend_storages_common_table_meta::meta::TableSnapshot;
use databend_storages_common_txn::TxnManager;

#[test]
/// base snapshot contains segments 1, 2, 3,
Expand Down Expand Up @@ -62,6 +63,8 @@ fn test_unresolvable_delete_conflict() {
None,
Some(Arc::new(latest_snapshot)),
None,
TxnManager::init(),
0,
);
assert!(result.is_err());
}
Expand Down Expand Up @@ -151,6 +154,8 @@ fn test_resolvable_delete_conflict() {
None,
Some(Arc::new(latest_snapshot)),
None,
TxnManager::init(),
0,
);
let snapshot = result.unwrap();
let expected = vec![("8".to_string(), 1), ("4".to_string(), 1)];
Expand Down Expand Up @@ -255,6 +260,8 @@ fn test_resolvable_replace_conflict() {
None,
Some(Arc::new(latest_snapshot)),
None,
TxnManager::init(),
0,
);
let snapshot = result.unwrap();
let expected = vec![
Expand Down
1 change: 1 addition & 0 deletions src/query/storages/fuse/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ databend-storages-common-index = { workspace = true }
databend-storages-common-io = { workspace = true }
databend-storages-common-pruner = { workspace = true }
databend-storages-common-table-meta = { workspace = true }
databend-storages-common-txn = { workspace = true }
enum-as-inner = "0.5"
futures = { workspace = true }
futures-util = { workspace = true }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ impl SnapshotGenerator for AppendGenerator {
Ok(())
}

fn generate_new_snapshot(
fn do_generate_new_snapshot(
&self,
schema: TableSchema,
cluster_key_meta: Option<ClusterKey>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ impl SnapshotGenerator for MutationGenerator {
self.conflict_resolve_ctx = ctx;
}

fn generate_new_snapshot(
fn do_generate_new_snapshot(
&self,
schema: TableSchema,
cluster_key_meta: Option<ClusterKey>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use databend_common_exception::Result;
use databend_common_expression::TableSchema;
use databend_storages_common_table_meta::meta::ClusterKey;
use databend_storages_common_table_meta::meta::TableSnapshot;
use databend_storages_common_txn::TxnManagerRef;

use crate::operations::common::ConflictResolveContext;

Expand All @@ -43,5 +44,25 @@ pub trait SnapshotGenerator {
cluster_key_meta: Option<ClusterKey>,
previous: Option<Arc<TableSnapshot>>,
prev_table_seq: Option<u64>,
txn_mgr: TxnManagerRef,
table_id: u64,
) -> Result<TableSnapshot> {
let previous_of_previous = previous.as_ref().and_then(|prev| prev.prev_snapshot_id);
let mut snapshot =
self.do_generate_new_snapshot(schema, cluster_key_meta, previous, prev_table_seq)?;
let guard = txn_mgr.lock();
// If a table is updated multi times in a transaction, the previous snapshot is always the snapshot before the transaction.
if guard.is_active() && guard.get_table_from_buffer_by_id(table_id).is_some() {
snapshot.prev_snapshot_id = previous_of_previous;
}
Ok(snapshot)
}

fn do_generate_new_snapshot(
&self,
schema: TableSchema,
cluster_key_meta: Option<ClusterKey>,
previous: Option<Arc<TableSnapshot>>,
prev_table_seq: Option<u64>,
) -> Result<TableSnapshot>;
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl SnapshotGenerator for TruncateGenerator {
self
}

fn generate_new_snapshot(
fn do_generate_new_snapshot(
&self,
schema: TableSchema,
cluster_key_meta: Option<ClusterKey>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use databend_common_meta_types::MatchSeq;
use databend_common_pipeline_sinks::AsyncSink;
use databend_storages_common_table_meta::meta::TableSnapshot;
use databend_storages_common_table_meta::meta::Versioned;
use databend_storages_common_txn::TxnManagerRef;
use log::debug;
use log::error;
use log::info;
Expand Down Expand Up @@ -90,7 +91,12 @@ impl AsyncSink for CommitMultiTableInsert {
snapshot_generator.set_conflict_resolve_context(commit_meta.conflict_resolve_context);
let table = self.tables.get(&table_id).unwrap();
update_table_metas.push((
build_update_table_meta_req(table.as_ref(), &snapshot_generator).await?,
build_update_table_meta_req(
table.as_ref(),
&snapshot_generator,
self.ctx.txn_mgr(),
)
.await?,
table.get_table_info().clone(),
));
snapshot_generators.insert(table_id, snapshot_generator);
Expand Down Expand Up @@ -173,6 +179,7 @@ impl AsyncSink for CommitMultiTableInsert {
*req = build_update_table_meta_req(
table.as_ref(),
snapshot_generators.get(&tid).unwrap(),
self.ctx.txn_mgr(),
)
.await?;
break;
Expand Down Expand Up @@ -227,6 +234,7 @@ impl AsyncSink for CommitMultiTableInsert {
async fn build_update_table_meta_req(
table: &dyn Table,
snapshot_generator: &AppendGenerator,
txn_mgr: TxnManagerRef,
) -> Result<UpdateTableMetaReq> {
let fuse_table = FuseTable::try_from_table(table)?;
let previous = fuse_table.read_table_snapshot().await?;
Expand All @@ -235,6 +243,8 @@ async fn build_update_table_meta_req(
fuse_table.cluster_key_meta.clone(),
previous,
Some(fuse_table.table_info.ident.seq),
txn_mgr,
table.get_id(),
)?;

// write snapshot
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,8 @@ where F: SnapshotGenerator + Send + 'static
cluster_key_meta,
previous,
Some(table_info.ident.seq),
self.ctx.txn_mgr(),
table_info.ident.table_id,
) {
Ok(snapshot) => {
self.state = State::TryCommit {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,11 @@ begin transaction;
statement ok
insert into s values(3,4),(1,2),(5,6);

query I
select count(*) from fuse_snapshot('default', 's');
----
1

query II
INSERT FIRST
WHEN c3 = 5 THEN
Expand All @@ -335,6 +340,16 @@ SELECT * from s;
----
1 2

query I
select count(*) from fuse_snapshot('default', 't1');
----
1

query I
select count(*) from fuse_snapshot('default', 't2');
----
1

query II
select * from t1 order by c1;
----
Expand All @@ -349,6 +364,22 @@ select * from t2;
statement ok
rollback;

query I
select count(*) from fuse_snapshot('default', 's');
----
0

query I
select count(*) from fuse_snapshot('default', 't1');
----
0

query I
select count(*) from fuse_snapshot('default', 't2');
----
0


query II
select * from t1 order by c1;
----
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,21 @@ INSERT INTO employees VALUES
(3, 'Charlie', 'Finance'),
(4, 'David', 'HR');

query I
select count(*) from fuse_snapshot('test_txn_merge_into','employees');
----
1

statement ok
INSERT INTO salaries VALUES
(1, 50000.00),
(2, 60000.00);

query I
select count(*) from fuse_snapshot('test_txn_merge_into','salaries');
----
1

statement ok
MERGE INTO salaries
USING (SELECT * FROM employees) AS employees
Expand All @@ -49,6 +59,11 @@ MERGE INTO salaries
INSERT (employee_id, salary)
VALUES (employees.employee_id, 55000.00);

query I
select count(*) from fuse_snapshot('test_txn_merge_into','salaries');
----
1

query IF
SELECT employee_id, salary FROM salaries order by employee_id;
----
Expand All @@ -60,6 +75,16 @@ SELECT employee_id, salary FROM salaries order by employee_id;
statement ok
COMMIT;

query I
select count(*) from fuse_snapshot('test_txn_merge_into','salaries');
----
1

query I
select count(*) from fuse_snapshot('test_txn_merge_into','employees');
----
1

query IF
SELECT employee_id, salary FROM salaries order by employee_id;
----
Expand Down Expand Up @@ -145,4 +170,4 @@ SELECT employee_id, salary FROM salaries order by employee_id;


statement ok
drop database test_txn_merge_into;
drop database test_txn_merge_into;
Original file line number Diff line number Diff line change
Expand Up @@ -54,30 +54,90 @@ select a from s_append_only;
2
3

query I
select count(*) from fuse_snapshot('test_txn_stream','t_append_only');
----
1

statement ok
BEGIN;

statement ok
INSERT INTO t_append_only VALUES(4), (5);

query I
select count(*) from fuse_snapshot('test_txn_stream','t_append_only');
----
2

statement ok
INSERT INTO t_consume_append_only_1 SELECT a FROM s_append_only;

query I
select count(*) from fuse_snapshot('test_txn_stream','t_consume_append_only_1');
----
1

statement ok
INSERT INTO t_consume_append_only_2 SELECT a FROM s_append_only;

query I
select count(*) from fuse_snapshot('test_txn_stream','t_consume_append_only_2');
----
1

statement ok
INSERT INTO t_append_only VALUES(6), (7);

query I
select count(*) from fuse_snapshot('test_txn_stream','t_append_only');
----
2

statement ok
INSERT INTO t_consume_append_only_3 SELECT a FROM s_append_only;

query I
select count(*) from fuse_snapshot('test_txn_stream','t_consume_append_only_3');
----
1

statement ok
INSERT INTO t_consume_append_only_4 SELECT a FROM s_append_only_1;

query I
select count(*) from fuse_snapshot('test_txn_stream','t_consume_append_only_4');
----
1

statement ok
COMMIT;

query I
select count(*) from fuse_snapshot('test_txn_stream','t_append_only');
----
2

query I
select count(*) from fuse_snapshot('test_txn_stream','t_consume_append_only_1');
----
1

query I
select count(*) from fuse_snapshot('test_txn_stream','t_consume_append_only_2');
----
1

query I
select count(*) from fuse_snapshot('test_txn_stream','t_consume_append_only_3');
----
1

query I
select count(*) from fuse_snapshot('test_txn_stream','t_consume_append_only_4');
----
1

statement ok
INSERT INTO t_consume_append_only_5 SELECT a FROM s_append_only;

Expand Down Expand Up @@ -473,4 +533,3 @@ select str from target_2 order by str;

statement ok
drop database test_txn_stream;

Loading