Skip to content
This repository has been archived by the owner on Nov 5, 2018. It is now read-only.

Issue #26: Scope stats on running, completed and panicked threads #27

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 132 additions & 0 deletions src/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ use std::ops::DerefMut;
use std::rc::Rc;
use std::thread;
use std::io;
use std::sync::atomic::{AtomicUsize, Ordering};

#[doc(hidden)]
trait FnBox<T> {
Expand Down Expand Up @@ -151,12 +152,20 @@ where
}

pub struct Scope<'a> {
stats: ScopeStats,
/// The list of the deferred functions and thread join jobs.
dtors: RefCell<Option<DtorChain<'a, ()>>>,
// !Send + !Sync
_marker: PhantomData<*const ()>,
}

#[derive(Debug, Default)]
struct ScopeStats {
running: AtomicUsize,
completed: AtomicUsize,
panicked: AtomicUsize
}

struct DtorChain<'a, T> {
dtor: Box<FnBox<T> + 'a>,
next: Option<Box<DtorChain<'a, T>>>,
Expand Down Expand Up @@ -226,6 +235,7 @@ where
F: FnOnce(&Scope<'a>) -> R,
{
let mut scope = Scope {
stats: Default::default(),
dtors: RefCell::new(None),
_marker: PhantomData,
};
Expand Down Expand Up @@ -300,6 +310,24 @@ impl<'a> Scope<'a> {
builder: thread::Builder::new(),
}
}

/// Get the count of currently running threads for the scope
#[inline]
pub fn running_threads(&self) -> usize {
self.stats.running.load(Ordering::SeqCst)
}

/// Get the current total count of completed threads for the scope
#[inline]
pub fn completed_threads(&self) -> usize {
self.stats.completed.load(Ordering::SeqCst)
}

/// Get the current total count of panicked threads for the scope
#[inline]
pub fn panicked_threads(&self) -> usize {
self.stats.panicked.load(Ordering::SeqCst)
}
}

/// Scoped thread configuration. Provides detailed control over the properties and behavior of new
Expand Down Expand Up @@ -334,8 +362,10 @@ impl<'s, 'a: 's> ScopedThreadBuilder<'s, 'a> {
// joined (`JoinState::join()`). Thus there are no data races.
let result = Box::into_raw(Box::<ManuallyDrop<T>>::new(unsafe { mem::uninitialized() })) as usize;

let stats = &self.scope.stats;
let join_handle = try!(unsafe {
builder_spawn_unchecked(self.builder, move || {
let _sentinel = Sentinel::new(stats);
let mut result = Box::from_raw(result as *mut ManuallyDrop<T>);
*result = ManuallyDrop::new(f());
mem::forget(result);
Expand All @@ -362,6 +392,36 @@ impl<'s, 'a: 's> ScopedThreadBuilder<'s, 'a> {
}
}

/// A sentinel for keeping track of a scope's stats
struct Sentinel<'s> {
stats: &'s ScopeStats
}

impl<'s> Sentinel<'s> {
/// Create the sentinel and increase the running count for the scope
#[inline]
fn new(stats: &'s ScopeStats) -> Self {
stats.running.fetch_add(1, Ordering::SeqCst);
Self {
stats
}
}
}

impl<'s> Drop for Sentinel<'s> {
/// Decrease the running count for the scope, if the thread is unwinding a panic, increase the
/// panicked count, else increase the completed count
#[inline]
fn drop(&mut self) {
self.stats.running.fetch_sub(1, Ordering::SeqCst);
if thread::panicking() {
self.stats.panicked.fetch_add(1, Ordering::SeqCst);
} else {
self.stats.completed.fetch_add(1, Ordering::SeqCst);
}
}
}

impl<'a, T: Send + 'a> ScopedJoinHandle<'a, T> {
/// Join the scoped thread, returning the result it produced.
pub fn join(self) -> thread::Result<T> {
Expand All @@ -380,3 +440,75 @@ impl<'a> Drop for Scope<'a> {
self.drop_all()
}
}

#[cfg(test)]
mod test {
use super::*;

use std::sync::{Arc, Barrier};
use std::time::Duration;
use std::thread;

#[test]
fn scope_stats() {
scope(|scope| {
let mut handles = Vec::with_capacity(8);
handles.push(scope.spawn(|| thread::sleep(Duration::from_millis(1))));
handles.push(scope.spawn(|| thread::sleep(Duration::from_millis(1))));
handles.push(scope.spawn(|| thread::sleep(Duration::from_millis(1))));
handles.push(scope.spawn(|| panic!()));
handles.push(scope.spawn(|| thread::sleep(Duration::from_millis(1))));
handles.push(scope.spawn(|| panic!()));
handles.push(scope.spawn(|| thread::sleep(Duration::from_millis(1))));
handles.push(scope.spawn(|| thread::sleep(Duration::from_millis(1))));

for handle in handles {
handle.join();
}

assert_eq!(scope.running_threads(), 0);
assert_eq!(scope.completed_threads(), 6);
assert_eq!(scope.panicked_threads(), 2);
});
}

#[test]
fn scope_stats_running() {
let barrier = Arc::new(Barrier::new(4));
scope(|scope| {
let mut handles = Vec::with_capacity(8);
let thread_barrier = Arc::clone(&barrier);
let handle = scope.spawn(move || {
thread_barrier.wait();
thread_barrier.wait();
});
handles.push(handle);

let thread_barrier = Arc::clone(&barrier);
let handle = scope.spawn(move || {
thread_barrier.wait();
thread_barrier.wait();
});
handles.push(handle);

let thread_barrier = Arc::clone(&barrier);
let handle = scope.spawn(move || {
thread_barrier.wait();
thread_barrier.wait();
});
handles.push(handle);

barrier.wait();
assert_eq!(scope.running_threads(), 3);
barrier.wait();

for handle in handles {
handle.join().unwrap();
}

assert_eq!(scope.running_threads(), 0);
assert_eq!(scope.completed_threads(), 3);
assert_eq!(scope.panicked_threads(), 0);
});
}
}