commands_functions.py 17.8 KB
Newer Older
Paulo Medeiros's avatar
Paulo Medeiros committed
1
#!/usr/bin/env python3
2
"""Implement the package's commands."""
Paulo Medeiros's avatar
Paulo Medeiros committed
3
import logging
4
import multiprocessing
Paulo Medeiros's avatar
Paulo Medeiros committed
5
6
7
8
import os
import platform
import subprocess
import time
9
10
11
12
13
14
15
16
17
from datetime import datetime
from functools import partial

import humanize
import numpy as np
import pandas as pd
import psutil
from joblib import Parallel, delayed, parallel_backend

18
19
20
21
22
from .clustering import (
    cluster_netatmo_obs,
    report_clustering_results,
    sort_df_by_cluster_size,
)
23
from .config_parser import read_config
24
from .domains import Domain
25
from .dtgs import Dtg
26
27
28
from .load_data import (
    DataNotFoundError,
    read_netatmo_data_for_dtg,
29
    remove_irregular_stations,
30
31
    rm_moving_stations,
)
32
33
34
from .logs import get_logger, logcolor
from .mpi import mpi_parallel
from .plots import (
Paulo Medeiros's avatar
Paulo Medeiros committed
35
    DEF_FIGSHOW_CONFIG,
36
37
38
    make_clustering_fig,
    show_cmd_get_fig_from_dataframes,
)
39
from .save_data import netatmoqc_input2output
40

41
42
logger = logging.getLogger(__name__)

43

44
def cluster_obs_single_dtg(args):
45
46
47
48
49
50
    """Implement the "cluster" command.

    Args:
        args (argparse.Namespace): Parsed command line arguments.

    """
51
52
    config = read_config(args.config_file)

53
54
    try:
        dtg = Dtg(args.dtg)
55
    except TypeError:
56
        dtg = None
57
    if dtg is None:
58
        dtg = config.general.dtgs[0]
59
    logger.info(
60
        "Running %s on obs for %sDTG=%s%s...",
61
        config.general.clustering_method,
62
        logcolor.cyan,
63
        dtg,
64
        logcolor.reset,
65
    )
66
67
68
69

    if args.savefig:
        # Create outdir at the beginning so users don't
        # waste time in case they can't save results
70
        outdir = config.general.outdir / "{}_netatmoqc_cluster".format(
71
72
73
74
75
            datetime.now().strftime("%Y-%m-%d_%H.%M.%S")
        )
        # Allow mkdir to raise eventual exceptions if cannot write to outdir
        outdir.mkdir(parents=True)

76
77
78
79
    try:
        df = read_netatmo_data_for_dtg(
            dtg, rootdir=config.general.data_rootdir
        )
80
81
82
83
    except DataNotFoundError:
        logger.warning(
            "Could not cluster obs for dtg=%s: ", dtg, exc_info=True
        )
84
85
        return

86
    df, _ = remove_irregular_stations(df)
87
    df = cluster_netatmo_obs(df=df, config=config)
88
    report_clustering_results(df)
89

90
91
    if args.show or args.savefig:
        logger.info("Preparing figure...")
92
        df = sort_df_by_cluster_size(df)
93
94
        domain = Domain.construct_from_dict(config.domain)
        fig = make_clustering_fig(df, domain=domain)
95
        if args.savefig:
96
97
98
            fpath = str(outdir / "cluster_obs_single_dtg.html")
            logger.info("Saving figure to %s\n", fpath)
            fig.write_html(fpath)
99
        if args.show:
Paulo Medeiros's avatar
Paulo Medeiros committed
100
            fig.show(config=DEF_FIGSHOW_CONFIG)
101
102
103
104

    logger.info(
        "%sDone with 'cluster' command.%s", logcolor.cyan, logcolor.reset
    )
105
106
107


