Skip to content

Commit

Permalink
Merge pull request #219 from Diggsey/server-join
Browse files Browse the repository at this point in the history
Implement "join" method on server
  • Loading branch information
bradfier authored Apr 25, 2021
2 parents 710b1c7 + a72ebf1 commit 08f89c5
Showing 1 changed file with 83 additions and 3 deletions.
86 changes: 83 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ use std::panic::AssertUnwindSafe;
use std::slice::Iter as SliceIter;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::atomic::{Ordering, AtomicUsize};
use std::sync::mpsc;
use std::thread;
use std::fmt;
Expand Down Expand Up @@ -236,10 +237,26 @@ pub fn start_server_with_pool<A, F>(addr: A, pool_size: Option<usize>, handler:
panic!("The server socket closed unexpectedly")
}

struct AtomicCounter(Arc<AtomicUsize>);

impl AtomicCounter {
fn new(count: &Arc<AtomicUsize>) -> Self {
count.fetch_add(1, Ordering::Relaxed);
AtomicCounter(Arc::clone(count))
}
}

impl Drop for AtomicCounter {
fn drop(&mut self) {
self.0.fetch_sub(1, Ordering::Release);
}
}

/// Executes a function in either a thread of a thread pool
enum Executor {
Threaded,
Threaded {
count: Arc<AtomicUsize>,
},
Pooled {
pool: threadpool::ThreadPool,
}
Expand All @@ -254,10 +271,33 @@ impl Executor {
#[inline]
fn execute<F: FnOnce() + Send + 'static>(&self, f: F) {
match *self {
Executor::Threaded => { thread::spawn(f); }
Executor::Threaded { ref count } => {
let counter = AtomicCounter::new(count);
thread::spawn(move || {
let _counter = counter;
f()
});
}
Executor::Pooled { ref pool } => { pool.execute(f); }
}
}

fn join(&self) {
match *self {
Executor::Threaded { ref count } => {
while count.load(Ordering::Acquire) > 0 {
thread::sleep(Duration::from_millis(100));
}
}
Executor::Pooled { ref pool } => { pool.join(); }
}
}
}

impl Default for Executor {
fn default() -> Self {
Executor::Threaded { count: Arc::new(AtomicUsize::new(0)) }
}
}


Expand Down Expand Up @@ -301,7 +341,7 @@ impl<F> Server<F> where F: Send + Sync + 'static + Fn(&Request) -> Response {
let server = try!(tiny_http::Server::http(addr));
Ok(Server {
server,
executor: Executor::Threaded,
executor: Executor::default(),
handler: Arc::new(AssertUnwindSafe(handler)), // TODO: using AssertUnwindSafe here is wrong, but unwind safety has some usability problems in Rust in general
})
}
Expand Down Expand Up @@ -436,6 +476,46 @@ impl<F> Server<F> where F: Send + Sync + 'static + Fn(&Request) -> Response {
}
}

/// Waits for all in-flight requests to be processed. This is useful for implementing a graceful
/// shutdown.
///
/// Note: new connections may still be accepted while we wait, and this function does not guarantee
/// to wait for those new requests. To implement a graceful shutdown or a clean rolling-update,
/// the following approach should be used:
///
/// 1) Stop routing requests to this server. For a rolling update, requests should be routed
/// to the new instance. This logic typically sits outside of your application.
///
/// 2) Drain the queue of incoming connections by calling `poll_timeout` with a short timeout.
///
/// 3) Wait for in-flight requests to be processed by using this method.
///
/// # Example
/// ```no_run
/// # use std::time::Duration;
/// # use rouille::Server;
/// #
/// # let server = Server::new("", |_| unimplemented!()).unwrap();
/// # fn is_stopping() -> bool { unimplemented!() }
///
/// // Accept connections until we receive a SIGTERM
/// while !is_stopping() {
/// server.poll_timeout(Duration::from_millis(100));
/// }
///
/// // We received a SIGTERM, but there still may be some queued connections,
/// // so wait for them to be accepted.
/// println!("Shutting down gracefully...");
/// server.poll_timeout(Duration::from_millis(100));
///
/// // We can expect there to be no more queued connections now, but slow requests
/// // may still be in-flight, so wait for them to finish.
/// server.join();
/// ```
pub fn join(&self) {
self.executor.join();
}

// Internal function, called when we got a request from tiny-http that needs to be processed.
fn process(&self, request: tiny_http::Request) {
// We spawn a thread so that requests are processed in parallel.
Expand Down

0 comments on commit 08f89c5

Please sign in to comment.