cvm-rs: Switch thread pool to rayon

Much less underhanded than using an IO library's capability to do so. Also, rayon will make encoding multiple jpegs in parallel *much* easier.

tbh if i do that I may also switch to napi-rs, it seems a bit less painful and supports much more async shenigans.

(Also, it will actually bind classes properly... something Neon really sucks at unless you look at strange documentation.)
This commit is contained in:
modeco80
2024-08-20 04:10:40 -04:00
parent a521f4c873
commit 55566fbd3a
3 changed files with 74 additions and 107 deletions

146
cvm-rs/Cargo.lock generated
View File

@@ -2,42 +2,12 @@
# It is not intended for manual editing. # It is not intended for manual editing.
version = 3 version = 3
[[package]]
name = "addr2line"
version = "0.22.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e4503c46a5c0c7844e948c9a4d6acd9f50cccb4de1c48eb9e291ea17470c678"
dependencies = [
"gimli",
]
[[package]]
name = "adler"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"
[[package]] [[package]]
name = "anyhow" name = "anyhow"
version = "1.0.86" version = "1.0.86"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b3d1d046238990b9cf5bcde22a3fb3584ee5cf65fb2765f454ed428c7a0063da" checksum = "b3d1d046238990b9cf5bcde22a3fb3584ee5cf65fb2765f454ed428c7a0063da"
[[package]]
name = "backtrace"
version = "0.3.73"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5cc23269a4f8976d0a4d2e7109211a419fe30e8d88d677cd60b6bc79c5732e0a"
dependencies = [
"addr2line",
"cc",
"cfg-if",
"libc",
"miniz_oxide",
"object",
"rustc-demangle",
]
[[package]] [[package]]
name = "cc" name = "cc"
version = "1.0.99" version = "1.0.99"
@@ -59,6 +29,31 @@ dependencies = [
"cc", "cc",
] ]
[[package]]
name = "crossbeam-deque"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "613f8cc01fe9cf1a3eb3d7f488fd2fa8388403e97039e2f73692932e291a770d"
dependencies = [
"crossbeam-epoch",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-epoch"
version = "0.9.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e"
dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22ec99545bb0ed0ea7bb9b8e1e9122ea386ff8a48c0922e43f36d45ab09e0e80"
[[package]] [[package]]
name = "cvm-rs" name = "cvm-rs"
version = "0.1.1" version = "0.1.1"
@@ -66,10 +61,16 @@ dependencies = [
"libc", "libc",
"neon", "neon",
"once_cell", "once_cell",
"tokio", "rayon",
"turbojpeg-sys", "turbojpeg-sys",
] ]
[[package]]
name = "either"
version = "1.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "60b1af1c220855b6ceac025d3f6ecdd2b7c4894bfe9cd9bda4fbb4bc7c0d4cf0"
[[package]] [[package]]
name = "getrandom" name = "getrandom"
version = "0.2.15" version = "0.2.15"
@@ -81,18 +82,6 @@ dependencies = [
"wasi", "wasi",
] ]
[[package]]
name = "gimli"
version = "0.29.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "40ecd4077b5ae9fd2e9e169b102c6c330d0605168eb0e8bf79952b256dbefffd"
[[package]]
name = "hermit-abi"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024"
[[package]] [[package]]
name = "libc" name = "libc"
version = "0.2.155" version = "0.2.155"
@@ -109,21 +98,6 @@ dependencies = [
"windows-targets", "windows-targets",
] ]
[[package]]
name = "memchr"
version = "2.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3"
[[package]]
name = "miniz_oxide"
version = "0.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b8a240ddb74feaf34a79a7add65a741f3167852fba007066dcac1ca548d89c08"
dependencies = [
"adler",
]
[[package]] [[package]]
name = "neon" name = "neon"
version = "1.0.0" version = "1.0.0"
@@ -150,37 +124,12 @@ dependencies = [
"syn-mid", "syn-mid",
] ]
[[package]]
name = "num_cpus"
version = "1.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43"
dependencies = [
"hermit-abi",
"libc",
]
[[package]]
name = "object"
version = "0.36.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "576dfe1fc8f9df304abb159d767a29d0476f7750fbf8aa7ad07816004a207434"
dependencies = [
"memchr",
]
[[package]] [[package]]
name = "once_cell" name = "once_cell"
version = "1.19.0" version = "1.19.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92"
[[package]]
name = "pin-project-lite"
version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bda66fc9667c18cb2758a2ac84d1167245054bcf85d5d1aaa6923f45801bdd02"
[[package]] [[package]]
name = "pkg-config" name = "pkg-config"
version = "0.3.30" version = "0.3.30"
@@ -206,10 +155,24 @@ dependencies = [
] ]
[[package]] [[package]]
name = "rustc-demangle" name = "rayon"
version = "0.1.24" version = "1.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" checksum = "b418a60154510ca1a002a752ca9714984e21e4241e804d32555251faf8b78ffa"
dependencies = [
"either",
"rayon-core",
]
[[package]]
name = "rayon-core"
version = "1.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1465873a3dfdaa8ae7cb14b4383657caab0b3e8a0aa9ae8e04b044854c8dfce2"
dependencies = [
"crossbeam-deque",
"crossbeam-utils",
]
[[package]] [[package]]
name = "semver" name = "semver"
@@ -251,17 +214,6 @@ dependencies = [
"syn", "syn",
] ]
[[package]]
name = "tokio"
version = "1.38.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba4f4a02a7a80d6f274636f0aa95c7e383b912d41fe721a31f29e29698585a4a"
dependencies = [
"backtrace",
"num_cpus",
"pin-project-lite",
]
[[package]] [[package]]
name = "turbojpeg-sys" name = "turbojpeg-sys"
version = "1.0.0" version = "1.0.0"