def _select_stations_single_dtg(dtg, config, args):
108
109
110
111
112
113
114
115
116
117
118
    """Implement "select_stations" for a single DTG.

    Args:
        dtg (Dtg): The dtg for which to select stations.
        config (ParsedConfig): Parsed dict of configs from config file.
        args (argparse.Namespace): Parsed command line arguments.

    Returns:
        (DataFrame, DataFrame, DataFrame): df_accepted, df_rejected, df_moving

    """
119
    tstart = time.time()
120
    logger = get_logger(__name__, args.loglevel)
121
    logger.info("%sDTG=%s%s: Started", logcolor.cyan, dtg, logcolor.reset)
122
123
124
125

    # Some control of oversubscription if using mpi.
    # This is not needed when using single-host joblib with "loky"
    cpu_share = -1
126
    if args.mpi:
127
128
129
130
        proc_parent = psutil.Process(os.getppid())
        proc_family_size = len(proc_parent.children())
        cpu_share = multiprocessing.cpu_count() // proc_family_size

131
    try:
132
133
        df = read_netatmo_data_for_dtg(
            dtg, rootdir=config.general.data_rootdir
134
        )
135
136
    except DataNotFoundError:
        logger.warning("Could not select obs for dtg=%s: ", dtg, exc_info=True)
137
138
        return pd.DataFrame(), pd.DataFrame(), pd.DataFrame()

139
140
    df, df_moving = remove_irregular_stations(df)

141
    df = cluster_netatmo_obs(
142
        df=df, config=config, n_jobs=cpu_share, calc_silhouette_samples=False,
143
144
    )

145
146
    selected_cols = ["id", "lat", "lon", "alt"]
    rejected_stat_mask = df["cluster_label"] < 0
147
148
    df_rejected = df[selected_cols][rejected_stat_mask]
    df_accepted = df[selected_cols][~rejected_stat_mask]
Paulo Medeiros's avatar
Paulo Medeiros committed
149
    df_moving = df_moving[selected_cols]
150

151
    logger.info("DTG=%s: Done. Elapsed: %.1fs", dtg, time.time() - tstart)
152
    return df_accepted, df_rejected, df_moving
153

154

155
def select_stations(args):
156
    """Implement the "select" command.
157

158
159
160
    This function calls "_select_stations_single_dtg" for each DTG
    (in parallel if requested/possible), and then processes, gathers
    and saves the results.
161
162
163
164

    Args:
        args (argparse.Namespace): Parsed command line arguments.

165
    """
Paulo Medeiros's avatar
Paulo Medeiros committed
166
    tstart_selection = time.time()
167
    config = read_config(args.config_file)
Paulo Medeiros's avatar
Paulo Medeiros committed
168

169
170
    # Create outdir at the beginning so users don't
    # waste time in case they can't save results
171
    outdir = config.general.outdir / "{}_netatmoqc_select".format(
172
173
174
175
176
        datetime.now().strftime("%Y-%m-%d_%H.%M.%S")
    )
    # Allow mkdir to raise eventual exceptions if cannot write to outdir
    outdir.mkdir(parents=True)

177
    # Process DTGs in parallel if possible/requested
178
179
    if args.mpi:
        logger.info("Parallelising tasks over DTGs using MPI")
180
        func = partial(_select_stations_single_dtg, config=config, args=args)
181
        results = mpi_parallel(func, config.general.dtgs)
182
    else:
183
        n_procs = int(os.getenv("NETATMOQC_MAX_PYTHON_PROCS", "1"))
184
185
        if n_procs > 1:
            logger.info("Parallelising tasks over DTGs within a single host")
186
187
188
189
        # Using the "loky" backend helps avoiding oversubscription
        # of cpus in child processes
        with parallel_backend("loky"):
            results = Parallel(n_jobs=n_procs)(
190
                delayed(_select_stations_single_dtg)(dtg, config, args)
191
                for dtg in config.general.dtgs
192
            )
Paulo Medeiros's avatar
Paulo Medeiros committed
193
    logger.info(
194
        "End of survey over %d DTGs. Combining results...\n",
195
        len(config.general.dtgs),
Paulo Medeiros's avatar
Paulo Medeiros committed
196
    )
