Commit 55e291ef authored by Paulo Medeiros's avatar Paulo Medeiros
Browse files

Enable MPI via config instead of command line args

parent 6d340fd8
......@@ -82,14 +82,6 @@ def get_parsed_args(program_name="program", argv=None):
choices=["critical", "error", "warning", "info", "debug", "notset"],
help="What type of info should be printed to the log",
)
parser.add_argument(
"--mpi",
action="store_true",
help=(
"Enable MPI parallelisation in some parts of the code. "
+ "Requires that the code be installed with support to MPI."
),
)
# Configure the main parser to handle the commands
subparsers = parser.add_subparsers(
......
......@@ -115,7 +115,7 @@ def _select_stations_single_time_window(time_window, args, config):
# Some control of oversubscription if using mpi.
# This is not needed when using single-host joblib with "loky"
cpu_share = -1
if args.mpi:
if config.general.use_mpi:
proc_parent = psutil.Process(os.getppid())
proc_family_size = len(proc_parent.children())
cpu_share = multiprocessing.cpu_count() // proc_family_size
......@@ -158,7 +158,7 @@ def select_stations(args, config):
domain = Domain.from_config(config.domain)
# Process time windows in parallel if possible/requested
if args.mpi:
if config.general.use_mpi:
logger.info("Parallelising tasks over time windows using MPI")
func = partial(_select_stations_single_time_window, args=args, config=config)
df_time_windows_qc = mpi_parallel(func, config.general.assimilation_times)
......@@ -370,7 +370,6 @@ def csv2obsoul(args, config):
rm_duplicate_stations=args.rm_duplicate_stations,
rm_moving_stations=args.rm_moving_stations,
outdir=outdir,
use_mpi=args.mpi,
save_csv=False,
save_obsoul=True,
)
......
......@@ -181,6 +181,7 @@ class _GeneralSectionModel(_ConfigsBaseModel):
remove_outliers_in_subdomain_preclustering: bool = False
apply_mslp_to_pressure_conversion_upon_reading: bool = True
outdir: ParsedPath = Path(tempfile.gettempdir()).resolve() / "netatmoqc_output"
use_mpi: bool = False
# Data cols to export when saving obsoul output
obsoul_export_params: List[str] = ("pressure", "temperature")
......
......@@ -20,7 +20,8 @@ logger = logging.getLogger(__name__)
def mpi_parallel(fun, iterable):
"""Run function "fun" in parallel over "iterable" using MPI."""
if MPI4PY_IMPORT_ERROR is not None:
msg = f"{MPI4PY_IMPORT_ERROR}. {logcolor.red}Support to MPI is unavailable!{logcolor.reset}"
msg = f"{MPI4PY_IMPORT_ERROR}. "
msg += f"{logcolor.red}Support to MPI is unavailable!{logcolor.reset}"
raise ImportError(msg) from MPI4PY_IMPORT_ERROR
# Prevent using mpiexec with n>1
......
......@@ -321,7 +321,6 @@ def netatmoqc_input2output(
rm_duplicate_stations=True,
rm_moving_stations=True,
outdir=None,
use_mpi=False,
save_csv=True,
save_obsoul=False,
):
......@@ -344,7 +343,6 @@ def netatmoqc_input2output(
rm_moving_stations (bool): (Default value = True)
outdir (pathlib.Path): Dir where out files will be put.
(Default value = ".")
use_mpi (bool): Whether to use MPI or not (Default value = False).
save_csv (bool): Whether to save csv files (Default value = True).
save_obsoul (bool): Whether to save csv files (Default value = True).
......@@ -382,7 +380,7 @@ def netatmoqc_input2output(
outdir_obsoul=outdir_obsoul,
)
if use_mpi:
if config.general.use_mpi:
mpi_parallel(in2out_ftime_window, config.general.assimilation_times)
else:
Parallel(n_jobs=MAX_PYTHON_PROCS, prefer="processes")(
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment