Commit d700b6c8 authored by Paulo Medeiros's avatar Paulo Medeiros
Browse files

Activate flake8-darglint plugin.

And fix an issue with retrieval of DTG metadata in save_df_as_obsoul.
parents 2ed56319 8ada75a2
Pipeline #9233 passed with stages
in 1 minute and 22 seconds
......@@ -4,6 +4,9 @@
format = "grouped"
# Show line of source code in output, with syntax highlighting
show_source = true
# flake8-darglint params
docstring_style = "google"
strictness = "short"
# list of plugins and rules for them
[tool.flakehell.plugins]
......@@ -18,6 +21,7 @@
# Remove C408 from flake8-comprehensions because I think sometimes the "dict" syntax
# looks cleaner than literal "{}". Dict creation performance is not an issue here.
flake8-comprehensions = ["+*", "-C408"]
flake8-darglint = ["+*"]
flake8-docstrings = ["+*", "-D105"] # Remove "D105: Missing docstring in magic method"
flake8-eradicate = ["+*"]
flake8-logging-format = ["+*"]
......
......@@ -102,6 +102,7 @@ def generate_obs_weights_panel():
"""Generate the observation weights panel."""
def obs_weight_cell(var_name, default=1.0, minval=0.0, maxval=np.inf):
"""Return a div for an obs weight cell."""
# Get defaults from config file if defined. Use the ones
# defined in the calls to this function otherwise.
try:
......
......@@ -16,11 +16,10 @@ from .commands_functions import (
class StoreDictKeyPair(argparse.Action):
"""Enable args="key1=val1, ..., keyN=valN" in command line args.
"""Enable args="key1=val1, ..., keyN=valN" in command line args."""
Source: <https://stackoverflow.com/questions/29986185/
python-argparse-dict-arg/42355279>
"""
# Source: <https://stackoverflow.com/questions/29986185/
# python-argparse-dict-arg/42355279>
def __init__(self, option_strings, dest, nargs=None, **kwargs):
"""Initialise nargs."""
......@@ -37,7 +36,15 @@ class StoreDictKeyPair(argparse.Action):
def get_parsed_args(program_name):
"""Get parsed command line arguments."""
"""Get parsed command line arguments.
Args:
program_name (str): The name of the program.
Returns:
argparse.Namespace: Parsed command line arguments.
"""
# Define main parser and general options
parser = argparse.ArgumentParser(
prog=program_name,
......
......@@ -22,8 +22,15 @@ def sort_df_by_cluster_size(df):
"""Return a version of df with data sorted by cluster_size.
Sort df so that clusters with more members are put at the top of the
dataframe. The exception if the "-1" label, whichm if present, will
dataframe. The exception if the "-1" label, which, if present, will
always remain at the top. Handy if results are to be plotted.
Args:
df (pandas.DataFrame): Dataframe with clustering info.
Returns:
pandas.DataFrame: Copy of input df sorted by cluster size.
"""
# The labels may have been reassigned if the df was passed through
# an outlier removal routine. Let's keep track of the original ones.
......@@ -85,23 +92,34 @@ def sort_df_by_cluster_size(df):
def weights_dict_to_np_array(
df, pairwise_diff_weights=None, skip=("id", "time_utc"), default=1
):
"""
Convert pairwise_diff_weights into a numpy array.
Takes a pandas dataframe and a {column_name:weight} dictionary
and returns a numpy array of weights to be passed to the routine
that calculates the distance matrix.
The "skip" arg lists columns that will not enter the clustering
and should therefore be skipped.
If the weight for a non-skipped column of the input dataframe
are not defined in pairwise_diff_weights, then it will be set to
default.
"""Convert pairwise_diff_weights into a numpy array.
Takes a pandas dataframe and a {column_name:weight} dictionary and returns
an array of weights to be passed to the routine that calculates the
distance matrix.
Columns "lat" and "lon" in df are treated specially, in that they are
not assigned a weight individually, but rather a single weight gets
assigned to the "geo_dist" property.
Args:
df (pandas.Dataframe): Dataframe with observations.
pairwise_diff_weights (dict): {df_column_name:weight} dictionary.
Default value = None.
skip: df columns that will not enter the clustering and should
therefore be skipped. Default value = ("id", "time_utc")
default: Default weight to be assigned for a non-skipped df column if
the column name is present in df but not in pairwise_diff_weights.
Default value = 1.
Returns:
numpy.ndarray: weights to be passed to the routine that calculates the
distance matrix.
Raises:
ValueError: If the dataframe 'lat' column is not followed by the 'lon'
column.
Columns "lat" and "lon" are treated specially, in that they are
not assigned a weight individually, but rather a single weight
should be assigned to the "geo_dist" property.
"""
if df.columns.get_loc("lon") - df.columns.get_loc("lat") != 1:
raise ValueError("'lat' column is not followed by 'lon' column")
......@@ -117,7 +135,16 @@ def weights_dict_to_np_array(
def get_silhouette_samples(df, distance_matrix):
"""Return df with a 'silhouette_samples' column added."""
"""Calculate silhouette scores for every obs in df.
Args:
df (pandas.Dataframe): Dataframe with observations.
distance_matrix (HollowSymmetricMatrix): Obs distance matrix.
Returns:
pandas.DataFrame: Copy of df with added silhouette samples info.
"""
# We will only consider true clusters for the calculation of
# the silhouette coeff, i.e., points with label<0 will be excluded from
# the calculation. This means that we need to use a slice of the distance
......@@ -164,9 +191,44 @@ def run_clustering_on_df(
max_n_stdev_around_mean=2.0,
trunc_perc=0.25,
calc_silhouette_samples=True,
**kwargs
):
"""Low-level clustering routine."""
"""Low-level clustering routine.
Args:
df (pandas.Dataframe): Dataframe with observations.
method (str): {"hdbscan", "dbscan", "rsl", "optics"}
Clustering method.
distance_matrix (HollowSymmetricMatrix): Obs distance matrix.
Default value = None.
distance_matrix_optimize_mode (str): The distance matrix optimization
mode. Default value = "memory".
skip (tuple): Columns to skip in df.
Default value = ("id", "time_utc").
weights_dict (dict): obs_name --> obs_weight map. Default value = None.
eps (float): DBSCAN's eps parameter. Default value = 15.0.
min_cluster_size (int): HDBSCAN's min_cluster_size. Default value = 3.
min_samples (int): (H)DBSCAN's min_samples. Default value = 3.
n_jobs (int): Max number of local-host parallel jobs.
Default value = -1.
outlier_rm_method (str): Outlier removal method. Default value = None.
max_num_refine_iter (int): Max number of iterations for the
"iterative" ourlier removal method. Default value = 50.
max_n_stdev_around_mean (float): Max number of stdev from cluster obs
mean for an observation to be considered OK in the "iteractive"
outlier removal method. Default value = 2.0.
trunc_perc (float): Percentage used in array truncation when
calculating stdevs and means in the "iterative" outlier removal
method. Default value = 0.25.
calc_silhouette_samples (bool): Calculate or not silhouette_samples?
Default value = True.
Returns:
pandas.DataFrame: Copy of df with clustering added info.
Raises:
NotImplementedError: If the `method` choice is not valid.
"""
method = method.lower()
# Compute clustering using DBSCAN or HDBSCAN
if method not in ["dbscan", "hdbscan", "rsl", "optics"]:
......@@ -190,13 +252,12 @@ def run_clustering_on_df(
# written in fortran and interfaced via f2py3, or then written in
# python but jit-compiled with numba, then the relative speed up
# can reach up to 120x.
num_threads = kwargs.get("num_threads", -1)
distance_matrix = calc_distance_matrix(
# Drop columns that won't be used in the clustering
df.drop(list(skip), axis=1),
weights,
optimize_mode=distance_matrix_optimize_mode,
num_threads=num_threads,
num_threads=n_jobs,
)
# Running clustering with the computed distance matrix
......@@ -292,12 +353,18 @@ def run_clustering_on_df(
def suspendlogging(func):
"""Suspend logging in the function it decorates.
Adapted from: <https://stackoverflow.com/questions/7341064/
disable-logging-per-method-function>
Args:
func (function): The function to be decorated.
Returns:
function: The decorated function.
"""
# Adapted from: <https://stackoverflow.com/questions/7341064/
# disable-logging-per-method-function>
@wraps(func)
def inner(*args, **kwargs):
def _inner(*args, **kwargs):
previousloglevel = logger.getEffectiveLevel()
try:
logger.setLevel(logging.WARN)
......@@ -305,12 +372,25 @@ def suspendlogging(func):
finally:
logger.setLevel(previousloglevel)
return inner
return _inner
@suspendlogging
def self_consistent_reclustering(df, distance_matrix, **kwargs):
"""Recluster obs in df until further clustering on df returns df itself."""
"""Recluster obs in df until further clustering on df returns df itself.
Args:
df (pandas.Dataframe): Pandas dataframe with observations and initial
clustering info.
distance_matrix (HollowSymmetricMatrix): Matrix of distances between
the obs in df.
**kwargs: Keyword args passed to run_clustering_on_df.
Returns:
pandas.Dataframe: Copy of df where obs have been clustered multiple
times until run_clustering_on_df(df, ...) = df.
"""
n_iter = 0
n_removed = 0
n_noise = np.count_nonzero(df["cluster_label"] < 0)
......@@ -342,7 +422,14 @@ def _cluster_netatmo_obs_one_domain(df, config, **kwargs):
Helper for the main routine cluster_netatmo_obs
kwargs are passed to run_clustering_on_df
Args:
df (pandas.Dataframe): Pandas dataframe with observations.
config (ParsedConfig): Parsed configs.
**kwargs: Keyword args passed to run_clustering_on_df.
Returns:
pandas.DataFrame: Copy of df with clustering added info.
"""
time_start_clustering = time.time()
logger.debug("Performing clustering...")
......@@ -376,7 +463,14 @@ def _cluster_netatmo_obs_one_domain(df, config, **kwargs):
def cluster_netatmo_obs(df, config, **kwargs):
"""Cluster NetAtmo observations.
kwargs are passed to _cluster_netatmo_obs_one_domain.
Args:
df (pandas.Dataframe): Pandas dataframe with observations.
config (ParsedConfig): Parsed configs.
**kwargs: Keyword args passed to _cluster_netatmo_obs_one_domain.
Returns:
pandas.DataFrame: Copy of df with clustering added info.
"""
# Load domain configs from config
domain = Domain.construct_from_dict(config.domain)
......@@ -485,7 +579,12 @@ def cluster_netatmo_obs(df, config, **kwargs):
def report_clustering_results(df):
"""Print some relevant info about the clustering."""
"""Print some relevant info about the clustering.
Args:
df (pandas.DataFrame): Dataframe with clustering info.
"""
n_obs = len(df.index)
noise_data_df = df[df["cluster_label"] < 0]
n_noise_clusters = len(noise_data_df["cluster_label"].unique())
......
......@@ -42,7 +42,12 @@ logger = logging.getLogger(__name__)
def cluster_obs_single_dtg(args):
"""Implement the "cluster" command."""
"""Implement the "cluster" command.
Args:
args (argparse.Namespace): Parsed command line arguments.
"""
config = read_config(args.config_file)
try:
......@@ -100,7 +105,17 @@ def cluster_obs_single_dtg(args):
def _select_stations_single_dtg(dtg, config, args):
"""Implement "select_stations" for a single DTG."""
"""Implement "select_stations" for a single DTG.
Args:
dtg (Dtg): The dtg for which to select stations.
config (ParsedConfig): Parsed dict of configs from config file.
args (argparse.Namespace): Parsed command line arguments.
Returns:
(DataFrame, DataFrame, DataFrame): df_accepted, df_rejected, df_moving
"""
tstart = time.time()
logger = get_logger(__name__, args.loglevel)
logger.info("%sDTG=%s%s: Started", logcolor.cyan, dtg, logcolor.reset)
......@@ -124,10 +139,7 @@ def _select_stations_single_dtg(dtg, config, args):
df, df_moving = remove_irregular_stations(df)
df = cluster_netatmo_obs(
df=df,
config=config,
num_threads=cpu_share,
calc_silhouette_samples=False,
df=df, config=config, n_jobs=cpu_share, calc_silhouette_samples=False,
)
selected_cols = ["id", "lat", "lon", "alt"]
......@@ -146,6 +158,10 @@ def select_stations(args):
This function calls "_select_stations_single_dtg" for each DTG
(in parallel if requested/possible), and then processes, gathers
and saves the results.
Args:
args (argparse.Namespace): Parsed command line arguments.
"""
tstart_selection = time.time()
config = read_config(args.config_file)
......@@ -413,7 +429,12 @@ def select_stations(args):
# Code related to the csv2obsoul command
def csv2obsoul(args):
"""Implement the "csv2obsoul" command."""
"""Implement the "csv2obsoul" command.
Args:
args (argparse.Namespace): Parsed command line arguments.
"""
config = read_config(args.config_file)
# Create outdir at the beginning so users don't
......@@ -455,7 +476,12 @@ def _open_file_with_default_app(fpath):
def show(args):
"""Implement the 'show' command."""
"""Implement the 'show' command.
Args:
args (argparse.Namespace): Parsed command line arguments.
"""
logger = get_logger(__name__, args.loglevel)
config = read_config(args.config_file)
......
......@@ -35,6 +35,7 @@ class UndefinedValueType:
We will use instances of this class to mark things such as missing
config file values, unspecified defaults in functions, etc. We don't
want to use None in these cases, as None may be a valid arg value.
"""
def __init__(
......@@ -92,7 +93,7 @@ NoDefaultProvided = UndefinedValueType(name="NoDefaultProvided")
class UndefinedConfigValueError(Exception):
"""Exception to be raised when retrieving undefined config value."""
"""Exception to be raised if UndefinedConfigValue is encountered."""
class ConfigDict(DotMap):
......@@ -112,6 +113,7 @@ class ConfigDict(DotMap):
(c) If self._dynamic is True, then, instead of raising KeyError:
* Retrievals on missing keys return UndefinedConfigValue
* Sublevels are automatically created if needed upon item setting
"""
def __init__(self, *args, **kwargs):
......@@ -225,6 +227,8 @@ class _MetadataDict(ConfigDict):
class _ConfigMetadataRegistry(ConfigDict):
"""Class to help register metadata about known config options."""
def register(
self,
key,
......@@ -235,6 +239,28 @@ class _ConfigMetadataRegistry(ConfigDict):
astype=NoDefaultProvided,
case_sensitive=NoDefaultProvided,
):
"""Register a metadata registry entry.
Args:
key (str): The key (name) of the param metadata to be registed.
default (obj): The default value for the registered metadata.
(Default value = NoDefaultProvided)
minval (obj): An optional min value.
(Default value = NoDefaultProvided).
maxval (obj): An optional max value.
(Default value = NoDefaultProvided).
choices (list): Optional list of allowed choices.
(Default value = NoDefaultProvided).
astype (type): Optional type info for the metadata. Used, e.g.,
for typecasting. (Default value = NoDefaultProvided).
case_sensitive (bool): Whether future lookups should consider
character case in the metadata key.
(Default value = NoDefaultProvided).
Raises:
TypeError: If key is not a string.
"""
if not isinstance(key, str):
raise TypeError("key must be a string")
try:
......@@ -260,7 +286,15 @@ class _ConfigMetadataRegistry(ConfigDict):
self._dynamic = False
def copy_template(self, key):
# "section" will be defined using a context manager
"""Copy contents from self["_template"] to section.key.
"section" needs to be defined in the upper-level scope,
using a context manager.
Args:
key (str): String such that the destination where the template
will be copied into is section.key.
"""
dest_key = "{}.{}".format(section, key)
if "." in section:
template_key, _, _dropped = section.rpartition(".")
......@@ -479,6 +513,28 @@ def _parse_dtg_entires(dtgs_config):
def _raw2parsed(raw, recog_configs=config_metadata, parent_keys=()):
"""Return parsed confif from raw input, validating with metadata.
Descend recursively into input dict raw and validate raw configs against
the registered metadata. Configs not that don't have corresponding metadata
info will be passed unchanged.
Args:
raw (dict): Raw configs, as read from config file.
recog_configs (_ConfigMetadataRegistry): Configs for which we have
registered metadata. Default value = config_metadata.
parent_keys (tuple): Tuple of all config dict keys, in order, traversed
to get to the current raw dict. Default value = (). Used for dicts
nested within the top-level raw.
Returns:
ParsedConfig: Parsed configs from raw.
Raises:
ValueError: If any config in raw is not a valid choice, or has an
invalid value.
"""
# Do not use mutable data structures for argument defaults (parent_keys)
parent_keys = list(parent_keys)
parsed = ConfigDict(_dynamic=True)
......@@ -559,6 +615,27 @@ def _raw2parsed(raw, recog_configs=config_metadata, parent_keys=()):
def _fill_defaults(raw, recog_configs=config_metadata, parent_keys=()):
"""Fill defaults into raw config according to registered metadata.
Descend recursively into input dict raw and fill in default values
according to metadata registered in this module.
Args:
raw (dict): Raw configs, as read from config file.
recog_configs (_ConfigMetadataRegistry): Configs for which we have
registered metadata. Default value = config_metadata.
parent_keys (tuple): Tuple of all config dict keys, in order, traversed
to get to the current raw dict. Default value = (). Used for dicts
nested within the top-level raw.
Returns:
dict: Copy of raw with defaults filled in.
Raises:
TypeError: If any metadata dict found while parsing raw is not an
instance of either _MetadataDict or ConfigDict classes.
"""
# Do not use mutable data structures for argument defaults (parent_keys)
parent_keys = list(parent_keys)
parsed = ConfigDict(raw, _dynamic=False)
......@@ -607,7 +684,8 @@ def _fill_defaults(raw, recog_configs=config_metadata, parent_keys=()):
class ParsedConfig:
"""Parsed version of config file or raw dict.
Each section in config will be accessible as self.section
Each section in config will be accessible as self.section.
"""
def __init__(
......@@ -656,7 +734,7 @@ class ParsedConfig:
return self.get(keys_for_get, default)
def get(self, item, default=UndefinedConfigValue):
"""Return parsed config values for the active clustering method."""
"""Implement get method with default UndefinedConfigValue."""
rtn = self._parsed.get(item, default=UndefinedConfigValue)
if rtn is UndefinedConfigValue:
if default is NoDefaultProvided:
......@@ -695,7 +773,15 @@ class ParsedConfig:
def read_config(config_path):
"""Read config file at location "config_path"."""
"""Read config file at location "config_path".
Args:
config_path (pathlib.Path): The path to the config file.
Returns:
ParsedConfig: Parsed configs read from config_path.
"""
logging.info("Reading config file %s", Path(config_path).resolve())
config = ParsedConfig(config_path, global_default=UndefinedConfigValue)
return config
......@@ -81,7 +81,7 @@ class Grid2D:
@property
def ny(self):
"""Return the number of grid points along the x-direction."""
"""Return the number of grid points along the y-direction."""
return self._yaxis.npts
@ny.setter
......@@ -300,6 +300,7 @@ class Domain:
"""Model domain geometry and grid.
See <https://hirlam.org/trac/wiki/HarmonieSystemDocumentation/ModelDomain>.
"""
def __init__(
......@@ -327,6 +328,7 @@ class Domain:
self.lmrt = lmrt
def init_proj(ngrid_lonlat, proj_lon0_lat0, grid_spacing):
"""Help routine to initialise domain projection."""
if self.lmrt and abs(proj_lon0_lat0[1]) > 0:
logger.warning(
"lat0 should be 0 if lmrt=True. Resetting lat0 to 0."
......@@ -348,6 +350,7 @@ class Domain:
# Initialise grid #
###################
def init_grid(ngrid_lonlat, grid_spacing, ezone_ngrid):
"""Help routine to initialise domain grid."""
# (a) Get projected coords of grid center
center_xy = proj.lonlat2xy(
lon=center_lonlat[0], lat=center_lonlat[1]
......@@ -397,7 +400,16 @@ class Domain:
@classmethod
def construct_from_dict(cls, config):
"""Construct domain from info in config dict."""
"""Construct domain from info in the config dict.
Args:
config (netatmoqc.config.ConfigDict): Configs dictionary.
Returns:
netatmoqc.domains.Domain: New class instance constructed from
info in the config dict.
"""
return cls(
name=config.name,
ngrid_lonlat=(config.nlon, config.nlat),
......@@ -412,11 +424,10 @@ class Domain:
)
def _auto_choose_projname(self, lat0, y_range):
"""Define domain projection.
"""Define domain projection."""
# Do this in a way close to what is explained at
# <https://hirlam.org/trac/wiki/HarmonieSystemDocumentation/ModelDomain>
In a way close to what is explained at
<https://hirlam.org/trac/wiki/HarmonieSystemDocumentation/ModelDomain>
"""
latrange = 180 * y_range / _EQUATOR_PERIM
if self.lmrt or latrange > 35 or np.isclose(latrange, 0):
# <https://proj.org/operations/projections/merc.html>
......@@ -465,7 +476,24 @@ class Domain:
return np.prod(self._nsplit_lonlat)
def split(self, nsplit_lon=None, nsplit_lat=None):
"""Split a domain "nsplit_lon(lat)" times along the lon(lat) axis."""
"""Split a domain "nsplit_lon(lat)" times along the lon(lat) axis.
Args:
nsplit_lon (int): Number of divisions along the longitude axis.
(Default value = None).
nsplit_lat (int): Number of divisions along the latitude axis.
(Default value = None).
Returns:
list: List of subdomains (Domain objects) obtained as a result of
the split.
Raises:
TypeError: If nsplit_lon, nsplit_lon are not integers.
ValueError: If nsplit_lon and nsplit_lat do not divide
nlon and nlat, respectively.
"""
if nsplit_lon is None:
nsplit_lon = self._nsplit_lonlat[0