Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: add some comments and minor refactor #8

Merged
merged 1 commit into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/query/storages/common/blocks/src/parquet_rs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ pub fn blocks_to_parquet(
assert!(!blocks.is_empty());
let props = WriterProperties::builder()
.set_compression(compression.into())
// use `usize::MAX` to effectively limit the number of row groups to 1
.set_max_row_group_size(usize::MAX)
.set_encoding(Encoding::PLAIN)
.set_dictionary_enabled(false)
Expand Down
43 changes: 31 additions & 12 deletions src/query/storages/fuse/src/io/read/block/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,13 @@ impl BlockReader {
)?;
let mut columns = Vec::with_capacity(self.projected_schema.fields.len());
let name_paths = column_name_paths(&self.projection, &self.original_schema);

let array_cache = if self.put_cache {
CacheManager::instance().get_table_data_array_cache()
} else {
None
};

for ((i, field), column_node) in self
.projected_schema
.fields
Expand All @@ -67,23 +74,29 @@ impl BlockReader {
.zip(self.project_column_nodes.iter())
{
let data_type = field.data_type().into();
if column_node.is_nested
&& column_node
.leaf_column_ids
.iter()
.any(|id| matches!(column_chunks.get(id), Some(DataItem::ColumnArray(_))))
{
return Err(ErrorCode::StorageOther(
"unexpected nested field: nested leaf field hits cached",
));
}

// NOTE, there is something tricky here:
// - `column_chunks` always contains data of leaf columns
// - here we may processing a nested type field
// - But, even if the field being processed is a filed with multiple leaf columns
// `column_chunks.get(&field.column_id)` will still return Some(DataItem::_)[^1],
// even if we are getting data from `column_chunks` using a non-leaf
// `column_id` of `projected_schema.fields`
//
// [^1]: Except in the current block, there is no data stored for the
// corresponding field, and a default value has been declared for
// the corresponding field.
//
// Yes, it is too obscure, we need to polish it later.

let value = match column_chunks.get(&field.column_id) {
Some(DataItem::RawData(data)) => {
// get the deserialized arrow array, which may be a nested array
let arrow_array = column_by_name(&record_batch, &name_paths[i]);
let arrow2_array: Box<dyn databend_common_arrow::arrow::array::Array> =
arrow_array.into();
if self.put_cache && !column_node.is_nested {
if let Some(cache) = CacheManager::instance().get_table_data_array_cache() {
if !column_node.is_nested {
if let Some(cache) = &array_cache {
let meta = column_metas.get(&field.column_id).unwrap();
let (offset, len) = meta.offset_length();
let key =
Expand All @@ -94,6 +107,12 @@ impl BlockReader {
Value::Column(Column::from_arrow(arrow2_array.as_ref(), &data_type)?)
}
Some(DataItem::ColumnArray(cached)) => {
if column_node.is_nested {
// a defensive check, should never happen
return Err(ErrorCode::StorageOther(
"unexpected nested field: nested leaf field hits cached",
));
}
Value::Column(Column::from_arrow(cached.0.as_ref(), &data_type)?)
}
None => Value::Scalar(self.default_vals[i].clone()),
Expand Down
Loading