Skip to content

Commit

Permalink
add home to edge sync api
Browse files Browse the repository at this point in the history
  • Loading branch information
sehz committed Oct 25, 2024
1 parent f413925 commit 39ee345
Show file tree
Hide file tree
Showing 9 changed files with 109 additions and 2 deletions.
1 change: 1 addition & 0 deletions crates/fluvio-spu/src/mirroring/home/api_key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ use fluvio_protocol::{Encoder, Decoder};
pub enum MirrorHomeApiEnum {
#[default]
UpdateHomeOffset = 0,
SyncRecords = 1
}
60 changes: 59 additions & 1 deletion crates/fluvio-spu/src/mirroring/home/connection.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::time::Duration;
use std::{fmt, sync::Arc};
use std::sync::atomic::AtomicU64;
use std::sync::atomic::{AtomicI64, AtomicU64};

use tokio::select;
use tracing::{debug, error, instrument, warn};
Expand All @@ -20,21 +20,26 @@ use crate::core::DefaultSharedGlobalContext;
use crate::mirroring::remote::api_key::MirrorRemoteApiEnum;
use crate::mirroring::remote::remote_api::RemoteMirrorRequest;
use crate::mirroring::remote::sync::DefaultPartitionSyncRequest;
use crate::mirroring::remote::update_offsets::UpdateEdgeOffsetRequest;
use crate::replication::leader::SharedFileLeaderState;
use crate::services::auth::SpuAuthServiceContext;

use super::update_offsets::UpdateHomeOffsetRequest;

const MIRROR_RECONCILIATION_INTERVAL_SEC: u64 = 60; // 1 min

const UNKNOWN_LEO: i64 = -1;

pub(crate) struct MirrorRequestMetrics {
loop_count: AtomicU64,
remote_leo: AtomicI64,
}

impl MirrorRequestMetrics {
pub(crate) fn new() -> Self {
Self {
loop_count: AtomicU64::new(0),
remote_leo: AtomicI64::new(UNKNOWN_LEO),
}
}

Expand All @@ -51,6 +56,7 @@ impl MirrorRequestMetrics {
/// Handle mirror request from remote
pub(crate) struct MirrorHomeHandler {
metrics: Arc<MirrorRequestMetrics>,
/// leader replicat that will be mirrored
leader: SharedFileLeaderState,
ctx: DefaultSharedGlobalContext,
status_update: SharedMirrorStatusUpdate,
Expand Down Expand Up @@ -191,6 +197,9 @@ impl MirrorHomeHandler {
RemoteMirrorRequest::SyncRecords(sync_request)=> {
self.sync_record_from_remote(&mut sink,sync_request.request).await?;
}
RemoteMirrorRequest::UpdateEdgeOffset(req) => {
remote_update_needed = self.update_from_remote(req)?;
}
}

} else {
Expand Down Expand Up @@ -242,4 +251,53 @@ impl MirrorHomeHandler {
debug!(append_flag, "leader appended");
self.send_offsets_to_remote(sink).await
}


/// received new offset from edge, this happens when edge is behind
#[instrument(skip(req))]
fn update_from_remote(&self, req: RequestMessage<UpdateEdgeOffsetRequest>) -> Result<bool> {
let leader_leo = self.leader.leo();
let old_home_leo = self.state.metrics.get_home_leo();
let new_home_leo = req.request.leo;
debug!(
leader_leo,
old_home_leo, new_home_leo, "received update from home"
);
// if old home leo is not initialized, we need to update home
if old_home_leo < 0 {
debug!(new_home_leo, "updating home leo from uninitialized");
self.state.metrics.update_home_leo(new_home_leo);
}
match new_home_leo.cmp(&leader_leo) {
std::cmp::Ordering::Greater => {
// home leo should never be greater than leader's leo
//warn!(
// leader_leo,
// new_home_leo, "home has more records, this should not happen, this is error"
//);
debug!(
new_home_leo,
leader_leo, "home has more records, need to refresh edge"
);
self.state.metrics.update_home_leo(new_home_leo);
//return Err(anyhow!("home's leo: {new_home_leo} > leader's leo: {leader_leo} this should not happen, this is error"));
Ok(true)
}
std::cmp::Ordering::Less => {
debug!(
new_home_leo,
leader_leo, "home has less records, need to refresh home"
);
self.state.metrics.update_home_leo(new_home_leo);
Ok(true)
}
std::cmp::Ordering::Equal => {
debug!(
new_home_leo,
"home has same records, no need to refresh home"
);
Ok(false)
}
}
}
}
8 changes: 8 additions & 0 deletions crates/fluvio-spu/src/mirroring/home/home_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use fluvio_protocol::bytes::Buf;
use fluvio_protocol::{Encoder, Decoder};
use fluvio_protocol::api::{RequestMessage, ApiMessage, RequestHeader};

use crate::mirroring::remote::sync::DefaultPartitionSyncRequest;

use super::api_key::MirrorHomeApiEnum;
use super::update_offsets::UpdateHomeOffsetRequest;

Expand All @@ -15,6 +17,8 @@ use super::update_offsets::UpdateHomeOffsetRequest;
pub enum HomeMirrorRequest {
#[fluvio(tag = 0)]
UpdateHomeOffset(RequestMessage<UpdateHomeOffsetRequest>),
#[fluvio(tag = 1)]
SyncRecords(RequestMessage<DefaultPartitionSyncRequest>),
}

impl Default for HomeMirrorRequest {
Expand All @@ -39,6 +43,10 @@ impl ApiMessage for HomeMirrorRequest {
header,
UpdateHomeOffsetRequest::decode_from(src, version)?,
))),
MirrorHomeApiEnum::SyncRecords => Ok(Self::SyncRecords(RequestMessage::new(
header,
DefaultPartitionSyncRequest::decode_from(src, version)?,
))),
}
}
}
2 changes: 1 addition & 1 deletion crates/fluvio-spu/src/mirroring/home/update_offsets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::replication::leader::ReplicaOffsetRequest;

