diff --git a/dc/s2n-quic-dc/src/path/secret/map.rs b/dc/s2n-quic-dc/src/path/secret/map.rs index 4bea24e8b..792bbacad 100644 --- a/dc/s2n-quic-dc/src/path/secret/map.rs +++ b/dc/s2n-quic-dc/src/path/secret/map.rs @@ -195,7 +195,7 @@ impl Cleaner { // handshake to happen. This handshake will happen on the next request for this // particular peer. if entry.rehandshake_time() <= now { - state.requested_handshakes.pin().insert(entry.peer); + state.request_handshake(entry.peer); } // Not retired. @@ -208,24 +208,47 @@ impl Cleaner { } } - if state.ids.len() <= (state.max_capacity * 95 / 100) { - return; - } - - let mut to_remove = std::cmp::max(state.ids.len() / 100, 1); - let guard = state.ids.guard(); - for (id, entry) in state.ids.iter(&guard) { - if to_remove > 0 { - // Only remove with the minimum epoch. This hopefully means that we will remove - // fairly stale entries. - if entry.used_at.load(Ordering::Relaxed) == minimum { - state.ids.remove(id, &guard); - to_remove -= 1; + if state.ids.len() > (state.max_capacity * 95 / 100) { + let mut to_remove = std::cmp::max(state.ids.len() / 100, 1); + let guard = state.ids.guard(); + for (id, entry) in state.ids.iter(&guard) { + if to_remove > 0 { + // Only remove with the minimum epoch. This hopefully means that we will remove + // fairly stale entries. + if entry.used_at.load(Ordering::Relaxed) == minimum { + state.ids.remove(id, &guard); + to_remove -= 1; + } + } else { + break; } - } else { - break; } } + + // Prune the peer list of any entries that no longer have a corresponding `id` entry. + // + // This ensures that the peer list is naturally bounded in size by the size of the `id` + // set, and relies on precisely the same mechanisms for eviction. + { + let ids = state.ids.pin(); + state + .peers + .pin() + .retain(|_, entry| ids.contains_key(entry.secret.id())); + } + + // Iteration order should be effectively random, so this effectively just prunes the list + // periodically. 5000 is chosen arbitrarily to make sure this isn't a memory leak. Note + // that peers the application is actively interested in will typically bypass this list, so + // this is mostly a risk of delaying regular re-handshaking with very large cardinalities. + // + // FIXME: Long or mid-term it likely makes sense to replace this data structure with a + // fuzzy set of some kind and/or just moving to immediate background handshake attempts. + let mut count = 0; + state.requested_handshakes.pin().retain(|_| { + count += 1; + count < 5000 + }); } fn epoch(&self) -> u64 { @@ -235,6 +258,16 @@ impl Cleaner { const EVICTION_CYCLES: u64 = if cfg!(test) { 0 } else { 10 }; +impl State { + fn request_handshake(&self, peer: SocketAddr) { + // The length is reset as part of cleanup to 5000. + let handshakes = self.requested_handshakes.pin(); + if handshakes.len() <= 6000 { + handshakes.insert(peer); + } + } +} + impl Map { pub fn new(signer: stateless_reset::Signer) -> Self { // FIXME: Avoid unwrap and the whole socket. @@ -385,7 +418,7 @@ impl Map { // FIXME: More actively schedule a new handshake. // See comment on requested_handshakes for details. - self.state.requested_handshakes.pin().insert(state.peer); + self.state.request_handshake(state.peer); } pub fn handle_control_packet(&self, packet: &control::Packet) { @@ -433,7 +466,7 @@ impl Map { // // Handshaking will be rate limited per destination peer (and at least // de-duplicated). - self.state.requested_handshakes.pin().insert(state.peer); + self.state.request_handshake(state.peer); } control::Packet::UnknownPathSecret(_) => unreachable!(), } diff --git a/dc/s2n-quic-dc/src/path/secret/map/test.rs b/dc/s2n-quic-dc/src/path/secret/map/test.rs index 1f5e47fa5..5fda426f8 100644 --- a/dc/s2n-quic-dc/src/path/secret/map/test.rs +++ b/dc/s2n-quic-dc/src/path/secret/map/test.rs @@ -154,7 +154,12 @@ impl Model { let ids = state.state.ids.guard(); self.invariants.retain(|invariant| { if let Invariant::ContainsId(id) = invariant { - if state.state.ids.get(id, &ids).unwrap().retired.retired() { + if state + .state + .ids + .get(id, &ids) + .map_or(true, |v| v.retired.retired()) + { invalidated.push(*id); return false; } @@ -191,12 +196,18 @@ impl Model { let peers = state.peers.guard(); let ids = state.ids.guard(); for invariant in self.invariants.iter() { + // We avoid assertions for contains() if we're running the small capacity test, since + // they are likely broken -- we semi-randomly evict peers in that case. match invariant { Invariant::ContainsIp(ip) => { - assert!(state.peers.contains_key(ip, &peers), "{:?}", ip); + if state.max_capacity != 5 { + assert!(state.peers.contains_key(ip, &peers), "{:?}", ip); + } } Invariant::ContainsId(id) => { - assert!(state.ids.contains_key(id, &ids), "{:?}", id); + if state.max_capacity != 5 { + assert!(state.ids.contains_key(id, &ids), "{:?}", id); + } } Invariant::IdRemoved(id) => { assert!( @@ -207,6 +218,16 @@ impl Model { } } } + + // All entries in the peer set should also be in the `ids` set (which is actively garbage + // collected). + for (_, entry) in state.peers.iter(&peers) { + assert!( + state.ids.contains_key(entry.secret.id(), &ids), + "{:?} not present in IDs", + entry.secret.id() + ); + } } } @@ -236,7 +257,36 @@ fn has_duplicate_pids(ops: &[Operation]) -> bool { fn check_invariants() { bolero::check!() .with_type::>() - .with_iterations(100_000) + .with_iterations(10_000) + .for_each(|input: &Vec| { + if has_duplicate_pids(input) { + // Ignore this attempt. + return; + } + + let mut model = Model::default(); + let signer = stateless_reset::Signer::new(b"secret"); + let mut map = Map::new(signer); + + // Avoid background work interfering with testing. + map.state.cleaner.stop(); + + Arc::get_mut(&mut map.state).unwrap().max_capacity = 5; + + model.check_invariants(&map.state); + + for op in input { + model.perform(*op, &map); + model.check_invariants(&map.state); + } + }) +} + +#[test] +fn check_invariants_no_overflow() { + bolero::check!() + .with_type::>() + .with_iterations(10_000) .for_each(|input: &Vec| { if has_duplicate_pids(input) { // Ignore this attempt.