Commit d0c941ae authored by Paulo Medeiros's avatar Paulo Medeiros

select cmd: Save as csv. Choose OBSOUL out params

Previous commit on branch: cbacc14b

Summary of main changes in this merge commit:

- "select" command saves results in CSV format.
- Can now choose which data params o export when writing OBSOUL files.
- Move thinning_grid_coarse_factor config to domain
- Fix issue with grid point posiions
parents cbacc14b 846f9a87
......@@ -13,7 +13,6 @@ import psutil
import subprocess
import time
from .domains import Domain
from .file_formats import netatmo_csvs2obsouls
from .clustering import cluster_netatmo_obs, report_clustering_results
from .config_parser import read_config
from .dtgs import Dtg
......@@ -28,6 +27,7 @@ from .plots import (
make_clustering_fig,
show_cmd_get_fig_from_dataframes,
)
from .save_data import netatmoqc_input2output
logger = logging.getLogger(__name__)
......@@ -253,17 +253,17 @@ def select_stations(args):
# then, for each (grid_i, grid_j), pick only the station that has the #
# lowest rejection_rate #
##########################################################################
coarse_grid_factor = config.commands.select.thinning_grid_coarse_factor
coarse_grid_factor = config.domain.thinning_grid_coarse_factor
if coarse_grid_factor > 0:
domain = Domain.construct_from_dict(config.domain)
coarse_grid_domain = domain.get_coarser(factor=coarse_grid_factor)
thinning_grid_domain = domain.get_thinning_grid_version()
logger.info(
"Thinning accepted obs: "
+ "Keeping only 1 station per support grid point"
)
logger.info(
" > Support grid spacing: %.1f m (%d x the domain's)",
coarse_grid_domain.gsize,
thinning_grid_domain.gsize,
coarse_grid_factor,
)
n_preliminary_accepted = len(df_accepted.index)
......@@ -284,7 +284,7 @@ def select_stations(args):
]
# (c) Add grid (i, j) info
icol, jcol = coarse_grid_domain.lonlat2grid(
icol, jcol = thinning_grid_domain.lonlat2grid(
df_accepted["lon"].to_numpy(), df_accepted["lat"].to_numpy(),
)
df_accepted["i"] = icol
......@@ -363,16 +363,18 @@ def select_stations(args):
)
df_moving.to_csv(fpath, index=None, mode="w")
if args.save_obsoul:
# Save data from selected stations in OBSOUL file format
netatmo_csvs2obsouls(
config.general.dtgs,
netatmo_data_rootdir=config.general.data_rootdir,
selected_stations=df_accepted["id"],
outdir=outdir / "obsoul_files",
loglevel=args.loglevel,
use_mpi=args.mpi,
)
# Save data from selected stations in OBSOUL file format
netatmoqc_input2output(
config.general.dtgs,
netatmo_data_rootdir=config.general.data_rootdir,
selected_stations=df_accepted["id"],
outdir=outdir,
loglevel=args.loglevel,
use_mpi=args.mpi,
save_csv=True,
save_obsoul=args.save_obsoul,
obsoul_export_params=config.general.obsoul_export_params,
)
# We're now done.
elapsed = time.time() - tstart_selection
......@@ -400,16 +402,19 @@ def csv2obsoul(args):
# Allow mkdir to raise eventual exceptions if cannot write to outdir
outdir.mkdir(parents=True)
netatmo_csvs2obsouls(
netatmoqc_input2output(
config.general.dtgs,
netatmo_data_rootdir=config.general.data_rootdir,
dropna=args.dropna,
fillna=args.fillna,
rm_duplicate_stations=args.rm_duplicate_stations,
rm_moving_stations=args.rm_moving_stations,
outdir=outdir / "obsoul_files",
outdir=outdir,
loglevel=args.loglevel,
use_mpi=args.mpi,
save_csv=False,
save_obsoul=True,
obsoul_export_params=config.general.obsoul_export_params,
)
logger.info(
......@@ -436,7 +441,7 @@ def show(args):
domain = Domain.construct_from_dict(config.domain)
if args.domain:
fig = domain.get_fig()
fig = domain.get_fig(display_grid_max_gsize=20000.0)
fig.update_layout(
legend=dict(
orientation="h", xanchor="center", x=0.5, yanchor="top", y=0.0,
......
......@@ -306,38 +306,39 @@ def parsed_path(path):
with config_section("general") as section:
# in/our dirs
config_metadata.register(
"data_rootdir", default=".", astype=parsed_path,
)
config_metadata.register(
"outdir", default="/tmp/netatmoqc_output", astype=parsed_path,
)
# Chosen clustering method
config_metadata.register(
"clustering_method",
default="hdbscan",
choices=["hdbscan", "dbscan", "optics", "rsl"],
)
# DTGs
config_metadata.register("dtgs.list")
config_metadata.register("dtgs.start")
config_metadata.register("dtgs.end")
config_metadata.register("dtgs.cycle_length", default="3H")
# Metrics optimisaion scheme
config_metadata.register(
"custom_metrics_optimize_mode",
default="memory",
choices=["speed", "memory", "speed_mem_compromise"],
)
# Data cols to export when saving obsoul output
config_metadata.register(
"obsoul_export_params",
default=["pressure", "temperature", "humidity", "sum_rain_1"],
)
# "commans section"
with config_section("commands.select") as section:
config_metadata.register("station_rejection_tol", default=0.15, minval=0.0)
# At the end of its run, the "select" command uses a coarser version of the
# domain's grid to thin the returned data (at most one obs per coarse grid
# point is returned). The config param "thinning_grid_coarse_factor"
# controls how coarser this coarse grid will be.
config_metadata.register(
"thinning_grid_coarse_factor", default=4, minval=1, astype=int
)
# clustering_method.hdbscan
with config_section("clustering_method.hdbscan") as section:
......@@ -401,6 +402,13 @@ with config_section("domain") as section:
config_metadata.register("gsize", default=2500.0, minval=0)
config_metadata.register("ezone", default=11, minval=0, astype=int)
config_metadata.register("lmrt", default=False, choices=[True, False])
# At the end of its run, the "select" command uses a coarser version of the
# domain's grid to thin the returned data (at most one obs per coarse grid
# point is returned). The config param "thinning_grid_coarse_factor"
# controls how coarser this coarse grid will be.
config_metadata.register(
"thinning_grid_coarse_factor", default=4, minval=0, astype=int
)
#####################################
......
......@@ -27,10 +27,13 @@ class Domain:
lmrt = attr.ib(default=False)
tstep = attr.ib(default=0.0)
name = attr.ib(default="")
# nsplit_lon and nsplit_lat are netatmoqc-specific. They are used
# employed to split the domain in subdomains for preliminary clustering
# The params below are netatmoqc-specific.
# nsplit_lon and nsplit_lat are employed to split the domain in subdomains
# for preliminary clustering
nsplit_lon = attr.ib(default=1)
nsplit_lat = attr.ib(default=1)
# thinning_grid_coarse_factor is used for post-clustering thinning
thinning_grid_coarse_factor = attr.ib(default=1)
def __attrs_post_init__(self):
if self.lmrt and abs(self.lat0) > 0:
......@@ -61,6 +64,7 @@ class Domain:
lmrt=config.lmrt,
nsplit_lon=config.split.lon,
nsplit_lat=config.split.lat,
thinning_grid_coarse_factor=config.thinning_grid_coarse_factor,
)
@property
......@@ -160,6 +164,13 @@ class Domain:
new.change_grid_spacing(factor=factor ** -1)
return new
def get_thinning_grid_version(self):
""" Return another version of self, but with the thinning grid """
if self.thinning_grid_coarse_factor < 1:
return None
else:
return self.get_coarser(factor=self.thinning_grid_coarse_factor)
@property
def n_subdomains(self):
return self.nsplit_lon * self.nsplit_lat
......@@ -342,8 +353,8 @@ class Domain:
def get_grid_ij2xy_map(self):
# We can then work with x=grid2x[i, j], y=grid2y[i, j]
xvals = np.linspace(self.xmin, self.xmax, self.nlon)
yvals = np.linspace(self.ymin, self.ymax, self.nlat)
xvals = np.linspace(self.xmin, self.xmax, self.nlon, endpoint=False)
yvals = np.linspace(self.ymin, self.ymax, self.nlat, endpoint=False)
grid2x, grid2y = np.meshgrid(xvals, yvals)
return grid2x, grid2y
......
......@@ -122,9 +122,36 @@ def draw_boundaries(
return fig
def draw_grid_pts(
fig, domain, display_grid_max_gsize=None, name="Grid", **marker_opts
):
""" Add to fig a trace containing a selection of grid points """
if display_grid_max_gsize is None:
display_grid_max_gsize = domain.gsize
if display_grid_max_gsize <= 0:
return fig
grid_draw_every = max(1, int(display_grid_max_gsize / domain.gsize))
lons, lats = domain.get_grid_ij2lonlat_map()
if grid_draw_every > 1:
name += " (every %s point of)" % (humanize.ordinal(grid_draw_every))
fig.add_trace(
go.Scattergeo(
name=name,
lat=lats[::grid_draw_every, ::grid_draw_every].flatten(),
lon=lons[::grid_draw_every, ::grid_draw_every].flatten(),
mode="markers",
marker=marker_opts,
)
)
return fig
def get_domain_fig(
domain,
max_ngrid=100,
# Show 1 grid point every "display_grid_max_gsize" meters.
# Don't show any grid point if display_grid_max_gsize <=0.
display_grid_max_gsize=0,
lonrange=None,
latrange=None,
ezone=None,
......@@ -201,23 +228,26 @@ def get_domain_fig(
),
)
# Draw selection of grid points if requested
lons, lats = domain.get_grid_ij2lonlat_map()
ngrid_full = max(domain.nlon, domain.nlat)
if max_ngrid > 0:
grid_draw_every = max(1, int(np.rint(ngrid_full / max_ngrid)))
name = "Grid"
if grid_draw_every > 1:
name += " (every %s point)" % (humanize.ordinal(grid_draw_every))
fig.add_trace(
go.Scattergeo(
name=name,
lat=lats[::grid_draw_every, ::grid_draw_every].flatten(),
lon=lons[::grid_draw_every, ::grid_draw_every].flatten(),
mode="markers",
marker=dict(size=2, color="black"),
)
# Draw selection of grid points
# (a) Support grid (if used)
if domain.thinning_grid_coarse_factor > 0:
fig = draw_grid_pts(
fig,
domain.get_thinning_grid_version(),
display_grid_max_gsize=display_grid_max_gsize,
name="Support Grid",
color="red",
size=1.5,
opacity=0.5,
)
# (b) Main grid
fig = draw_grid_pts(
fig,
domain,
display_grid_max_gsize=display_grid_max_gsize,
color="black",
size=1.5,
)
# Add line segments to mark domain, subdomain and extension zone boundaries
# (a) Observations
......
......@@ -3,9 +3,12 @@ from collections import OrderedDict
from functools import partial
from joblib import delayed, parallel_backend, Parallel
import logging
import numpy as np
import os
from pathlib import Path
import pandas as pd
import psutil
from .dtgs import datetime2epoch
from .load_data import DataNotFoundError, read_netatmo_data_for_dtg
from .logs import get_logger, logcolor
from .mpi import mpi_parallel
......@@ -36,15 +39,17 @@ def save_df_as_netatmo_csv(df, path, overwrite=False):
df = df.rename(columns=dict(mslp="pressure"))
# Saving the adjusted df
logger.info("Saving file %s", path)
logger.debug(
"%sSaving CSV, %d obs%s: %s",
logcolor.cyan,
len(df.index),
logcolor.reset,
path,
)
df.to_csv(path, index=None, mode="w")
def save_df_as_obsoul(
df,
fpath=None,
export_params=["pressure", "temperature", "humidity", "sum_rain_1"],
):
def save_df_as_obsoul(df, fpath=None, export_params=None):
"""Saves a dataframe returned by read_netatmo_data_for_dtg in obsoul format
For further reference on the obsoul file format, please take a look at the
......@@ -52,6 +57,9 @@ def save_df_as_obsoul(
<http://www.rclace.eu/File/Data_Assimilation/2007/lace_obspp.pdf>
"""
if export_params is None:
export_params = ["pressure", "temperature", "humidity", "sum_rain_1"]
dtg = df._parent_dtg
# Define n_params, the "number of bodies" in obsoul lingo
......@@ -179,7 +187,31 @@ def save_df_as_obsoul(
f.write("\n")
def netatmo_csv2obsoul(
@np.vectorize
def obs_timestamp2csv_rpath(timestamp):
""" Observation timestamp --> relative CSV file path """
yyyy = timestamp.strftime("%Y")
mm = timestamp.strftime("%m")
dd = timestamp.strftime("%d")
minute = int(timestamp.strftime("%M"))
if minute < 10:
f_minute = "05"
elif minute < 20:
f_minute = "15"
elif minute < 30:
f_minute = "25"
elif minute < 40:
f_minute = "35"
elif minute < 50:
f_minute = "45"
elif minute < 60:
f_minute = "55"
fname = "%s%s00Z.csv" % (timestamp.strftime("%Y%m%dT%H"), f_minute)
return Path("%s/%s/%s/%s" % (yyyy, mm, dd, fname))
def _input2output_single_dtg(
dtg,
netatmo_data_rootdir,
selected_stations=None,
......@@ -188,11 +220,16 @@ def netatmo_csv2obsoul(
rm_duplicate_stations=True,
rm_moving_stations=True,
outdir=Path(),
outdir_csv=None,
outdir_obsoul=None,
obsoul_export_params=None,
loglevel="INFO",
):
""" Read data from one DTG, filters select obs and save results """
logger = get_logger(__name__, loglevel)
logger.info(
logger.debug(
"Reading data for %sDTG=%s%s...", logcolor.cyan, dtg, logcolor.reset,
)
try:
......@@ -217,19 +254,32 @@ def netatmo_csv2obsoul(
logger.warning("No stations for DTG=%s", dtg)
return
fpath = Path(outdir) / "OBSOUL{}".format(dtg.strftime("%Y%m%d%H"))
logger.info(
"Saving OBSOUL: %sDTG=%s, %d obs%s, file %s",
logcolor.cyan,
dtg,
len(df.index),
logcolor.reset,
fpath,
)
save_df_as_obsoul(df, fpath)
if outdir_csv is not None:
df["_f_rpath"] = obs_timestamp2csv_rpath(df["time_utc"])
for f_rpath, file_df in df.groupby("_f_rpath", sort=False):
fpath = Path(outdir_csv) / f_rpath
file_df = file_df.drop("_f_rpath", axis=1).sort_values(
by=["time_utc"]
)
save_df_as_netatmo_csv(file_df, fpath, overwrite=True)
df = df.drop("_f_rpath", axis=1)
if outdir_obsoul is not None:
fpath = Path(outdir_obsoul) / "OBSOUL{}".format(
dtg.strftime("%Y%m%d%H")
)
logger.debug(
"%sSaving OBSOUL: DTG=%s, %d obs%s, file %s",
logcolor.cyan,
dtg,
len(df.index),
logcolor.reset,
fpath,
)
save_df_as_obsoul(df, fpath, export_params=obsoul_export_params)
def netatmo_csvs2obsouls(
def netatmoqc_input2output(
dtgs,
netatmo_data_rootdir,
selected_stations=None,
......@@ -240,8 +290,11 @@ def netatmo_csvs2obsouls(
outdir=Path(),
loglevel="INFO",
use_mpi=False,
save_csv=True,
save_obsoul=False,
obsoul_export_params=None,
):
"""Save in OBSOUL files the data from the CSV files that netatmoqc reads
"""Save NetAtmoQC output data in CSV or OBSOUL formats
If a list of selected stations is passed in the "selected_stations", then
only these are kept.
......@@ -250,24 +303,48 @@ def netatmo_csvs2obsouls(
"""
logger.info(
"%sSaving selected data in obsoul format...%s",
logcolor.cyan,
logcolor.reset,
"%sSaving selected observations...%s", logcolor.cyan, logcolor.reset,
)
outdir_csv = None
outdir_obsoul = None
if save_csv:
outdir_csv = Path(outdir) / "csv_files"
logger.info(
"%s> CSV outdir:%s %s", logcolor.cyan, logcolor.reset, outdir_csv,
)
if save_obsoul:
outdir_obsoul = Path(outdir) / "obsoul_files"
logger.info(
"%s> OBSOUL outdir:%s %s",
logcolor.cyan,
logcolor.reset,
outdir_obsoul,
)
if obsoul_export_params is not None:
logger.info(
" * OBSOUL files will contain: %s",
", ".join(obsoul_export_params),
)
in2out_fdtg = partial(
_input2output_single_dtg,
netatmo_data_rootdir=netatmo_data_rootdir,
selected_stations=selected_stations,
dropna=dropna,
fillna=fillna,
rm_duplicate_stations=rm_duplicate_stations,
rm_moving_stations=rm_moving_stations,
outdir=outdir,
loglevel=loglevel,
outdir_csv=outdir_csv,
outdir_obsoul=outdir_obsoul,
obsoul_export_params=obsoul_export_params,
)
if use_mpi:
logger.info("Parallelising tasks over DTGs using MPI")
func = partial(
netatmo_csv2obsoul,
netatmo_data_rootdir=netatmo_data_rootdir,
selected_stations=selected_stations,
dropna=dropna,
fillna=fillna,
rm_duplicate_stations=rm_duplicate_stations,
rm_moving_stations=rm_moving_stations,
outdir=outdir,
loglevel=loglevel,
)
mpi_parallel(func, dtgs)
mpi_parallel(in2out_fdtg, dtgs)
else:
n_procs = int(
os.getenv("NETATMOQC_MAX_PYTHON_PROCS", psutil.cpu_count())
......@@ -277,17 +354,4 @@ def netatmo_csvs2obsouls(
# Using the "loky" backend helps avoiding oversubscription
# of cpus in child processes
with parallel_backend("loky"):
Parallel(n_jobs=n_procs)(
delayed(netatmo_csv2obsoul)(
dtg,
netatmo_data_rootdir=netatmo_data_rootdir,
selected_stations=selected_stations,
dropna=dropna,
fillna=fillna,
rm_duplicate_stations=rm_duplicate_stations,
rm_moving_stations=rm_moving_stations,
outdir=outdir,
loglevel=loglevel,
)
for dtg in dtgs
)
Parallel(n_jobs=n_procs)(delayed(in2out_fdtg)(dtg) for dtg in dtgs)
......@@ -7,7 +7,7 @@
[tool.poetry]
name = "netatmoqc"
version = "0.3.2"
version = "0.3.3.dev0"
description = "Use machine learning clustering methods to perform quality control over NetAtmo data"
authors = [
"Paulo V. C. Medeiros <paulo.medeiros@smhi.se>"
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment