Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UDP/QUIC/Http3 quiche::h3 Client/Connector integration #524

Open
wants to merge 53 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
bc26ff1
add UdpSocket, Quic listener and basic Quic connection
hargut Jan 2, 2025
f3a40f0
add HTTP3 server session, Quic TLS handshake preparations
hargut Jan 2, 2025
1ed9ff7
import helpers from cloudflare/quiche@0570ab83
hargut Jan 3, 2025
7e5d62b
add initial Quic TLS handshake and connection state transition logic
hargut Jan 3, 2025
7912480
import helpers from cloudflare/quiche@0570ab83
hargut Jan 4, 2025
af61e2f
add Quic ConnectionTx task, HTTP3 handshake and HTTP3 session
hargut Jan 4, 2025
dfcbebd
successful curl HTTP3 requests/responses
hargut Jan 5, 2025
5cc981a
detect socket settings during Quic listener creation
hargut Jan 5, 2025
dcbaebe
H3 session housekeeping, stream capacity enhancements
hargut Jan 6, 2025
63d1ff9
H3 connection shutdown, goaway &
hargut Jan 6, 2025
20bacd4
IO enhancements & handshake fixes
hargut Jan 8, 2025
de6dcc0
enhance TLS handshake robustness & timeouts
hargut Jan 8, 2025
26cb062
fix send issue/packet creation
hargut Jan 9, 2025
6f7a0ce
use VecDeque for connection & session drop
hargut Jan 13, 2025
c3d44d4
send on end and capacity required
hargut Jan 14, 2025
8f5dd4e
remove mutex on listener connections
hargut Jan 14, 2025
1d43682
move tls_handshake to protocols::tls::quic
hargut Jan 14, 2025
a729986
msrv 1.72 changes
hargut Jan 14, 2025
3f062bc
cargo fmt & clippy
hargut Jan 14, 2025
9976d7a
revert changes that are no longer required
hargut Jan 14, 2025
cb93a55
add Quic/Http3 test using quiche/h3i
hargut Jan 15, 2025
e95cdc7
add timeout test, fix timeout handling
hargut Jan 15, 2025
5f28c37
refactor Quic & Http3 config handling, enable user provided configs
hargut Jan 15, 2025
28773f5
allow building using rustls & quic-boringssl features
hargut Jan 15, 2025
ee077b8
bump MSRV in workflow to 1.74
hargut Jan 16, 2025
3d368f6
fix tx loop continue write reset
hargut Jan 16, 2025
0429df8
impl Ssl for Quic connection, extend ALPN
hargut Jan 17, 2025
4ddd927
connector UDP socket creation, layout connector implementation
hargut Jan 20, 2025
fd29b3f
initial Connector/Listener successful handshake
hargut Jan 20, 2025
5584baf
initial Connector HTTP3 integration, successful request
hargut Jan 22, 2025
49aec00
setup close watch channel for idle connections
hargut Jan 23, 2025
8cb7ec4
enhance UDP/Http3 selection
hargut Jan 23, 2025
8b7d913
add connector tests, release streams on H3 session drop
hargut Jan 24, 2025
6c492e6
consolidate server/client functionality
hargut Jan 24, 2025
a39ef3f
unify ConnectionIo for server & client
hargut Jan 24, 2025
26cadf4
add connection timeout for Http3Poll task
hargut Jan 24, 2025
6230071
unify timeout & error handling on H3 connection
hargut Jan 25, 2025
92ded9b
doc, license headers, fmt & clippy
hargut Jan 25, 2025
531abdb
add read timeout to response headers & trailers
hargut Jan 25, 2025
7b2f62c
fix current_streams AtomicUnit overflow
hargut Jan 30, 2025
c487dca
fix race data/timeout issue during high load
hargut Jan 30, 2025
b33c182
add testcase and histograms
hargut Jan 30, 2025
f0a3125
fix handshake establishing locking
hargut Jan 30, 2025
ca15910
logging, cleanups, comments
hargut Jan 30, 2025
48cd7bc
correctly close server connections, avoid tx_notify misses
hargut Jan 31, 2025
0d601d6
notify send on timeout, stop ConnectionTx task on connection is_draining
hargut Jan 31, 2025
1dd78d1
run session housekeeping before polling h3 connection
hargut Jan 31, 2025
c18fe34
add a housekeeping tick, to avoid stuck sessions
hargut Feb 3, 2025
57a3a60
move TransportConnectors TLS context into Options to avoid duplicated…
hargut Feb 3, 2025
7de17ea
Merge remote-tracking branch 'origin/main' into feat/udp-quic-http3-q…
hargut Feb 3, 2025
5785b86
merge fixes & update license headers for 2025
hargut Feb 3, 2025
5501789
separate test listener ports
hargut Feb 4, 2025
537792b
register rx & tx notify directly after signals
hargut Feb 4, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ jobs:
strategy:
matrix:
# nightly, MSRV, and latest stable
toolchain: [nightly, 1.72, 1.82.0]
toolchain: [nightly, 1.74, 1.82.0]
runs-on: ubuntu-latest
# Only run on "pull_request" event for external PRs. This is to avoid
# duplicate builds for PRs created from internal branches.
Expand Down
10 changes: 8 additions & 2 deletions pingora-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ zstd = "0"
httpdate = "1"
x509-parser = { version = "0.16.0", optional = true }
ouroboros = { version = "0.18.4", optional = true }
quiche = { git = 'https://github.com/cloudflare/quiche.git', rev = "5d2031ca", default-features = false, optional = true }
ring = { version = "0.17.8", optional = true }