197
198
    # Gather results: Only accepted and rejected stations for now. Moving
    # stations will be handled after post-processing accepted and rejected
199
200
    df_accepted = pd.concat((r[0] for r in results), ignore_index=True)
    df_rejected = pd.concat((r[1] for r in results), ignore_index=True)
Paulo Medeiros's avatar
Paulo Medeiros committed
201

202
203
    df_accepted, df_moving_accepted = rm_moving_stations(df_accepted)
    if len(df_moving_accepted.index) > 0:
Paulo Medeiros's avatar
Paulo Medeiros committed
204
        logger.warning(
Paulo Medeiros's avatar
Paulo Medeiros committed
205
            'Removed %d "moving" stations from the list of accepted stations',
206
            len(df_moving_accepted["id"].unique()),
Paulo Medeiros's avatar
Paulo Medeiros committed
207
208
        )

209
210
    df_rejected, df_moving_rejected = rm_moving_stations(df_rejected)
    if len(df_moving_rejected.index) > 0:
Paulo Medeiros's avatar
Paulo Medeiros committed
211
        logger.warning(
Paulo Medeiros's avatar
Paulo Medeiros committed
212
            'Removed %d "moving" stations from the list of removed stations',
213
            len(df_moving_rejected["id"].unique()),
Paulo Medeiros's avatar
Paulo Medeiros committed
214
        )
215
216

    # Let's recover some stations for the list of rejected if they
217
    # also appear among the df_accepted. Do this based on a
218
219
    # tolerance criteria.
    logger.info(
220
221
222
223
        (
            "Applying station_rejection_tol=%.2f to rescue "
            'stations from the "rejected" list'
        ),
Paulo Medeiros's avatar
Paulo Medeiros committed
224
        config.commands.select.station_rejection_tol,
225
    )
226
227
    accept_counts_per_id = df_accepted["id"].value_counts()
    reject_counts_per_id = df_rejected["id"].value_counts()
228
    rescued_rejected_stats = []
229
    # Keep track of rejection rates. We'll use this later.
230
    id2rejection_rate = {}
231
232
233
234
    for stat_id in reject_counts_per_id.index:
        reject_count = reject_counts_per_id[stat_id]
        try:
            accept_count = accept_counts_per_id[stat_id]
235
        except KeyError:
236
237
            accept_count = 0
        rejection_rate = 1.0 * reject_count / (reject_count + accept_count)
238
        id2rejection_rate[stat_id] = rejection_rate
239
        if rejection_rate < config.commands.select.station_rejection_tol:
240
241
            rescued_rejected_stats.append(stat_id)
    # Using a rescued_rejected_stats list to modify the dataframes
242
    # df_accepted/df_rejected is much faster than modifying
243
    # the dataframes inside the loop above
244
245
    rescued_rows = df_rejected.loc[
        df_rejected["id"].isin(rescued_rejected_stats), :
246
    ]
247
248
    df_accepted = df_accepted.append(rescued_rows, ignore_index=True)
    df_rejected = df_rejected.drop(rescued_rows.index)
249
    if len(rescued_rejected_stats) > 0:
250
        logger.info(
251
252
253
254
            (
                "    > Rescuing %d stations rejected in "
                "less than %.1f%% of occurences"
            ),
255
            len(rescued_rejected_stats),
256
257
258
259
            100 * config.commands.select.station_rejection_tol,
        )
    else:
        logger.info('    > No stations recovered from "rejected"')
260
261

    # We don't need duplicate entries
262
263
    df_accepted = df_accepted.drop_duplicates(subset=["id"])
    df_rejected = df_rejected.drop_duplicates(subset=["id"])
Paulo Medeiros's avatar
Paulo Medeiros committed
264

265
    # Now we can handle moving stations
266
267
    df_moving = pd.concat(
        [r[2] for r in results] + [df_moving_accepted, df_moving_rejected],
268
        ignore_index=True,
269
    )
