multithreading - Consume non-overlapping vector chunks, and combine results -
i'm trying speed expensive computation on large vector using threads. function consumes vector, computes vector of new values (it doesn't aggregate, input order has retained), , returns it. however, i'm struggling figure out how spawn threads, assign vector slices each, , collect , combine results.
// tunable const numthreads: i32 = 4; fn f(val: i32) -> i32 { // expensive computation let res = val + 1; res } fn main() { // choose odd number of elements let orig = (1..14).collect::<vec<i32>>(); let mut result: vec<vec<i32>> = vec!(); let mut flat: vec<i32> = vec::with_capacity(orig.len()); // split slices chunk in orig.chunks(orig.len() / numthreads usize) { result.push( chunk.iter().map(|&digit| f(digit)).collect() ); }; // flatten result vector subvec in result.iter() { elem in subvec.iter() { flat.push(elem.to_owned()); } } println!("flattened result: {:?}", flat); }
the threaded computation should taking place between for chunk…
, // flatten …
, can't find many simple examples of spawning x threads, assigning chunks sequentially, , returning newly-computed vector out of thread , container can flattened. have wrap orig.chunks()
in arc
, , manually grab each chunk in loop? have pass f
each thread? have use b-tree ensure input , output order match? can use simple_parallel
?
well, ideal application unstable thread::scoped()
:
#![feature(scoped)] use std::thread::{self, joinguard}; // tunable const numthreads: i32 = 4; fn f(val: i32) -> i32 { // expensive computation let res = val + 1; res } fn main() { // choose odd number of elements let orig: vec<i32> = (1..14).collect(); let mut guards: vec<joinguard<vec<i32>>> = vec!(); // split slices chunk in orig.chunks(orig.len() / numthreads usize) { let g = thread::scoped(move || chunk.iter().cloned().map(f).collect()); guards.push(g); }; // collect results let mut result: vec<i32> = vec::with_capacity(orig.len()); g in guards { result.extend(g.join().into_iter()); } println!("flattened result: {:?}", result); }
it unstable , won't stabilized in form because has inherent flaw (you can find more here). far can see, simple_parallel
extension of approach - hides fiddling joinguards
, can used in stable rust (probably unsafe
ty, believe). not recommended general use, however, docs suggest.
of course, can use thread::spawn()
, need clone each chunk moved each thread:
use std::thread::{self, joinhandle}; // tunable const numthreads: i32 = 4; fn f(val: i32) -> i32 { // expensive computation let res = val + 1; res } fn main() { // choose odd number of elements let orig: vec<i32> = (1..14).collect(); let mut guards: vec<joinhandle<vec<i32>>> = vec!(); // split slices chunk in orig.chunks(orig.len() / numthreads usize) { let chunk = chunk.to_owned(); let g = thread::spawn(move || chunk.into_iter().map(f).collect()); guards.push(g); }; // collect results let mut result: vec<i32> = vec::with_capacity(orig.len()); g in guards { result.extend(g.join().unwrap().into_iter()); } println!("flattened result: {:?}", result); }
Comments
Post a Comment