View File

@@ -14,5 +14,5 @@ libc = "0.2.155"
# Required for JPEG # Required for JPEG
once_cell = "1.19.0" once_cell = "1.19.0"
tokio = { version = "1.38.0", features = [ "rt", "rt-multi-thread" ] }
turbojpeg-sys = "1.0.0" turbojpeg-sys = "1.0.0"
rayon = "1.10.0"

View File

@@ -4,19 +4,30 @@ use neon::prelude::*;
use neon::types::buffer::TypedArray; use neon::types::buffer::TypedArray;
use once_cell::sync::OnceCell; use once_cell::sync::OnceCell;
use tokio::runtime::Runtime;
use std::cell::RefCell; use std::cell::RefCell;
use rayon::{ThreadPool, ThreadPoolBuilder};
use crate::jpeg_compressor::*; use crate::jpeg_compressor::*;
/// Gives a static Tokio runtime. We should replace this with /// Gives a Rayon thread pool we use for parallelism
/// rayon or something, but for now tokio works. fn rayon_pool<'a, C: Context<'a>>(cx: &mut C) -> NeonResult<&'static ThreadPool> {
fn runtime<'a, C: Context<'a>>(cx: &mut C) -> NeonResult<&'static Runtime> { static RUNTIME: OnceCell<ThreadPool> = OnceCell::new();
static RUNTIME: OnceCell<Runtime> = OnceCell::new();
RUNTIME RUNTIME
.get_or_try_init(Runtime::new) .get_or_try_init(|| {
// spawn at least 4 threads
let mut nr_threads = std::thread::available_parallelism().expect("??").get() / 8;
if nr_threads == 0 {
nr_threads = 4;
}
ThreadPoolBuilder::new()
.num_threads(nr_threads)
.thread_name(|index| format!("cvmrs_jpeg_{}", index + 1))
.build()
})
.or_else(|err| cx.throw_error(&err.to_string())) .or_else(|err| cx.throw_error(&err.to_string()))
} }
@@ -24,6 +35,9 @@ thread_local! {
static COMPRESSOR: RefCell<JpegCompressor> = RefCell::new(JpegCompressor::new()); static COMPRESSOR: RefCell<JpegCompressor> = RefCell::new(JpegCompressor::new());
} }
// TODO: We should probably allow passing an array of images to encode, which would
// increase parallelism heavily.
fn jpeg_encode_impl<'a>(cx: &mut FunctionContext<'a>) -> JsResult<'a, JsPromise> { fn jpeg_encode_impl<'a>(cx: &mut FunctionContext<'a>) -> JsResult<'a, JsPromise> {
let input = cx.argument::<JsObject>(0)?; let input = cx.argument::<JsObject>(0)?;
@@ -35,7 +49,7 @@ fn jpeg_encode_impl<'a>(cx: &mut FunctionContext<'a>) -> JsResult<'a, JsPromise>
let (deferred, promise) = cx.promise(); let (deferred, promise) = cx.promise();
let channel = cx.channel(); let channel = cx.channel();
let runtime = runtime(cx)?; let pool = rayon_pool(cx)?;
let buf = buffer.as_slice(cx); let buf = buffer.as_slice(cx);
@@ -49,8 +63,9 @@ fn jpeg_encode_impl<'a>(cx: &mut FunctionContext<'a>) -> JsResult<'a, JsPromise>
locked.copy_from_slice(buf); locked.copy_from_slice(buf);
} }
// Spawn off a tokio blocking pool thread that will do the work for us // Spawn a task on the rayon pool that encodes the JPEG and fufills the promise
runtime.spawn_blocking(move || { // once it is done encoding.
pool.spawn_fifo(move || {
let clone = Arc::clone(&copy); let clone = Arc::clone(&copy);
let locked = clone.lock().unwrap(); let locked = clone.lock().unwrap();