270
    df_unique_moving = df_moving["id"].unique()
Paulo Medeiros's avatar
Paulo Medeiros committed
271

272
273
    df_rejected = df_rejected[~df_rejected["id"].isin(df_unique_moving)]
    df_accepted = df_accepted[
274
        ~(
275
276
            df_accepted["id"].isin(df_unique_moving)
            | df_accepted["id"].isin(df_rejected["id"])
277
        )
Paulo Medeiros's avatar
Paulo Medeiros committed
278
    ]
279
    if len(df_unique_moving) > 0:
280
        logger.warning(
281
282
283
284
285
286
            (
                "\n A total of %d unique stations were removed from both\n"
                " accepted and rejected lists because they changed\n"
                " at least one of (lat, lon, alt) throughout the survey\n"
                " These will be saved separately in the moving_stations.csv file"
            ),
287
            len(df_unique_moving),
288
        )
Paulo Medeiros's avatar
Paulo Medeiros committed
289

290
291
292
    ##########################################################################
    # Trim accepted obs to (a possibly coarser version of) the domain's grid #
    ##########################################################################
293
    # Add (rejection_rate, grid_i, grid_j) columns to df_accepted,           #
294
295
296
    # then, for each (grid_i, grid_j), pick only the station that has the    #
    # lowest rejection_rate                                                  #
    ##########################################################################
297
    coarse_grid_factor = config.domain.thinning_grid_coarse_factor
298
299
300
    if coarse_grid_factor > 0:
        domain = Domain.construct_from_dict(config.domain)
        logger.info(
301
            "Thinning accepted obs: Keep only 1 station per support grid point"
302
        )
303
        logger.info(
304
            "    > Support grid spacing: %.1f m (%d x the domain's)",
Paulo Medeiros's avatar
Paulo Medeiros committed
305
            domain.thinning_grid.x_spacing,
306
307
            coarse_grid_factor,
        )
308
        n_preliminary_accepted = len(df_accepted.index)
309
310
311
312
313
314

        # (a) Add rejection rate info
        @np.vectorize
        def rej_rate(stat_id):
            try:
                return id2rejection_rate[stat_id]
315
            except KeyError:
316
317
                return 0.0

318
        df_accepted["rejection_rate"] = rej_rate(df_accepted["id"])
319
320

        # (b) Sort by rejection rate (lower to higher)
321
322
        df_accepted = df_accepted.loc[
            df_accepted["rejection_rate"].sort_values(ascending=True).index
323
324
325
        ]

        # (c) Add grid (i, j) info
Paulo Medeiros's avatar
Paulo Medeiros committed
326
        icol, jcol = domain.thinning_grid.lonlat2grid(
327
            df_accepted["lon"].to_numpy(), df_accepted["lat"].to_numpy(),
328
        )
329
330
        df_accepted["i"] = icol
        df_accepted["j"] = jcol
331

332
333
        # (d) rm all but the lowest-rejection-rate station at each grid (i, j)
        grid_trimmed_stations = df_accepted.groupby(
334
335
336
            ["i", "j"], as_index=False, sort=False
        ).first()
        grid_trimmed_stations = grid_trimmed_stations.drop(["i", "j"], axis=1)
337
        df_accepted = df_accepted.drop(["i", "j"], axis=1)
338

339
340
341
342
343
        # (e) Finally, move to df_rejected those stations that appear in
        #     df_accepted but not in grid_trimmed_stations
        grid_trimmed_mask = df_accepted["id"].isin(grid_trimmed_stations["id"])
        df_rejected = df_rejected.append(df_accepted[~grid_trimmed_mask])
        df_accepted = grid_trimmed_stations
344

345
346
        logger.info(
            "    > Number of stations rejected at the thinning step: %d",
347
            n_preliminary_accepted - len(df_accepted.index),
348
        )
349

350
351
352
    ##################
    # Report results #
    ##################
