diff --git a/crates/fluvio-controlplane-metadata/src/lib.rs b/crates/fluvio-controlplane-metadata/src/lib.rs index b6fd323cfa8..bab9c715947 100644 --- a/crates/fluvio-controlplane-metadata/src/lib.rs +++ b/crates/fluvio-controlplane-metadata/src/lib.rs @@ -14,6 +14,12 @@ pub mod store { pub use fluvio_stream_model::store::*; } + +#[cfg(feature = "use_serde")] +pub(crate) fn is_false(b: &bool) -> bool { !b } + + + #[cfg(feature = "k8")] pub use fluvio_stream_model::k8_types; diff --git a/crates/fluvio-controlplane-metadata/src/partition/k8.rs b/crates/fluvio-controlplane-metadata/src/partition/k8.rs index d0f505773a0..10b3f660ecd 100644 --- a/crates/fluvio-controlplane-metadata/src/partition/k8.rs +++ b/crates/fluvio-controlplane-metadata/src/partition/k8.rs @@ -50,7 +50,8 @@ mod test_spec { mirror, PartitionMirrorConfig::Home(HomePartitionConfig { remote_cluster: "boat1".to_string(), - remote_replica: "boats-0".to_string() + remote_replica: "boats-0".to_string(), + ..Default::default() }) ); } diff --git a/crates/fluvio-controlplane-metadata/src/partition/spec.rs b/crates/fluvio-controlplane-metadata/src/partition/spec.rs index 6e6d8686bba..0db985ca96d 100644 --- a/crates/fluvio-controlplane-metadata/src/partition/spec.rs +++ b/crates/fluvio-controlplane-metadata/src/partition/spec.rs @@ -8,6 +8,7 @@ use fluvio_types::SpuId; use fluvio_protocol::{Encoder, Decoder}; use crate::topic::{CleanupPolicy, CompressionAlgorithm, Deduplication, TopicSpec, TopicStorageConfig}; +use crate::is_false; /// Spec for Partition /// Each partition has replicas spread among SPU @@ -163,20 +164,6 @@ impl std::fmt::Display for PartitionMirrorConfig { } } -/// Direction for mirror -#[derive(Decoder, Encoder, Default, Debug, Clone, Eq, PartialEq)] -#[cfg_attr( - feature = "use_serde", - derive(serde::Serialize, serde::Deserialize), - serde(rename_all = "camelCase") -)] -pub enum HomeMirrorDirection { - #[default] - #[fluvio(tag = 0)] - EdgeToHome, - #[fluvio(tag = 1)] - HomeToEdge, -} #[derive(Decoder, Encoder, Default, Debug, Clone, Eq, PartialEq)] #[cfg_attr( @@ -187,7 +174,8 @@ pub enum HomeMirrorDirection { pub struct HomePartitionConfig { pub remote_cluster: String, pub remote_replica: String, - // pub direction: HomeMirrorDirection + // if this is set, home will be mirror instead of + pub source: bool } impl std::fmt::Display for HomePartitionConfig { @@ -208,6 +196,8 @@ pub struct RemotePartitionConfig { #[cfg_attr(feature = "use_serde", serde(default))] pub home_spu_id: SpuId, pub home_spu_endpoint: String, + #[cfg_attr(feature = "use_serde", serde(default, skip_serializing_if = "is_false"))] + pub target: bool } impl std::fmt::Display for RemotePartitionConfig { diff --git a/crates/fluvio-controlplane-metadata/src/topic/k8.rs b/crates/fluvio-controlplane-metadata/src/topic/k8.rs index ea5ad491d55..9dc0854bc6d 100644 --- a/crates/fluvio-controlplane-metadata/src/topic/k8.rs +++ b/crates/fluvio-controlplane-metadata/src/topic/k8.rs @@ -61,11 +61,13 @@ mod test_spec { vec![ HomePartitionConfig { remote_cluster: "boat1".to_string(), - remote_replica: "boats-0".to_string() + remote_replica: "boats-0".to_string(), + ..Default::default() }, HomePartitionConfig { remote_cluster: "boat2".to_string(), - remote_replica: "boats-0".to_string() + remote_replica: "boats-0".to_string(), + ..Default::default() } ] .into() diff --git a/crates/fluvio-controlplane-metadata/src/topic/spec.rs b/crates/fluvio-controlplane-metadata/src/topic/spec.rs index 7462d89d663..e93c28cde87 100644 --- a/crates/fluvio-controlplane-metadata/src/topic/spec.rs +++ b/crates/fluvio-controlplane-metadata/src/topic/spec.rs @@ -11,9 +11,8 @@ use fluvio_types::SpuId; use fluvio_types::{PartitionId, PartitionCount, ReplicationFactor, IgnoreRackAssignment}; use fluvio_protocol::{Encoder, Decoder}; -use crate::partition::{ - HomeMirrorDirection, HomePartitionConfig, PartitionMirrorConfig, RemotePartitionConfig, -}; +use crate::partition::{HomePartitionConfig, PartitionMirrorConfig, RemotePartitionConfig}; +use crate::is_false; use super::deduplication::Deduplication; @@ -659,14 +658,19 @@ impl MirrorConfig { derive(serde::Serialize, serde::Deserialize), serde(rename_all = "camelCase") )] -pub struct HomeMirrorConfig( +pub struct HomeMirrorConfig { #[cfg_attr(feature = "use_serde", serde(skip_serializing_if = "Vec::is_empty"))] - Vec, -); + pub partitions: Vec, + #[cfg_attr(feature = "use_serde", serde(skip_serializing_if = "is_false"))] + pub source: bool, // source of mirror +} impl From> for HomeMirrorConfig { fn from(partitions: Vec) -> Self { - Self(partitions) + Self { + partitions, + source: false, + } } } @@ -674,20 +678,21 @@ impl HomeMirrorConfig { /// generate home config from simple mirror cluster list /// this uses home topic to generate remote replicas pub fn from_simple(topic: &str, remote_clusters: Vec) -> Self { - Self( - remote_clusters + Self { + partitions: remote_clusters .into_iter() .map(|remote_cluster| HomePartitionConfig { remote_cluster, remote_replica: { ReplicaKey::new(topic, 0_u32).to_string() }, - // direction: HomeMirrorDirection::default() + ..Default::default() }) .collect(), - ) + source: false, + } } pub fn partition_count(&self) -> PartitionCount { - self.0.len() as PartitionCount + self.partitions.len() as PartitionCount } pub fn replication_factor(&self) -> Option { @@ -695,12 +700,12 @@ impl HomeMirrorConfig { } pub fn partitions(&self) -> &Vec { - &self.0 + &self.partitions } pub fn as_partition_maps(&self) -> PartitionMaps { let mut maps = vec![]; - for (partition_id, home_partition) in self.0.iter().enumerate() { + for (partition_id, home_partition) in self.partitions.iter().enumerate() { maps.push(PartitionMap { id: partition_id as u32, mirror: Some(PartitionMirrorConfig::Home(home_partition.clone())), @@ -717,7 +722,7 @@ impl HomeMirrorConfig { /// Add partition to home mirror config pub fn add_partition(&mut self, partition: HomePartitionConfig) { - self.0.push(partition); + self.partitions.push(partition); } } @@ -738,8 +743,11 @@ pub struct HomeMirrorPartition { serde(rename_all = "camelCase") )] pub struct RemoteMirrorConfig { + // source of mirror pub home_cluster: String, pub home_spus: Vec, + #[cfg_attr(feature = "use_serde", serde(skip_serializing_if = "is_false"))] + pub target: bool, } #[derive(Decoder, Encoder, Default, Debug, Clone, Eq, PartialEq)] @@ -777,6 +785,7 @@ impl RemoteMirrorConfig { home_spu_id: home_spu.id, home_cluster: self.home_cluster.clone(), home_spu_endpoint: home_spu.endpoint.clone(), + target: self.target, })), ..Default::default() }); @@ -1147,6 +1156,7 @@ mod mirror_test { mirror: Some(PartitionMirrorConfig::Home(HomePartitionConfig { remote_replica: "boats-0".to_string(), remote_cluster: "boat1".to_owned(), + ..Default::default() })), ..Default::default() }, @@ -1155,6 +1165,7 @@ mod mirror_test { mirror: Some(PartitionMirrorConfig::Home(HomePartitionConfig { remote_replica: "boats-0".to_string(), remote_cluster: "boat2".to_string(), + ..Default::default() })), replicas: vec![], }, diff --git a/crates/fluvio-sc/src/controllers/mirroring/controller.rs b/crates/fluvio-sc/src/controllers/mirroring/controller.rs index 91e64e1d68f..eb9a6d37027 100644 --- a/crates/fluvio-sc/src/controllers/mirroring/controller.rs +++ b/crates/fluvio-sc/src/controllers/mirroring/controller.rs @@ -210,6 +210,13 @@ impl RemoteMirrorController { // Sync the mirror topic async fn sync_topic(&self, home: &Home, topic: &MirroringSpecWrapper) -> Result<()> { + + let remote_spec = match topic.spec.replicas() { + ReplicaSpec::Mirror(MirrorConfig::Remote(r)) => r, + _ => { + return Err(anyhow!("topic {} is not a remote mirror", topic.key)); + } + }; // Create a new replica spec for the topic let new_replica: ReplicaSpec = ReplicaSpec::Mirror(MirrorConfig::Remote(RemoteMirrorConfig { @@ -222,6 +229,7 @@ impl RemoteMirrorController { 1 ], home_cluster: home.id.clone(), + target: remote_spec.target })); // Check if the topic already exists diff --git a/crates/fluvio-sc/src/controllers/topics/policy.rs b/crates/fluvio-sc/src/controllers/topics/policy.rs index f54b1206a34..644e2b4ff68 100644 --- a/crates/fluvio-sc/src/controllers/topics/policy.rs +++ b/crates/fluvio-sc/src/controllers/topics/policy.rs @@ -298,6 +298,7 @@ impl TopicNextState { home_spu_key: spu.key.clone(), home_cluster: src.home_cluster.clone(), home_spu_endpoint: spu.endpoint.clone(), + target: src.target }), ); } diff --git a/crates/fluvio-sc/src/services/public_api/topic/update/add_mirror.rs b/crates/fluvio-sc/src/services/public_api/topic/update/add_mirror.rs index c70be6e5c6a..0155edf06ea 100644 --- a/crates/fluvio-sc/src/services/public_api/topic/update/add_mirror.rs +++ b/crates/fluvio-sc/src/services/public_api/topic/update/add_mirror.rs @@ -104,7 +104,7 @@ pub async fn handle_add_mirror( let new_home_partition_config = HomePartitionConfig { remote_cluster: request.remote_cluster, remote_replica: { ReplicaKey::new(topic.key(), 0_u32).to_string() }, - //direction: home_config.direction.clone(), + source: home_config.source }; new_home_config.add_partition(new_home_partition_config); spec.set_replicas(ReplicaSpec::Mirror(MirrorConfig::Home(new_home_config))); diff --git a/crates/fluvio-spu/src/mirroring/home/connection.rs b/crates/fluvio-spu/src/mirroring/home/connection.rs index 50b81205c46..e4b1090a71b 100644 --- a/crates/fluvio-spu/src/mirroring/home/connection.rs +++ b/crates/fluvio-spu/src/mirroring/home/connection.rs @@ -198,7 +198,8 @@ impl MirrorHomeHandler { self.sync_record_from_remote(&mut sink,sync_request.request).await?; } RemoteMirrorRequest::UpdateEdgeOffset(req) => { - remote_update_needed = self.update_from_remote(req)?; + // remote_update_needed = self.update_from_remote(req)?; + todo!() } } @@ -253,7 +254,8 @@ impl MirrorHomeHandler { } - /// received new offset from edge, this happens when edge is behind + // received new offset from edge, this happens when edge is behind + /* #[instrument(skip(req))] fn update_from_remote(&self, req: RequestMessage) -> Result { let leader_leo = self.leader.leo(); @@ -300,4 +302,5 @@ impl MirrorHomeHandler { } } } + */ } diff --git a/crates/fluvio-spu/src/mirroring/remote/controller.rs b/crates/fluvio-spu/src/mirroring/remote/controller.rs index dc4c78ff4c2..1c0f0aac4c3 100644 --- a/crates/fluvio-spu/src/mirroring/remote/controller.rs +++ b/crates/fluvio-spu/src/mirroring/remote/controller.rs @@ -263,8 +263,9 @@ where home_updated_needed = self.update_from_home(req)?; }, HomeMirrorRequest::SyncRecords(sync_request)=> { - debug!("received sync request from home"); - self.sync_record_from_home(&mut home_sink, sync_request.request).await?; + // debug!("received sync request from home"); + // self.sync_record_from_home(&mut home_sink, sync_request.request).await?; + todo!() } } backoff.reset(); diff --git a/k8-util/helm/fluvio-sys/Chart.yaml b/k8-util/helm/fluvio-sys/Chart.yaml index f2fded0aa8a..063e0310c6d 100644 --- a/k8-util/helm/fluvio-sys/Chart.yaml +++ b/k8-util/helm/fluvio-sys/Chart.yaml @@ -2,4 +2,4 @@ apiVersion: v2 name: fluvio-sys description: A Helm chart for Fluvio type: application -version: 0.9.19 +version: 0.9.20 diff --git a/k8-util/helm/fluvio-sys/templates/crd_partition.yaml b/k8-util/helm/fluvio-sys/templates/crd_partition.yaml index 3b836c97aca..5ce910841ad 100644 --- a/k8-util/helm/fluvio-sys/templates/crd_partition.yaml +++ b/k8-util/helm/fluvio-sys/templates/crd_partition.yaml @@ -42,10 +42,14 @@ spec: type: string remoteCluster: type: string + source: + type: bool remote: type: object required: ["homeCluster","homeSpuKey","homeSpuEndpoint","homeSpu"] properties: + target: + type: bool homeCluster: type: string homeSpuKey: diff --git a/k8-util/helm/fluvio-sys/templates/crd_topic.yaml b/k8-util/helm/fluvio-sys/templates/crd_topic.yaml index c13849c4c74..548c8f63d91 100644 --- a/k8-util/helm/fluvio-sys/templates/crd_topic.yaml +++ b/k8-util/helm/fluvio-sys/templates/crd_topic.yaml @@ -76,10 +76,15 @@ spec: type: string remoteReplica: type: string + source: + type: bool + remote: type: object required: ["homeCluster","homeSpus"] properties: + target: + type: bool homeCluster: type: string homeSpus: