Skip to content

Commit

Permalink
feat: drop connector for table with connector (#20023)
Browse files Browse the repository at this point in the history
Signed-off-by: tabversion <[email protected]>
Co-authored-by: tabversion <[email protected]>
  • Loading branch information
tabVersion and tabversion authored Feb 18, 2025
1 parent 7335fa6 commit d98b06e
Show file tree
Hide file tree
Showing 24 changed files with 714 additions and 146 deletions.
68 changes: 68 additions & 0 deletions e2e_test/source_inline/cdc/mysql/mysql_alter_drop_connector.serial
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
control substitution on

system ok
mysql -e "
SET GLOBAL time_zone = '+01:00';
"

system ok
mysql -e "
DROP DATABASE IF EXISTS testdb1; CREATE DATABASE testdb1;
USE testdb1;
CREATE TABLE tt1 (v1 int primary key, v2 varchar(255));
INSERT INTO tt1 VALUES (1, '2023-10-23 10:00:00'), (2, '2023-10-23 11:00:00');
"

system ok
mysql -e "
DROP USER IF EXISTS 'non-shared-cdc'@'%';
CREATE USER 'non-shared-cdc'@'%' IDENTIFIED BY '123456';
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'non-shared-cdc'@'%';
#
DROP USER IF EXISTS 'shared-cdc'@'%';
CREATE USER 'shared-cdc'@'%' IDENTIFIED BY 'abcdef';
GRANT SELECT, RELOAD, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'shared-cdc'@'%';
#
FLUSH PRIVILEGES;
"

statement ok
CREATE TABLE table_t (
v1 int primary key, v2 varchar
) with (
${RISEDEV_MYSQL_WITH_OPTIONS_COMMON},
username = 'shared-cdc',
password = 'abcdef',
database.name = 'testdb1',
table.name = 'tt1',
);

sleep 1s

query I
select count(*) from table_t;
----
2

statement ok
alter table table_t drop connector;

query TT
show create table table_t;
----
public.table_t CREATE TABLE table_t (v1 INT, v2 CHARACTER VARYING)

system ok
mysql -e "
INSERT INTO tt1 VALUES (3, '2023-10-23 10:00:00'), (4, '2023-10-23 11:00:00');
"

sleep 1s

query I
select count(*) from table_t;
----
2

statement ok
drop table table_t;
138 changes: 138 additions & 0 deletions e2e_test/source_inline/kafka/alter_table_drop_connector.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
control substitution on

system ok
rpk topic create test_alter_table_drop_connector -p 1

system ok
cat <<EOF | rpk topic produce 'test_alter_table_drop_connector' -f "%k^%v\n"
{"ID": 1}^{"ID": 1, "firstName": "John", "lastName": "Doe", "age": 18, "height": 5.10, "weight": 150}
{"ID": 2}^{"ID": 2, "firstName": "Sarah", "lastName": "Smith", "age": 19, "height": 5.5, "weight": 120}
{"ID": 3}^{"ID": 3, "firstName": "Ben", "lastName": "Johnson", "age": 21, "height": 6.0, "weight": 175}
{"ID": 4}^{"ID": 4, "firstName": "Emma", "lastName": "Brown", "age": 20, "height": 5.3, "weight": 130}
{"ID": 5}^{"ID": 5, "firstName": "Michael", "lastName": "Williams", "age": 22, "height": 6.2, "weight": 190}
EOF

statement ok
CREATE TABLE plain_students (
"ID" INT,
"firstName" VARCHAR,
"lastName" VARCHAR,
age INT,
height REAL,
weight REAL,
)
INCLUDE KEY
INCLUDE PARTITION
INCLUDE TIMESTAMP
WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'test_alter_table_drop_connector')
FORMAT PLAIN ENCODE JSON;

statement ok
flush;

sleep 1s

query I
SELECT count(*) FROM plain_students;
----
5

statement ok
CREATE TABLE t_no_connector (a int, b varchar);

statement error Protocol error: Table t_no_connector is not associated with a connector
ALTER TABLE t_no_connector DROP CONNECTOR;

statement ok
ALTER TABLE plain_students DROP CONNECTOR;

query TT
show create table plain_students;
----
public.plain_students CREATE TABLE plain_students ("ID" INT, "firstName" CHARACTER VARYING, "lastName" CHARACTER VARYING, age INT, height REAL, weight REAL, _rw_kafka_timestamp TIMESTAMP WITH TIME ZONE, _rw_kafka_partition CHARACTER VARYING, _rw_kafka_key BYTEA)

system ok
cat <<EOF | rpk topic produce 'test_alter_table_drop_connector' -f "%k^%v\n"
{"ID": 6}^{"ID": 6, "firstName": "Leah", "lastName": "Davis", "age": 18, "height": 5.7, "weight": 140}
{"ID": 7}^{"ID": 7, "firstName": "Connor", "lastName": "Wilson", "age": 19, "height": 5.9, "weight": 160}
{"ID": 8}^{"ID": 8, "firstName": "Ava", "lastName": "Garcia", "age": 21, "height": 5.2, "weight": 115}
EOF

sleep 1s

# the streaming job does not intake new data
query I
SELECT count(*) FROM plain_students;
----
5

# ===== test with schema registry =====

system ok
rpk topic delete 'avro_drop_table_connector_test' || true; \
(rpk sr subject delete 'avro_drop_table_connector_test-value' && rpk sr subject delete 'avro_drop_table_connector_test-value' --permanent) || true;