353
    total_nstations = sum(
354
        map(len, [df_accepted.index, df_rejected.index, df_unique_moving])
355
    )
356
    n_selected = len(df_accepted.index)
Paulo Medeiros's avatar
Paulo Medeiros committed
357
358
359
    ratio = 100.0 * n_selected / total_nstations

    logger.info(
360
361
        "%sFinished with %d accepted stations%s (%.2f%% out of %d)",
        logcolor.cyan,
362
        n_selected,
363
        logcolor.reset,
364
365
        ratio,
        total_nstations,
Paulo Medeiros's avatar
Paulo Medeiros committed
366
    )
367
    logger.info(
368
        "That's %d rejections %s(%.2f%% rejection rate)%s",
369
        total_nstations - n_selected,
370
        logcolor.cyan,
Paulo Medeiros's avatar
Paulo Medeiros committed
371
        100.0 - ratio,
372
        logcolor.reset,
373
    )
Paulo Medeiros's avatar
Paulo Medeiros committed
374

375
376
377
    ###########################
    # Save and/or show results #
    ###########################
378
    fpath = outdir / "accepted_stations.csv"
379
380
381
382
383
384
385
    logger.info(
        "%sSaving accepted station data to%s %s",
        logcolor.cyan,
        logcolor.reset,
        fpath,
    )
    df_accepted.to_csv(fpath, index=None, mode="w")
Paulo Medeiros's avatar
Paulo Medeiros committed
386

387
    fpath = outdir / "rejected_stations.csv"
388
389
390
391
392
393
394
    logger.info(
        "%sSaving rejected station data to%s %s",
        logcolor.cyan,
        logcolor.reset,
        fpath,
    )
    df_rejected.to_csv(fpath, index=None, mode="w")
Paulo Medeiros's avatar
Paulo Medeiros committed
395

396
    fpath = outdir / "moving_stations.csv"
397
398
399
400
401
402
403
    logger.info(
        "%sSaving moving station data%s to %s\n",
        logcolor.cyan,
        logcolor.reset,
        fpath,
    )
    df_moving.to_csv(fpath, index=None, mode="w")
Paulo Medeiros's avatar
Paulo Medeiros committed
404

405
406
407
408
409
410
411
412
413
414
    # Save data from selected stations in OBSOUL file format
    netatmoqc_input2output(
        config.general.dtgs,
        netatmo_data_rootdir=config.general.data_rootdir,
        selected_stations=df_accepted["id"],
        outdir=outdir,
        loglevel=args.loglevel,
        use_mpi=args.mpi,
        save_csv=True,
        save_obsoul=args.save_obsoul,
415
        obsoul_export_params=config.general.obsoul_export_params,
416
    )
417

418
419
    # We're now done.
    elapsed = time.time() - tstart_selection
420
    logger.info(
421
422
423
424
425
426
        "%sDone with 'select' command, %d DTGs. Elapsed: %s (~%s per DTG)%s",
        logcolor.cyan,
        len(config.general.dtgs),
        humanize.precisedelta(elapsed),
        humanize.precisedelta(elapsed / len(config.general.dtgs)),
        logcolor.reset,
Paulo Medeiros's avatar
Paulo Medeiros committed
427
428
429
430
431
    )


# Code related to the csv2obsoul command
def csv2obsoul(args):
432
433
434
435
436
437
    """Implement the "csv2obsoul" command.

    Args:
        args (argparse.Namespace): Parsed command line arguments.

    """
Paulo Medeiros's avatar
Paulo Medeiros committed
438
439
440
441
    config = read_config(args.config_file)

    # Create outdir at the beginning so users don't
    # waste time in case they can't save results
442
    outdir = config.general.outdir / "{}_netatmoqc_csv2obsoul".format(
Paulo Medeiros's avatar
Paulo Medeiros committed
443
444
445
446
447
        datetime.now().strftime("%Y-%m-%d_%H.%M.%S")
    )
    # Allow mkdir to raise eventual exceptions if cannot write to outdir
    outdir.mkdir(parents=True)

