summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Beyer <mail@beyermatthias.de>2020-01-02 14:09:12 +0100
committerMatthias Beyer <mail@beyermatthias.de>2020-01-03 10:19:49 +0100
commit51fd196bed12e91ca1720ea383efe97145abdcb6 (patch)
tree605d6101324f5f2e597711aa4f2f89ae7f402b35
parenta6ca22c73036be412dafa04265506eb9135c6fd1 (diff)
downloadimag-51fd196bed12e91ca1720ea383efe97145abdcb6.zip
imag-51fd196bed12e91ca1720ea383efe97145abdcb6.tar.gz
Rewrite scan function with option to fetch pathes in parallel
This does not result in parallel store accessing, but only parallel fetching pathes from the filesystem. Signed-off-by: Matthias Beyer <mail@beyermatthias.de>
-rw-r--r--bin/domain/imag-mail/Cargo.toml1
-rw-r--r--bin/domain/imag-mail/src/lib.rs75
-rw-r--r--bin/domain/imag-mail/src/ui.rs7
3 files changed, 62 insertions, 21 deletions
diff --git a/bin/domain/imag-mail/Cargo.toml b/bin/domain/imag-mail/Cargo.toml
index 13a60a5..68e9055 100644
--- a/bin/domain/imag-mail/Cargo.toml
+++ b/bin/domain/imag-mail/Cargo.toml
@@ -25,6 +25,7 @@ failure = "0.1.5"
indoc = "0.3.3"
resiter = "0.4"
walkdir = "2"
+rayon = "1"
libimagrt = { version = "0.10.0", path = "../../../lib/core/libimagrt" }
libimagstore = { version = "0.10.0", path = "../../../lib/core/libimagstore" }
diff --git a/bin/domain/imag-mail/src/lib.rs b/bin/domain/imag-mail/src/lib.rs
index e290d7d..0240042 100644
--- a/bin/domain/imag-mail/src/lib.rs
+++ b/bin/domain/imag-mail/src/lib.rs
@@ -41,6 +41,7 @@ extern crate toml_query;
#[macro_use] extern crate indoc;
extern crate resiter;
extern crate walkdir;
+extern crate rayon;
extern crate libimagrt;
extern crate libimagmail;
@@ -58,14 +59,17 @@ use failure::Error;
use toml_query::read::TomlValueReadTypeExt;
use clap::App;
use resiter::AndThen;
+use resiter::Filter;
use resiter::IterInnerOkOrElse;
use resiter::Map;
+use rayon::prelude::*;
use libimagmail::mail::Mail;
use libimagmail::store::MailStore;
use libimagmail::util;
use libimagentryref::reference::{Ref, RefFassade};
use libimagentryref::util::get_ref_config;
+use libimagentryref::reference::Config as RefConfig;
use libimagrt::runtime::Runtime;
use libimagrt::application::ImagApplication;
use libimagutil::info_result::*;
@@ -119,30 +123,59 @@ fn scan(rt: &Runtime) -> Result<()> {
let collection_name = get_ref_collection_name(rt)?;
let refconfig = get_ref_config(rt, "imag-mail")?;
let scmd = rt.cli().subcommand_matches("scan").unwrap();
- let store = rt.store();
- scmd.values_of("path")
+ /// Helper function to get an Iterator for all files from one PathBuf
+ fn walk_pathes(path: PathBuf) -> impl Iterator<Item = Result<PathBuf>> {
+ walkdir::WalkDir::new(path)
+ .follow_links(false)
+ .into_iter()
+ .filter_ok(|entry| entry.file_type().is_file())
+ .map_err(Error::from)
+ .map_ok(|entry| entry.into_path())
+ .inspect(|e| trace!("Processing = {:?}", e))
+ }
+
+ /// Helper function to process an iterator over Result<PathBuf> and create store entries for
+ /// each path in the iterator
+ fn process_iter(i: &mut dyn Iterator<Item = Result<PathBuf>>, rt: &Runtime, collection_name: &str, refconfig: &RefConfig) -> Result<()> {
+ let scmd = rt.cli().subcommand_matches("scan").unwrap();
+ i.and_then_ok(|path| {
+ if scmd.is_present("ignore-existing-ids") {
+ rt.store().retrieve_mail_from_path(path, collection_name, refconfig, true)
+ } else {
+ rt.store().create_mail_from_path(path, collection_name, refconfig)
+ }
+ })
+ .and_then_ok(|e| rt.report_touched(e.get_location()).map_err(Error::from))
+ .collect::<Result<Vec<_>>>()
+ .map(|_| ())
+ }
+
+ let pathes = scmd.values_of("path")
.unwrap() // enforced by clap
.map(PathBuf::from)
- .map(|path| {
- walkdir::WalkDir::new(path)
- .follow_links(false)
- .into_iter()
- .filter_entry(|entry| entry.file_type().is_file())
- .map_ok(|entry| entry.into_path())
- .map_err(Error::from)
- })
- .flatten()
- .and_then_ok(|path| {
- if scmd.is_present("ignore_existing_ids") {
- store.retrieve_mail_from_path(path, &collection_name, &refconfig)
- } else {
- store.create_mail_from_path(path, &collection_name, &refconfig)
- }
- })
- .and_then_ok(|e| rt.report_touched(e.get_location()).map_err(Error::from))
- .collect::<Result<Vec<()>>>()
- .map(|_| ())
+ .collect::<Vec<_>>();
+
+ if scmd.is_present("scan-parallel") {
+ debug!("Fetching pathes in parallel");
+ let mut i = pathes.into_par_iter()
+ .map(|path| walk_pathes(path).collect::<Result<_>>())
+ .collect::<Result<Vec<PathBuf>>>()?
+ .into_iter()
+ .map(Ok);
+
+ debug!("Processing pathes");
+ process_iter(&mut i, rt, &collection_name, &refconfig)
+ } else {
+ debug!("Fetching pathes not in parallel");
+ let mut i = pathes
+ .into_iter()
+ .map(walk_pathes)
+ .flatten();
+
+ debug!("Processing pathes");
+ process_iter(&mut i, rt, &collection_name, &refconfig)
+ }
}
fn import_mail(rt: &Runtime) -> Result<()> {
diff --git a/bin/domain/imag-mail/src/ui.rs b/bin/domain/imag-mail/src/ui.rs
index de0cd0a..0af3e88 100644
--- a/bin/domain/imag-mail/src/ui.rs
+++ b/bin/domain/imag-mail/src/ui.rs
@@ -57,6 +57,13 @@ pub fn build_ui<'a>(app: App<'a, 'a>) -> App<'a, 'a> {
.required(false)
.help("Ignore errors that might occur when store entries exist already"))
+ .arg(Arg::with_name("scan-parallel")
+ .long("parallel")
+ .takes_value(false)
+ .required(false)
+ .multiple(false)
+ .help("Scan with multiple threads. Might be faster, but might slow down other tasks"))
+
.arg(Arg::with_name("path")
.index(1)
.takes_value(true)