[target.'cfg(unix)'.dependencies]
daemonize = "0.5.0"
Expand All @@ -83,17 +85,21 @@ reqwest = { version = "0.11", features = [
], default-features = false }
hyper = "0.14"
rstest = "0.23.0"
h3i = { git = 'https://github.com/cloudflare/quiche.git', rev = "5d2031ca" }

[target.'cfg(unix)'.dev-dependencies]
hyperlocal = "0.8"
jemallocator = "0.5"
histogram = "0.11"
textplots = "0.8"

[features]
default = []
default = ["boringssl"]
openssl = ["pingora-openssl", "openssl_derived"]
boringssl = ["pingora-boringssl", "openssl_derived"]
boringssl = ["pingora-boringssl", "openssl_derived", "quic-boringssl"]
rustls = ["pingora-rustls", "any_tls", "dep:x509-parser", "ouroboros"]
patched_http1 = ["pingora-http/patched_http1"]
openssl_derived = ["any_tls"]
any_tls = []
sentry = ["dep:sentry"]
quic-boringssl = ["dep:quiche", "dep:ring", "pingora-boringssl", "quiche/boringssl-boring-crate"]
109 changes: 98 additions & 11 deletions pingora-core/src/apps/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ use log::{debug, error};
use std::future::poll_fn;
use std::sync::Arc;