use super::api_key::MirrorHomeApiEnum;

/// Update home's offset
/// Update home's offset to edge
pub(crate) type UpdateHomeOffsetRequest = ReplicaOffsetRequest;

impl Request for UpdateHomeOffsetRequest {
Expand Down
1 change: 1 addition & 0 deletions crates/fluvio-spu/src/mirroring/remote/api_key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ use fluvio_protocol::{Encoder, Decoder};
pub enum MirrorRemoteApiEnum {
#[default]
SyncRecords = 0,
UpdateEdgeOffset = 1
}
4 changes: 4 additions & 0 deletions crates/fluvio-spu/src/mirroring/remote/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,10 @@ where
match home_msg {
HomeMirrorRequest::UpdateHomeOffset(req)=> {
home_updated_needed = self.update_from_home(req)?;
},
HomeMirrorRequest::SyncRecords(sync_request)=> {
debug!("received sync request from home");
self.sync_record_from_home(&mut home_sink, sync_request.request).await?;
}
}
backoff.reset();
Expand Down
1 change: 1 addition & 0 deletions crates/fluvio-spu/src/mirroring/remote/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ pub(crate) mod controller;
pub(crate) mod api_key;
pub(crate) mod remote_api;
pub(crate) mod sync;
pub(crate) mod update_offsets;
7 changes: 7 additions & 0 deletions crates/fluvio-spu/src/mirroring/remote/remote_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,14 @@ use fluvio_protocol::api::{RequestMessage, ApiMessage, RequestHeader};

use super::api_key::MirrorRemoteApiEnum;
use super::sync::DefaultPartitionSyncRequest;
use super::update_offsets::UpdateEdgeOffsetRequest;

#[derive(Debug, Encoder)]
pub enum RemoteMirrorRequest {
#[fluvio(tag = 0)]
SyncRecords(RequestMessage<DefaultPartitionSyncRequest>),
#[fluvio(tag = 1)]
UpdateEdgeOffset(RequestMessage<UpdateEdgeOffsetRequest>),
// #[fluvio(tag = 1)]
// RejectedOffsetRequest(RequestMessage<RejectOffsetRequest>),
}
Expand All @@ -36,6 +39,10 @@ impl ApiMessage for RemoteMirrorRequest {
trace!("decoding with header: {:#?}", header);
let version = header.api_version();
match header.api_key().try_into()? {
MirrorRemoteApiEnum::UpdateEdgeOffset => Ok(Self::UpdateEdgeOffset(RequestMessage::new(
header,
UpdateEdgeOffsetRequest::decode_from(src, version)?,
))),
MirrorRemoteApiEnum::SyncRecords => Ok(Self::SyncRecords(RequestMessage::new(
header,
DefaultPartitionSyncRequest::decode_from(src, version)?,
Expand Down
27 changes: 27 additions & 0 deletions crates/fluvio-spu/src/mirroring/remote/update_offsets.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use fluvio_protocol::{Encoder, Decoder};
use fluvio_protocol::api::Request;

use crate::mirroring::COMMON_MIRROR_VERSION;
use crate::replication::leader::ReplicaOffsetRequest;

use super::api_key::MirrorRemoteApiEnum;

/// Edge offset
#[derive(Decoder, Encoder, Default, Clone, Debug)]
pub(crate) struct UpdateEdgeOffsetRequest(ReplicaOffsetRequest);

impl From<ReplicaOffsetRequest> for UpdateEdgeOffsetRequest {
fn from(offset: ReplicaOffsetRequest) -> Self {
Self(offset)
}
}

impl Request for UpdateEdgeOffsetRequest {
const API_KEY: u16 = MirrorRemoteApiEnum::UpdateEdgeOffset as u16;
const DEFAULT_API_VERSION: i16 = COMMON_MIRROR_VERSION;
type Response = UpdateEdgeOffsetResponse;
}

// no content, this is one way request
#[derive(Decoder, Encoder, Default, Debug)]
pub struct UpdateEdgeOffsetResponse {}

0 comments on commit 39ee345

Please sign in to comment.