Skip to content

Commit

Permalink
refactor(types): remove type_name and sub_fields from Field (#20496)
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao authored Feb 20, 2025
1 parent 9160155 commit 2dbf6fb
Show file tree
Hide file tree
Showing 19 changed files with 33 additions and 199 deletions.
8 changes: 2 additions & 6 deletions src/common/src/catalog/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,12 +296,8 @@ impl ColumnDesc {
data_type: field.data_type.clone(),
column_id: ColumnId::new(id),
name: field.name.clone(),
field_descs: field
.sub_fields
.iter()
.map(Self::from_field_without_column_id)
.collect_vec(),
type_name: field.type_name.clone(),
field_descs: Vec::new(), // TODO: deprecate this
type_name: String::new(), // TODO: deprecate this
description: None,
generated_or_default_column: None,
additional_column: AdditionalColumn { column_type: None },
Expand Down
55 changes: 12 additions & 43 deletions src/common/src/catalog/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

use std::ops::Index;

use itertools::Itertools;
use risingwave_pb::plan_common::{PbColumnDesc, PbField};

use super::ColumnDesc;
Expand All @@ -27,20 +26,13 @@ use crate::util::iter_util::ZipEqFast;
pub struct Field {
pub data_type: DataType,
pub name: String,
/// For STRUCT type.
pub sub_fields: Vec<Field>,
/// The user-defined type's name, when the type is created from a protobuf schema file,
/// this field will store the message name.
pub type_name: String,
}

impl Field {
pub fn new(name: impl Into<String>, data_type: DataType) -> Self {
Self {
data_type,
name: name.into(),
sub_fields: vec![],
type_name: String::new(),
}
}
}
Expand All @@ -65,8 +57,6 @@ impl From<&ColumnDesc> for Field {
Self {
data_type: desc.data_type.clone(),
name: desc.name.clone(),
sub_fields: desc.field_descs.iter().map(|d| d.into()).collect_vec(),
type_name: desc.type_name.clone(),
}
}
}
Expand All @@ -76,12 +66,6 @@ impl From<ColumnDesc> for Field {
Self {
data_type: column_desc.data_type,
name: column_desc.name,
sub_fields: column_desc
.field_descs
.into_iter()
.map(Into::into)
.collect(),
type_name: column_desc.type_name,
}
}
}
Expand All @@ -91,8 +75,6 @@ impl From<&PbColumnDesc> for Field {
Self {
data_type: pb_column_desc.column_type.as_ref().unwrap().into(),
name: pb_column_desc.name.clone(),
sub_fields: pb_column_desc.field_descs.iter().map(Into::into).collect(),
type_name: pb_column_desc.type_name.clone(),
}
}
}
Expand Down Expand Up @@ -236,41 +218,21 @@ impl Schema {
}

