diff --git a/src/mito2/src/memtable/bulk/context.rs b/src/mito2/src/memtable/bulk/context.rs index a3c019ebec40..0380afd7e086 100644 --- a/src/mito2/src/memtable/bulk/context.rs +++ b/src/mito2/src/memtable/bulk/context.rs @@ -22,7 +22,7 @@ use store_api::metadata::RegionMetadataRef; use store_api::storage::ColumnId; use table::predicate::Predicate; -use crate::row_converter::DensePrimaryKeyCodec; +use crate::row_converter::{build_primary_key_codec, DensePrimaryKeyCodec}; use crate::sst::parquet::file_range::RangeBase; use crate::sst::parquet::format::ReadFormat; use crate::sst::parquet::reader::SimpleFilterContext; @@ -41,7 +41,7 @@ impl BulkIterContext { projection: &Option<&[ColumnId]>, predicate: Option, ) -> Self { - let codec = DensePrimaryKeyCodec::new(®ion_metadata); + let codec = build_primary_key_codec(®ion_metadata); let simple_filters = predicate .as_ref() diff --git a/src/mito2/src/memtable/bulk/part.rs b/src/mito2/src/memtable/bulk/part.rs index 6c132ce64458..07f5fda5295e 100644 --- a/src/mito2/src/memtable/bulk/part.rs +++ b/src/mito2/src/memtable/bulk/part.rs @@ -562,7 +562,7 @@ mod tests { let batch_values = batches .into_iter() .map(|b| { - let pk_values = pk_encoder.decode_dense(b.primary_key()).unwrap(); + let pk_values = pk_encoder.decode(b.primary_key()).unwrap().into_dense(); let timestamps = b .timestamps() .as_any() diff --git a/src/mito2/src/memtable/partition_tree.rs b/src/mito2/src/memtable/partition_tree.rs index 4bcd432d414a..78a8b7d847d9 100644 --- a/src/mito2/src/memtable/partition_tree.rs +++ b/src/mito2/src/memtable/partition_tree.rs @@ -31,9 +31,8 @@ use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering}; use std::sync::Arc; use common_base::readable_size::ReadableSize; -pub(crate) use primary_key_filter::DensePrimaryKeyFilter; +pub(crate) use primary_key_filter::{DensePrimaryKeyFilter, SparsePrimaryKeyFilter}; use serde::{Deserialize, Serialize}; -use store_api::codec::PrimaryKeyEncoding; use store_api::metadata::RegionMetadataRef; use store_api::storage::{ColumnId, SequenceNumber}; use table::predicate::Predicate; @@ -48,7 +47,7 @@ use crate::memtable::{ MemtableId, MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats, }; use crate::region::options::MergeMode; -use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodec}; +use crate::row_converter::{build_primary_key_codec, PrimaryKeyCodec}; /// Use `1/DICTIONARY_SIZE_FACTOR` of OS memory as dictionary size. pub(crate) const DICTIONARY_SIZE_FACTOR: u64 = 8; @@ -330,22 +329,14 @@ impl PartitionTreeMemtableBuilder { impl MemtableBuilder for PartitionTreeMemtableBuilder { fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef { - match metadata.primary_key_encoding { - PrimaryKeyEncoding::Dense => { - let codec = Arc::new(DensePrimaryKeyCodec::new(metadata)); - Arc::new(PartitionTreeMemtable::new( - id, - codec, - metadata.clone(), - self.write_buffer_manager.clone(), - &self.config, - )) - } - PrimaryKeyEncoding::Sparse => { - //TODO(weny): Implement sparse primary key encoding. - todo!() - } - } + let codec = build_primary_key_codec(metadata); + Arc::new(PartitionTreeMemtable::new( + id, + codec, + metadata.clone(), + self.write_buffer_manager.clone(), + &self.config, + )) } } @@ -382,7 +373,7 @@ mod tests { use store_api::storage::RegionId; use super::*; - use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt}; + use crate::row_converter::DensePrimaryKeyCodec; use crate::test_util::memtable_util::{ self, collect_iter_timestamps, region_metadata_to_row_schema, }; @@ -794,7 +785,7 @@ mod tests { let mut reader = new_memtable.iter(None, None, None).unwrap(); let batch = reader.next().unwrap().unwrap(); - let pk = codec.decode(batch.primary_key()).unwrap(); + let pk = codec.decode(batch.primary_key()).unwrap().into_dense(); if let Value::String(s) = &pk[2] { assert_eq!("10min", s.as_utf8()); } else { diff --git a/src/mito2/src/memtable/partition_tree/tree.rs b/src/mito2/src/memtable/partition_tree/tree.rs index d02b13ddb47a..4645ca7ab953 100644 --- a/src/mito2/src/memtable/partition_tree/tree.rs +++ b/src/mito2/src/memtable/partition_tree/tree.rs @@ -96,6 +96,21 @@ impl PartitionTree { } } + fn verify_primary_key_length(&self, kv: &KeyValue) -> Result<()> { + // The sparse primary key codec does not have a fixed number of fields. + if let Some(expected_num_fields) = self.row_codec.num_fields() { + ensure!( + expected_num_fields == kv.num_primary_keys(), + PrimaryKeyLengthMismatchSnafu { + expect: expected_num_fields, + actual: kv.num_primary_keys(), + } + ); + } + // TODO(weny): verify the primary key length for sparse primary key codec. + Ok(()) + } + // TODO(yingwen): The size computed from values is inaccurate. /// Write key-values into the tree. /// @@ -110,13 +125,7 @@ impl PartitionTree { let has_pk = !self.metadata.primary_key.is_empty(); for kv in kvs.iter() { - ensure!( - kv.num_primary_keys() == self.row_codec.num_fields(), - PrimaryKeyLengthMismatchSnafu { - expect: self.row_codec.num_fields(), - actual: kv.num_primary_keys(), - } - ); + self.verify_primary_key_length(&kv)?; // Safety: timestamp of kv must be both present and a valid timestamp value. let ts = kv.timestamp().as_timestamp().unwrap().unwrap().value(); metrics.min_ts = metrics.min_ts.min(ts); @@ -161,13 +170,7 @@ impl PartitionTree { ) -> Result<()> { let has_pk = !self.metadata.primary_key.is_empty(); - ensure!( - kv.num_primary_keys() == self.row_codec.num_fields(), - PrimaryKeyLengthMismatchSnafu { - expect: self.row_codec.num_fields(), - actual: kv.num_primary_keys(), - } - ); + self.verify_primary_key_length(&kv)?; // Safety: timestamp of kv must be both present and a valid timestamp value. let ts = kv.timestamp().as_timestamp().unwrap().unwrap().value(); metrics.min_ts = metrics.min_ts.min(ts); diff --git a/src/mito2/src/memtable/time_series.rs b/src/mito2/src/memtable/time_series.rs index a7c41648f39f..88fa058c6b59 100644 --- a/src/mito2/src/memtable/time_series.rs +++ b/src/mito2/src/memtable/time_series.rs @@ -51,7 +51,7 @@ use crate::metrics::{READ_ROWS_TOTAL, READ_STAGE_ELAPSED}; use crate::read::dedup::LastNonNullIter; use crate::read::{Batch, BatchBuilder, BatchColumn}; use crate::region::options::MergeMode; -use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt}; +use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt}; /// Initial vector builder capacity. const INITIAL_BUILDER_CAPACITY: usize = 0; @@ -146,12 +146,13 @@ impl TimeSeriesMemtable { fn write_key_value(&self, kv: KeyValue, stats: &mut WriteMetrics) -> Result<()> { ensure!( - kv.num_primary_keys() == self.row_codec.num_fields(), + self.row_codec.num_fields() == kv.num_primary_keys(), PrimaryKeyLengthMismatchSnafu { expect: self.row_codec.num_fields(), - actual: kv.num_primary_keys() + actual: kv.num_primary_keys(), } ); + let primary_key_encoded = self.row_codec.encode(kv.primary_keys())?; let fields = kv.fields().collect::>(); @@ -585,7 +586,7 @@ fn prune_primary_key( let pk_values = if let Some(pk_values) = series.pk_cache.as_ref() { pk_values } else { - let pk_values = codec.decode(pk); + let pk_values = codec.decode_dense_without_column_id(pk); if let Err(e) = pk_values { error!(e; "Failed to decode primary key"); return true; @@ -1176,7 +1177,12 @@ mod tests { let row_codec = Arc::new(DensePrimaryKeyCodec::with_fields( schema .primary_key_columns() - .map(|c| SortField::new(c.column_schema.data_type.clone())) + .map(|c| { + ( + c.column_id, + SortField::new(c.column_schema.data_type.clone()), + ) + }) .collect(), )); let set = Arc::new(SeriesSet::new(schema.clone(), row_codec)); diff --git a/src/mito2/src/read.rs b/src/mito2/src/read.rs index 6001d3062491..2f9bdff7b036 100644 --- a/src/mito2/src/read.rs +++ b/src/mito2/src/read.rs @@ -40,7 +40,7 @@ use datatypes::arrow::compute::SortOptions; use datatypes::arrow::row::{RowConverter, SortField}; use datatypes::prelude::{ConcreteDataType, DataType, ScalarVector}; use datatypes::types::TimestampType; -use datatypes::value::{Value, ValueRef}; +use datatypes::value::ValueRef; use datatypes::vectors::{ BooleanVector, Helper, TimestampMicrosecondVector, TimestampMillisecondVector, TimestampNanosecondVector, TimestampSecondVector, UInt32Vector, UInt64Vector, UInt8Vector, @@ -58,6 +58,7 @@ use crate::error::{ use crate::memtable::BoxedBatchIterator; use crate::metrics::{READ_BATCHES_RETURN, READ_ROWS_RETURN, READ_STAGE_ELAPSED}; use crate::read::prune::PruneReader; +use crate::row_converter::CompositeValues; /// Storage internal representation of a batch of rows for a primary key (time series). /// @@ -68,7 +69,7 @@ pub struct Batch { /// Primary key encoded in a comparable form. primary_key: Vec, /// Possibly decoded `primary_key` values. Some places would decode it in advance. - pk_values: Option>, + pk_values: Option, /// Timestamps of rows, should be sorted and not null. timestamps: VectorRef, /// Sequences of rows @@ -114,12 +115,12 @@ impl Batch { } /// Returns possibly decoded primary-key values. - pub fn pk_values(&self) -> Option<&[Value]> { - self.pk_values.as_deref() + pub fn pk_values(&self) -> Option<&CompositeValues> { + self.pk_values.as_ref() } /// Sets possibly decoded primary-key values. - pub fn set_pk_values(&mut self, pk_values: Vec) { + pub fn set_pk_values(&mut self, pk_values: CompositeValues) { self.pk_values = Some(pk_values); } diff --git a/src/mito2/src/read/compat.rs b/src/mito2/src/read/compat.rs index 1de5d624210c..c103bbaa9c94 100644 --- a/src/mito2/src/read/compat.rs +++ b/src/mito2/src/read/compat.rs @@ -15,6 +15,7 @@ //! Utilities to adapt readers with different schema. use std::collections::HashMap; +use std::sync::Arc; use datatypes::data_type::ConcreteDataType; use datatypes::value::Value; @@ -26,7 +27,10 @@ use store_api::storage::ColumnId; use crate::error::{CompatReaderSnafu, CreateDefaultSnafu, Result}; use crate::read::projection::ProjectionMapper; use crate::read::{Batch, BatchColumn, BatchReader}; -use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodec, SortField}; +use crate::row_converter::{ + build_primary_key_codec, build_primary_key_codec_with_fields, CompositeValues, PrimaryKeyCodec, + SortField, +}; /// Reader to adapt schema of underlying reader to expected schema. pub struct CompatReader { @@ -68,6 +72,8 @@ impl BatchReader for CompatReader { /// A helper struct to adapt schema of the batch to an expected schema. pub(crate) struct CompatBatch { + /// Optional primary key adapter. + rewrite_pk: Option, /// Optional primary key adapter. compat_pk: Option, /// Optional fields adapter. @@ -79,10 +85,12 @@ impl CompatBatch { /// - `mapper` is built from the metadata users expect to see. /// - `reader_meta` is the metadata of the input reader. pub(crate) fn new(mapper: &ProjectionMapper, reader_meta: RegionMetadataRef) -> Result { + let rewrite_pk = may_rewrite_primary_key(mapper.metadata(), &reader_meta); let compat_pk = may_compat_primary_key(mapper.metadata(), &reader_meta)?; let compat_fields = may_compat_fields(mapper, &reader_meta)?; Ok(Self { + rewrite_pk, compat_pk, compat_fields, }) @@ -90,6 +98,9 @@ impl CompatBatch { /// Adapts the `batch` to the expected schema. pub(crate) fn compat_batch(&self, mut batch: Batch) -> Result { + if let Some(rewrite_pk) = &self.rewrite_pk { + batch = rewrite_pk.compat(batch)?; + } if let Some(compat_pk) = &self.compat_pk { batch = compat_pk.compat(batch)?; } @@ -101,10 +112,15 @@ impl CompatBatch { } } -/// Returns true if `left` and `right` have same columns to read. -/// -/// It only consider column ids. -pub(crate) fn has_same_columns(left: &RegionMetadata, right: &RegionMetadata) -> bool { +/// Returns true if `left` and `right` have same columns and primary key encoding. +pub(crate) fn has_same_columns_and_pk_encoding( + left: &RegionMetadata, + right: &RegionMetadata, +) -> bool { + if left.primary_key_encoding != right.primary_key_encoding { + return false; + } + if left.column_metadatas.len() != right.column_metadatas.len() { return false; } @@ -127,16 +143,17 @@ pub(crate) fn has_same_columns(left: &RegionMetadata, right: &RegionMetadata) -> #[derive(Debug)] struct CompatPrimaryKey { /// Row converter to append values to primary keys. - converter: DensePrimaryKeyCodec, + converter: Arc, /// Default values to append. - values: Vec, + values: Vec<(ColumnId, Value)>, } impl CompatPrimaryKey { /// Make primary key of the `batch` compatible. fn compat(&self, mut batch: Batch) -> Result { - let mut buffer = - Vec::with_capacity(batch.primary_key().len() + self.converter.estimated_size()); + let mut buffer = Vec::with_capacity( + batch.primary_key().len() + self.converter.estimated_size().unwrap_or_default(), + ); buffer.extend_from_slice(batch.primary_key()); self.converter.encode_values(&self.values, &mut buffer)?; @@ -144,9 +161,7 @@ impl CompatPrimaryKey { // update cache if let Some(pk_values) = &mut batch.pk_values { - for value in &self.values { - pk_values.push(value.clone()); - } + pk_values.extend(&self.values); } Ok(batch) @@ -211,6 +226,25 @@ impl CompatFields { } } +fn may_rewrite_primary_key( + expect: &RegionMetadata, + actual: &RegionMetadata, +) -> Option { + if expect.primary_key_encoding == actual.primary_key_encoding { + return None; + } + + let fields = expect.primary_key.clone(); + let original = build_primary_key_codec(actual); + let new = build_primary_key_codec(expect); + + Some(RewritePrimaryKey { + original, + new, + fields, + }) +} + /// Creates a [CompatPrimaryKey] if needed. fn may_compat_primary_key( expect: &RegionMetadata, @@ -248,7 +282,10 @@ fn may_compat_primary_key( for column_id in to_add { // Safety: The id comes from expect region metadata. let column = expect.column_by_id(*column_id).unwrap(); - fields.push(SortField::new(column.column_schema.data_type.clone())); + fields.push(( + *column_id, + SortField::new(column.column_schema.data_type.clone()), + )); let default_value = column .column_schema .create_default() @@ -263,9 +300,11 @@ fn may_compat_primary_key( column.column_schema.name ), })?; - values.push(default_value); + values.push((*column_id, default_value)); } - let converter = DensePrimaryKeyCodec::with_fields(fields); + // Using expect primary key encoding to build the converter + let converter = + build_primary_key_codec_with_fields(expect.primary_key_encoding, fields.into_iter()); Ok(Some(CompatPrimaryKey { converter, values })) } @@ -350,6 +389,53 @@ enum IndexOrDefault { }, } +/// Adapter to rewrite primary key. +struct RewritePrimaryKey { + /// Original primary key codec. + original: Arc, + /// New primary key codec. + new: Arc, + /// Order of the fields in the new primary key. + fields: Vec, +} + +impl RewritePrimaryKey { + /// Make primary key of the `batch` compatible. + fn compat(&self, mut batch: Batch) -> Result { + let values = if let Some(pk_values) = batch.pk_values() { + pk_values + } else { + let new_pk_values = self.original.decode(batch.primary_key())?; + batch.set_pk_values(new_pk_values); + // Safety: We ensure pk_values is not None. + batch.pk_values().as_ref().unwrap() + }; + + let mut buffer = Vec::with_capacity( + batch.primary_key().len() + self.new.estimated_size().unwrap_or_default(), + ); + match values { + CompositeValues::Dense(values) => { + self.new.encode_values(values.as_slice(), &mut buffer)?; + } + CompositeValues::Sparse(values) => { + let values = self + .fields + .iter() + .map(|id| { + let value = values.get_or_null(*id); + (*id, value.as_value_ref()) + }) + .collect::>(); + self.new.encode_value_refs(&values, &mut buffer)?; + } + } + batch.set_primary_key(buffer); + + Ok(batch) + } +} + #[cfg(test)] mod tests { use std::sync::Arc; @@ -359,11 +445,12 @@ mod tests { use datatypes::schema::ColumnSchema; use datatypes::value::ValueRef; use datatypes::vectors::{Int64Vector, TimestampMillisecondVector, UInt64Vector, UInt8Vector}; + use store_api::codec::PrimaryKeyEncoding; use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder}; use store_api::storage::RegionId; use super::*; - use crate::row_converter::PrimaryKeyCodecExt; + use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt, SparsePrimaryKeyCodec}; use crate::test_util::{check_reader_result, VecBatchReader}; /// Creates a new [RegionMetadata]. @@ -396,7 +483,7 @@ mod tests { /// Encode primary key. fn encode_key(keys: &[Option<&str>]) -> Vec { let fields = (0..keys.len()) - .map(|_| SortField::new(ConcreteDataType::string_datatype())) + .map(|_| (0, SortField::new(ConcreteDataType::string_datatype()))) .collect(); let converter = DensePrimaryKeyCodec::with_fields(fields); let row = keys.iter().map(|str_opt| match str_opt { @@ -407,6 +494,24 @@ mod tests { converter.encode(row).unwrap() } + /// Encode sparse primary key. + fn encode_sparse_key(keys: &[(ColumnId, Option<&str>)]) -> Vec { + let fields = (0..keys.len()) + .map(|_| (1, SortField::new(ConcreteDataType::string_datatype()))) + .collect(); + let converter = SparsePrimaryKeyCodec::with_fields(fields); + let row = keys + .iter() + .map(|(id, str_opt)| match str_opt { + Some(v) => (*id, ValueRef::String(v)), + None => (*id, ValueRef::Null), + }) + .collect::>(); + let mut buffer = vec![]; + converter.encode_value_refs(&row, &mut buffer).unwrap(); + buffer + } + /// Creates a batch for specific primary `key`. /// /// `fields`: [(column_id of the field, is null)] @@ -526,6 +631,25 @@ mod tests { .is_none()); } + #[test] + fn test_same_pk_encoding() { + let reader_meta = Arc::new(new_metadata( + &[ + ( + 0, + SemanticType::Timestamp, + ConcreteDataType::timestamp_millisecond_datatype(), + ), + (1, SemanticType::Tag, ConcreteDataType::string_datatype()), + ], + &[1], + )); + + assert!(may_compat_primary_key(&reader_meta, &reader_meta) + .unwrap() + .is_none()); + } + #[test] fn test_same_fields() { let reader_meta = Arc::new(new_metadata( @@ -747,4 +871,58 @@ mod tests { ) .await; } + + #[tokio::test] + async fn test_compat_reader_different_pk_encoding() { + let mut reader_meta = new_metadata( + &[ + ( + 0, + SemanticType::Timestamp, + ConcreteDataType::timestamp_millisecond_datatype(), + ), + (1, SemanticType::Tag, ConcreteDataType::string_datatype()), + (2, SemanticType::Field, ConcreteDataType::int64_datatype()), + ], + &[1], + ); + reader_meta.primary_key_encoding = PrimaryKeyEncoding::Dense; + let reader_meta = Arc::new(reader_meta); + let mut expect_meta = new_metadata( + &[ + ( + 0, + SemanticType::Timestamp, + ConcreteDataType::timestamp_millisecond_datatype(), + ), + (1, SemanticType::Tag, ConcreteDataType::string_datatype()), + (2, SemanticType::Field, ConcreteDataType::int64_datatype()), + (3, SemanticType::Tag, ConcreteDataType::string_datatype()), + (4, SemanticType::Field, ConcreteDataType::int64_datatype()), + ], + &[1, 3], + ); + expect_meta.primary_key_encoding = PrimaryKeyEncoding::Sparse; + let expect_meta = Arc::new(expect_meta); + + let mapper = ProjectionMapper::all(&expect_meta).unwrap(); + let k1 = encode_key(&[Some("a")]); + let k2 = encode_key(&[Some("b")]); + let source_reader = VecBatchReader::new(&[ + new_batch(&k1, &[(2, false)], 1000, 3), + new_batch(&k2, &[(2, false)], 1000, 3), + ]); + + let mut compat_reader = CompatReader::new(&mapper, reader_meta, source_reader).unwrap(); + let k1 = encode_sparse_key(&[(1, Some("a")), (3, None)]); + let k2 = encode_sparse_key(&[(1, Some("b")), (3, None)]); + check_reader_result( + &mut compat_reader, + &[ + new_batch(&k1, &[(2, false), (4, true)], 1000, 3), + new_batch(&k2, &[(2, false), (4, true)], 1000, 3), + ], + ) + .await; + } } diff --git a/src/mito2/src/read/projection.rs b/src/mito2/src/read/projection.rs index 4ffc021e42cd..883f55406644 100644 --- a/src/mito2/src/read/projection.rs +++ b/src/mito2/src/read/projection.rs @@ -33,7 +33,7 @@ use store_api::storage::ColumnId; use crate::cache::CacheStrategy; use crate::error::{InvalidRequestSnafu, Result}; use crate::read::Batch; -use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodec}; +use crate::row_converter::{build_primary_key_codec, CompositeValues, PrimaryKeyCodec}; /// Only cache vector when its length `<=` this value. const MAX_VECTOR_LENGTH_TO_CACHE: usize = 16384; @@ -47,7 +47,7 @@ pub struct ProjectionMapper { /// Output record batch contains tags. has_tags: bool, /// Decoder for primary key. - codec: DensePrimaryKeyCodec, + codec: Arc, /// Schema for converted [RecordBatch]. output_schema: SchemaRef, /// Ids of columns to project. It keeps ids in the same order as the `projection` @@ -92,8 +92,8 @@ impl ProjectionMapper { // Safety: idx is valid. column_schemas.push(metadata.schema.column_schemas()[*idx].clone()); } - let codec = DensePrimaryKeyCodec::new(metadata); + let codec = build_primary_key_codec(metadata); if is_empty_projection { // If projection is empty, we don't output any column. return Ok(ProjectionMapper { @@ -134,7 +134,7 @@ impl ProjectionMapper { has_tags = true; // We always read all primary key so the column always exists and the tag // index is always valid. - BatchIndex::Tag(index) + BatchIndex::Tag((index, column.column_id)) } SemanticType::Timestamp => BatchIndex::Timestamp, SemanticType::Field => { @@ -213,15 +213,15 @@ impl ProjectionMapper { // Skips decoding pk if we don't need to output it. let pk_values = if self.has_tags { match batch.pk_values() { - Some(v) => v.to_vec(), + Some(v) => v.clone(), None => self .codec - .decode_dense(batch.primary_key()) + .decode(batch.primary_key()) .map_err(BoxedError::new) .context(ExternalSnafu)?, } } else { - Vec::new() + CompositeValues::Dense(vec![]) }; let mut columns = Vec::with_capacity(self.output_schema.num_columns()); @@ -232,8 +232,11 @@ impl ProjectionMapper { .zip(self.output_schema.column_schemas()) { match index { - BatchIndex::Tag(idx) => { - let value = &pk_values[*idx]; + BatchIndex::Tag((idx, column_id)) => { + let value = match &pk_values { + CompositeValues::Dense(v) => &v[*idx].1, + CompositeValues::Sparse(v) => v.get_or_null(*column_id), + }; let vector = repeated_vector_with_cache( &column_schema.data_type, value, @@ -259,7 +262,7 @@ impl ProjectionMapper { #[derive(Debug, Clone, Copy)] enum BatchIndex { /// Index in primary keys. - Tag(usize), + Tag((usize, ColumnId)), /// The time index column. Timestamp, /// Index in fields. @@ -321,7 +324,7 @@ mod tests { use super::*; use crate::cache::CacheManager; use crate::read::BatchBuilder; - use crate::row_converter::{PrimaryKeyCodecExt, SortField}; + use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt, SortField}; use crate::test_util::meta_util::TestRegionMetadataBuilder; fn new_batch( @@ -332,7 +335,12 @@ mod tests { ) -> Batch { let converter = DensePrimaryKeyCodec::with_fields( (0..tags.len()) - .map(|_| SortField::new(ConcreteDataType::int64_datatype())) + .map(|idx| { + ( + idx as u32, + SortField::new(ConcreteDataType::int64_datatype()), + ) + }) .collect(), ); let primary_key = converter diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index b5c4eecd0c0d..193e3c3e1764 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -767,7 +767,7 @@ impl ScanInput { } } }; - if !compat::has_same_columns( + if !compat::has_same_columns_and_pk_encoding( self.mapper.metadata(), file_range_ctx.read_format().metadata(), ) { diff --git a/src/mito2/src/row_converter.rs b/src/mito2/src/row_converter.rs index 75f015d4494f..4d0635d3cc45 100644 --- a/src/mito2/src/row_converter.rs +++ b/src/mito2/src/row_converter.rs @@ -13,10 +13,8 @@ // limitations under the License. mod dense; -// TODO(weny): remove it. -#[allow(unused)] mod sparse; - +use std::fmt::Debug; use std::sync::Arc; use common_recordbatch::filter::SimpleFilterEvaluator; @@ -24,7 +22,8 @@ use datatypes::value::{Value, ValueRef}; pub use dense::{DensePrimaryKeyCodec, SortField}; pub use sparse::{SparsePrimaryKeyCodec, SparseValues}; use store_api::codec::PrimaryKeyEncoding; -use store_api::metadata::RegionMetadataRef; +use store_api::metadata::{RegionMetadata, RegionMetadataRef}; +use store_api::storage::ColumnId; use crate::error::Result; use crate::memtable::key_values::KeyValue; @@ -49,9 +48,6 @@ pub trait PrimaryKeyCodecExt { fn encode_to_vec<'a, I>(&self, row: I, buffer: &mut Vec) -> Result<()> where I: Iterator>; - - /// Decode row values from bytes. - fn decode(&self, bytes: &[u8]) -> Result>; } pub trait PrimaryKeyFilter: Send + Sync { @@ -59,15 +55,63 @@ pub trait PrimaryKeyFilter: Send + Sync { fn matches(&mut self, pk: &[u8]) -> bool; } -pub trait PrimaryKeyCodec: Send + Sync { +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum CompositeValues { + Dense(Vec<(ColumnId, Value)>), + Sparse(SparseValues), +} + +impl CompositeValues { + /// Extends the composite values with the given values. + pub fn extend(&mut self, values: &[(ColumnId, Value)]) { + match self { + CompositeValues::Dense(dense_values) => { + for (column_id, value) in values { + dense_values.push((*column_id, value.clone())); + } + } + CompositeValues::Sparse(sprase_value) => { + for (column_id, value) in values { + sprase_value.insert(*column_id, value.clone()); + } + } + } + } +} + +#[cfg(test)] +impl CompositeValues { + pub fn into_sparse(self) -> SparseValues { + match self { + CompositeValues::Sparse(v) => v, + _ => panic!("CompositeValues is not sparse"), + } + } + + pub fn into_dense(self) -> Vec { + match self { + CompositeValues::Dense(v) => v.into_iter().map(|(_, v)| v).collect(), + _ => panic!("CompositeValues is not dense"), + } + } +} + +pub trait PrimaryKeyCodec: Send + Sync + Debug { /// Encodes a key value to bytes. fn encode_key_value(&self, key_value: &KeyValue, buffer: &mut Vec) -> Result<()>; /// Encodes values to bytes. - fn encode_values(&self, values: &[Value], buffer: &mut Vec) -> Result<()>; + fn encode_values(&self, values: &[(ColumnId, Value)], buffer: &mut Vec) -> Result<()>; + + /// Encodes values to bytes. + fn encode_value_refs( + &self, + values: &[(ColumnId, ValueRef)], + buffer: &mut Vec, + ) -> Result<()>; /// Returns the number of fields in the primary key. - fn num_fields(&self) -> usize; + fn num_fields(&self) -> Option; /// Returns a primary key filter factory. fn primary_key_filter( @@ -86,9 +130,33 @@ pub trait PrimaryKeyCodec: Send + Sync { /// Decodes the primary key from the given bytes. /// - /// Returns a [`Vec`] that follows the primary key ordering. - fn decode_dense(&self, bytes: &[u8]) -> Result>; + /// Returns a [`CompositeValues`] that follows the primary key ordering. + fn decode(&self, bytes: &[u8]) -> Result; /// Decode the leftmost value from bytes. fn decode_leftmost(&self, bytes: &[u8]) -> Result>; } + +/// Builds a primary key codec from region metadata. +pub fn build_primary_key_codec(region_metadata: &RegionMetadata) -> Arc { + let fields = region_metadata.primary_key_columns().map(|col| { + ( + col.column_id, + SortField::new(col.column_schema.data_type.clone()), + ) + }); + build_primary_key_codec_with_fields(region_metadata.primary_key_encoding, fields) +} + +/// Builds a primary key codec from region metadata. +pub fn build_primary_key_codec_with_fields( + encoding: PrimaryKeyEncoding, + fields: impl Iterator, +) -> Arc { + match encoding { + PrimaryKeyEncoding::Dense => Arc::new(DensePrimaryKeyCodec::with_fields(fields.collect())), + PrimaryKeyEncoding::Sparse => { + Arc::new(SparsePrimaryKeyCodec::with_fields(fields.collect())) + } + } +} diff --git a/src/mito2/src/row_converter/dense.rs b/src/mito2/src/row_converter/dense.rs index 5c21428523f8..8c3d497d7e21 100644 --- a/src/mito2/src/row_converter/dense.rs +++ b/src/mito2/src/row_converter/dense.rs @@ -30,8 +30,9 @@ use serde::{Deserialize, Serialize}; use snafu::ResultExt; use store_api::codec::PrimaryKeyEncoding; use store_api::metadata::{RegionMetadata, RegionMetadataRef}; +use store_api::storage::ColumnId; -use super::PrimaryKeyFilter; +use super::{CompositeValues, PrimaryKeyFilter}; use crate::error::{ self, FieldTypeMismatchSnafu, NotSupportedFieldSnafu, Result, SerializeFieldSnafu, }; @@ -312,34 +313,31 @@ impl PrimaryKeyCodecExt for DensePrimaryKeyCodec { { self.encode_dense(row, buffer) } - - fn decode(&self, bytes: &[u8]) -> Result> { - self.decode_dense(bytes) - } } /// A memory-comparable row [`Value`] encoder/decoder. #[derive(Clone, Debug)] pub struct DensePrimaryKeyCodec { /// Primary key fields. - ordered_primary_key_columns: Arc>, + ordered_primary_key_columns: Arc>, } impl DensePrimaryKeyCodec { pub fn new(metadata: &RegionMetadata) -> Self { - let ordered_primary_key_columns = Arc::new( - metadata - .primary_key_columns() - .map(|c| SortField::new(c.column_schema.data_type.clone())) - .collect::>(), - ); - - Self { - ordered_primary_key_columns, - } + let ordered_primary_key_columns = metadata + .primary_key_columns() + .map(|c| { + ( + c.column_id, + SortField::new(c.column_schema.data_type.clone()), + ) + }) + .collect::>(); + + Self::with_fields(ordered_primary_key_columns) } - pub fn with_fields(fields: Vec) -> Self { + pub fn with_fields(fields: Vec<(ColumnId, SortField)>) -> Self { Self { ordered_primary_key_columns: Arc::new(fields), } @@ -350,12 +348,42 @@ impl DensePrimaryKeyCodec { I: Iterator>, { let mut serializer = Serializer::new(buffer); - for (value, field) in row.zip(self.ordered_primary_key_columns.iter()) { + for (value, (_, field)) in row.zip(self.ordered_primary_key_columns.iter()) { field.serialize(&mut serializer, &value)?; } Ok(()) } + /// Decode primary key values from bytes. + pub fn decode_dense(&self, bytes: &[u8]) -> Result> { + let mut deserializer = Deserializer::new(bytes); + let mut values = Vec::with_capacity(self.ordered_primary_key_columns.len()); + for (column_id, field) in self.ordered_primary_key_columns.iter() { + let value = field.deserialize(&mut deserializer)?; + values.push((*column_id, value)); + } + Ok(values) + } + + /// Decode primary key values from bytes without column id. + pub fn decode_dense_without_column_id(&self, bytes: &[u8]) -> Result> { + let mut deserializer = Deserializer::new(bytes); + let mut values = Vec::with_capacity(self.ordered_primary_key_columns.len()); + for (_, field) in self.ordered_primary_key_columns.iter() { + let value = field.deserialize(&mut deserializer)?; + values.push(value); + } + Ok(values) + } + + /// Returns the field at `pos`. + /// + /// # Panics + /// Panics if `pos` is out of bounds. + fn field_at(&self, pos: usize) -> &SortField { + &self.ordered_primary_key_columns[pos].1 + } + /// Decode value at `pos` in `bytes`. /// /// The i-th element in offsets buffer is how many bytes to skip in order to read value at `pos`. @@ -370,7 +398,7 @@ impl DensePrimaryKeyCodec { // We computed the offset before. let to_skip = offsets_buf[pos]; deserializer.advance(to_skip); - return self.ordered_primary_key_columns[pos].deserialize(&mut deserializer); + return self.field_at(pos).deserialize(&mut deserializer); } if offsets_buf.is_empty() { @@ -379,7 +407,8 @@ impl DensePrimaryKeyCodec { for i in 0..pos { // Offset to skip before reading value i. offsets_buf.push(offset); - let skip = self.ordered_primary_key_columns[i] + let skip = self + .field_at(i) .skip_deserialize(bytes, &mut deserializer)?; offset += skip; } @@ -393,7 +422,8 @@ impl DensePrimaryKeyCodec { deserializer.advance(offset); for i in value_start..pos { // Skip value i. - let skip = self.ordered_primary_key_columns[i] + let skip = self + .field_at(i) .skip_deserialize(bytes, &mut deserializer)?; // Offset for the value at i + 1. offset += skip; @@ -401,15 +431,19 @@ impl DensePrimaryKeyCodec { } } - self.ordered_primary_key_columns[pos].deserialize(&mut deserializer) + self.field_at(pos).deserialize(&mut deserializer) } pub fn estimated_size(&self) -> usize { self.ordered_primary_key_columns .iter() - .map(|f| f.estimated_size()) + .map(|(_, f)| f.estimated_size()) .sum() } + + pub fn num_fields(&self) -> usize { + self.ordered_primary_key_columns.len() + } } impl PrimaryKeyCodec for DensePrimaryKeyCodec { @@ -417,16 +451,25 @@ impl PrimaryKeyCodec for DensePrimaryKeyCodec { self.encode_dense(key_value.primary_keys(), buffer) } - fn encode_values(&self, values: &[Value], buffer: &mut Vec) -> Result<()> { - self.encode_dense(values.iter().map(|v| v.as_value_ref()), buffer) + fn encode_values(&self, values: &[(ColumnId, Value)], buffer: &mut Vec) -> Result<()> { + self.encode_dense(values.iter().map(|(_, v)| v.as_value_ref()), buffer) + } + + fn encode_value_refs( + &self, + values: &[(ColumnId, ValueRef)], + buffer: &mut Vec, + ) -> Result<()> { + let iter = values.iter().map(|(_, v)| *v); + self.encode_dense(iter, buffer) } fn estimated_size(&self) -> Option { Some(self.estimated_size()) } - fn num_fields(&self) -> usize { - self.ordered_primary_key_columns.len() + fn num_fields(&self) -> Option { + Some(self.num_fields()) } fn encoding(&self) -> PrimaryKeyEncoding { @@ -445,20 +488,14 @@ impl PrimaryKeyCodec for DensePrimaryKeyCodec { )) } - fn decode_dense(&self, bytes: &[u8]) -> Result> { - let mut deserializer = Deserializer::new(bytes); - let mut values = Vec::with_capacity(self.ordered_primary_key_columns.len()); - for f in self.ordered_primary_key_columns.iter() { - let value = f.deserialize(&mut deserializer)?; - values.push(value); - } - Ok(values) + fn decode(&self, bytes: &[u8]) -> Result { + Ok(CompositeValues::Dense(self.decode_dense(bytes)?)) } fn decode_leftmost(&self, bytes: &[u8]) -> Result> { // TODO(weny, yinwen): avoid decoding the whole primary key. let mut values = self.decode_dense(bytes)?; - Ok(values.pop()) + Ok(values.pop().map(|(_, v)| v)) } } @@ -476,14 +513,14 @@ mod tests { let encoder = DensePrimaryKeyCodec::with_fields( data_types .iter() - .map(|t| SortField::new(t.clone())) + .map(|t| (0, SortField::new(t.clone()))) .collect::>(), ); let value_ref = row.iter().map(|v| v.as_value_ref()).collect::>(); let result = encoder.encode(value_ref.iter().cloned()).unwrap(); - let decoded = encoder.decode(&result).unwrap(); + let decoded = encoder.decode(&result).unwrap().into_dense(); assert_eq!(decoded, row); let mut decoded = Vec::new(); let mut offsets = Vec::new(); @@ -502,14 +539,14 @@ mod tests { #[test] fn test_memcmp() { let encoder = DensePrimaryKeyCodec::with_fields(vec![ - SortField::new(ConcreteDataType::string_datatype()), - SortField::new(ConcreteDataType::int64_datatype()), + (0, SortField::new(ConcreteDataType::string_datatype())), + (1, SortField::new(ConcreteDataType::int64_datatype())), ]); let values = [Value::String("abcdefgh".into()), Value::Int64(128)]; let value_ref = values.iter().map(|v| v.as_value_ref()).collect::>(); let result = encoder.encode(value_ref.iter().cloned()).unwrap(); - let decoded = encoder.decode(&result).unwrap(); + let decoded = encoder.decode(&result).unwrap().into_dense(); assert_eq!(&values, &decoded as &[Value]); } diff --git a/src/mito2/src/row_converter/sparse.rs b/src/mito2/src/row_converter/sparse.rs index 6beca6412a6b..91a5623110d3 100644 --- a/src/mito2/src/row_converter/sparse.rs +++ b/src/mito2/src/row_converter/sparse.rs @@ -15,25 +15,30 @@ use std::collections::{HashMap, HashSet}; use std::sync::Arc; +use common_recordbatch::filter::SimpleFilterEvaluator; use datatypes::prelude::ConcreteDataType; use datatypes::value::{Value, ValueRef}; use memcomparable::{Deserializer, Serializer}; use serde::{Deserialize, Serialize}; use snafu::ResultExt; +use store_api::codec::PrimaryKeyEncoding; use store_api::metadata::RegionMetadataRef; use store_api::storage::consts::ReservedColumnId; use store_api::storage::ColumnId; -use crate::error::{DeserializeFieldSnafu, Result, SerializeFieldSnafu}; +use crate::error::{DeserializeFieldSnafu, Result, SerializeFieldSnafu, UnsupportedOperationSnafu}; +use crate::memtable::key_values::KeyValue; +use crate::memtable::partition_tree::SparsePrimaryKeyFilter; use crate::row_converter::dense::SortField; -use crate::row_converter::PrimaryKeyCodec; +use crate::row_converter::{CompositeValues, PrimaryKeyCodec, PrimaryKeyFilter}; /// A codec for sparse key of metrics. -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct SparsePrimaryKeyCodec { inner: Arc, } +#[derive(Debug)] struct SparsePrimaryKeyCodecInner { // Internal fields table_id_field: SortField, @@ -66,6 +71,11 @@ impl SparseValues { self.values.get(&column_id).unwrap_or(&Value::Null) } + /// Returns the value of the given column, or [`None`] if the column is not present. + pub fn get(&self, column_id: &ColumnId) -> Option<&Value> { + self.values.get(column_id) + } + /// Inserts a new value into the [`SparseValues`]. pub fn insert(&mut self, column_id: ColumnId, value: Value) { self.values.insert(column_id, value); @@ -111,6 +121,17 @@ impl SparsePrimaryKeyCodec { } } + pub fn with_fields(fields: Vec<(ColumnId, SortField)>) -> Self { + Self { + inner: Arc::new(SparsePrimaryKeyCodecInner { + columns: Some(fields.iter().map(|f| f.0).collect()), + table_id_field: SortField::new(ConcreteDataType::uint32_datatype()), + tsid_field: SortField::new(ConcreteDataType::uint64_datatype()), + label_field: SortField::new(ConcreteDataType::string_datatype()), + }), + } + } + /// Returns the field of the given column id. fn get_field(&self, column_id: ColumnId) -> Option<&SortField> { // if the `columns` is not specified, all unknown columns is primary key(label field). @@ -224,6 +245,59 @@ impl SparsePrimaryKeyCodec { } } +impl PrimaryKeyCodec for SparsePrimaryKeyCodec { + fn encode_key_value(&self, _key_value: &KeyValue, _buffer: &mut Vec) -> Result<()> { + UnsupportedOperationSnafu { + err_msg: "The encode_key_value method is not supported in SparsePrimaryKeyCodec.", + } + .fail() + } + + fn encode_values(&self, values: &[(ColumnId, Value)], buffer: &mut Vec) -> Result<()> { + self.encode_to_vec(values.iter().map(|v| (v.0, v.1.as_value_ref())), buffer) + } + + fn encode_value_refs( + &self, + values: &[(ColumnId, ValueRef)], + buffer: &mut Vec, + ) -> Result<()> { + self.encode_to_vec(values.iter().map(|v| (v.0, v.1)), buffer) + } + + fn estimated_size(&self) -> Option { + None + } + + fn num_fields(&self) -> Option { + None + } + + fn encoding(&self) -> PrimaryKeyEncoding { + PrimaryKeyEncoding::Sparse + } + + fn primary_key_filter( + &self, + metadata: &RegionMetadataRef, + filters: Arc>, + ) -> Box { + Box::new(SparsePrimaryKeyFilter::new( + metadata.clone(), + filters, + self.clone(), + )) + } + + fn decode(&self, bytes: &[u8]) -> Result { + Ok(CompositeValues::Sparse(self.decode_sparse(bytes)?)) + } + + fn decode_leftmost(&self, bytes: &[u8]) -> Result> { + self.decode_leftmost(bytes) + } +} + #[cfg(test)] mod tests { use std::sync::Arc; diff --git a/src/mito2/src/sst/index/bloom_filter/creator.rs b/src/mito2/src/sst/index/bloom_filter/creator.rs index 0f97ea102711..3dfe15dfd5af 100644 --- a/src/mito2/src/sst/index/bloom_filter/creator.rs +++ b/src/mito2/src/sst/index/bloom_filter/creator.rs @@ -30,7 +30,7 @@ use crate::error::{ PuffinAddBlobSnafu, PushBloomFilterValueSnafu, Result, }; use crate::read::Batch; -use crate::row_converter::SortField; +use crate::row_converter::{CompositeValues, SortField}; use crate::sst::file::FileId; use crate::sst::index::bloom_filter::INDEX_BLOB_TYPE; use crate::sst::index::codec::{IndexValueCodec, IndexValuesCodec}; @@ -108,7 +108,10 @@ impl BloomFilterIndexer { return Ok(None); } - let codec = IndexValuesCodec::from_tag_columns(metadata.primary_key_columns()); + let codec = IndexValuesCodec::from_tag_columns( + metadata.primary_key_encoding, + metadata.primary_key_columns(), + ); let indexer = Self { creators, temp_file_provider, @@ -192,11 +195,26 @@ impl BloomFilterIndexer { let n = batch.num_rows(); guard.inc_row_count(n); + // TODO(weny, zhenchi): lazy decode + let values = self.codec.decode(batch.primary_key())?; // Tags - for ((col_id, _), field, value) in self.codec.decode(batch.primary_key())? { + for (idx, (col_id, field)) in self.codec.fields().iter().enumerate() { let Some(creator) = self.creators.get_mut(col_id) else { continue; }; + + let value = match &values { + CompositeValues::Dense(vec) => { + let value = &vec[idx].1; + if value.is_null() { + None + } else { + Some(value) + } + } + CompositeValues::Sparse(sparse_values) => sparse_values.get(col_id), + }; + let elems = value .map(|v| { let mut buf = vec![]; @@ -411,7 +429,7 @@ pub(crate) mod tests { } pub fn new_batch(str_tag: impl AsRef, u64_field: impl IntoIterator) -> Batch { - let fields = vec![SortField::new(ConcreteDataType::string_datatype())]; + let fields = vec![(0, SortField::new(ConcreteDataType::string_datatype()))]; let codec = DensePrimaryKeyCodec::with_fields(fields); let row: [ValueRef; 1] = [str_tag.as_ref().into()]; let primary_key = codec.encode(row.into_iter()).unwrap(); diff --git a/src/mito2/src/sst/index/codec.rs b/src/mito2/src/sst/index/codec.rs index 23702ba41448..5d08cc7b2934 100644 --- a/src/mito2/src/sst/index/codec.rs +++ b/src/mito2/src/sst/index/codec.rs @@ -12,15 +12,21 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; +use std::sync::Arc; + use datatypes::data_type::ConcreteDataType; -use datatypes::value::{Value, ValueRef}; +use datatypes::value::ValueRef; use memcomparable::Serializer; use snafu::{ensure, OptionExt, ResultExt}; +use store_api::codec::PrimaryKeyEncoding; use store_api::metadata::ColumnMetadata; use store_api::storage::ColumnId; use crate::error::{FieldTypeMismatchSnafu, IndexEncodeNullSnafu, Result}; -use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodec, SortField}; +use crate::row_converter::{ + build_primary_key_codec_with_fields, CompositeValues, PrimaryKeyCodec, SortField, +}; /// Encodes index values according to their data types for sorting and storage use. pub struct IndexValueCodec; @@ -62,26 +68,35 @@ impl IndexValueCodec { pub struct IndexValuesCodec { /// Tuples containing column id and its corresponding index_name (result of `to_string` on ColumnId), /// to minimize redundant `to_string` calls. - column_ids: Vec<(ColumnId, String)>, + column_ids: HashMap, /// The data types of tag columns. - fields: Vec, + fields: Vec<(ColumnId, SortField)>, /// The decoder for the primary key. - decoder: DensePrimaryKeyCodec, + decoder: Arc, } impl IndexValuesCodec { /// Creates a new `IndexValuesCodec` from a list of `ColumnMetadata` of tag columns. - pub fn from_tag_columns<'a>(tag_columns: impl Iterator) -> Self { + pub fn from_tag_columns<'a>( + primary_key_encoding: PrimaryKeyEncoding, + tag_columns: impl Iterator, + ) -> Self { let (column_ids, fields): (Vec<_>, Vec<_>) = tag_columns .map(|column| { ( (column.column_id, column.column_id.to_string()), - SortField::new(column.column_schema.data_type.clone()), + ( + column.column_id, + SortField::new(column.column_schema.data_type.clone()), + ), ) }) .unzip(); - let decoder = DensePrimaryKeyCodec::with_fields(fields.clone()); + let column_ids = column_ids.into_iter().collect(); + let decoder = + build_primary_key_codec_with_fields(primary_key_encoding, fields.clone().into_iter()); + Self { column_ids, fields, @@ -89,26 +104,19 @@ impl IndexValuesCodec { } } + /// Returns the column ids of the index. + pub fn column_ids(&self) -> &HashMap { + &self.column_ids + } + + /// Returns the fields of the index. + pub fn fields(&self) -> &[(ColumnId, SortField)] { + &self.fields + } + /// Decodes a primary key into its corresponding column ids, data types and values. - pub fn decode( - &self, - primary_key: &[u8], - ) -> Result)>> { - let values = self.decoder.decode_dense(primary_key)?; - - let iter = values - .into_iter() - .zip(&self.column_ids) - .zip(&self.fields) - .map(|((value, column_id), encoder)| { - if value.is_null() { - (column_id, encoder, None) - } else { - (column_id, encoder, Some(value)) - } - }); - - Ok(iter) + pub fn decode(&self, primary_key: &[u8]) -> Result { + self.decoder.decode(primary_key) } } @@ -116,10 +124,12 @@ impl IndexValuesCodec { mod tests { use datatypes::data_type::ConcreteDataType; use datatypes::schema::ColumnSchema; + use datatypes::value::Value; + use store_api::metadata::ColumnMetadata; use super::*; use crate::error::Error; - use crate::row_converter::{PrimaryKeyCodecExt, SortField}; + use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt, SortField}; #[test] fn test_encode_value_basic() { @@ -167,27 +177,18 @@ mod tests { ]; let primary_key = DensePrimaryKeyCodec::with_fields(vec![ - SortField::new(ConcreteDataType::string_datatype()), - SortField::new(ConcreteDataType::int64_datatype()), + (0, SortField::new(ConcreteDataType::string_datatype())), + (1, SortField::new(ConcreteDataType::int64_datatype())), ]) .encode([ValueRef::Null, ValueRef::Int64(10)].into_iter()) .unwrap(); - let codec = IndexValuesCodec::from_tag_columns(tag_columns.iter()); - let mut iter = codec.decode(&primary_key).unwrap(); - - let ((column_id, col_id_str), field, value) = iter.next().unwrap(); - assert_eq!(*column_id, 1); - assert_eq!(col_id_str, "1"); - assert_eq!(field, &SortField::new(ConcreteDataType::string_datatype())); - assert_eq!(value, None); - - let ((column_id, col_id_str), field, value) = iter.next().unwrap(); - assert_eq!(*column_id, 2); - assert_eq!(col_id_str, "2"); - assert_eq!(field, &SortField::new(ConcreteDataType::int64_datatype())); - assert_eq!(value, Some(Value::Int64(10))); + let codec = + IndexValuesCodec::from_tag_columns(PrimaryKeyEncoding::Dense, tag_columns.iter()); + let values = codec.decode(&primary_key).unwrap().into_dense(); - assert!(iter.next().is_none()); + assert_eq!(values.len(), 2); + assert_eq!(values[0], Value::Null); + assert_eq!(values[1], Value::Int64(10)); } } diff --git a/src/mito2/src/sst/index/inverted_index/creator.rs b/src/mito2/src/sst/index/inverted_index/creator.rs index 669d4ff6f23e..7903f2a496d9 100644 --- a/src/mito2/src/sst/index/inverted_index/creator.rs +++ b/src/mito2/src/sst/index/inverted_index/creator.rs @@ -34,7 +34,7 @@ use crate::error::{ PushIndexValueSnafu, Result, }; use crate::read::Batch; -use crate::row_converter::SortField; +use crate::row_converter::{CompositeValues, SortField}; use crate::sst::file::FileId; use crate::sst::index::codec::{IndexValueCodec, IndexValuesCodec}; use crate::sst::index::intermediate::{ @@ -101,7 +101,10 @@ impl InvertedIndexer { ); let index_creator = Box::new(SortIndexCreator::new(sorter, segment_row_count)); - let codec = IndexValuesCodec::from_tag_columns(metadata.primary_key_columns()); + let codec = IndexValuesCodec::from_tag_columns( + metadata.primary_key_encoding, + metadata.primary_key_columns(), + ); Self { codec, index_creator, @@ -180,11 +183,25 @@ impl InvertedIndexer { let n = batch.num_rows(); guard.inc_row_count(n); - for ((col_id, col_id_str), field, value) in self.codec.decode(batch.primary_key())? { + // TODO(weny, zhenchi): lazy decode + let values = self.codec.decode(batch.primary_key())?; + for (idx, (col_id, field)) in self.codec.fields().iter().enumerate() { if !self.indexed_column_ids.contains(col_id) { continue; } + let value = match &values { + CompositeValues::Dense(vec) => { + let value = &vec[idx].1; + if value.is_null() { + None + } else { + Some(value) + } + } + CompositeValues::Sparse(sparse_values) => sparse_values.get(col_id), + }; + if let Some(value) = value.as_ref() { self.value_buf.clear(); IndexValueCodec::encode_nonnull_value( @@ -194,6 +211,9 @@ impl InvertedIndexer { )?; } + // Safety: the column id is guaranteed to be in the map + let col_id_str = self.codec.column_ids().get(col_id).unwrap(); + // non-null value -> Some(encoded_bytes), null value -> None let value = value.is_some().then_some(self.value_buf.as_slice()); self.index_creator @@ -381,8 +401,8 @@ mod tests { u64_field: impl IntoIterator, ) -> Batch { let fields = vec![ - SortField::new(ConcreteDataType::string_datatype()), - SortField::new(ConcreteDataType::int32_datatype()), + (0, SortField::new(ConcreteDataType::string_datatype())), + (1, SortField::new(ConcreteDataType::int32_datatype())), ]; let codec = DensePrimaryKeyCodec::with_fields(fields); let row: [ValueRef; 2] = [str_tag.as_ref().into(), i32_tag.into().into()]; diff --git a/src/mito2/src/sst/parquet/file_range.rs b/src/mito2/src/sst/parquet/file_range.rs index 388dc24677b4..e8241a453fb8 100644 --- a/src/mito2/src/sst/parquet/file_range.rs +++ b/src/mito2/src/sst/parquet/file_range.rs @@ -33,7 +33,7 @@ use crate::read::compat::CompatBatch; use crate::read::last_row::RowGroupLastRowCachedReader; use crate::read::prune::PruneReader; use crate::read::Batch; -use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt}; +use crate::row_converter::{CompositeValues, PrimaryKeyCodec}; use crate::sst::file::FileHandle; use crate::sst::parquet::format::ReadFormat; use crate::sst::parquet::reader::{RowGroupReader, RowGroupReaderBuilder, SimpleFilterContext}; @@ -156,7 +156,7 @@ impl FileRangeContext { reader_builder: RowGroupReaderBuilder, filters: Vec, read_format: ReadFormat, - codec: DensePrimaryKeyCodec, + codec: Arc, ) -> Self { Self { reader_builder, @@ -237,7 +237,7 @@ pub(crate) struct RangeBase { /// Helper to read the SST. pub(crate) read_format: ReadFormat, /// Decoder for primary keys - pub(crate) codec: DensePrimaryKeyCodec, + pub(crate) codec: Arc, /// Optional helper to compat batches. pub(crate) compat_batch: Option, } @@ -264,15 +264,25 @@ impl RangeBase { input.set_pk_values(self.codec.decode(input.primary_key())?); input.pk_values().unwrap() }; - // Safety: this is a primary key - let pk_index = self - .read_format - .metadata() - .primary_key_index(filter.column_id()) - .unwrap(); - let pk_value = pk_values[pk_index] - .try_to_scalar_value(filter.data_type()) - .context(FieldTypeMismatchSnafu)?; + let pk_value = match pk_values { + CompositeValues::Dense(v) => { + // Safety: this is a primary key + let pk_index = self + .read_format + .metadata() + .primary_key_index(filter.column_id()) + .unwrap(); + v[pk_index] + .1 + .try_to_scalar_value(filter.data_type()) + .context(FieldTypeMismatchSnafu)? + } + CompositeValues::Sparse(v) => { + let v = v.get_or_null(filter.column_id()); + v.try_to_scalar_value(filter.data_type()) + .context(FieldTypeMismatchSnafu)? + } + }; if filter .filter() .evaluate_scalar(&pk_value) diff --git a/src/mito2/src/sst/parquet/format.rs b/src/mito2/src/sst/parquet/format.rs index 34a1da565e40..c90907f0eb26 100644 --- a/src/mito2/src/sst/parquet/format.rs +++ b/src/mito2/src/sst/parquet/format.rs @@ -48,7 +48,7 @@ use crate::error::{ ConvertVectorSnafu, InvalidBatchSnafu, InvalidRecordBatchSnafu, NewRecordBatchSnafu, Result, }; use crate::read::{Batch, BatchBuilder, BatchColumn}; -use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodec, SortField}; +use crate::row_converter::{build_primary_key_codec_with_fields, SortField}; use crate::sst::file::{FileMeta, FileTimeRange}; use crate::sst::to_sst_arrow_schema; @@ -391,6 +391,7 @@ impl ReadFormat { column: &ColumnMetadata, is_min: bool, ) -> Option { + let primary_key_encoding = self.metadata.primary_key_encoding; let is_first_tag = self .metadata .primary_key @@ -402,9 +403,15 @@ impl ReadFormat { return None; } - let converter = DensePrimaryKeyCodec::with_fields(vec![SortField::new( - column.column_schema.data_type.clone(), - )]); + let converter = build_primary_key_codec_with_fields( + primary_key_encoding, + [( + column.column_id, + SortField::new(column.column_schema.data_type.clone()), + )] + .into_iter(), + ); + let values = row_groups.iter().map(|meta| { let stats = meta .borrow() diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 6854c072a1a3..4aecf744d696 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -49,7 +49,7 @@ use crate::metrics::{ }; use crate::read::prune::{PruneReader, Source}; use crate::read::{Batch, BatchReader}; -use crate::row_converter::DensePrimaryKeyCodec; +use crate::row_converter::build_primary_key_codec; use crate::sst::file::FileHandle; use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplierRef; use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef; @@ -253,7 +253,7 @@ impl ParquetReaderBuilder { vec![] }; - let codec = DensePrimaryKeyCodec::new(read_format.metadata()); + let codec = build_primary_key_codec(read_format.metadata()); let context = FileRangeContext::new(reader_builder, filters, read_format, codec); diff --git a/src/mito2/src/test_util/memtable_util.rs b/src/mito2/src/test_util/memtable_util.rs index 369ba95f354d..4cb4469dc08d 100644 --- a/src/mito2/src/test_util/memtable_util.rs +++ b/src/mito2/src/test_util/memtable_util.rs @@ -326,8 +326,8 @@ pub(crate) fn encode_keys( /// Encode one key. pub(crate) fn encode_key_by_kv(key_value: &KeyValue) -> Vec { let row_codec = DensePrimaryKeyCodec::with_fields(vec![ - SortField::new(ConcreteDataType::string_datatype()), - SortField::new(ConcreteDataType::uint32_datatype()), + (0, SortField::new(ConcreteDataType::string_datatype())), + (1, SortField::new(ConcreteDataType::uint32_datatype())), ]); row_codec.encode(key_value.primary_keys()).unwrap() } diff --git a/src/mito2/src/test_util/sst_util.rs b/src/mito2/src/test_util/sst_util.rs index ce8cd4412f63..8bef6d205ba3 100644 --- a/src/mito2/src/test_util/sst_util.rs +++ b/src/mito2/src/test_util/sst_util.rs @@ -85,7 +85,12 @@ pub fn sst_region_metadata() -> RegionMetadata { /// Encodes a primary key for specific tags. pub fn new_primary_key(tags: &[&str]) -> Vec { let fields = (0..tags.len()) - .map(|_| SortField::new(ConcreteDataType::string_datatype())) + .map(|idx| { + ( + idx as u32, + SortField::new(ConcreteDataType::string_datatype()), + ) + }) .collect(); let converter = DensePrimaryKeyCodec::with_fields(fields); converter