system ok
rpk topic create 'avro_drop_table_connector_test'

system ok
sr_register avro_drop_table_connector_test-value AVRO <<< '{"type":"record","name":"Root","fields":[{"name":"bar","type":"int","default":0},{"name":"foo","type":"string"}]}'

system ok
echo '{"foo":"ABC", "bar":1}' | rpk topic produce --schema-id=topic avro_drop_table_connector_test

statement ok
create table avro_drop_table_connector_test_table
WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'avro_drop_table_connector_test'
)
FORMAT PLAIN ENCODE AVRO (
schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}'
);

sleep 1s

query ??
select foo, bar from avro_drop_table_connector_test_table
----
ABC 1

statement ok
alter table avro_drop_table_connector_test_table drop connector;

query TT
show create table avro_drop_table_connector_test_table;
----
public.avro_drop_table_connector_test_table CREATE TABLE avro_drop_table_connector_test_table (bar INT, foo CHARACTER VARYING)

query ??
select foo, bar from avro_drop_table_connector_test_table
----
ABC 1

# produce another message
system ok
echo '{"foo":"DEF", "bar":2}' | rpk topic produce --schema-id=topic avro_drop_table_connector_test

sleep 1s

# the new message is not ingested
query ??
select foo, bar from avro_drop_table_connector_test_table
----
ABC 1

# ===== clean up =====

statement ok
DROP TABLE plain_students;

statement ok
drop table avro_drop_table_connector_test_table;

system ok
rpk topic delete 'avro_drop_table_connector_test' || true; \
(rpk sr subject delete 'avro_drop_table_connector_test-value' && rpk sr subject delete 'avro_drop_table_connector_test-value' --permanent) || true;
15 changes: 15 additions & 0 deletions src/frontend/src/catalog/root_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,21 @@ impl Catalog {
))
}

pub fn get_source_by_id<'a>(
&self,
db_name: &'a str,
schema_path: SchemaPath<'a>,
source_id: &SourceId,
) -> CatalogResult<(&Arc<SourceCatalog>, &'a str)> {
schema_path
.try_find(|schema_name| {
Ok(self
.get_schema_by_name(db_name, schema_name)?
.get_source_by_id(source_id))
})?
.ok_or_else(|| CatalogError::NotFound("source", source_id.to_string()))
}

/// Used to get `TableCatalog` for Materialized Views, Tables and Indexes.
/// Retrieves all tables, created or creating.
pub fn get_any_table_by_name<'a>(
Expand Down
4 changes: 3 additions & 1 deletion src/frontend/src/handler/alter_source_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,9 @@ pub async fn handle_alter_source_column(
"column \"{new_column_name}\" of source \"{source_name}\" already exists"
)))?
}
let mut bound_column = bind_sql_columns(&[column_def])?.remove(0);

// add column name is from user, so we still have check for reserved column name
let mut bound_column = bind_sql_columns(&[column_def], false)?.remove(0);
bound_column.column_desc.column_id = max_column_id(columns).next();
columns.push(bound_column);
// No need to update the definition here. It will be done by purification later.
Expand Down
41 changes: 2 additions & 39 deletions src/frontend/src/handler/alter_table_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::sync::Arc;

use itertools::Itertools;
use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::catalog::{ColumnCatalog, Engine};
use risingwave_common::catalog::ColumnCatalog;
use risingwave_common::hash::VnodeCount;
use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_common::{bail, bail_not_implemented};
Expand All @@ -30,7 +30,6 @@ use risingwave_sqlparser::ast::{AlterTableOperation, ColumnOption, ObjectName, S

use super::create_source::SqlColumnStrategy;
use super::create_table::{generate_stream_graph_for_replace_table, ColumnIdGenerator};
use super::util::SourceSchemaCompatExt;
use super::{HandlerArgs, RwPgResponse};
use crate::catalog::purify::try_purify_table_source_create_sql_ast;
use crate::catalog::root_catalog::SchemaPath;
Expand Down Expand Up @@ -101,50 +100,14 @@ pub async fn get_replace_table_plan(
// Create handler args as if we're creating a new table with the altered definition.
let handler_args = HandlerArgs::new(session.clone(), &new_definition, Arc::from(""))?;
let col_id_gen = ColumnIdGenerator::new_alter(old_catalog);
let Statement::CreateTable {
columns,
constraints,
source_watermarks,
append_only,
on_conflict,
with_version_column,
wildcard_idx,
cdc_table_info,
format_encode,
include_column_options,
engine,
..
} = new_definition
else {
panic!("unexpected statement type: {:?}", new_definition);
};

let format_encode = format_encode
.clone()
.map(|format_encode| format_encode.into_v2_with_warning());

let engine = match engine {
risingwave_sqlparser::ast::Engine::Hummock => Engine::Hummock,
risingwave_sqlparser::ast::Engine::Iceberg => Engine::Iceberg,
};

let (mut graph, table, source, job_type) = generate_stream_graph_for_replace_table(
session,
table_name,
old_catalog,
format_encode,
handler_args.clone(),
new_definition,
col_id_gen,
columns.clone(),
wildcard_idx,
constraints,
source_watermarks,
append_only,
on_conflict,
with_version_column,
cdc_table_info,
include_column_options,
engine,
sql_column_strategy,
)
.await?;
Expand Down
Loading

0 comments on commit d98b06e

Please sign in to comment.