diff --git a/pingora-core/src/server/mod.rs b/pingora-core/src/server/mod.rs index c056ed273..fdb6be59c 100644 --- a/pingora-core/src/server/mod.rs +++ b/pingora-core/src/server/mod.rs @@ -87,69 +87,91 @@ pub struct Server { impl Server { #[cfg(unix)] - async fn main_loop(&self) -> ShutdownType { + async fn main_loop(&self) -> (ShutdownType, bool) { // waiting for exit signal // TODO: there should be a signal handling function let mut graceful_upgrade_signal = unix::signal(unix::SignalKind::quit()).unwrap(); let mut graceful_terminate_signal = unix::signal(unix::SignalKind::terminate()).unwrap(); let mut fast_shutdown_signal = unix::signal(unix::SignalKind::interrupt()).unwrap(); - tokio::select! { - _ = fast_shutdown_signal.recv() => { - info!("SIGINT received, exiting"); - ShutdownType::Quick - }, - _ = graceful_terminate_signal.recv() => { - // we receive a graceful terminate, all instances are instructed to stop - info!("SIGTERM received, gracefully exiting"); - // graceful shutdown if there are listening sockets - info!("Broadcasting graceful shutdown"); - match self.shutdown_watch.send(true) { - Ok(_) => { info!("Graceful shutdown started!"); } - Err(e) => { - error!("Graceful shutdown broadcast failed: {e}"); - } - } - info!("Broadcast graceful shutdown complete"); - ShutdownType::Graceful - } - _ = graceful_upgrade_signal.recv() => { - // TODO: still need to select! on signals in case a fast shutdown is needed - // aka: move below to another task and only kick it off here - info!("SIGQUIT received, sending socks and gracefully exiting"); - if let Some(fds) = &self.listen_fds { - let fds = fds.lock().await; - info!("Trying to send socks"); - // XXX: this is blocking IO - match fds.send_to_sock( - self.configuration.as_ref().upgrade_sock.as_str()) - { - Ok(_) => {info!("listener sockets sent");}, - Err(e) => { - error!("Unable to send listener sockets to new process: {e}"); - // sentry log error on fd send failure - #[cfg(all(not(debug_assertions), feature = "sentry"))] - sentry::capture_error(&e); - } - } - sleep(Duration::from_secs(CLOSE_TIMEOUT)).await; + let mut reload_signal = unix::signal(unix::SignalKind::hangup()).unwrap(); + + loop { + tokio::select! { + _ = fast_shutdown_signal.recv() => { + info!("SIGINT received, exiting"); + return (ShutdownType::Quick, false) + }, + _ = graceful_terminate_signal.recv() => { + // we receive a graceful terminate, all instances are instructed to stop + info!("SIGTERM received, gracefully exiting"); + // graceful shutdown if there are listening sockets info!("Broadcasting graceful shutdown"); - // gracefully exiting match self.shutdown_watch.send(true) { Ok(_) => { info!("Graceful shutdown started!"); } Err(e) => { error!("Graceful shutdown broadcast failed: {e}"); - // switch to fast shutdown - return ShutdownType::Graceful; } } info!("Broadcast graceful shutdown complete"); - ShutdownType::Graceful - } else { - info!("No socks to send, shutting down."); - ShutdownType::Graceful + return (ShutdownType::Graceful, false) + } + _ = graceful_upgrade_signal.recv() => { + info!("SIGQUIT received, sending socks and gracefully exiting"); + self.handle_gracefull_upgrade_signal(false).await; + return (ShutdownType::Graceful, false) + }, + _ = reload_signal.recv() => { + info!("SIGHUP received, sending socks and gracefully reloading"); + // ensure that sending socks is successful before exiting + if !self.handle_gracefull_upgrade_signal(true).await { + continue; + } + return (ShutdownType::Graceful, true) } - }, + } + } + } + + async fn handle_gracefull_upgrade_signal(&self, reload: bool) -> bool { + // TODO: still need to select! on signals in case a fast shutdown is needed + // aka: move below to another task and only kick it off here + if let Some(fds) = &self.listen_fds { + let fds = fds.lock().await; + info!("Trying to send socks"); + // XXX: this is blocking IO + match fds.send_to_sock(self.configuration.as_ref().upgrade_sock.as_str()) { + Ok(_) => { + info!("listener sockets sent"); + } + Err(e) => { + error!("Unable to send listener sockets to new process: {e}"); + // sentry log error on fd send failure + #[cfg(all(not(debug_assertions), feature = "sentry"))] + sentry::capture_error(&e); + + if reload { + return false; + } + } + } + sleep(Duration::from_secs(CLOSE_TIMEOUT)).await; + info!("Broadcasting graceful shutdown"); + // gracefully exiting + match self.shutdown_watch.send(true) { + Ok(_) => { + info!("Graceful shutdown started!"); + } + Err(e) => { + error!("Graceful shutdown broadcast failed: {e}"); + // switch to fast shutdown + return true; + } + } + info!("Broadcast graceful shutdown complete"); + } else { + info!("No socks to send, shutting down."); } + true } fn run_service( @@ -277,6 +299,22 @@ impl Server { /// When trying to zero downtime upgrade from an older version of the server which is already /// running, this function will try to get all its listening sockets in order to take them over. pub fn bootstrap(&mut self) { + match self.try_bootstrap() { + Ok(true) => { + std::process::exit(0); + } + Ok(false) => {} + Err(_) => { + std::process::exit(1); + } + } + } + + /// Prepare the server to start + /// + /// When trying to zero downtime upgrade from an older version of the server which is already + /// running, this function will try to get all its listening sockets in order to take them over. + pub fn try_bootstrap(&mut self) -> Result { info!("Bootstrap starting"); debug!("{:#?}", self.options); @@ -286,7 +324,7 @@ impl Server { if self.options.as_ref().map_or(false, |o| o.test) { info!("Server Test passed, exiting"); - std::process::exit(0); + return Ok(true); } // load fds @@ -294,6 +332,7 @@ impl Server { match self.load_fds(self.options.as_ref().map_or(false, |o| o.upgrade)) { Ok(_) => { info!("Bootstrap done"); + Ok(false) } Err(e) => { // sentry log error on fd load failure @@ -301,7 +340,10 @@ impl Server { sentry::capture_error(&e); error!("Bootstrap failed on error: {:?}, exiting.", e); - std::process::exit(1); + Err(Error::explain( + ErrorType::Custom("BootstrapFdLoadError"), + e.desc(), + )) } } } @@ -313,13 +355,26 @@ impl Server { /// /// Note: this function may fork the process for daemonization, so any additional threads created /// before this function will be lost to any service logic once this function is called. - pub fn run_forever(mut self) -> ! { + pub fn run_forever(self) -> ! { + let daemon = self.configuration.daemon; + self.run_server(daemon).unwrap(); + std::process::exit(0) + } + + /// Start the server + /// + /// This function will block forever until the server needs to quit or reload. So this would be the last + /// function to call for this object. + /// + /// Note: this function may fork the process for daemonization, so any additional threads created + /// before this function will be lost to any service logic once this function is called. + pub fn run_server(mut self, enable_daemon: bool) -> Result { info!("Server starting"); let conf = self.configuration.as_ref(); #[cfg(unix)] - if conf.daemon { + if enable_daemon { info!("Daemonizing the server"); fast_timeout::pause_for_fork(); daemonize(&self.configuration); @@ -354,9 +409,9 @@ impl Server { // Only work steal runtime can use block_on() let server_runtime = Server::create_runtime("Server", 1, true); #[cfg(unix)] - let shutdown_type = server_runtime.get_handle().block_on(self.main_loop()); + let (shutdown_type, reload) = server_runtime.get_handle().block_on(self.main_loop()); #[cfg(windows)] - let shutdown_type = ShutdownType::Graceful; + let (shutdown_type, reload) = (ShutdownType::Graceful, false); if matches!(shutdown_type, ShutdownType::Graceful) { let exit_timeout = self @@ -395,7 +450,7 @@ impl Server { } } info!("All runtimes exited, exiting now"); - std::process::exit(0) + Ok(reload) } fn create_runtime(name: &str, threads: usize, work_steal: bool) -> Runtime { diff --git a/pingora/examples/app/proxy.rs b/pingora/examples/app/proxy.rs index 4ac0aaea9..617e5ce51 100644 --- a/pingora/examples/app/proxy.rs +++ b/pingora/examples/app/proxy.rs @@ -36,6 +36,7 @@ enum DuplexEvent { } impl ProxyApp { + #[allow(dead_code)] pub fn new(proxy_to: BasicPeer) -> Self { ProxyApp { client_connector: TransportConnector::new(None), diff --git a/pingora/examples/server_reload.rs b/pingora/examples/server_reload.rs new file mode 100644 index 000000000..dc64efb01 --- /dev/null +++ b/pingora/examples/server_reload.rs @@ -0,0 +1,108 @@ +use log::{error, info}; +use pingora::protocols::TcpKeepalive; +use pingora::server::configuration::Opt; +use pingora::server::Server; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::time::Duration; +use tokio::signal::unix; + +mod app; +mod service; + +pub fn main() { + env_logger::init(); + + let rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(1) + .enable_all() + .build() + .unwrap(); + + let args_opt = Opt::parse_args(); + + rt.block_on(async move { + let mut reload_signal = unix::signal(unix::SignalKind::hangup()).unwrap(); + let upgrade = Arc::new(AtomicBool::new(args_opt.upgrade)); + let conf_filename = args_opt.conf; + + let (server_stop_tx, mut server_stop_rx) = tokio::sync::mpsc::channel::(1); + + loop { + let conf_filename = conf_filename.clone(); + let upgrade = upgrade.clone(); + #[cfg(target_os = "linux")] + let upgrade_for_store = upgrade.clone(); + + let server_stop_tx = server_stop_tx.clone(); + tokio::spawn(async move { + let opt = Opt { + conf: conf_filename, + upgrade: upgrade.load(Ordering::SeqCst), + ..Opt::default() + }; + let opt = Some(opt); + let mut my_server = match Server::new(opt) { + Ok(server) => server, + Err(e) => { + error!("Create server error: {:?}", e); + return; + } + }; + match my_server.try_bootstrap() { + Ok(_) => {} + Err(e) => { + error!("Bootstrap error: {:?}", e); + return; + } + } + + let mut echo_service_http = service::echo::echo_service_http(); + + let mut options = pingora::listeners::TcpSocketOptions::default(); + options.tcp_fastopen = Some(10); + options.tcp_keepalive = Some(TcpKeepalive { + idle: Duration::from_secs(60), + interval: Duration::from_secs(5), + count: 5, + }); + + echo_service_http.add_tcp_with_settings("0.0.0.0:6145", options); + my_server.add_service(echo_service_http); + + let server_task = + tokio::task::spawn_blocking(move || match my_server.run_server(false) { + Ok(reload) => { + info!("Reload: {}", reload); + reload + } + Err(e) => { + error!("Failed to run server: {}", e); + false + } + }); + if !server_task.await.unwrap() { + server_stop_tx.send(true).await.unwrap(); + } + }); + + tokio::select! { + _ = reload_signal.recv() => { + #[cfg(target_os = "linux")] + { + upgrade_for_store.store(true, Ordering::SeqCst); + } + #[cfg(not(target_os = "linux"))] + { + info!("Upgrade is only supported on Linux"); + } + } + _ = server_stop_rx.recv() => { + info!("Server task finished"); + break; + } + } + } + }); + rt.shutdown_background(); +} diff --git a/pingora/examples/service/echo.rs b/pingora/examples/service/echo.rs index af07da217..d234d6494 100644 --- a/pingora/examples/service/echo.rs +++ b/pingora/examples/service/echo.rs @@ -15,6 +15,7 @@ use crate::app::echo::{EchoApp, HttpEchoApp}; use pingora::services::listening::Service; +#[allow(dead_code)] pub fn echo_service() -> Service { Service::new("Echo Service".to_string(), EchoApp) } diff --git a/pingora/examples/service/proxy.rs b/pingora/examples/service/proxy.rs index 49c24ca02..a9391dcb5 100644 --- a/pingora/examples/service/proxy.rs +++ b/pingora/examples/service/proxy.rs @@ -17,6 +17,7 @@ use pingora_core::listeners::Listeners; use pingora_core::services::listening::Service; use pingora_core::upstreams::peer::BasicPeer; +#[allow(dead_code)] pub fn proxy_service(addr: &str, proxy_addr: &str) -> Service { let proxy_to = BasicPeer::new(proxy_addr); @@ -27,6 +28,7 @@ pub fn proxy_service(addr: &str, proxy_addr: &str) -> Service { ) } +#[allow(dead_code)] pub fn proxy_service_tls( addr: &str, proxy_addr: &str,