diff --git a/Cargo.lock b/Cargo.lock index 2792198..9d6c3f0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -43,6 +43,37 @@ dependencies = [ "vec_map", ] +[[package]] +name = "crossbeam-deque" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9dd111b7b7f7d55b72c0a6ae361660ee5853c9af73f70c3c2ef6858b950e2e51" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" +dependencies = [ + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" + +[[package]] +name = "either" +version = "1.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" + [[package]] name = "heck" version = "0.3.3" @@ -65,6 +96,7 @@ dependencies = [ name = "indivisible" version = "2.0.0" dependencies = [ + "rayon", "structopt", ] @@ -76,9 +108,9 @@ checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" [[package]] name = "libc" -version = "0.2.178" +version = "0.2.182" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "37c93d8daa9d8a012fd8ab92f088405fb202ea0b6ab73ee2482ae66af4f42091" +checksum = "6800badb6cb2082ffd7b6a67e6125bb39f18782f793520caee8cb8846be06112" [[package]] name = "proc-macro-error" @@ -106,22 +138,42 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.103" +version = "1.0.106" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ee95bc4ef87b8d5ba32e8b7714ccc834865276eab0aed5c9958d00ec45f49e8" +checksum = "8fd00f0bb2e90d81d1044c2b32617f68fcb9fa3bb7640c23e9c748e53fb30934" dependencies = [ "unicode-ident", ] [[package]] name = "quote" -version = "1.0.42" +version = "1.0.45" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a338cc41d27e6cc6dce6cefc13a0729dfbb81c262b1f519331575dd80ef3067f" +checksum = "41f2619966050689382d2b44f664f4bc593e129785a36d6ee376ddf37259b924" dependencies = [ "proc-macro2", ] +[[package]] +name = "rayon" +version = "1.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "368f01d005bf8fd9b1206fb6fa653e6c4a81ceb1466406b81792d87c5677a58f" +dependencies = [ + "either", + "rayon-core", +] + +[[package]] +name = "rayon-core" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22e18b0f0062d30d4230b2e85ff77fdfe4326feb054b9783a3460d8435c8ab91" +dependencies = [ + "crossbeam-deque", + "crossbeam-utils", +] + [[package]] name = "strsim" version = "0.8.0" @@ -174,9 +226,9 @@ dependencies = [ [[package]] name = "unicode-ident" -version = "1.0.22" +version = "1.0.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9312f7c4f6ff9069b165498234ce8be658059c6728633667c526e27dc2cf1df5" +checksum = "e6e4313cd5fcd3dad5cafa179702e2b244f760991f45397d14d4ebf38247da75" [[package]] name = "unicode-segmentation" diff --git a/Cargo.toml b/Cargo.toml index 18b9633..7d4f685 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,3 +10,4 @@ keywords = [ "primes", "math" ] [dependencies] structopt = "0.3" +rayon = "1.11" diff --git a/benchmark.sh b/benchmark.sh index c296250..cda6760 100755 --- a/benchmark.sh +++ b/benchmark.sh @@ -16,7 +16,7 @@ then exit 1 fi -while getopts "t:s:" opt +while getopts "t:s:j:" opt do case "$opt" in s) @@ -26,6 +26,10 @@ do t) TRIALS="$OPTARG" ;; + j) + OPTIONS=("${OPTIONS[@]}" -j "$OPTARG") + JOBS=$OPTARG + ;; *) >&2 echo "Uknown option $opt" exit 1 @@ -36,6 +40,7 @@ done echo "Calculating primes up to 1,000,000,000" echo "Trials: $TRIALS" echo "Sieve segment size: ${SIEVE:-"default"}" +echo "Number of jobs: ${JOBS:-"default"}" TOTAL="0" for _ in $(seq "$TRIALS") do diff --git a/src/main.rs b/src/main.rs index 6b0cfc9..e20b28d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -16,11 +16,16 @@ * along with this program. If not, see . */ +use std::collections::VecDeque; use std::fs::File; use std::io::{BufRead, BufReader}; use std::path::PathBuf; use std::process; +use std::sync::{Arc, RwLock}; +use std::sync::mpsc::{Sender, Receiver}; +use std::sync::mpsc; use structopt::StructOpt; +use rayon::ThreadPoolBuilder; mod worker; @@ -37,13 +42,18 @@ struct Opt { num:usize, #[structopt(short, long, name = "SIZE", default_value = "10485760", help = "Set a custom sieve size")] sieve:usize, - //#[structopt(short, long, name = "n", default_value = "1", help = "Number of threads to spawn")] - //jobs:u64, + #[structopt(short, long, name = "n", default_value = "1", help = "Number of threads to spawn")] + jobs:usize, +} + +struct BatchResult { + batch_id: u64, + primes: Vec, } fn main() { let opts = Opt::from_args(); - let mut prime_list = Vec::new(); + let mut prime_list_raw = Vec::new(); if opts.import.is_some() { let in_file = File::open(opts.import.unwrap()).unwrap(); @@ -54,7 +64,7 @@ fn main() { break; } - prime_list.push(prime); + prime_list_raw.push(prime); } } @@ -63,31 +73,98 @@ fn main() { process::exit(1); } - let mut start:usize = if prime_list.is_empty() { + let pool = ThreadPoolBuilder::new().num_threads(opts.jobs).build().unwrap(); + let mut pending_tasks = 0; + /* force u64 to avoid overflow if sieve size / opts.num is larger than i32::MAX. + * This is unlikely, but possible. + */ + let mut batch_num: u64 = 0; + let mut last_batch_id: u64 = 0; + let mut remaining_batches = VecDeque::::new(); + let (tx, rx): (Sender, Receiver) = mpsc::channel(); + + let mut start:usize = if prime_list_raw.is_empty() { 2 } else { - (*prime_list.last().unwrap() + 1) as usize + (*prime_list_raw.last().unwrap() + 1) as usize + }; + let mut end = if start + opts.sieve < opts.num { + start + opts.sieve + } else { + opts.num + 1 }; - while start < opts.num { - let end = if start + opts.sieve < opts.num { - start + opts.sieve - } else { - opts.num + 1 - }; - let mut new_primes = worker::work_segment(&prime_list, start, end); - if opts.verbose { - for p in &new_primes { - println!("{}", *p); + let prime_list = Arc::new(RwLock::new(prime_list_raw)); + loop { + if (start < opts.num) && ((prime_list.read().unwrap().is_empty() && pending_tasks == 0) || + (*prime_list.read().unwrap().last().unwrap_or(&0)).pow(2) >= (end as u64)) + { + let prime_list_clone = Arc::clone(&prime_list); + let tx_clone = tx.clone(); + batch_num += 1; + let batch_id = batch_num; + pool.spawn(move || { + let res = worker::work_segment(&prime_list_clone, start, end); + tx_clone.send(BatchResult{batch_id, primes: res}).unwrap(); + }); + pending_tasks += 1; + start += opts.sieve; + end = if end + opts.sieve < opts.num { + end + opts.sieve + } else { + opts.num + 1 + }; + } else if pending_tasks > 0 { + let res = rx.recv().unwrap(); + pending_tasks -= 1; + if res.batch_id != last_batch_id + 1 { + if remaining_batches.is_empty() || (res.batch_id > remaining_batches.back().unwrap().batch_id) { + remaining_batches.push_back(res); + } else if res.batch_id < remaining_batches.front().unwrap().batch_id { + remaining_batches.push_front(res); + } else { + let mut i = 0; + while i < remaining_batches.len() { + if res.batch_id < remaining_batches[i].batch_id { + remaining_batches.insert(i, res); + break; + } + i += 1; + } + } + } else { + last_batch_id = res.batch_id; + + if opts.verbose { + for p in &res.primes { + println!("{}", *p); + } + } + prime_list.write().unwrap().append(&mut res.primes.clone()); + + loop { + match remaining_batches.pop_front_if(|br| br.batch_id == last_batch_id + 1) { + Some(br) => { + last_batch_id = br.batch_id; + if opts.verbose { + for p in &br.primes { + println!("{}", *p); + } + } + prime_list.write().unwrap().append(&mut br.primes.clone()); + }, + None => break, + }; + } } + } else { + // no more tasks to be added, and all tasks have been processed + break; } - prime_list.append(&mut new_primes); - - start += opts.sieve; } if opts.test { - if *prime_list.last().unwrap() == (opts.num as u64) { + if *prime_list.read().unwrap().last().unwrap() == (opts.num as u64) { if opts.verbose { println!("{} is prime", opts.num); } @@ -99,6 +176,6 @@ fn main() { process::exit(1); } } else if !opts.verbose { - println!("{}", prime_list.last().unwrap()); + println!("{}", prime_list.read().unwrap().last().unwrap()); } } diff --git a/src/worker.rs b/src/worker.rs index c1bb6cc..f25f326 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -16,6 +16,8 @@ * along with this program. If not, see . */ +use std::sync::RwLock; + /** * @brief Work on a segment. * @@ -25,13 +27,13 @@ * * @return List of primes found in segment. */ -pub fn work_segment(known_primes:&Vec, start:usize, end:usize) -> Vec { +pub fn work_segment(known_primes:&RwLock>, start:usize, end:usize) -> Vec { let mut sieve = vec![true; end - start]; let mut found_primes = Vec::new(); let sqrt_end = (end as f64).sqrt() as usize; - for p in known_primes { + for p in known_primes.read().unwrap().iter() { let prime = *p as usize; if prime > sqrt_end { break;