use crate::protocols::http::v2::server;
use crate::protocols::http::ServerSession;
use crate::protocols::http::v2::server as h2_server;
use crate::protocols::http::v3::server as h3_server;
use crate::protocols::http::{HttpVersion, ServerSession};
use crate::protocols::Digest;
use crate::protocols::Stream;
use crate::protocols::ALPN;
Expand Down Expand Up @@ -61,8 +62,8 @@ pub trait ServerApp {
#[derive(Default)]
/// HTTP Server options that control how the server handles some transport types.
pub struct HttpServerOptions {
/// Use HTTP/2 for plaintext.
pub h2c: bool,
/// HTTP version to use.
pub http_version: HttpVersion,
}

/// This trait defines the interface of an HTTP application.
Expand All @@ -84,7 +85,15 @@ pub trait HttpServerApp {
/// every time a new HTTP/2 **connection** needs to be established.
///
/// A `None` means to use the built-in default options. See [`server::H2Options`] for more details.
fn h2_options(&self) -> Option<server::H2Options> {
fn h2_options(&self) -> Option<h2_server::H2Options> {
None
}

/// Provide options on how HTTP/3 connection should be established. This function will be called
/// every time a new HTTP/3 **connection** needs to be established.
///
/// A `None` means to use the built-in default options. See [`server::H2Options`] for more details.
fn h3_options(&self) -> Option<&h3_server::Http3Options> {
None
}

Expand All @@ -109,10 +118,17 @@ where
mut stream: Stream,
shutdown: &ShutdownWatch,
) -> Option<Stream> {
let mut h2c = self.server_options().as_ref().map_or(false, |o| o.h2c);
let mut http_version = self
.server_options()
.as_ref()
.map_or(HttpVersion::V1, |o| o.http_version);

if stream.quic_connection_state().is_some() {
http_version = HttpVersion::V3;
}

// try to read h2 preface
if h2c {
if matches!(http_version, HttpVersion::V2) {
let mut buf = [0u8; H2_PREFACE.len()];
let peeked = stream
.try_peek(&mut buf)
Expand All @@ -126,10 +142,30 @@ where
// not all streams support peeking
if peeked {
// turn off h2c (use h1) if h2 preface doesn't exist
h2c = buf == H2_PREFACE;
http_version = match buf == H2_PREFACE {
true => HttpVersion::V2,
false => HttpVersion::V1,
};
}
}
if h2c || matches!(stream.selected_alpn_proto(), Some(ALPN::H2)) {

// TODO: logic for Http3 to Http2/1 fallback. Requires Http2/1 listener being present.
if matches!(http_version, HttpVersion::V3)
&& (matches!(stream.selected_alpn_proto(), Some(ALPN::H1))
|| matches!(stream.selected_alpn_proto(), Some(ALPN::H2))
|| matches!(stream.selected_alpn_proto(), Some(ALPN::H2H1)))
{
error!(
"Server is configured for {:?}. Received ALPN: {}. \
Fallback from Http3 to Http2/1 is currently not supported.",
http_version,
stream.selected_alpn_proto().unwrap()
)
}

if matches!(http_version, HttpVersion::V2)
|| matches!(stream.selected_alpn_proto(), Some(ALPN::H2))
{
// create a shared connection digest
let digest = Arc::new(Digest {
ssl_digest: stream.get_ssl_digest(),
Expand All @@ -140,7 +176,7 @@ where
});

let h2_options = self.h2_options();
let h2_conn = server::handshake(stream, h2_options).await;
let h2_conn = h2_server::handshake(stream, h2_options).await;
let mut h2_conn = match h2_conn {
Err(e) => {
error!("H2 handshake error {e}");
Expand All @@ -160,7 +196,7 @@ where
.await.map_err(|e| error!("H2 error waiting for shutdown {e}"));
return None;
}
h2_stream = server::HttpSession::from_h2_conn(&mut h2_conn, digest.clone()) => h2_stream
h2_stream = h2_server::HttpSession::from_h2_conn(&mut h2_conn, digest.clone()) => h2_stream
};
let h2_stream = match h2_stream {
Err(e) => {
Expand All @@ -178,6 +214,57 @@ where
.await;
});
}
} else if matches!(http_version, HttpVersion::V3) {
// create a shared connection digest
let digest = Arc::new(Digest {
ssl_digest: stream.get_ssl_digest(),
// TODO: log h3 handshake time
timing_digest: stream.get_timing_digest(),
proxy_digest: stream.get_proxy_digest(),
socket_digest: stream.get_socket_digest(),
});

let h3_options = self.h3_options();
let h3_conn = h3_server::handshake(stream, h3_options).await;
let mut h3_conn = match h3_conn {
Err(e) => {
error!("H3 handshake error {e}");
return None;
}
Ok(c) => c,
};

let mut shutdown = shutdown.clone();
loop {
// this loop ends when the client decides to close the h3 conn
let h3_stream = tokio::select! {
_ = shutdown.changed() => {
match h3_conn.graceful_shutdown().await {
Ok(()) => {}
Err(e) => { error!("H3 error waiting for shutdown {e}") }
};
return None;
}
h3_stream = h3_server::Http3Session::from_h3_conn(&mut h3_conn, digest.clone()) => h3_stream
};

let h3_stream = match h3_stream {
Err(e) => {
// It is common for the client to just disconnect TCP without properly
// closing H2. So we don't log the errors here
debug!("H3 error when accepting new stream {e}");
return None;
}
Ok(s) => s?, // None means the connection is ready to be closed
};

let app = self.clone();
let shutdown = shutdown.clone();
pingora_runtime::current_handle().spawn(async move {
app.process_new_http(ServerSession::new_http3(h3_stream), &shutdown)
.await;
});
}
} else {
// No ALPN or ALPN::H1 and h2c was not configured, fallback to HTTP/1.1
self.process_new_http(ServerSession::new_http1(stream), shutdown)
Expand Down
82 changes: 72 additions & 10 deletions pingora-core/src/connectors/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,30 @@

use crate::connectors::ConnectorOptions;
use crate::protocols::http::client::HttpSession;
use crate::protocols::{UniqueID, UniqueIDType};
use crate::upstreams::peer::Peer;
use parking_lot::RwLock;
use pingora_error::Result;
use pingora_pool::PoolNode;
use std::collections::HashMap;
use std::time::Duration;

pub mod v1;
pub mod v2;
pub mod v3;

pub struct Connector {
h1: v1::Connector,
h2: v2::Connector,
h3: v3::Connector,
}

impl Connector {
pub fn new(options: Option<ConnectorOptions>) -> Self {
Connector {
h1: v1::Connector::new(options.clone()),
h2: v2::Connector::new(options),
h2: v2::Connector::new(options.clone()),
h3: v3::Connector::new(options),
}
}

Expand All @@ -50,7 +57,15 @@ impl Connector {
let h1_only = peer
.get_peer_options()
.map_or(true, |o| o.alpn.get_max_http_version() == 1);
if h1_only {

if peer.udp_http3() {
if let Some(h3) = self.h3.reused_http_session(peer).await? {
Ok((HttpSession::H3(h3), true))
} else {
let session = self.h3.new_http_session(peer).await?;
Ok((session, false))
}
} else if h1_only {
let (h1, reused) = self.h1.get_http_session(peer).await?;
Ok((HttpSession::H1(h1), reused))
} else {
Expand Down Expand Up @@ -85,6 +100,7 @@ impl Connector {
match session {
HttpSession::H1(h1) => self.h1.release_http_session(h1, peer, idle_timeout).await,
HttpSession::H2(h2) => self.h2.release_http_session(h2, peer, idle_timeout),
HttpSession::H3(h3) => self.h3.release_http_session(h3, peer, idle_timeout),
}
}

Expand All @@ -94,6 +110,52 @@ impl Connector {
}
}

pub(crate) struct InUsePool<T: UniqueID> {
// TODO: use pingora hashmap to shard the lock contention
pools: RwLock<HashMap<u64, PoolNode<T>>>,
}

impl<T: UniqueID> InUsePool<T> {
pub(crate) fn new() -> Self {
InUsePool {
pools: RwLock::new(HashMap::new()),
}
}
pub(crate) fn insert(&self, reuse_hash: u64, conn: T) {
{
let pools = self.pools.read();
if let Some(pool) = pools.get(&reuse_hash) {
pool.insert(conn.id(), conn);
return;
}
} // drop read lock

let pool = PoolNode::new();
pool.insert(conn.id(), conn);
let mut pools = self.pools.write();
pools.insert(reuse_hash, pool);
}

// retrieve a `<T>` to create a new stream
// the caller should return the <T> to this pool if there is still
// capacity left for more streams
pub(crate) fn get(&self, reuse_hash: u64) -> Option<T> {
let pools = self.pools.read();
pools.get(&reuse_hash)?.get_any().map(|v| v.1)
}

// release a http stream, this functional will cause an `<T>` to be returned (if exist)
// the caller should update the ref and then decide where to put it (in use pool or idle)
pub(crate) fn release(&self, reuse_hash: u64, id: UniqueIDType) -> Option<T> {
let pools = self.pools.read();
if let Some(pool) = pools.get(&reuse_hash) {
pool.remove(id)
} else {
None
}
}
}

#[cfg(test)]
#[cfg(feature = "any_tls")]
mod tests {
Expand Down Expand Up @@ -121,8 +183,8 @@ mod tests {
let (h2, reused) = connector.get_http_session(&peer).await.unwrap();
assert!(!reused);
match &h2 {
HttpSession::H1(_) => panic!("expect h2"),
HttpSession::H2(h2_stream) => assert!(!h2_stream.ping_timedout()),
_ => panic!("expect h2"),
}

connector.release_http_session(h2, &peer, None).await;
Expand All @@ -131,8 +193,8 @@ mod tests {
// reused this time
assert!(reused);
match &h2 {
HttpSession::H1(_) => panic!("expect h2"),
HttpSession::H2(h2_stream) => assert!(!h2_stream.ping_timedout()),
_ => panic!("expect h2"),
}
}

Expand All @@ -147,7 +209,7 @@ mod tests {
HttpSession::H1(http) => {
get_http(http, 200).await;
}
HttpSession::H2(_) => panic!("expect h1"),
_ => panic!("expect h1"),
}
connector.release_http_session(h1, &peer, None).await;

Expand All @@ -156,7 +218,7 @@ mod tests {
assert!(reused);
match &mut h1 {
HttpSession::H1(_) => {}
HttpSession::H2(_) => panic!("expect h1"),
_ => panic!("expect h1"),
}
}

Expand All @@ -177,7 +239,7 @@ mod tests {
HttpSession::H1(http) => {
get_http(http, 200).await;
}
HttpSession::H2(_) => panic!("expect h1"),
_ => panic!("expect h1"),
}
connector.release_http_session(h1, &peer, None).await;

Expand All @@ -189,7 +251,7 @@ mod tests {
assert!(reused);
match &mut h1 {
HttpSession::H1(_) => {}
HttpSession::H2(_) => panic!("expect h1"),
_ => panic!("expect h1"),
}
}

Expand All @@ -206,7 +268,7 @@ mod tests {
HttpSession::H1(http) => {
get_http(http, 200).await;
}
HttpSession::H2(_) => panic!("expect h1"),
_ => panic!("expect h1"),
}
connector.release_http_session(h1, &peer, None).await;

Expand All @@ -216,7 +278,7 @@ mod tests {
assert!(reused);
match &mut h1 {
HttpSession::H1(_) => {}
HttpSession::H2(_) => panic!("expect h1"),
_ => panic!("expect h1"),
}
}
}
Loading
Loading