Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
sehz committed Oct 25, 2024
1 parent 39ee345 commit f6a0bc0
Show file tree
Hide file tree
Showing 13 changed files with 71 additions and 39 deletions.
6 changes: 6 additions & 0 deletions crates/fluvio-controlplane-metadata/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ pub mod store {
pub use fluvio_stream_model::store::*;
}


#[cfg(feature = "use_serde")]
pub(crate) fn is_false(b: &bool) -> bool { !b }



#[cfg(feature = "k8")]
pub use fluvio_stream_model::k8_types;

Expand Down
3 changes: 2 additions & 1 deletion crates/fluvio-controlplane-metadata/src/partition/k8.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ mod test_spec {
mirror,
PartitionMirrorConfig::Home(HomePartitionConfig {
remote_cluster: "boat1".to_string(),
remote_replica: "boats-0".to_string()
remote_replica: "boats-0".to_string(),
..Default::default()
})
);
}
Expand Down
20 changes: 5 additions & 15 deletions crates/fluvio-controlplane-metadata/src/partition/spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use fluvio_types::SpuId;
use fluvio_protocol::{Encoder, Decoder};

use crate::topic::{CleanupPolicy, CompressionAlgorithm, Deduplication, TopicSpec, TopicStorageConfig};
use crate::is_false;

/// Spec for Partition
/// Each partition has replicas spread among SPU
Expand Down Expand Up @@ -163,20 +164,6 @@ impl std::fmt::Display for PartitionMirrorConfig {
}
}

/// Direction for mirror
#[derive(Decoder, Encoder, Default, Debug, Clone, Eq, PartialEq)]
#[cfg_attr(
feature = "use_serde",
derive(serde::Serialize, serde::Deserialize),
serde(rename_all = "camelCase")
)]
pub enum HomeMirrorDirection {
#[default]
#[fluvio(tag = 0)]
EdgeToHome,
#[fluvio(tag = 1)]
HomeToEdge,
}

