1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281
//! Code that decides when workers should go to sleep. See README.md
//! for an overview.
use log::Event::*;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Condvar, Mutex};
use std::thread;
use std::usize;
pub(super) struct Sleep {
state: AtomicUsize,
data: Mutex<()>,
tickle: Condvar,
}
const AWAKE: usize = 0;
const SLEEPING: usize = 1;
const ROUNDS_UNTIL_SLEEPY: usize = 32;
const ROUNDS_UNTIL_ASLEEP: usize = 64;
impl Sleep {
pub(super) fn new() -> Sleep {
Sleep {
state: AtomicUsize::new(AWAKE),
data: Mutex::new(()),
tickle: Condvar::new(),
}
}
fn anyone_sleeping(&self, state: usize) -> bool {
state & SLEEPING != 0
}
fn any_worker_is_sleepy(&self, state: usize) -> bool {
(state >> 1) != 0
}
fn worker_is_sleepy(&self, state: usize, worker_index: usize) -> bool {
(state >> 1) == (worker_index + 1)
}
fn with_sleepy_worker(&self, state: usize, worker_index: usize) -> usize {
debug_assert!(state == AWAKE || state == SLEEPING);
((worker_index + 1) << 1) + state
}
#[inline]
pub(super) fn work_found(&self, worker_index: usize, yields: usize) -> usize {
log!(FoundWork {
worker: worker_index,
yields: yields,
});
if yields > ROUNDS_UNTIL_SLEEPY {
// FIXME tickling here is a bit extreme; mostly we want to "release the lock"
// from us being sleepy, we don't necessarily need to wake others
// who are sleeping
self.tickle(worker_index);
}
0
}
#[inline]
pub(super) fn no_work_found(&self, worker_index: usize, yields: usize) -> usize {
log!(DidNotFindWork {
worker: worker_index,
yields: yields,
});
if yields < ROUNDS_UNTIL_SLEEPY {
thread::yield_now();
yields + 1
} else if yields == ROUNDS_UNTIL_SLEEPY {
thread::yield_now();
if self.get_sleepy(worker_index) {
yields + 1
} else {
yields
}
} else if yields < ROUNDS_UNTIL_ASLEEP {
thread::yield_now();
if self.still_sleepy(worker_index) {
yields + 1
} else {
log!(GotInterrupted {
worker: worker_index
});
0
}
} else {
debug_assert_eq!(yields, ROUNDS_UNTIL_ASLEEP);
self.sleep(worker_index);
0
}
}
pub(super) fn tickle(&self, worker_index: usize) {
// As described in README.md, this load must be SeqCst so as to ensure that:
// - if anyone is sleepy or asleep, we *definitely* see that now (and not eventually);
// - if anyone after us becomes sleepy or asleep, they see memory events that
// precede the call to `tickle()`, even though we did not do a write.
let old_state = self.state.load(Ordering::SeqCst);
if old_state != AWAKE {
self.tickle_cold(worker_index);
}
}
#[cold]
fn tickle_cold(&self, worker_index: usize) {
// The `Release` ordering here suffices. The reasoning is that
// the atomic's own natural ordering ensure that any attempt
// to become sleepy/asleep either will come before/after this
// swap. If it comes *after*, then Release is good because we
// want it to see the action that generated this tickle. If it
// comes *before*, then we will see it here (but not other
// memory writes from that thread). If the other worker was
// becoming sleepy, the other writes don't matter. If they
// were were going to sleep, we will acquire lock and hence
// acquire their reads.
let old_state = self.state.swap(AWAKE, Ordering::Release);
log!(Tickle {
worker: worker_index,
old_state: old_state,
});
if self.anyone_sleeping(old_state) {
let _data = self.data.lock().unwrap();
self.tickle.notify_all();
}
}
fn get_sleepy(&self, worker_index: usize) -> bool {
loop {
// Acquire ordering suffices here. If some other worker
// was sleepy but no longer is, we will eventually see
// that, and until then it doesn't hurt to spin.
// Otherwise, we will do a compare-exchange which will
// assert a stronger order and acquire any reads etc that
// we must see.
let state = self.state.load(Ordering::Acquire);
log!(GetSleepy {
worker: worker_index,
state: state,
});
if self.any_worker_is_sleepy(state) {
// somebody else is already sleepy, so we'll just wait our turn
debug_assert!(
!self.worker_is_sleepy(state, worker_index),
"worker {} called `is_sleepy()`, \
but they are already sleepy (state={})",
worker_index,
state
);
return false;
} else {
// make ourselves the sleepy one
let new_state = self.with_sleepy_worker(state, worker_index);
// This must be SeqCst on success because we want to
// ensure:
//
// - That we observe any writes that preceded
// some prior tickle, and that tickle may have only
// done a SeqCst load on `self.state`.
// - That any subsequent tickle *definitely* sees this store.
//
// See the section on "Ensuring Sequentially
// Consistency" in README.md for more details.
//
// The failure ordering doesn't matter since we are
// about to spin around and do a fresh load.
if self
.state
.compare_exchange(state, new_state, Ordering::SeqCst, Ordering::Relaxed)
.is_ok()
{
log!(GotSleepy {
worker: worker_index,
old_state: state,
new_state: new_state,
});
return true;
}
}
}
}
fn still_sleepy(&self, worker_index: usize) -> bool {
let state = self.state.load(Ordering::SeqCst);
self.worker_is_sleepy(state, worker_index)
}
fn sleep(&self, worker_index: usize) {
loop {
// Acquire here suffices. If we observe that the current worker is still
// sleepy, then in fact we know that no writes have occurred, and anyhow
// we are going to do a CAS which will synchronize.
//
// If we observe that the state has changed, it must be
// due to a tickle, and then the Acquire means we also see
// any events that occured before that.
let state = self.state.load(Ordering::Acquire);
if self.worker_is_sleepy(state, worker_index) {
// It is important that we hold the lock when we do
// the CAS. Otherwise, if we were to CAS first, then
// the following sequence of events could occur:
//
// - Thread A (us) sets state to SLEEPING.
// - Thread B sets state to AWAKE.
// - Thread C sets state to SLEEPY(C).
// - Thread C sets state to SLEEPING.
// - Thread A reawakens, acquires lock, and goes to sleep.
//
// Now we missed the wake-up from thread B! But since
// we have the lock when we set the state to sleeping,
// that cannot happen. Note that the swap `tickle()`
// is not part of the lock, though, so let's play that
// out:
//
// # Scenario 1
//
// - A loads state and see SLEEPY(A)
// - B swaps to AWAKE.
// - A locks, fails CAS
//
// # Scenario 2
//
// - A loads state and see SLEEPY(A)
// - A locks, performs CAS
// - B swaps to AWAKE.
// - A waits (releasing lock)
// - B locks, notifies
//
// In general, acquiring the lock inside the loop
// seems like it could lead to bad performance, but
// actually it should be ok. This is because the only
// reason for the `compare_exchange` to fail is if an
// awaken comes, in which case the next cycle around
// the loop will just return.
let data = self.data.lock().unwrap();
// This must be SeqCst on success because we want to
// ensure:
//
// - That we observe any writes that preceded
// some prior tickle, and that tickle may have only
// done a SeqCst load on `self.state`.
// - That any subsequent tickle *definitely* sees this store.
//
// See the section on "Ensuring Sequentially
// Consistency" in README.md for more details.
//
// The failure ordering doesn't matter since we are
// about to spin around and do a fresh load.
if self
.state
.compare_exchange(state, SLEEPING, Ordering::SeqCst, Ordering::Relaxed)
.is_ok()
{
// Don't do this in a loop. If we do it in a loop, we need
// some way to distinguish the ABA scenario where the pool
// was awoken but before we could process it somebody went
// to sleep. Note that if we get a false wakeup it's not a
// problem for us, we'll just loop around and maybe get
// sleepy again.
log!(FellAsleep {
worker: worker_index
});
let _ = self.tickle.wait(data).unwrap();
log!(GotAwoken {
worker: worker_index
});
return;
}
} else {
log!(GotInterrupted {
worker: worker_index
});
return;
}
}
}
}