-
Notifications
You must be signed in to change notification settings - Fork 333
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(remote-wal): introduce TopicRegionManager #5407
base: main
Are you sure you want to change the base?
Conversation
Important Review skippedAuto reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
fad133c
to
d69c025
Compare
src/common/meta/src/key.rs
Outdated
//! 13. Topic name to region map key `__topic_region/{topic_name}/{region_id}` | ||
//! - The key is used to map a pair of topic name and region id. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
//! 13. Topic name to region map key `__topic_region/{topic_name}/{region_id}` | |
//! - The key is used to map a pair of topic name and region id. | |
//! 13. Topic name to region key `__topic_region/{topic_name}/{region_id}` | |
//! - Mapping {topic_name} to {region_id} |
src/common/meta/src/key.rs
Outdated
@@ -105,6 +108,7 @@ pub mod table_route; | |||
pub mod test_utils; | |||
mod tombstone; | |||
pub mod topic_name; | |||
pub mod topic_region_map; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pub mod topic_region_map; | |
pub mod topic_region; |
pub struct TopicRegionMapKey<'a> { | ||
pub region_id: RegionId, | ||
pub topic: &'a str, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pub struct TopicRegionMapKey<'a> { | |
pub region_id: RegionId, | |
pub topic: &'a str, | |
} | |
pub struct TopicRegionKey<'a> { | |
pub region_id: RegionId, | |
pub topic: &'a str, | |
} |
} | ||
} | ||
|
||
impl TopicRegionMapManager { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
impl TopicRegionMapManager { | |
impl TopicRegionManager { |
impl Default for TopicRegionMapManager { | ||
fn default() -> Self { | ||
Self::new(Arc::new(MemoryKvBackend::default())) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
impl Default for TopicRegionMapManager { | |
fn default() -> Self { | |
Self::new(Arc::new(MemoryKvBackend::default())) | |
} | |
} | |
#[cfg(test)] | |
impl Default for TopicRegionMapManager { | |
fn default() -> Self { | |
Self::new(Arc::new(MemoryKvBackend::default())) | |
} | |
} |
pub async fn _put_topic_region_map(&self, topic: &str, region_id: RegionId) -> Result<()> { | ||
let key = TopicRegionMapKey::new(region_id, topic); | ||
self._topic_region_map_manager.put(key).await?; | ||
Ok(()) | ||
} | ||
|
||
/// Stores multiple topic-region mappings in the key-value backend. | ||
pub async fn _batch_put_topic_region_map( | ||
&self, | ||
topic: &str, | ||
region_ids: &[RegionId], | ||
) -> Result<()> { | ||
let keys = region_ids | ||
.iter() | ||
.map(|region_id| TopicRegionMapKey::new(*region_id, topic)) | ||
.collect(); | ||
self._topic_region_map_manager.batch_put(keys).await?; | ||
Ok(()) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Introduce these methods in future PRs
impl<'a> TopicRegionMapKey<'a> { | ||
pub fn new(region_id: RegionId, topic: &'a str) -> Self { | ||
Self { region_id, topic } | ||
} | ||
|
||
pub fn range_start_key() -> String { | ||
TOPIC_REGION_MAP_PREFIX.to_string() | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
impl<'a> TopicRegionMapKey<'a> { | |
pub fn new(region_id: RegionId, topic: &'a str) -> Self { | |
Self { region_id, topic } | |
} | |
pub fn range_start_key() -> String { | |
TOPIC_REGION_MAP_PREFIX.to_string() | |
} | |
} | |
impl<'a> TopicRegionKey<'a> { | |
pub fn new(region_id: RegionId, topic: &'a str) -> Self { | |
Self { region_id, topic } | |
} | |
pub fn range_start_key() -> String { | |
TOPIC_REGION_MAP_PREFIX.to_string() | |
} | |
} |
src/common/meta/src/key.rs
Outdated
@@ -236,6 +241,13 @@ lazy_static! { | |||
Regex::new(&format!("^{KAFKA_TOPIC_KEY_PREFIX}/(.*)$")).unwrap(); | |||
} | |||
|
|||
lazy_static! { | |||
pub static ref TOPIC_REGION_MAP_PATTERN: Regex = Regex::new(&format!( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pub static ref TOPIC_REGION_MAP_PATTERN: Regex = Regex::new(&format!( | |
pub static ref TOPIC_REGION_PATTERN: Regex = Regex::new(&format!( |
let topic = captures | ||
.get(1) | ||
.map(|m| m.as_str()) | ||
.context(InvalidMetadataSnafu { | ||
err_msg: format!("Invalid topic in region topic map key: {}", value), | ||
})?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's fine to retrieve the value directly since it has already passed the regex check.
let topic = captures | |
.get(1) | |
.map(|m| m.as_str()) | |
.context(InvalidMetadataSnafu { | |
err_msg: format!("Invalid topic in region topic map key: {}", value), | |
})?; | |
let topic = captures[1].as_str(); |
/// Returns all topic-region mappings to avoid parsing again. | ||
pub async fn range(&self) -> Result<Vec<(RegionId, String)>> { | ||
let prefix = TopicRegionMapKey::range_start_key(); | ||
let range_req = RangeRequest::new().with_prefix(prefix.as_bytes()); | ||
let resp = self.kv_backend.range(range_req).await?; | ||
resp.kvs | ||
.iter() | ||
.map(|kv| { | ||
let key = topic_region_map_decoder(kv)?; | ||
Ok((key.region_id, key.topic.to_string())) | ||
}) | ||
.collect::<Result<Vec<_>>>() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's implement it when we need it
src/common/meta/src/key.rs
Outdated
@@ -236,6 +241,13 @@ lazy_static! { | |||
Regex::new(&format!("^{KAFKA_TOPIC_KEY_PREFIX}/(.*)$")).unwrap(); | |||
} | |||
|
|||
lazy_static! { | |||
pub static ref TOPIC_REGION_MAP_PATTERN: Regex = Regex::new(&format!( | |||
"^{TOPIC_REGION_MAP_PREFIX}/({NAME_PATTERN})/([0-9]+)$" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Validate user configured topic_prefix
when the node starts
I hereby agree to the terms of the GreptimeDB CLA.
Refer to a related PR or issue link (optional)
Waiting for #5351...
What's changed and what's your intention?
As title. Maps topic to region in kvbackend.
PR Checklist
Please convert it to a draft if some of the following conditions are not met.