impl Field {
// TODO: rename to `new`
pub fn with_name<S>(data_type: DataType, name: S) -> Self
where
S: Into<String>,
{
Self {
data_type,
name: name.into(),
sub_fields: vec![],
type_name: String::new(),
}
}

pub fn with_struct<S>(
data_type: DataType,
name: S,
sub_fields: Vec<Field>,
type_name: S,
) -> Self
where
S: Into<String>,
{
Self {
data_type,
name: name.into(),
sub_fields,
type_name: type_name.into(),
}
}

pub fn unnamed(data_type: DataType) -> Self {
Self {
data_type,
name: String::new(),
sub_fields: vec![],
type_name: String::new(),
}
}

Expand All @@ -282,8 +244,17 @@ impl Field {
Self {
data_type: desc.data_type.clone(),
name: format!("{}.{}", table_name, desc.name),
sub_fields: desc.field_descs.iter().map(|d| d.into()).collect_vec(),
type_name: desc.type_name.clone(),
}
}

/// Get the sub fields if the data type is a struct, otherwise return an empty vector.
pub fn sub_fields(&self) -> Vec<Field> {
if let DataType::Struct(st) = &self.data_type {
st.iter()
.map(|(name, data_type)| Field::with_name(data_type.clone(), name))
.collect()
} else {
Vec::new()
}
}
}
Expand All @@ -293,8 +264,6 @@ impl From<&PbField> for Field {
Self {
data_type: DataType::from(prost_field.get_data_type().expect("data type not found")),
name: prost_field.get_name().clone(),
sub_fields: vec![],
type_name: String::new(),
}
}
}
Expand Down
7 changes: 4 additions & 3 deletions src/connector/src/sink/big_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,10 +376,11 @@ impl BigQuerySink {
)))
}
DataType::Struct(_) => {
let mut sub_fields = Vec::with_capacity(rw_field.sub_fields.len());
for rw_field in &rw_field.sub_fields {
let rw_sub_fields = rw_field.sub_fields();
let mut sub_fields = Vec::with_capacity(rw_sub_fields.len());
for rw_field in &rw_sub_fields {
let field = Self::map_field(rw_field)?;
sub_fields.push(field)
sub_fields.push(field);
}
TableFieldSchema::record(&rw_field.name, sub_fields)
}
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/sink/clickhouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1073,7 +1073,7 @@ pub fn build_fields_name_type_from_schema(schema: &Schema) -> Result<Vec<(String
let mut vec = vec![];
for field in schema.fields() {
if matches!(field.data_type, DataType::Struct(_)) {
for i in &field.sub_fields {
for i in &field.sub_fields() {
if matches!(i.data_type, DataType::Struct(_)) {
return Err(SinkError::ClickHouse(
"Only one level of nesting is supported for struct".to_owned(),
Expand Down
4 changes: 0 additions & 4 deletions src/connector/src/sink/deltalake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -616,14 +616,10 @@ mod test {
Field {
data_type: DataType::Int32,
name: "id".into(),
sub_fields: vec![],
type_name: "".into(),
},
Field {
data_type: DataType::Varchar,
name: "name".into(),
sub_fields: vec![],
type_name: "".into(),
},
]);

Expand Down
55 changes: 0 additions & 55 deletions src/connector/src/sink/encoder/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,8 +445,6 @@ mod tests {
let mock_field = Field {
data_type: DataType::Boolean,
name: Default::default(),
sub_fields: Default::default(),
type_name: Default::default(),
};

let config = JsonEncoderConfig {
Expand Down Expand Up @@ -621,7 +619,6 @@ mod tests {
&Field {
data_type: DataType::Decimal,
name: "aaa".to_owned(),
..mock_field.clone()
},
Some(ScalarImpl::Decimal(Decimal::try_from(1.1111111).unwrap()).as_scalar_ref_impl()),
&doris_config,
Expand Down Expand Up @@ -721,57 +718,42 @@ mod tests {

#[test]
fn test_generate_json_converter_schema() {
let mock_field = Field {
data_type: DataType::Boolean,
name: Default::default(),
sub_fields: Default::default(),
type_name: Default::default(),
};
let fields = vec![
Field {
data_type: DataType::Boolean,
name: "v1".into(),
..mock_field.clone()
},
Field {
data_type: DataType::Int16,
name: "v2".into(),
..mock_field.clone()
},
Field {
data_type: DataType::Int32,
name: "v3".into(),
..mock_field.clone()
},
Field {
data_type: DataType::Float32,
name: "v4".into(),
..mock_field.clone()
},
Field {
data_type: DataType::Decimal,
name: "v5".into(),
..mock_field.clone()
},
Field {
data_type: DataType::Date,
name: "v6".into(),
..mock_field.clone()
},
Field {
data_type: DataType::Varchar,
name: "v7".into(),
..mock_field.clone()
},
Field {
data_type: DataType::Time,
name: "v8".into(),
..mock_field.clone()
},
Field {
data_type: DataType::Interval,
name: "v9".into(),
..mock_field.clone()
},
Field {
data_type: DataType::Struct(StructType::new(vec![
Expand All @@ -786,61 +768,24 @@ mod tests {
),
])),
name: "v10".into(),
sub_fields: vec![
Field {
data_type: DataType::Timestamp,
name: "a".into(),
..mock_field.clone()
},
Field {
data_type: DataType::Timestamptz,
name: "b".into(),
..mock_field.clone()
},
Field {
data_type: DataType::Struct(StructType::new(vec![
("aa", DataType::Int64),
("bb", DataType::Float64),
])),
name: "c".into(),
sub_fields: vec![
Field {
data_type: DataType::Int64,
name: "aa".into(),
..mock_field.clone()
},
Field {
data_type: DataType::Float64,
name: "bb".into(),
..mock_field.clone()
},
],
..mock_field.clone()
},
],
..mock_field.clone()
},
Field {
data_type: DataType::List(Box::new(DataType::List(Box::new(DataType::Struct(
StructType::new(vec![("aa", DataType::Int64), ("bb", DataType::Float64)]),
))))),
name: "v11".into(),
..mock_field.clone()
},
Field {
data_type: DataType::Jsonb,
name: "12".into(),
..mock_field.clone()
},
Field {
data_type: DataType::Serial,
name: "13".into(),
..mock_field.clone()
},
Field {
data_type: DataType::Int256,
name: "14".into(),
..mock_field.clone()
},
];
let schema = json_converter_with_schema(json!({}), "test".to_owned(), fields.iter())
Expand Down
19 changes: 0 additions & 19 deletions src/connector/src/sink/formatter/debezium_json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,14 +363,10 @@ mod tests {
Field {
data_type: DataType::Int32,
name: "v1".into(),
sub_fields: vec![],
type_name: "".into(),
},
Field {
data_type: DataType::Float32,
name: "v2".into(),
sub_fields: vec![],
type_name: "".into(),
},
Field {
data_type: StructType::new(vec![
Expand All @@ -379,21 +375,6 @@ mod tests {
])
.into(),
name: "v3".into(),
sub_fields: vec![
Field {
data_type: DataType::Int32,
name: "v4".into(),
sub_fields: vec![],
type_name: "".into(),
},
Field {
data_type: DataType::Float32,
name: "v5".into(),
sub_fields: vec![],
type_name: "".into(),
},
],
type_name: "".into(),
},
]);

Expand Down
4 changes: 0 additions & 4 deletions src/connector/src/sink/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -752,14 +752,10 @@ mod test {
Field {
data_type: DataType::Int32,
name: "id".into(),
sub_fields: vec![],
type_name: "".into(),
},
Field {
data_type: DataType::Varchar,
name: "v2".into(),
sub_fields: vec![],
type_name: "".into(),
},
]);

Expand Down
Loading

0 comments on commit 2dbf6fb

Please sign in to comment.