448
449
    # Allow picking only selected stations
    selected_stations = None
450
451
452
    if args.selected_stations_fpath is not None:
        if args.selected_stations_fpath.suffix == ".csv":
            selected_stations = pd.read_csv(args.selected_stations_fpath)["id"]
Paulo Medeiros's avatar
Paulo Medeiros committed
453
            logger.info(
454
                "Found %s selected stations in file '%s'.",
Paulo Medeiros's avatar
Paulo Medeiros committed
455
                len(selected_stations),
456
                args.selected_stations_fpath,
Paulo Medeiros's avatar
Paulo Medeiros committed
457
            )
458
459
        else:
            logger.warning(
460
461
                'Only csv files supported in "--selected-stations-fpath". '
                "Received '%s'." % (args.selected_stations_fpath),
462
463
            )

464
    netatmoqc_input2output(
465
466
        config.general.dtgs,
        netatmo_data_rootdir=config.general.data_rootdir,
Paulo Medeiros's avatar
Paulo Medeiros committed
467
468
        dropna=args.dropna,
        fillna=args.fillna,
469
        selected_stations=selected_stations,
Paulo Medeiros's avatar
Paulo Medeiros committed
470
471
        rm_duplicate_stations=args.rm_duplicate_stations,
        rm_moving_stations=args.rm_moving_stations,
472
        outdir=outdir,
Paulo Medeiros's avatar
Paulo Medeiros committed
473
        loglevel=args.loglevel,
474
        use_mpi=args.mpi,
475
476
        save_csv=False,
        save_obsoul=True,
477
        obsoul_export_params=config.general.obsoul_export_params,
Paulo Medeiros's avatar
Paulo Medeiros committed
478
479
480
481
    )

    logger.info(
        "%sDone with 'csv2obsoul' command.%s", logcolor.cyan, logcolor.reset
482
    )
483

484
485
486
487
488
489
490
491
492
493
494
495

# Code related to the "show" command
def _open_file_with_default_app(fpath):
    if platform.system() == "Windows":
        os.startfile(fpath)
    elif platform.system() == "Darwin":
        subprocess.call(("open", fpath))
    else:
        subprocess.call(("xdg-open", fpath))


def show(args):
496
497
498
499
500
501
    """Implement the 'show' command.

    Args:
        args (argparse.Namespace): Parsed command line arguments.

    """
502
503
    logger = get_logger(__name__, args.loglevel)

504
505
    config = read_config(args.config_file)
    domain = Domain.construct_from_dict(config.domain)
506
507

    if args.domain:
508
        fig = domain.get_fig(display_grid_max_gsize=20000.0)
509
510
511
512
513
        fig.update_layout(
            legend=dict(
                orientation="h", xanchor="center", x=0.5, yanchor="top", y=0.0,
            )
        )
Paulo Medeiros's avatar
Paulo Medeiros committed
514
        fig.show(config=DEF_FIGSHOW_CONFIG)
515
516
517

    if args.config:
        logger.info("TOML config dump:\n\n%s", config.toml_dumps())
518

519
520
521
522
523
524
525
526
527
528
529
530
531
532
    dataframes = {}
    for fpath in args.file_list:
        if fpath.suffix == ".csv":
            logger.info("Reading data from file %s", fpath)
            dataframes[fpath.name] = pd.read_csv(fpath)
        elif fpath.suffix in [".htm", ".html"]:
            logger.info("Openning file '%s'", fpath)
            _open_file_with_default_app(fpath)
        else:
            logger.warning(
                "Only html and csv files supported. Skipping file '%s'", fpath
            )

    if len(dataframes) > 0:
533
        fig = show_cmd_get_fig_from_dataframes(args, dataframes, domain)
Paulo Medeiros's avatar
Paulo Medeiros committed
534
        fig.show(config=DEF_FIGSHOW_CONFIG)