Bug 1855622 - Upgrade rayon-core to 1.12.0. r=emilio,supply-chain-reviewers

Differential Revision: https://phabricator.services.mozilla.com/D189454
This commit is contained in:
Mike Hommey 2023-09-29 00:12:20 +00:00
Родитель 6734d92ac6
Коммит 63f56660d6
21 изменённых файлов: 312 добавлений и 884 удалений

6
Cargo.lock сгенерированный
Просмотреть файл

@ -4544,12 +4544,12 @@ dependencies = [
[[package]]
name = "rayon-core"
version = "1.11.0"
version = "1.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5ce3fb6ad83f861aac485e76e1985cd109d9a3713802152be56c3b1f0e0658ed"
dependencies = [
"crossbeam-channel",
"crossbeam-deque",
"crossbeam-utils",
"num_cpus",
]
[[package]]

Просмотреть файл

@ -165,9 +165,6 @@ web-sys = { path = "build/rust/dummy-web/web-sys" }
# Overrides to allow easier use of common internal crates.
moz_asserts = { path = "mozglue/static/rust/moz_asserts" }
# Patch rayon core to import https://github.com/rayon-rs/rayon/pull/1063
rayon-core = { path = "third_party/rust/rayon-core" }
# Patch `rure` to disable building the cdylib and staticlib targets
# Cargo has no way to disable building targets your dependencies provide which
# you don't depend on, and linking the cdylib breaks during instrumentation

Просмотреть файл

@ -112,9 +112,6 @@ impl StyleThreadPool {
while let Some(join_handle) = STYLE_THREAD_JOIN_HANDLES.lock().pop() {
let _ = join_handle.join();
}
// Clean up the current thread too.
rayon_core::clean_up_use_current_thread();
}
/// Returns a reference to the thread pool.

Просмотреть файл

@ -177,10 +177,6 @@ notes = "This is a first-party crate which is entirely unrelated to the crates.i
audit-as-crates-io = true
notes = "This is a first-party crate which is also published to crates.io, but we should publish audits for it for the benefit of the ecosystem."
[policy.rayon-core]
audit-as-crates-io = true
notes = "Identical to upstream, with a Mozilla-authored PR, see Cargo.toml comment for details"
[policy.rure]
audit-as-crates-io = true
notes = "Identical to upstream, but with cdylib and staticlib targets disabled to avoid unnecessary build artifacts and linker errors."

Просмотреть файл

@ -1335,6 +1335,11 @@ who = "Brandon Pitman <bran@bran.land>"
criteria = "safe-to-deploy"
delta = "1.10.2 -> 1.11.0"
[[audits.isrg.audits.rayon-core]]
who = "David Cook <dcook@divviup.org>"
criteria = "safe-to-deploy"
delta = "1.11.0 -> 1.12.0"
[[audits.isrg.audits.sha2]]
who = "David Cook <dcook@divviup.org>"
criteria = "safe-to-deploy"

Просмотреть файл

@ -1 +1 @@
{"files":{"Cargo.toml":"253950b73a1610997642b03671361a8846655d5a36ed019e8dd77d069b9914aa","LICENSE-APACHE":"a60eea817514531668d7e00765731449fe14d059d3249e0bc93b36de45f759f2","LICENSE-MIT":"0621878e61f0d0fda054bcbe02df75192c28bde1ecc8289cbd86aeba2dd72720","README.md":"f964d2888146f80d5b0f2b42af405893c0f0187800e406c1f86f81334c08db1a","build.rs":"fa31cb198b772600d100a7c403ddedccef637d2e6b2da431fa7f02ca41307fc6","src/broadcast/mod.rs":"38bf2c713c3e05514f23a5db04d82a7f255eb53863d03eead33a9f5c82f878b7","src/broadcast/test.rs":"c42d5aa6a3d3a53614ac534811f0fe7a347f18912ecfd63d874a281dc215aca4","src/compile_fail/mod.rs":"4d70256295bd64691a8c1994b323559cda1888e85f0b45ca55711541c257dcb6","src/compile_fail/quicksort_race1.rs":"35f498cda38f4eb6e00117f78ed68e0fe5a3fa61c25303d9c08a19bda345bc6c","src/compile_fail/quicksort_race2.rs":"cbb40030c7867cae34bb373b6ec5d97c2ac6de39bc917f47879b30eb423924bc","src/compile_fail/quicksort_race3.rs":"8403643e64c969306b1a9b1243378e6ccdd313b57e1878dbd31393618082fd35","src/compile_fail/rc_return.rs":"197894803d8df58fc8005d90c86b90cd98f1972f1a4b57438516a565df35903f","src/compile_fail/rc_upvar.rs":"42d110429621f407ef0dada1306dab116583d2c782a99894204dd8e0ccd2312f","src/compile_fail/scope_join_bad.rs":"892959949f77cadfc07458473e7a290301182027ca64428df5a8ce887be0892b","src/job.rs":"06de0c2add2e303b6383bf11f5f0d75775c1efe6aa7bc16de3992117f1012f09","src/join/mod.rs":"7638c0fc1da1a2d2b14673c8a2e0f87d26c24232cebee26fd334bdc2caa80886","src/join/test.rs":"157db5306e8df89a8eea19dbba499f26c2f44d9803cb36a796c852a9a695821e","src/latch.rs":"2056effd8b1d71e1df2889c7a163570c975d25fff8404368b0e1554efeeab6c7","src/lib.rs":"9cf9ef2dd473a50bae8304be792a66204d6e9fc1e9de38902bc265643620d043","src/log.rs":"3f901d61125584a50b05892b7e690872bda15be2150b9c0595c6125664f4cf3e","src/private.rs":"152f6d65ce4741616a1dec796b9442f78a018d38bb040f76c4cd85008333a3bb","src/registry.rs":"39c3190ed74e340cc3d4c91838d2761450dd4e473c0059aa424036ccbbbfae65","src/scope/mod.rs":"acf475f32391843bbe297ce1d1e6d37ed951992ca26349fc65941255e31cdeb5","src/scope/test.rs":"d4f068cae4ee4483b41bd3054582d96e74ced46eb57361e7510ef62d4318d340","src/sleep/README.md":"e1ac1a5556cf257f38b7654feb0615c208d9186fefbe52a584d4fe6545d7c373","src/sleep/counters.rs":"2ce3052f05b3b75b1b96c6604fc0dfb6de867426981b37641410c068f92897bd","src/sleep/mod.rs":"94d36b5659657a0a4814228eb0538e41345de11b021e306382f14ebd501884ff","src/spawn/mod.rs":"745494a18fc4901c37ea2f45a1324abf5bd2a4d9c840620956e6633755116d88","src/spawn/test.rs":"a28f8943f28a4cef642b6429c538b1df879c9eb1db9927ce69b97c686bf81173","src/test.rs":"7d0dee06fcf41bddf77449a85cece44133f966a0622a31cf3ed110fbe83e094e","src/thread_pool/mod.rs":"392ad78a209826c4fb7257288dc082ace380220893d44559480045587e279202","src/thread_pool/test.rs":"cf63d45ae1f0e7fd3c6d5e4b2aafa8900338c141e04aba15711b02b4a71febb2","src/unwind.rs":"7baa4511467d008b14856ea8de7ced92b9251c9df4854f69c81f7efc3cf0cd6c","tests/double_init_fail.rs":"8c208ce45e83ab1dfc5890353d5b2f06fc8005684ae622827a65d05abb35a072","tests/init_zero_threads.rs":"5c7f7e0e13e9ead3733253e30d6b52ac5ee66fd6c105999d096bdf31cfccaf95","tests/scope_join.rs":"56f570c4b6a01704aacf93e7f17f89fe0f40f46ed6f9ede517abfe9adaf91f83","tests/scoped_threadpool.rs":"9753467b3de37dd1d19327fe0075d3c4ba768430b97e7aface39627592e9b09a","tests/simple_panic.rs":"916d40d36c1a0fad3e1dfb31550f0672641feab4b03d480f039143dbe2f2445f","tests/stack_overflow_crash.rs":"87b962c66f301ac44f808d992d4e8b861305db0c282f256761a5075c9f018243"},"package":"4b8f95bd6966f5c87776639160a66bd8ab9895d9d4ab01ddba9fc60661aebe8d"}
{"files":{"Cargo.toml":"c25083c4b0fc46e0f63b88b4bc346a1c034698c16ece8f04ce72dd2af9cc7ffb","LICENSE-APACHE":"a60eea817514531668d7e00765731449fe14d059d3249e0bc93b36de45f759f2","LICENSE-MIT":"0621878e61f0d0fda054bcbe02df75192c28bde1ecc8289cbd86aeba2dd72720","README.md":"7281273bea1d5fdc57731513cf9f0e3b911d06ac9905b03a8375a1324951c35b","build.rs":"fa31cb198b772600d100a7c403ddedccef637d2e6b2da431fa7f02ca41307fc6","src/broadcast/mod.rs":"2c9a84e7e6e5e8d8e23e28d6f2703825d7d6af59f0a16bc6125d5f0d25bd7598","src/broadcast/test.rs":"fe50fc868e67d855a9f71e078b0c3a7780e789652abb4b586accb4ccf035e872","src/compile_fail/mod.rs":"4d70256295bd64691a8c1994b323559cda1888e85f0b45ca55711541c257dcb6","src/compile_fail/quicksort_race1.rs":"35f498cda38f4eb6e00117f78ed68e0fe5a3fa61c25303d9c08a19bda345bc6c","src/compile_fail/quicksort_race2.rs":"cbb40030c7867cae34bb373b6ec5d97c2ac6de39bc917f47879b30eb423924bc","src/compile_fail/quicksort_race3.rs":"8403643e64c969306b1a9b1243378e6ccdd313b57e1878dbd31393618082fd35","src/compile_fail/rc_return.rs":"197894803d8df58fc8005d90c86b90cd98f1972f1a4b57438516a565df35903f","src/compile_fail/rc_upvar.rs":"42d110429621f407ef0dada1306dab116583d2c782a99894204dd8e0ccd2312f","src/compile_fail/scope_join_bad.rs":"892959949f77cadfc07458473e7a290301182027ca64428df5a8ce887be0892b","src/job.rs":"06de0c2add2e303b6383bf11f5f0d75775c1efe6aa7bc16de3992117f1012f09","src/join/mod.rs":"7638c0fc1da1a2d2b14673c8a2e0f87d26c24232cebee26fd334bdc2caa80886","src/join/test.rs":"157db5306e8df89a8eea19dbba499f26c2f44d9803cb36a796c852a9a695821e","src/latch.rs":"81da563b29b03455cd22073d243eaed081e953873c14ac202f6605cd3dac09a5","src/lib.rs":"53bb01b167d56c6ace035666b570fff648eedf03a5c8c415ec37136a0ef35697","src/private.rs":"152f6d65ce4741616a1dec796b9442f78a018d38bb040f76c4cd85008333a3bb","src/registry.rs":"c464c4fdb36c85cfe2a10d6196802b036bb76985d737ab9a67d708f908877672","src/scope/mod.rs":"421a5561093928b1d0081d34c2bff78377055d8f6de0689088f52fe476d3a56a","src/scope/test.rs":"d4f068cae4ee4483b41bd3054582d96e74ced46eb57361e7510ef62d4318d340","src/sleep/README.md":"e1ac1a5556cf257f38b7654feb0615c208d9186fefbe52a584d4fe6545d7c373","src/sleep/counters.rs":"e9eccc7d76d17415156c12d30cc7bf89a5c64ca5742965bb4e6c1ce23c2782e9","src/sleep/mod.rs":"23a9116f84653a5f68ab21c910f1dea5314a5332fdc9473a87710974f4b2c717","src/spawn/mod.rs":"745494a18fc4901c37ea2f45a1324abf5bd2a4d9c840620956e6633755116d88","src/spawn/test.rs":"a28f8943f28a4cef642b6429c538b1df879c9eb1db9927ce69b97c686bf81173","src/test.rs":"7d0dee06fcf41bddf77449a85cece44133f966a0622a31cf3ed110fbe83e094e","src/thread_pool/mod.rs":"392ad78a209826c4fb7257288dc082ace380220893d44559480045587e279202","src/thread_pool/test.rs":"657b1938993eb98fb5f3fd1d02a77728e37d0e833390b4ba82926b9107ce3170","src/unwind.rs":"7baa4511467d008b14856ea8de7ced92b9251c9df4854f69c81f7efc3cf0cd6c","tests/double_init_fail.rs":"8c208ce45e83ab1dfc5890353d5b2f06fc8005684ae622827a65d05abb35a072","tests/init_zero_threads.rs":"5c7f7e0e13e9ead3733253e30d6b52ac5ee66fd6c105999d096bdf31cfccaf95","tests/scope_join.rs":"56f570c4b6a01704aacf93e7f17f89fe0f40f46ed6f9ede517abfe9adaf91f83","tests/scoped_threadpool.rs":"24d1293fe65ad5f194bbff9d1ef0486c3440d0a3783f04eaaaae4929adef5cb8","tests/simple_panic.rs":"916d40d36c1a0fad3e1dfb31550f0672641feab4b03d480f039143dbe2f2445f","tests/stack_overflow_crash.rs":"87b962c66f301ac44f808d992d4e8b861305db0c282f256761a5075c9f018243","tests/use_current_thread.rs":"fe1b981e77e422e616c09502731a70fb2f1c023d2386ef32c9d47e5a6f5bc162"},"package":"5ce3fb6ad83f861aac485e76e1985cd109d9a3713802152be56c3b1f0e0658ed"}

12
third_party/rust/rayon-core/Cargo.toml поставляемый
Просмотреть файл

@ -11,9 +11,9 @@
[package]
edition = "2021"
rust-version = "1.59"
rust-version = "1.63"
name = "rayon-core"
version = "1.11.0"
version = "1.12.0"
authors = [
"Niko Matsakis <niko@alum.mit.edu>",
"Josh Stone <cuviper@gmail.com>",
@ -58,8 +58,9 @@ path = "tests/simple_panic.rs"
name = "scoped_threadpool"
path = "tests/scoped_threadpool.rs"
[dependencies.crossbeam-channel]
version = "0.5.0"
[[test]]
name = "use_current_thread"
path = "tests/use_current_thread.rs"
[dependencies.crossbeam-deque]
version = "0.8.1"
@ -67,9 +68,6 @@ version = "0.8.1"
[dependencies.crossbeam-utils]
version = "0.8.0"
[dependencies.num_cpus]
version = "1.2"
[dev-dependencies.rand]
version = "0.8"

2
third_party/rust/rayon-core/README.md поставляемый
Просмотреть файл

@ -8,4 +8,4 @@ Please see [Rayon Docs] for details about using Rayon.
[Rayon Docs]: https://docs.rs/rayon/
Rayon-core currently requires `rustc 1.59.0` or greater.
Rayon-core currently requires `rustc 1.63.0` or greater.

Просмотреть файл

@ -1,7 +1,6 @@
use crate::job::{ArcJob, StackJob};
use crate::latch::LatchRef;
use crate::latch::{CountLatch, LatchRef};
use crate::registry::{Registry, WorkerThread};
use crate::scope::ScopeLatch;
use std::fmt;
use std::marker::PhantomData;
use std::sync::Arc;
@ -107,7 +106,7 @@ where
let n_threads = registry.num_threads();
let current_thread = WorkerThread::current().as_ref();
let latch = ScopeLatch::with_count(n_threads, current_thread);
let latch = CountLatch::with_count(n_threads, current_thread);
let jobs: Vec<_> = (0..n_threads)
.map(|_| StackJob::new(&f, LatchRef::new(&latch)))
.collect();

Просмотреть файл

@ -2,6 +2,7 @@
use crate::ThreadPoolBuilder;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::mpsc::channel;
use std::sync::Arc;
use std::{thread, time};
@ -14,7 +15,7 @@ fn broadcast_global() {
#[test]
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn spawn_broadcast_global() {
let (tx, rx) = crossbeam_channel::unbounded();
let (tx, rx) = channel();
crate::spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap());
let mut v: Vec<_> = rx.into_iter().collect();
@ -33,7 +34,7 @@ fn broadcast_pool() {
#[test]
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn spawn_broadcast_pool() {
let (tx, rx) = crossbeam_channel::unbounded();
let (tx, rx) = channel();
let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
pool.spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap());
@ -53,7 +54,7 @@ fn broadcast_self() {
#[test]
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn spawn_broadcast_self() {
let (tx, rx) = crossbeam_channel::unbounded();
let (tx, rx) = channel();
let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
pool.spawn(|| crate::spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap()));
@ -81,7 +82,7 @@ fn broadcast_mutual() {
#[test]
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn spawn_broadcast_mutual() {
let (tx, rx) = crossbeam_channel::unbounded();
let (tx, rx) = channel();
let pool1 = Arc::new(ThreadPoolBuilder::new().num_threads(3).build().unwrap());
let pool2 = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
pool1.spawn({
@ -118,7 +119,7 @@ fn broadcast_mutual_sleepy() {
#[test]
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn spawn_broadcast_mutual_sleepy() {
let (tx, rx) = crossbeam_channel::unbounded();
let (tx, rx) = channel();
let pool1 = Arc::new(ThreadPoolBuilder::new().num_threads(3).build().unwrap());
let pool2 = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
pool1.spawn({
@ -158,8 +159,8 @@ fn broadcast_panic_one() {
#[test]
#[cfg_attr(not(panic = "unwind"), ignore)]
fn spawn_broadcast_panic_one() {
let (tx, rx) = crossbeam_channel::unbounded();
let (panic_tx, panic_rx) = crossbeam_channel::unbounded();
let (tx, rx) = channel();
let (panic_tx, panic_rx) = channel();
let pool = ThreadPoolBuilder::new()
.num_threads(7)
.panic_handler(move |e| panic_tx.send(e).unwrap())
@ -196,8 +197,8 @@ fn broadcast_panic_many() {
#[test]
#[cfg_attr(not(panic = "unwind"), ignore)]
fn spawn_broadcast_panic_many() {
let (tx, rx) = crossbeam_channel::unbounded();
let (panic_tx, panic_rx) = crossbeam_channel::unbounded();
let (tx, rx) = channel();
let (panic_tx, panic_rx) = channel();
let pool = ThreadPoolBuilder::new()
.num_threads(7)
.panic_handler(move |e| panic_tx.send(e).unwrap())
@ -231,7 +232,7 @@ fn broadcast_sleep_race() {
#[test]
fn broadcast_after_spawn_broadcast() {
let (tx, rx) = crossbeam_channel::unbounded();
let (tx, rx) = channel();
// Queue a non-blocking spawn_broadcast.
crate::spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap());
@ -247,7 +248,7 @@ fn broadcast_after_spawn_broadcast() {
#[test]
fn broadcast_after_spawn() {
let (tx, rx) = crossbeam_channel::bounded(1);
let (tx, rx) = channel();
// Queue a regular spawn on a thread-local deque.
crate::registry::in_worker(move |_, _| {

168
third_party/rust/rayon-core/src/latch.rs поставляемый
Просмотреть файл

@ -84,13 +84,6 @@ impl CoreLatch {
}
}
/// Returns the address of this core latch as an integer. Used
/// for logging.
#[inline]
pub(super) fn addr(&self) -> usize {
self as *const CoreLatch as usize
}
/// Invoked by owning thread as it prepares to sleep. Returns true
/// if the owning thread may proceed to fall asleep, false if the
/// latch was set in the meantime.
@ -142,6 +135,13 @@ impl CoreLatch {
}
}
impl AsCoreLatch for CoreLatch {
#[inline]
fn as_core_latch(&self) -> &CoreLatch {
self
}
}
/// Spin latches are the simplest, most efficient kind, but they do
/// not support a `wait()` operation. They just have a boolean flag
/// that becomes true when `set()` is called.
@ -269,62 +269,32 @@ impl Latch for LockLatch {
}
}
/// Counting latches are used to implement scopes. They track a
/// counter. Unlike other latches, calling `set()` does not
/// necessarily make the latch be considered `set()`; instead, it just
/// decrements the counter. The latch is only "set" (in the sense that
/// `probe()` returns true) once the counter reaches zero.
/// Once latches are used to implement one-time blocking, primarily
/// for the termination flag of the threads in the pool.
///
/// Note: like a `SpinLatch`, count laches are always associated with
/// Note: like a `SpinLatch`, once-latches are always associated with
/// some registry that is probing them, which must be tickled when
/// they are set. *Unlike* a `SpinLatch`, they don't themselves hold a
/// reference to that registry. This is because in some cases the
/// registry owns the count-latch, and that would create a cycle. So a
/// `CountLatch` must be given a reference to its owning registry when
/// registry owns the once-latch, and that would create a cycle. So a
/// `OnceLatch` must be given a reference to its owning registry when
/// it is set. For this reason, it does not implement the `Latch`
/// trait (but it doesn't have to, as it is not used in those generic
/// contexts).
#[derive(Debug)]
pub(super) struct CountLatch {
pub(super) struct OnceLatch {
core_latch: CoreLatch,
counter: AtomicUsize,
}
impl CountLatch {
impl OnceLatch {
#[inline]
pub(super) fn new() -> CountLatch {
Self::with_count(1)
}
#[inline]
pub(super) fn with_count(n: usize) -> CountLatch {
CountLatch {
pub(super) fn new() -> OnceLatch {
Self {
core_latch: CoreLatch::new(),
counter: AtomicUsize::new(n),
}
}
#[inline]
pub(super) fn increment(&self) {
debug_assert!(!self.core_latch.probe());
self.counter.fetch_add(1, Ordering::Relaxed);
}
/// Decrements the latch counter by one. If this is the final
/// count, then the latch is **set**, and calls to `probe()` will
/// return true. Returns whether the latch was set.
#[inline]
pub(super) unsafe fn set(this: *const Self) -> bool {
if (*this).counter.fetch_sub(1, Ordering::SeqCst) == 1 {
CoreLatch::set(&(*this).core_latch);
true
} else {
false
}
}
/// Decrements the latch counter by one and possibly set it. If
/// the latch is set, then the specific worker thread is tickled,
/// Set the latch, then tickle the specific worker thread,
/// which should be the one that owns this latch.
#[inline]
pub(super) unsafe fn set_and_tickle_one(
@ -332,31 +302,81 @@ impl CountLatch {
registry: &Registry,
target_worker_index: usize,
) {
if Self::set(this) {
if CoreLatch::set(&(*this).core_latch) {
registry.notify_worker_latch_is_set(target_worker_index);
}
}
}
impl AsCoreLatch for CountLatch {
impl AsCoreLatch for OnceLatch {
#[inline]
fn as_core_latch(&self) -> &CoreLatch {
&self.core_latch
}
}
/// Counting latches are used to implement scopes. They track a
/// counter. Unlike other latches, calling `set()` does not
/// necessarily make the latch be considered `set()`; instead, it just
/// decrements the counter. The latch is only "set" (in the sense that
/// `probe()` returns true) once the counter reaches zero.
#[derive(Debug)]
pub(super) struct CountLockLatch {
lock_latch: LockLatch,
pub(super) struct CountLatch {
counter: AtomicUsize,
kind: CountLatchKind,
}
impl CountLockLatch {
#[inline]
pub(super) fn with_count(n: usize) -> CountLockLatch {
CountLockLatch {
lock_latch: LockLatch::new(),
counter: AtomicUsize::new(n),
enum CountLatchKind {
/// A latch for scopes created on a rayon thread which will participate in work-
/// stealing while it waits for completion. This thread is not necessarily part
/// of the same registry as the scope itself!
Stealing {
latch: CoreLatch,
/// If a worker thread in registry A calls `in_place_scope` on a ThreadPool
/// with registry B, when a job completes in a thread of registry B, we may
/// need to call `notify_worker_latch_is_set()` to wake the thread in registry A.
/// That means we need a reference to registry A (since at that point we will
/// only have a reference to registry B), so we stash it here.
registry: Arc<Registry>,
/// The index of the worker to wake in `registry`
worker_index: usize,
},
/// A latch for scopes created on a non-rayon thread which will block to wait.
Blocking { latch: LockLatch },
}
impl std::fmt::Debug for CountLatchKind {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
CountLatchKind::Stealing { latch, .. } => {
f.debug_tuple("Stealing").field(latch).finish()
}
CountLatchKind::Blocking { latch, .. } => {
f.debug_tuple("Blocking").field(latch).finish()
}
}
}
}
impl CountLatch {
pub(super) fn new(owner: Option<&WorkerThread>) -> Self {
Self::with_count(1, owner)
}
pub(super) fn with_count(count: usize, owner: Option<&WorkerThread>) -> Self {
Self {
counter: AtomicUsize::new(count),
kind: match owner {
Some(owner) => CountLatchKind::Stealing {
latch: CoreLatch::new(),
registry: Arc::clone(owner.registry()),
worker_index: owner.index(),
},
None => CountLatchKind::Blocking {
latch: LockLatch::new(),
},
},
}
}
@ -366,16 +386,42 @@ impl CountLockLatch {
debug_assert!(old_counter != 0);
}
pub(super) fn wait(&self) {
self.lock_latch.wait();
pub(super) fn wait(&self, owner: Option<&WorkerThread>) {
match &self.kind {
CountLatchKind::Stealing {
latch,
registry,
worker_index,
} => unsafe {
let owner = owner.expect("owner thread");
debug_assert_eq!(registry.id(), owner.registry().id());
debug_assert_eq!(*worker_index, owner.index());
owner.wait_until(latch);
},
CountLatchKind::Blocking { latch } => latch.wait(),
}
}
}
impl Latch for CountLockLatch {
impl Latch for CountLatch {
#[inline]
unsafe fn set(this: *const Self) {
if (*this).counter.fetch_sub(1, Ordering::SeqCst) == 1 {
LockLatch::set(&(*this).lock_latch);
// NOTE: Once we call `set` on the internal `latch`,
// the target may proceed and invalidate `this`!
match (*this).kind {
CountLatchKind::Stealing {
ref latch,
ref registry,
worker_index,
} => {
let registry = Arc::clone(registry);
if CoreLatch::set(latch) {
registry.notify_worker_latch_is_set(worker_index);
}
}
CountLatchKind::Blocking { ref latch } => LockLatch::set(latch),
}
}
}
}

79
third_party/rust/rayon-core/src/lib.rs поставляемый
Просмотреть файл

@ -73,9 +73,8 @@ use std::fmt;
use std::io;
use std::marker::PhantomData;
use std::str::FromStr;
use std::thread;
#[macro_use]
mod log;
#[macro_use]
mod private;
@ -99,10 +98,10 @@ pub use self::registry::ThreadBuilder;
pub use self::scope::{in_place_scope, scope, Scope};
pub use self::scope::{in_place_scope_fifo, scope_fifo, ScopeFifo};
pub use self::spawn::{spawn, spawn_fifo};
pub use self::thread_pool::{
clean_up_use_current_thread, current_thread_has_pending_tasks, current_thread_index,
yield_local, yield_now, ThreadPool, Yield,
};
pub use self::thread_pool::current_thread_has_pending_tasks;
pub use self::thread_pool::current_thread_index;
pub use self::thread_pool::ThreadPool;
pub use self::thread_pool::{yield_local, yield_now, Yield};
use self::registry::{CustomSpawn, DefaultSpawn, ThreadSpawn};
@ -148,6 +147,7 @@ pub struct ThreadPoolBuildError {
#[derive(Debug)]
enum ErrorKind {
GlobalPoolAlreadyInitialized,
CurrentThreadAlreadyInPool,
IOError(io::Error),
}
@ -288,12 +288,12 @@ where
impl ThreadPoolBuilder {
/// Creates a scoped `ThreadPool` initialized using this configuration.
///
/// This is a convenience function for building a pool using [`crossbeam::scope`]
/// This is a convenience function for building a pool using [`std::thread::scope`]
/// to spawn threads in a [`spawn_handler`](#method.spawn_handler).
/// The threads in this pool will start by calling `wrapper`, which should
/// do initialization and continue by calling `ThreadBuilder::run()`.
///
/// [`crossbeam::scope`]: https://docs.rs/crossbeam/0.8/crossbeam/fn.scope.html
/// [`std::thread::scope`]: https://doc.rust-lang.org/std/thread/fn.scope.html
///
/// # Examples
///
@ -328,28 +328,22 @@ impl ThreadPoolBuilder {
W: Fn(ThreadBuilder) + Sync, // expected to call `run()`
F: FnOnce(&ThreadPool) -> R,
{
let result = crossbeam_utils::thread::scope(|scope| {
let wrapper = &wrapper;
std::thread::scope(|scope| {
let pool = self
.spawn_handler(|thread| {
let mut builder = scope.builder();
let mut builder = std::thread::Builder::new();
if let Some(name) = thread.name() {
builder = builder.name(name.to_string());
}
if let Some(size) = thread.stack_size() {
builder = builder.stack_size(size);
}
builder.spawn(move |_| wrapper(thread))?;
builder.spawn_scoped(scope, || wrapper(thread))?;
Ok(())
})
.build()?;
Ok(with_pool(&pool))
});
match result {
Ok(result) => result,
Err(err) => unwind::resume_unwinding(err),
}
})
}
}
@ -358,13 +352,11 @@ impl<S> ThreadPoolBuilder<S> {
///
/// Note that the threads will not exit until after the pool is dropped. It
/// is up to the caller to wait for thread termination if that is important
/// for any invariants. For instance, threads created in [`crossbeam::scope`]
/// for any invariants. For instance, threads created in [`std::thread::scope`]
/// will be joined before that scope returns, and this will block indefinitely
/// if the pool is leaked. Furthermore, the global thread pool doesn't terminate
/// until the entire process exits!
///
/// [`crossbeam::scope`]: https://docs.rs/crossbeam/0.8/crossbeam/fn.scope.html
///
/// # Examples
///
/// A minimal spawn handler just needs to call `run()` from an independent thread.
@ -413,6 +405,7 @@ impl<S> ThreadPoolBuilder<S> {
/// or [`std::thread::scope`] introduced in Rust 1.63, which is encapsulated in
/// [`build_scoped`](#method.build_scoped).
///
/// [`crossbeam::scope`]: https://docs.rs/crossbeam/0.8/crossbeam/fn.scope.html
/// [`std::thread::scope`]: https://doc.rust-lang.org/std/thread/fn.scope.html
///
/// ```
@ -470,12 +463,18 @@ impl<S> ThreadPoolBuilder<S> {
if self.num_threads > 0 {
self.num_threads
} else {
let default = || {
thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(1)
};
match env::var("RAYON_NUM_THREADS")
.ok()
.and_then(|s| usize::from_str(&s).ok())
{
Some(x) if x > 0 => return x,
Some(x) if x == 0 => return num_cpus::get(),
Some(x @ 1..) => return x,
Some(0) => return default(),
_ => {}
}
@ -484,8 +483,8 @@ impl<S> ThreadPoolBuilder<S> {
.ok()
.and_then(|s| usize::from_str(&s).ok())
{
Some(x) if x > 0 => x,
_ => num_cpus::get(),
Some(x @ 1..) => x,
_ => default(),
}
}
}
@ -524,9 +523,8 @@ impl<S> ThreadPoolBuilder<S> {
/// may change in the future, if you wish to rely on a fixed
/// number of threads, you should use this function to specify
/// that number. To reproduce the current default behavior, you
/// may wish to use the [`num_cpus`
/// crate](https://crates.io/crates/num_cpus) to query the number
/// of CPUs dynamically.
/// may wish to use [`std::thread::available_parallelism`]
/// to query the number of CPUs dynamically.
///
/// **Old environment variable:** `RAYON_NUM_THREADS` is a one-to-one
/// replacement of the now deprecated `RAYON_RS_NUM_CPUS` environment
@ -544,22 +542,12 @@ impl<S> ThreadPoolBuilder<S> {
///
/// Note that the current thread won't run the main work-stealing loop, so jobs spawned into
/// the thread-pool will generally not be picked up automatically by this thread unless you
/// yield to rayon in some way, like via [`yield_now()`], [`yield_local()`], [`scope()`], or
/// [`clean_up_use_current_thread()`].
/// yield to rayon in some way, like via [`yield_now()`], [`yield_local()`], or [`scope()`].
///
/// # Panics
/// # Local thread-pools
///
/// This function won't panic itself, but [`ThreadPoolBuilder::build()`] will panic if you've
/// called this function and the current thread is already part of another [`ThreadPool`].
///
/// # Cleaning up a local thread-pool
///
/// In order to properly clean-up the worker thread state, for local thread-pools you should
/// call [`clean_up_use_current_thread()`] from the same thread that built the thread-pool.
/// See that function's documentation for more details.
///
/// This call is not required, but without it the registry will leak even if the pool is
/// otherwise terminated.
/// Using this in a local thread-pool means the registry will be leaked. In future versions
/// there might be a way of cleaning up the current-thread state.
pub fn use_current_thread(mut self) -> Self {
self.use_current_thread = true;
self
@ -767,18 +755,22 @@ impl ThreadPoolBuildError {
const GLOBAL_POOL_ALREADY_INITIALIZED: &str =
"The global thread pool has already been initialized.";
const CURRENT_THREAD_ALREADY_IN_POOL: &str =
"The current thread is already part of another thread pool.";
impl Error for ThreadPoolBuildError {
#[allow(deprecated)]
fn description(&self) -> &str {
match self.kind {
ErrorKind::GlobalPoolAlreadyInitialized => GLOBAL_POOL_ALREADY_INITIALIZED,
ErrorKind::CurrentThreadAlreadyInPool => CURRENT_THREAD_ALREADY_IN_POOL,
ErrorKind::IOError(ref e) => e.description(),
}
}
fn source(&self) -> Option<&(dyn Error + 'static)> {
match &self.kind {
ErrorKind::GlobalPoolAlreadyInitialized => None,
ErrorKind::GlobalPoolAlreadyInitialized | ErrorKind::CurrentThreadAlreadyInPool => None,
ErrorKind::IOError(e) => Some(e),
}
}
@ -787,6 +779,7 @@ impl Error for ThreadPoolBuildError {
impl fmt::Display for ThreadPoolBuildError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match &self.kind {
ErrorKind::CurrentThreadAlreadyInPool => CURRENT_THREAD_ALREADY_IN_POOL.fmt(f),
ErrorKind::GlobalPoolAlreadyInitialized => GLOBAL_POOL_ALREADY_INITIALIZED.fmt(f),
ErrorKind::IOError(e) => e.fmt(f),
}

413
third_party/rust/rayon-core/src/log.rs поставляемый
Просмотреть файл

@ -1,413 +0,0 @@
//! Debug Logging
//!
//! To use in a debug build, set the env var `RAYON_LOG` as
//! described below. In a release build, logs are compiled out by
//! default unless Rayon is built with `--cfg rayon_rs_log` (try
//! `RUSTFLAGS="--cfg rayon_rs_log"`).
//!
//! Note that logs are an internally debugging tool and their format
//! is considered unstable, as are the details of how to enable them.
//!
//! # Valid values for RAYON_LOG
//!
//! The `RAYON_LOG` variable can take on the following values:
//!
//! * `tail:<file>` -- dumps the last 10,000 events into the given file;
//! useful for tracking down deadlocks
//! * `profile:<file>` -- dumps only those events needed to reconstruct how
//! many workers are active at a given time
//! * `all:<file>` -- dumps every event to the file; useful for debugging
use crossbeam_channel::{self, Receiver, Sender};
use std::collections::VecDeque;
use std::env;
use std::fs::File;
use std::io::{self, BufWriter, Write};
/// True if logs are compiled in.
pub(super) const LOG_ENABLED: bool = cfg!(any(rayon_rs_log, debug_assertions));
#[derive(Copy, Clone, PartialOrd, Ord, PartialEq, Eq, Debug)]
pub(super) enum Event {
/// Flushes events to disk, used to terminate benchmarking.
Flush,
/// Indicates that a worker thread started execution.
ThreadStart {
worker: usize,
terminate_addr: usize,
},
/// Indicates that a worker thread started execution.
ThreadTerminate { worker: usize },
/// Indicates that a worker thread became idle, blocked on `latch_addr`.
ThreadIdle { worker: usize, latch_addr: usize },
/// Indicates that an idle worker thread found work to do, after
/// yield rounds. It should no longer be considered idle.
ThreadFoundWork { worker: usize, yields: u32 },
/// Indicates that a worker blocked on a latch observed that it was set.
///
/// Internal debugging event that does not affect the state
/// machine.
ThreadSawLatchSet { worker: usize, latch_addr: usize },
/// Indicates that an idle worker is getting sleepy. `sleepy_counter` is the internal
/// sleep state that we saw at the time.
ThreadSleepy { worker: usize, jobs_counter: usize },
/// Indicates that the thread's attempt to fall asleep was
/// interrupted because the latch was set. (This is not, in and of
/// itself, a change to the thread state.)
ThreadSleepInterruptedByLatch { worker: usize, latch_addr: usize },
/// Indicates that the thread's attempt to fall asleep was
/// interrupted because a job was posted. (This is not, in and of
/// itself, a change to the thread state.)
ThreadSleepInterruptedByJob { worker: usize },
/// Indicates that an idle worker has gone to sleep.
ThreadSleeping { worker: usize, latch_addr: usize },
/// Indicates that a sleeping worker has awoken.
ThreadAwoken { worker: usize, latch_addr: usize },
/// Indicates that the given worker thread was notified it should
/// awaken.
ThreadNotify { worker: usize },
/// The given worker has pushed a job to its local deque.
JobPushed { worker: usize },
/// The given worker has popped a job from its local deque.
JobPopped { worker: usize },
/// The given worker has stolen a job from the deque of another.
JobStolen { worker: usize, victim: usize },
/// N jobs were injected into the global queue.
JobsInjected { count: usize },
/// A job was removed from the global queue.
JobUninjected { worker: usize },
/// A job was broadcasted to N threads.
JobBroadcast { count: usize },
/// When announcing a job, this was the value of the counters we observed.
///
/// No effect on thread state, just a debugging event.
JobThreadCounts {
worker: usize,
num_idle: u16,
num_sleepers: u16,
},
}
/// Handle to the logging thread, if any. You can use this to deliver
/// logs. You can also clone it freely.
#[derive(Clone)]
pub(super) struct Logger {
sender: Option<Sender<Event>>,
}
impl Logger {
pub(super) fn new(num_workers: usize) -> Logger {
if !LOG_ENABLED {
return Self::disabled();
}
// see the doc comment for the format
let env_log = match env::var("RAYON_LOG") {
Ok(s) => s,
Err(_) => return Self::disabled(),
};
let (sender, receiver) = crossbeam_channel::unbounded();
if let Some(filename) = env_log.strip_prefix("tail:") {
let filename = filename.to_string();
::std::thread::spawn(move || {
Self::tail_logger_thread(num_workers, filename, 10_000, receiver)
});
} else if env_log == "all" {
::std::thread::spawn(move || Self::all_logger_thread(num_workers, receiver));
} else if let Some(filename) = env_log.strip_prefix("profile:") {
let filename = filename.to_string();
::std::thread::spawn(move || {
Self::profile_logger_thread(num_workers, filename, 10_000, receiver)
});
} else {
panic!("RAYON_LOG should be 'tail:<file>' or 'profile:<file>'");
}
Logger {
sender: Some(sender),
}
}
fn disabled() -> Logger {
Logger { sender: None }
}
#[inline]
pub(super) fn log(&self, event: impl FnOnce() -> Event) {
if !LOG_ENABLED {
return;
}
if let Some(sender) = &self.sender {
sender.send(event()).unwrap();
}
}
fn profile_logger_thread(
num_workers: usize,
log_filename: String,
capacity: usize,
receiver: Receiver<Event>,
) {
let file = File::create(&log_filename)
.unwrap_or_else(|err| panic!("failed to open `{}`: {}", log_filename, err));
let mut writer = BufWriter::new(file);
let mut events = Vec::with_capacity(capacity);
let mut state = SimulatorState::new(num_workers);
let timeout = std::time::Duration::from_secs(30);
loop {
while let Ok(event) = receiver.recv_timeout(timeout) {
if let Event::Flush = event {
break;
}
events.push(event);
if events.len() == capacity {
break;
}
}
for event in events.drain(..) {
if state.simulate(&event) {
state.dump(&mut writer, &event).unwrap();
}
}
writer.flush().unwrap();
}
}
fn tail_logger_thread(
num_workers: usize,
log_filename: String,
capacity: usize,
receiver: Receiver<Event>,
) {
let file = File::create(&log_filename)
.unwrap_or_else(|err| panic!("failed to open `{}`: {}", log_filename, err));
let mut writer = BufWriter::new(file);
let mut events: VecDeque<Event> = VecDeque::with_capacity(capacity);
let mut state = SimulatorState::new(num_workers);
let timeout = std::time::Duration::from_secs(30);
let mut skipped = false;
loop {
while let Ok(event) = receiver.recv_timeout(timeout) {
if let Event::Flush = event {
// We ignore Flush events in tail mode --
// we're really just looking for
// deadlocks.
continue;
} else {
if events.len() == capacity {
let event = events.pop_front().unwrap();
state.simulate(&event);
skipped = true;
}
events.push_back(event);
}
}
if skipped {
writeln!(writer, "...").unwrap();
skipped = false;
}
for event in events.drain(..) {
// In tail mode, we dump *all* events out, whether or
// not they were 'interesting' to the state machine.
state.simulate(&event);
state.dump(&mut writer, &event).unwrap();
}
writer.flush().unwrap();
}
}
fn all_logger_thread(num_workers: usize, receiver: Receiver<Event>) {
let stderr = std::io::stderr();
let mut state = SimulatorState::new(num_workers);
for event in receiver {
let mut writer = BufWriter::new(stderr.lock());
state.simulate(&event);
state.dump(&mut writer, &event).unwrap();
writer.flush().unwrap();
}
}
}
#[derive(Copy, Clone, PartialOrd, Ord, PartialEq, Eq, Debug)]
enum State {
Working,
Idle,
Notified,
Sleeping,
Terminated,
}
impl State {
fn letter(&self) -> char {
match self {
State::Working => 'W',
State::Idle => 'I',
State::Notified => 'N',
State::Sleeping => 'S',
State::Terminated => 'T',
}
}
}
struct SimulatorState {
local_queue_size: Vec<usize>,
thread_states: Vec<State>,
injector_size: usize,
}
impl SimulatorState {
fn new(num_workers: usize) -> Self {
Self {
local_queue_size: (0..num_workers).map(|_| 0).collect(),
thread_states: (0..num_workers).map(|_| State::Working).collect(),
injector_size: 0,
}
}
fn simulate(&mut self, event: &Event) -> bool {
match *event {
Event::ThreadIdle { worker, .. } => {
assert_eq!(self.thread_states[worker], State::Working);
self.thread_states[worker] = State::Idle;
true
}
Event::ThreadStart { worker, .. } | Event::ThreadFoundWork { worker, .. } => {
self.thread_states[worker] = State::Working;
true
}
Event::ThreadTerminate { worker, .. } => {
self.thread_states[worker] = State::Terminated;
true
}
Event::ThreadSleeping { worker, .. } => {
assert_eq!(self.thread_states[worker], State::Idle);
self.thread_states[worker] = State::Sleeping;
true
}
Event::ThreadAwoken { worker, .. } => {
assert_eq!(self.thread_states[worker], State::Notified);
self.thread_states[worker] = State::Idle;
true
}
Event::JobPushed { worker } => {
self.local_queue_size[worker] += 1;
true
}
Event::JobPopped { worker } => {
self.local_queue_size[worker] -= 1;
true
}
Event::JobStolen { victim, .. } => {
self.local_queue_size[victim] -= 1;
true
}
Event::JobsInjected { count } => {
self.injector_size += count;
true
}
Event::JobUninjected { .. } => {
self.injector_size -= 1;
true
}
Event::ThreadNotify { worker } => {
// Currently, this log event occurs while holding the
// thread lock, so we should *always* see it before
// the worker awakens.
assert_eq!(self.thread_states[worker], State::Sleeping);
self.thread_states[worker] = State::Notified;
true
}
// remaining events are no-ops from pov of simulating the
// thread state
_ => false,
}
}
fn dump(&mut self, w: &mut impl Write, event: &Event) -> io::Result<()> {
let num_idle_threads = self
.thread_states
.iter()
.filter(|s| **s == State::Idle)
.count();
let num_sleeping_threads = self
.thread_states
.iter()
.filter(|s| **s == State::Sleeping)
.count();
let num_notified_threads = self
.thread_states
.iter()
.filter(|s| **s == State::Notified)
.count();
let num_pending_jobs: usize = self.local_queue_size.iter().sum();
write!(w, "{:2},", num_idle_threads)?;
write!(w, "{:2},", num_sleeping_threads)?;
write!(w, "{:2},", num_notified_threads)?;
write!(w, "{:4},", num_pending_jobs)?;
write!(w, "{:4},", self.injector_size)?;
let event_str = format!("{:?}", event);
write!(w, r#""{:60}","#, event_str)?;
for ((i, state), queue_size) in (0..).zip(&self.thread_states).zip(&self.local_queue_size) {
write!(w, " T{:02},{}", i, state.letter(),)?;
if *queue_size > 0 {
write!(w, ",{:03},", queue_size)?;
} else {
write!(w, ", ,")?;
}
}
writeln!(w)?;
Ok(())
}
}

160
third_party/rust/rayon-core/src/registry.rs поставляемый
Просмотреть файл

@ -1,7 +1,5 @@
use crate::job::{JobFifo, JobRef, StackJob};
use crate::latch::{AsCoreLatch, CoreLatch, CountLatch, Latch, LatchRef, LockLatch, SpinLatch};
use crate::log::Event::*;
use crate::log::Logger;
use crate::latch::{AsCoreLatch, CoreLatch, Latch, LatchRef, LockLatch, OnceLatch, SpinLatch};
use crate::sleep::Sleep;
use crate::unwind;
use crate::{
@ -130,7 +128,6 @@ where
}
pub(super) struct Registry {
logger: Logger,
thread_infos: Vec<ThreadInfo>,
sleep: Sleep,
injected_jobs: Injector<JobRef>,
@ -139,9 +136,6 @@ pub(super) struct Registry {
start_handler: Option<Box<StartHandler>>,
exit_handler: Option<Box<ExitHandler>>,
/// Whether the first thread of our pool is the creator of the thread pool.
used_creator_thread: bool,
// When this latch reaches 0, it means that all work on this
// registry must be complete. This is ensured in the following ways:
//
@ -264,18 +258,15 @@ impl Registry {
})
.unzip();
let logger = Logger::new(n_threads);
let registry = Arc::new(Registry {
logger: logger.clone(),
thread_infos: stealers.into_iter().map(ThreadInfo::new).collect(),
sleep: Sleep::new(logger, n_threads),
sleep: Sleep::new(n_threads),
injected_jobs: Injector::new(),
broadcasts: Mutex::new(broadcasts),
terminate_count: AtomicUsize::new(1),
panic_handler: builder.take_panic_handler(),
start_handler: builder.take_start_handler(),
exit_handler: builder.take_exit_handler(),
used_creator_thread: builder.use_current_thread,
});
// If we return early or panic, make sure to terminate existing threads.
@ -292,12 +283,14 @@ impl Registry {
};
if index == 0 && builder.use_current_thread {
if !WorkerThread::current().is_null() {
return Err(ThreadPoolBuildError::new(
ErrorKind::CurrentThreadAlreadyInPool,
));
}
// Rather than starting a new thread, we're just taking over the current thread
// *without* running the main loop, so we can still return from here.
// The WorkerThread is leaked, but we never shutdown the global pool anyway.
//
// For local pools, the caller is responsible of cleaning this up if they need to
// by using clean_up_use_current_thread.
let worker_thread = Box::into_raw(Box::new(WorkerThread::from(thread)));
unsafe {
@ -365,11 +358,6 @@ impl Registry {
}
}
#[inline]
pub(super) fn log(&self, event: impl FnOnce() -> crate::log::Event) {
self.logger.log(event)
}
pub(super) fn num_threads(&self) -> usize {
self.thread_infos.len()
}
@ -428,8 +416,6 @@ impl Registry {
/// whatever worker has nothing to do. Use this if you know that
/// you are not on a worker of this registry.
pub(super) fn inject(&self, injected_job: JobRef) {
self.log(|| JobsInjected { count: 1 });
// It should not be possible for `state.terminate` to be true
// here. It is only set to true when the user creates (and
// drops) a `ThreadPool`; and, in that case, they cannot be
@ -444,22 +430,17 @@ impl Registry {
let queue_was_empty = self.injected_jobs.is_empty();
self.injected_jobs.push(injected_job);
self.sleep.new_injected_jobs(usize::MAX, 1, queue_was_empty);
self.sleep.new_injected_jobs(1, queue_was_empty);
}
fn has_injected_job(&self) -> bool {
!self.injected_jobs.is_empty()
}
fn pop_injected_job(&self, worker_index: usize) -> Option<JobRef> {
fn pop_injected_job(&self) -> Option<JobRef> {
loop {
match self.injected_jobs.steal() {
Steal::Success(job) => {
self.log(|| JobUninjected {
worker: worker_index,
});
return Some(job);
}
Steal::Success(job) => return Some(job),
Steal::Empty => return None,
Steal::Retry => {}
}
@ -473,9 +454,6 @@ impl Registry {
/// **Panics** if not given exactly as many jobs as there are threads.
pub(super) fn inject_broadcast(&self, injected_jobs: impl ExactSizeIterator<Item = JobRef>) {
assert_eq!(self.num_threads(), injected_jobs.len());
self.log(|| JobBroadcast {
count: self.num_threads(),
});
{
let broadcasts = self.broadcasts.lock().unwrap();
@ -547,9 +525,6 @@ impl Registry {
self.inject(job.as_job_ref());
job.latch.wait_and_reset(); // Make sure we can use the same latch again next time.
// flush accumulated logs as we exit the thread
self.logger.log(|| Flush);
job.into_result()
})
}
@ -612,7 +587,7 @@ impl Registry {
pub(super) fn terminate(&self) {
if self.terminate_count.fetch_sub(1, Ordering::AcqRel) == 1 {
for (i, thread_info) in self.thread_infos.iter().enumerate() {
unsafe { CountLatch::set_and_tickle_one(&thread_info.terminate, self, i) };
unsafe { OnceLatch::set_and_tickle_one(&thread_info.terminate, self, i) };
}
}
}
@ -621,10 +596,6 @@ impl Registry {
pub(super) fn notify_worker_latch_is_set(&self, target_worker_index: usize) {
self.sleep.notify_worker_latch_is_set(target_worker_index);
}
pub(super) fn used_creator_thread(&self) -> bool {
self.used_creator_thread
}
}
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
@ -646,10 +617,7 @@ struct ThreadInfo {
/// This latch is *set* by the `terminate` method on the
/// `Registry`, once the registry's main "terminate" counter
/// reaches zero.
///
/// NB. We use a `CountLatch` here because it has no lifetimes and is
/// meant for async use, but the count never gets higher than one.
terminate: CountLatch,
terminate: OnceLatch,
/// the "stealer" half of the worker's deque
stealer: Stealer<JobRef>,
@ -660,7 +628,7 @@ impl ThreadInfo {
ThreadInfo {
primed: LockLatch::new(),
stopped: LockLatch::new(),
terminate: CountLatch::new(),
terminate: OnceLatch::new(),
stealer,
}
}
@ -743,11 +711,6 @@ impl WorkerThread {
&self.registry
}
#[inline]
pub(super) fn log(&self, event: impl FnOnce() -> crate::log::Event) {
self.registry.logger.log(event)
}
/// Our index amongst the worker threads (ranges from `0..self.num_threads()`).
#[inline]
pub(super) fn index(&self) -> usize {
@ -756,12 +719,9 @@ impl WorkerThread {
#[inline]
pub(super) unsafe fn push(&self, job: JobRef) {
self.log(|| JobPushed { worker: self.index });
let queue_was_empty = self.worker.is_empty();
self.worker.push(job);
self.registry
.sleep
.new_internal_jobs(self.index, 1, queue_was_empty);
self.registry.sleep.new_internal_jobs(1, queue_was_empty);
}
#[inline]
@ -783,7 +743,6 @@ impl WorkerThread {
let popped_job = self.worker.pop();
if popped_job.is_some() {
self.log(|| JobPopped { worker: self.index });
return popped_job;
}
@ -819,31 +778,51 @@ impl WorkerThread {
// accesses, which would be *very bad*
let abort_guard = unwind::AbortIfPanic;
let mut idle_state = self.registry.sleep.start_looking(self.index, latch);
while !latch.probe() {
if let Some(job) = self.find_work() {
self.registry.sleep.work_found(idle_state);
'outer: while !latch.probe() {
// Check for local work *before* we start marking ourself idle,
// especially to avoid modifying shared sleep state.
if let Some(job) = self.take_local_job() {
self.execute(job);
idle_state = self.registry.sleep.start_looking(self.index, latch);
} else {
self.registry
.sleep
.no_work_found(&mut idle_state, latch, || self.has_injected_job())
continue;
}
let mut idle_state = self.registry.sleep.start_looking(self.index);
while !latch.probe() {
if let Some(job) = self.find_work() {
self.registry.sleep.work_found();
self.execute(job);
// The job might have injected local work, so go back to the outer loop.
continue 'outer;
} else {
self.registry
.sleep
.no_work_found(&mut idle_state, latch, || self.has_injected_job())
}
}
// If we were sleepy, we are not anymore. We "found work" --
// whatever the surrounding thread was doing before it had to wait.
self.registry.sleep.work_found();
break;
}
// If we were sleepy, we are not anymore. We "found work" --
// whatever the surrounding thread was doing before it had to
// wait.
self.registry.sleep.work_found(idle_state);
self.log(|| ThreadSawLatchSet {
worker: self.index,
latch_addr: latch.addr(),
});
mem::forget(abort_guard); // successful execution, do not abort
}
unsafe fn wait_until_out_of_work(&self) {
debug_assert_eq!(self as *const _, WorkerThread::current());
let registry = &*self.registry;
let index = self.index;
self.wait_until(&registry.thread_infos[index].terminate);
// Should not be any work left in our queue.
debug_assert!(self.take_local_job().is_none());
// Let registry know we are done
Latch::set(&registry.thread_infos[index].stopped);
}
fn find_work(&self) -> Option<JobRef> {
// Try to find some work to do. We give preference first
// to things in our local deque, then in other workers
@ -852,7 +831,7 @@ impl WorkerThread {
// we take on something new.
self.take_local_job()
.or_else(|| self.steal())
.or_else(|| self.registry.pop_injected_job(self.index))
.or_else(|| self.registry.pop_injected_job())
}
pub(super) fn yield_now(&self) -> Yield {
@ -904,13 +883,7 @@ impl WorkerThread {
.find_map(|victim_index| {
let victim = &thread_infos[victim_index];
match victim.stealer.steal() {
Steal::Success(job) => {
self.log(|| JobStolen {
worker: self.index,
victim: victim_index,
});
Some(job)
}
Steal::Success(job) => Some(job),
Steal::Empty => None,
Steal::Retry => {
retry = true;
@ -946,19 +919,11 @@ unsafe fn main_loop(thread: ThreadBuilder) {
registry.catch_unwind(|| handler(index));
}
let my_terminate_latch = &registry.thread_infos[index].terminate;
worker_thread.log(|| ThreadStart {
worker: index,
terminate_addr: my_terminate_latch.as_core_latch().addr(),
});
wait_until_out_of_work(worker_thread);
worker_thread.wait_until_out_of_work();
// Normal termination, do not abort.
mem::forget(abort_guard);
worker_thread.log(|| ThreadTerminate { worker: index });
// Inform a user callback that we exited a thread.
if let Some(ref handler) = registry.exit_handler {
registry.catch_unwind(|| handler(index));
@ -966,21 +931,6 @@ unsafe fn main_loop(thread: ThreadBuilder) {
}
}
pub(crate) unsafe fn wait_until_out_of_work(worker_thread: &WorkerThread) {
debug_assert_eq!(worker_thread as *const _, WorkerThread::current());
let registry = &*worker_thread.registry;
let index = worker_thread.index;
let my_terminate_latch = &registry.thread_infos[index].terminate;
worker_thread.wait_until(my_terminate_latch);
// Should not be any work left in our queue.
debug_assert!(worker_thread.take_local_job().is_none());
// let registry know we are done
Latch::set(&registry.thread_infos[index].stopped);
}
/// If already in a worker-thread, just execute `op`. Otherwise,
/// execute `op` in the default thread-pool. Either way, block until
/// `op` completes and return its return value. If `op` panics, that

116
third_party/rust/rayon-core/src/scope/mod.rs поставляемый
Просмотреть файл

@ -7,7 +7,7 @@
use crate::broadcast::BroadcastContext;
use crate::job::{ArcJob, HeapJob, JobFifo, JobRef};
use crate::latch::{CountLatch, CountLockLatch, Latch};
use crate::latch::{CountLatch, Latch};
use crate::registry::{global_registry, in_worker, Registry, WorkerThread};
use crate::unwind;
use std::any::Any;
@ -39,26 +39,6 @@ pub struct ScopeFifo<'scope> {
fifos: Vec<JobFifo>,
}
pub(super) enum ScopeLatch {
/// A latch for scopes created on a rayon thread which will participate in work-
/// stealing while it waits for completion. This thread is not necessarily part
/// of the same registry as the scope itself!
Stealing {
latch: CountLatch,
/// If a worker thread in registry A calls `in_place_scope` on a ThreadPool
/// with registry B, when a job completes in a thread of registry B, we may
/// need to call `latch.set_and_tickle_one()` to wake the thread in registry A.
/// That means we need a reference to registry A (since at that point we will
/// only have a reference to registry B), so we stash it here.
registry: Arc<Registry>,
/// The index of the worker to wake in `registry`
worker_index: usize,
},
/// A latch for scopes created on a non-rayon thread which will block to wait.
Blocking { latch: CountLockLatch },
}
struct ScopeBase<'scope> {
/// thread registry where `scope()` was executed or where `in_place_scope()`
/// should spawn jobs.
@ -69,7 +49,7 @@ struct ScopeBase<'scope> {
panic: AtomicPtr<Box<dyn Any + Send + 'static>>,
/// latch to track job counts
job_completed_latch: ScopeLatch,
job_completed_latch: CountLatch,
/// You can think of a scope as containing a list of closures to execute,
/// all of which outlive `'scope`. They're not actually required to be
@ -650,21 +630,17 @@ impl<'scope> ScopeBase<'scope> {
ScopeBase {
registry: Arc::clone(registry),
panic: AtomicPtr::new(ptr::null_mut()),
job_completed_latch: ScopeLatch::new(owner),
job_completed_latch: CountLatch::new(owner),
marker: PhantomData,
}
}
fn increment(&self) {
self.job_completed_latch.increment();
}
fn heap_job_ref<FUNC>(&self, job: Box<HeapJob<FUNC>>) -> JobRef
where
FUNC: FnOnce() + Send + 'scope,
{
unsafe {
self.increment();
self.job_completed_latch.increment();
job.into_job_ref()
}
}
@ -675,7 +651,7 @@ impl<'scope> ScopeBase<'scope> {
{
let n_threads = self.registry.num_threads();
let job_refs = (0..n_threads).map(|_| unsafe {
self.increment();
self.job_completed_latch.increment();
ArcJob::as_job_ref(&job)
});
@ -710,17 +686,15 @@ impl<'scope> ScopeBase<'scope> {
where
FUNC: FnOnce() -> R,
{
match unwind::halt_unwinding(func) {
Ok(r) => {
Latch::set(&(*this).job_completed_latch);
Some(r)
}
let result = match unwind::halt_unwinding(func) {
Ok(r) => Some(r),
Err(err) => {
(*this).job_panicked(err);
Latch::set(&(*this).job_completed_latch);
None
}
}
};
Latch::set(&(*this).job_completed_latch);
result
}
fn job_panicked(&self, err: Box<dyn Any + Send + 'static>) {
@ -754,61 +728,6 @@ impl<'scope> ScopeBase<'scope> {
}
}
impl ScopeLatch {
fn new(owner: Option<&WorkerThread>) -> Self {
Self::with_count(1, owner)
}
pub(super) fn with_count(count: usize, owner: Option<&WorkerThread>) -> Self {
match owner {
Some(owner) => ScopeLatch::Stealing {
latch: CountLatch::with_count(count),
registry: Arc::clone(owner.registry()),
worker_index: owner.index(),
},
None => ScopeLatch::Blocking {
latch: CountLockLatch::with_count(count),
},
}
}
fn increment(&self) {
match self {
ScopeLatch::Stealing { latch, .. } => latch.increment(),
ScopeLatch::Blocking { latch } => latch.increment(),
}
}
pub(super) fn wait(&self, owner: Option<&WorkerThread>) {
match self {
ScopeLatch::Stealing {
latch,
registry,
worker_index,
} => unsafe {
let owner = owner.expect("owner thread");
debug_assert_eq!(registry.id(), owner.registry().id());
debug_assert_eq!(*worker_index, owner.index());
owner.wait_until(latch);
},
ScopeLatch::Blocking { latch } => latch.wait(),
}
}
}
impl Latch for ScopeLatch {
unsafe fn set(this: *const Self) {
match &*this {
ScopeLatch::Stealing {
latch,
registry,
worker_index,
} => CountLatch::set_and_tickle_one(latch, registry, *worker_index),
ScopeLatch::Blocking { latch } => Latch::set(latch),
}
}
}
impl<'scope> fmt::Debug for Scope<'scope> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("Scope")
@ -830,21 +749,6 @@ impl<'scope> fmt::Debug for ScopeFifo<'scope> {
}
}
impl fmt::Debug for ScopeLatch {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ScopeLatch::Stealing { latch, .. } => fmt
.debug_tuple("ScopeLatch::Stealing")
.field(latch)
.finish(),
ScopeLatch::Blocking { latch } => fmt
.debug_tuple("ScopeLatch::Blocking")
.field(latch)
.finish(),
}
}
}
/// Used to capture a scope `&Self` pointer in jobs, without faking a lifetime.
///
/// Unsafe code is still required to dereference the pointer, but that's fine in

Просмотреть файл

@ -212,12 +212,12 @@ impl AtomicCounters {
#[inline]
fn select_thread(word: usize, shift: usize) -> usize {
((word >> shift) as usize) & THREADS_MAX
(word >> shift) & THREADS_MAX
}
#[inline]
fn select_jec(word: usize) -> usize {
(word >> JEC_SHIFT) as usize
word >> JEC_SHIFT
}
impl Counters {

95
third_party/rust/rayon-core/src/sleep/mod.rs поставляемый
Просмотреть файл

@ -2,8 +2,6 @@
//! for an overview.
use crate::latch::CoreLatch;
use crate::log::Event::*;
use crate::log::Logger;
use crossbeam_utils::CachePadded;
use std::sync::atomic::Ordering;
use std::sync::{Condvar, Mutex};
@ -22,8 +20,6 @@ use self::counters::{AtomicCounters, JobsEventCounter};
///
/// [`README.md`] README.md
pub(super) struct Sleep {
logger: Logger,
/// One "sleep state" per worker. Used to track if a worker is sleeping and to have
/// them block.
worker_sleep_states: Vec<CachePadded<WorkerSleepState>>,
@ -62,22 +58,16 @@ const ROUNDS_UNTIL_SLEEPY: u32 = 32;
const ROUNDS_UNTIL_SLEEPING: u32 = ROUNDS_UNTIL_SLEEPY + 1;
impl Sleep {
pub(super) fn new(logger: Logger, n_threads: usize) -> Sleep {
pub(super) fn new(n_threads: usize) -> Sleep {
assert!(n_threads <= THREADS_MAX);
Sleep {
logger,
worker_sleep_states: (0..n_threads).map(|_| Default::default()).collect(),
counters: AtomicCounters::new(),
}
}
#[inline]
pub(super) fn start_looking(&self, worker_index: usize, latch: &CoreLatch) -> IdleState {
self.logger.log(|| ThreadIdle {
worker: worker_index,
latch_addr: latch.addr(),
});
pub(super) fn start_looking(&self, worker_index: usize) -> IdleState {
self.counters.add_inactive_thread();
IdleState {
@ -88,12 +78,7 @@ impl Sleep {
}
#[inline]
pub(super) fn work_found(&self, idle_state: IdleState) {
self.logger.log(|| ThreadFoundWork {
worker: idle_state.worker_index,
yields: idle_state.rounds,
});
pub(super) fn work_found(&self) {
// If we were the last idle thread and other threads are still sleeping,
// then we should wake up another thread.
let threads_to_wake = self.counters.sub_inactive_thread();
@ -111,7 +96,7 @@ impl Sleep {
thread::yield_now();
idle_state.rounds += 1;
} else if idle_state.rounds == ROUNDS_UNTIL_SLEEPY {
idle_state.jobs_counter = self.announce_sleepy(idle_state.worker_index);
idle_state.jobs_counter = self.announce_sleepy();
idle_state.rounds += 1;
thread::yield_now();
} else if idle_state.rounds < ROUNDS_UNTIL_SLEEPING {
@ -124,16 +109,10 @@ impl Sleep {
}
#[cold]
fn announce_sleepy(&self, worker_index: usize) -> JobsEventCounter {
let counters = self
.counters
.increment_jobs_event_counter_if(JobsEventCounter::is_active);
let jobs_counter = counters.jobs_counter();
self.logger.log(|| ThreadSleepy {
worker: worker_index,
jobs_counter: jobs_counter.as_usize(),
});
jobs_counter
fn announce_sleepy(&self) -> JobsEventCounter {
self.counters
.increment_jobs_event_counter_if(JobsEventCounter::is_active)
.jobs_counter()
}
#[cold]
@ -146,11 +125,6 @@ impl Sleep {
let worker_index = idle_state.worker_index;
if !latch.get_sleepy() {
self.logger.log(|| ThreadSleepInterruptedByLatch {
worker: worker_index,
latch_addr: latch.addr(),
});
return;
}
@ -161,11 +135,6 @@ impl Sleep {
// Our latch was signalled. We should wake back up fully as we
// will have some stuff to do.
if !latch.fall_asleep() {
self.logger.log(|| ThreadSleepInterruptedByLatch {
worker: worker_index,
latch_addr: latch.addr(),
});
idle_state.wake_fully();
return;
}
@ -180,10 +149,6 @@ impl Sleep {
// we didn't see it. We should return to just before the SLEEPY
// state so we can do another search and (if we fail to find
// work) go back to sleep.
self.logger.log(|| ThreadSleepInterruptedByJob {
worker: worker_index,
});
idle_state.wake_partly();
latch.wake_up();
return;
@ -197,11 +162,6 @@ impl Sleep {
// Successfully registered as asleep.
self.logger.log(|| ThreadSleeping {
worker: worker_index,
latch_addr: latch.addr(),
});
// We have one last check for injected jobs to do. This protects against
// deadlock in the very unlikely event that
//
@ -232,11 +192,6 @@ impl Sleep {
// Update other state:
idle_state.wake_fully();
latch.wake_up();
self.logger.log(|| ThreadAwoken {
worker: worker_index,
latch_addr: latch.addr(),
});
}
/// Notify the given thread that it should wake up (if it is
@ -254,24 +209,16 @@ impl Sleep {
///
/// # Parameters
///
/// - `source_worker_index` -- index of the thread that did the
/// push, or `usize::MAX` if this came from outside the thread
/// pool -- it is used only for logging.
/// - `num_jobs` -- lower bound on number of jobs available for stealing.
/// We'll try to get at least one thread per job.
#[inline]
pub(super) fn new_injected_jobs(
&self,
source_worker_index: usize,
num_jobs: u32,
queue_was_empty: bool,
) {
pub(super) fn new_injected_jobs(&self, num_jobs: u32, queue_was_empty: bool) {
// This fence is needed to guarantee that threads
// as they are about to fall asleep, observe any
// new jobs that may have been injected.
std::sync::atomic::fence(Ordering::SeqCst);
self.new_jobs(source_worker_index, num_jobs, queue_was_empty)
self.new_jobs(num_jobs, queue_was_empty)
}
/// Signals that `num_jobs` new jobs were pushed onto a thread's
@ -284,24 +231,16 @@ impl Sleep {
///
/// # Parameters
///
/// - `source_worker_index` -- index of the thread that did the
/// push, or `usize::MAX` if this came from outside the thread
/// pool -- it is used only for logging.
/// - `num_jobs` -- lower bound on number of jobs available for stealing.
/// We'll try to get at least one thread per job.
#[inline]
pub(super) fn new_internal_jobs(
&self,
source_worker_index: usize,
num_jobs: u32,
queue_was_empty: bool,
) {
self.new_jobs(source_worker_index, num_jobs, queue_was_empty)
pub(super) fn new_internal_jobs(&self, num_jobs: u32, queue_was_empty: bool) {
self.new_jobs(num_jobs, queue_was_empty)
}
/// Common helper for `new_injected_jobs` and `new_internal_jobs`.
#[inline]
fn new_jobs(&self, source_worker_index: usize, num_jobs: u32, queue_was_empty: bool) {
fn new_jobs(&self, num_jobs: u32, queue_was_empty: bool) {
// Read the counters and -- if sleepy workers have announced themselves
// -- announce that there is now work available. The final value of `counters`
// with which we exit the loop thus corresponds to a state when
@ -311,12 +250,6 @@ impl Sleep {
let num_awake_but_idle = counters.awake_but_idle_threads();
let num_sleepers = counters.sleeping_threads();
self.logger.log(|| JobThreadCounts {
worker: source_worker_index,
num_idle: num_awake_but_idle as u16,
num_sleepers: num_sleepers as u16,
});
if num_sleepers == 0 {
// nobody to wake
return;
@ -372,8 +305,6 @@ impl Sleep {
// do.
self.counters.sub_sleeping_thread();
self.logger.log(|| ThreadNotify { worker: index });
true
} else {
false

Просмотреть файл

@ -461,39 +461,6 @@ pub fn yield_local() -> Option<Yield> {
}
}
/// Waits for termination of the thread-pool (if pending), and cleans up resources allocated by
/// [`ThreadPoolBuilder::use_current_thread()`]. Should only be called from the thread that built
/// the thread-pool, and only when [`ThreadPoolBuilder::use_current_thread()`] is used.
///
/// Calling this function from a thread pool job will block indefinitely.
///
/// Calling this function before before the thread-pool has been dropped will cause the thread to
/// not return control flow to the caller until that happens (stealing work as necessary).
///
/// # Panics
///
/// If the calling thread is no the creator thread of a thread-pool, or not part of that
/// thread-pool, via [`ThreadPoolBuilder::use_current_thread()`].
pub fn clean_up_use_current_thread() {
unsafe {
let thread = WorkerThread::current()
.as_ref()
.expect("Should be called from a worker thread");
assert!(
thread.registry().used_creator_thread(),
"Should only be used to clean up the pool creator constructor thread"
);
assert_eq!(
thread.index(),
0,
"Should be called from the thread that created the pool"
);
crate::registry::wait_until_out_of_work(thread);
let _ = Box::from_raw(WorkerThread::current() as *mut WorkerThread);
}
assert!(WorkerThread::current().is_null());
}
/// Result of [`yield_now()`] or [`yield_local()`].
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum Yield {

Просмотреть файл

@ -383,7 +383,7 @@ fn in_place_scope_fifo_no_deadlock() {
#[test]
fn yield_now_to_spawn() {
let (tx, rx) = crossbeam_channel::bounded(1);
let (tx, rx) = channel();
// Queue a regular spawn.
crate::spawn(move || tx.send(22).unwrap());
@ -401,7 +401,7 @@ fn yield_now_to_spawn() {
#[test]
fn yield_local_to_spawn() {
let (tx, rx) = crossbeam_channel::bounded(1);
let (tx, rx) = channel();
// Queue a regular spawn.
crate::spawn(move || tx.send(22).unwrap());

Просмотреть файл

@ -93,7 +93,7 @@ fn build_scoped_tls_threadpool() {
},
)
.expect("thread pool created");
// Internally, `crossbeam::scope` will wait for the threads to exit before returning.
// Internally, `std::thread::scope` will wait for the threads to exit before returning.
});
});
}

57
third_party/rust/rayon-core/tests/use_current_thread.rs поставляемый Normal file
Просмотреть файл

@ -0,0 +1,57 @@
use rayon_core::ThreadPoolBuilder;
use std::sync::{Arc, Condvar, Mutex};
use std::thread::{self, JoinHandle};
#[test]
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn use_current_thread_basic() {
static JOIN_HANDLES: Mutex<Vec<JoinHandle<()>>> = Mutex::new(Vec::new());
let pool = ThreadPoolBuilder::new()
.num_threads(2)
.use_current_thread()
.spawn_handler(|builder| {
let handle = thread::Builder::new().spawn(|| builder.run())?;
JOIN_HANDLES.lock().unwrap().push(handle);
Ok(())
})
.build()
.unwrap();
assert_eq!(rayon_core::current_thread_index(), Some(0));
assert_eq!(
JOIN_HANDLES.lock().unwrap().len(),
1,
"Should only spawn one extra thread"
);
let another_pool = ThreadPoolBuilder::new()
.num_threads(2)
.use_current_thread()
.build();
assert!(
another_pool.is_err(),
"Should error if the thread is already part of a pool"
);
let pair = Arc::new((Mutex::new(false), Condvar::new()));
let pair2 = Arc::clone(&pair);
pool.spawn(move || {
assert_ne!(rayon_core::current_thread_index(), Some(0));
// This should execute even if the current thread is blocked, since we have two threads in
// the pool.
let &(ref started, ref condvar) = &*pair2;
*started.lock().unwrap() = true;
condvar.notify_one();
});
let _guard = pair
.1
.wait_while(pair.0.lock().unwrap(), |ran| !*ran)
.unwrap();
std::mem::drop(pool); // Drop the pool.
// Wait until all threads have actually exited. This is not really needed, other than to
// reduce noise of leak-checking tools.
for handle in std::mem::take(&mut *JOIN_HANDLES.lock().unwrap()) {
let _ = handle.join();
}
}