#[derive(Decoder, Encoder, Default, Debug, Clone, Eq, PartialEq)]
#[cfg_attr(
Expand All @@ -187,7 +174,8 @@ pub enum HomeMirrorDirection {
pub struct HomePartitionConfig {
pub remote_cluster: String,
pub remote_replica: String,
// pub direction: HomeMirrorDirection
// if this is set, home will be mirror instead of
pub source: bool
}

impl std::fmt::Display for HomePartitionConfig {
Expand All @@ -208,6 +196,8 @@ pub struct RemotePartitionConfig {
#[cfg_attr(feature = "use_serde", serde(default))]
pub home_spu_id: SpuId,
pub home_spu_endpoint: String,
#[cfg_attr(feature = "use_serde", serde(default, skip_serializing_if = "is_false"))]
pub target: bool
}

impl std::fmt::Display for RemotePartitionConfig {
Expand Down
6 changes: 4 additions & 2 deletions crates/fluvio-controlplane-metadata/src/topic/k8.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,13 @@ mod test_spec {
vec![
HomePartitionConfig {
remote_cluster: "boat1".to_string(),
remote_replica: "boats-0".to_string()
remote_replica: "boats-0".to_string(),
..Default::default()
},
HomePartitionConfig {
remote_cluster: "boat2".to_string(),
remote_replica: "boats-0".to_string()
remote_replica: "boats-0".to_string(),
..Default::default()
}
]
.into()
Expand Down
41 changes: 26 additions & 15 deletions crates/fluvio-controlplane-metadata/src/topic/spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@ use fluvio_types::SpuId;
use fluvio_types::{PartitionId, PartitionCount, ReplicationFactor, IgnoreRackAssignment};
use fluvio_protocol::{Encoder, Decoder};

use crate::partition::{
HomeMirrorDirection, HomePartitionConfig, PartitionMirrorConfig, RemotePartitionConfig,
};
use crate::partition::{HomePartitionConfig, PartitionMirrorConfig, RemotePartitionConfig};
use crate::is_false;

use super::deduplication::Deduplication;

Expand Down Expand Up @@ -659,48 +658,54 @@ impl MirrorConfig {
derive(serde::Serialize, serde::Deserialize),
serde(rename_all = "camelCase")
)]
pub struct HomeMirrorConfig(
pub struct HomeMirrorConfig {
#[cfg_attr(feature = "use_serde", serde(skip_serializing_if = "Vec::is_empty"))]
Vec<HomePartitionConfig>,
);
pub partitions: Vec<HomePartitionConfig>,
#[cfg_attr(feature = "use_serde", serde(skip_serializing_if = "is_false"))]
pub source: bool, // source of mirror
}

impl From<Vec<HomePartitionConfig>> for HomeMirrorConfig {
fn from(partitions: Vec<HomePartitionConfig>) -> Self {
Self(partitions)
Self {
partitions,
source: false,
}
}
}

impl HomeMirrorConfig {
/// generate home config from simple mirror cluster list
/// this uses home topic to generate remote replicas
pub fn from_simple(topic: &str, remote_clusters: Vec<String>) -> Self {
Self(
remote_clusters
Self {
partitions: remote_clusters
.into_iter()
.map(|remote_cluster| HomePartitionConfig {
remote_cluster,
remote_replica: { ReplicaKey::new(topic, 0_u32).to_string() },
// direction: HomeMirrorDirection::default()
..Default::default()
})
.collect(),
)
source: false,
}
}

pub fn partition_count(&self) -> PartitionCount {
self.0.len() as PartitionCount
self.partitions.len() as PartitionCount
}

pub fn replication_factor(&self) -> Option<ReplicationFactor> {
None
}

pub fn partitions(&self) -> &Vec<HomePartitionConfig> {
&self.0
&self.partitions
}

pub fn as_partition_maps(&self) -> PartitionMaps {
let mut maps = vec![];
for (partition_id, home_partition) in self.0.iter().enumerate() {
for (partition_id, home_partition) in self.partitions.iter().enumerate() {
maps.push(PartitionMap {
id: partition_id as u32,
mirror: Some(PartitionMirrorConfig::Home(home_partition.clone())),
Expand All @@ -717,7 +722,7 @@ impl HomeMirrorConfig {

/// Add partition to home mirror config
pub fn add_partition(&mut self, partition: HomePartitionConfig) {
self.0.push(partition);
self.partitions.push(partition);
}
}

Expand All @@ -738,8 +743,11 @@ pub struct HomeMirrorPartition {
serde(rename_all = "camelCase")
)]
pub struct RemoteMirrorConfig {
// source of mirror
pub home_cluster: String,
pub home_spus: Vec<SpuMirrorConfig>,
#[cfg_attr(feature = "use_serde", serde(skip_serializing_if = "is_false"))]
pub target: bool,
}

#[derive(Decoder, Encoder, Default, Debug, Clone, Eq, PartialEq)]
Expand Down Expand Up @@ -777,6 +785,7 @@ impl RemoteMirrorConfig {
home_spu_id: home_spu.id,
home_cluster: self.home_cluster.clone(),
home_spu_endpoint: home_spu.endpoint.clone(),
target: self.target,
})),
..Default::default()
});
Expand Down Expand Up @@ -1147,6 +1156,7 @@ mod mirror_test {
mirror: Some(PartitionMirrorConfig::Home(HomePartitionConfig {
remote_replica: "boats-0".to_string(),
remote_cluster: "boat1".to_owned(),
..Default::default()
})),
..Default::default()
},
Expand All @@ -1155,6 +1165,7 @@ mod mirror_test {
mirror: Some(PartitionMirrorConfig::Home(HomePartitionConfig {
remote_replica: "boats-0".to_string(),
remote_cluster: "boat2".to_string(),
..Default::default()
})),
replicas: vec![],
},
Expand Down
8 changes: 8 additions & 0 deletions crates/fluvio-sc/src/controllers/mirroring/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,13 @@ impl<C: MetadataItem> RemoteMirrorController<C> {

// Sync the mirror topic
async fn sync_topic(&self, home: &Home, topic: &MirroringSpecWrapper<TopicSpec>) -> Result<()> {

let remote_spec = match topic.spec.replicas() {
ReplicaSpec::Mirror(MirrorConfig::Remote(r)) => r,
_ => {
return Err(anyhow!("topic {} is not a remote mirror", topic.key));
}
};
// Create a new replica spec for the topic
let new_replica: ReplicaSpec =
ReplicaSpec::Mirror(MirrorConfig::Remote(RemoteMirrorConfig {
Expand All @@ -222,6 +229,7 @@ impl<C: MetadataItem> RemoteMirrorController<C> {
1
],
home_cluster: home.id.clone(),
target: remote_spec.target
}));

// Check if the topic already exists
Expand Down
1 change: 1 addition & 0 deletions crates/fluvio-sc/src/controllers/topics/policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ impl<C: MetadataItem> TopicNextState<C> {
home_spu_key: spu.key.clone(),
home_cluster: src.home_cluster.clone(),
home_spu_endpoint: spu.endpoint.clone(),
target: src.target
}),
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ pub async fn handle_add_mirror<AC: AuthContext, C: MetadataItem>(
let new_home_partition_config = HomePartitionConfig {
remote_cluster: request.remote_cluster,
remote_replica: { ReplicaKey::new(topic.key(), 0_u32).to_string() },
//direction: home_config.direction.clone(),
source: home_config.source
};
new_home_config.add_partition(new_home_partition_config);
spec.set_replicas(ReplicaSpec::Mirror(MirrorConfig::Home(new_home_config)));
Expand Down
7 changes: 5 additions & 2 deletions crates/fluvio-spu/src/mirroring/home/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,8 @@ impl MirrorHomeHandler {
self.sync_record_from_remote(&mut sink,sync_request.request).await?;
}
RemoteMirrorRequest::UpdateEdgeOffset(req) => {
remote_update_needed = self.update_from_remote(req)?;
// remote_update_needed = self.update_from_remote(req)?;
todo!()
}
}

Expand Down Expand Up @@ -253,7 +254,8 @@ impl MirrorHomeHandler {
}


/// received new offset from edge, this happens when edge is behind
// received new offset from edge, this happens when edge is behind
/*
#[instrument(skip(req))]
fn update_from_remote(&self, req: RequestMessage<UpdateEdgeOffsetRequest>) -> Result<bool> {
let leader_leo = self.leader.leo();
Expand Down Expand Up @@ -300,4 +302,5 @@ impl MirrorHomeHandler {
}
}
}
*/
}
5 changes: 3 additions & 2 deletions crates/fluvio-spu/src/mirroring/remote/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,8 +263,9 @@ where
home_updated_needed = self.update_from_home(req)?;
},
HomeMirrorRequest::SyncRecords(sync_request)=> {
debug!("received sync request from home");
self.sync_record_from_home(&mut home_sink, sync_request.request).await?;
// debug!("received sync request from home");
// self.sync_record_from_home(&mut home_sink, sync_request.request).await?;
todo!()
}
}
backoff.reset();
Expand Down
2 changes: 1 addition & 1 deletion k8-util/helm/fluvio-sys/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ apiVersion: v2
name: fluvio-sys
description: A Helm chart for Fluvio
type: application
version: 0.9.19
version: 0.9.20
4 changes: 4 additions & 0 deletions k8-util/helm/fluvio-sys/templates/crd_partition.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,14 @@ spec:
type: string
remoteCluster:
type: string
source:
type: bool
remote:
type: object
required: ["homeCluster","homeSpuKey","homeSpuEndpoint","homeSpu"]
properties:
target:
type: bool
homeCluster:
type: string
homeSpuKey:
Expand Down
5 changes: 5 additions & 0 deletions k8-util/helm/fluvio-sys/templates/crd_topic.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,15 @@ spec:
type: string
remoteReplica:
type: string
source:
type: bool

remote:
type: object
required: ["homeCluster","homeSpus"]
properties:
target:
type: bool
homeCluster:
type: string
homeSpus:
Expand Down

0 comments on commit f6a0bc0

Please sign in to comment.