Commit c3a3730a authored by Paulo Medeiros's avatar Paulo Medeiros
Browse files

New metrics calc methods. Other minor changes.

Summary of main changes below:

- New metrics cal methods:
    - correlation_aware_euclidean (the new default)
    - haversine_plus_euclidean
    - haversine_plus_manhattan (the only one implemented previously)
- Add "unclusterable_data_columns" general config option
- Allow choice of HDBSCAN's cluster_selection_method
- Metrics has its own section in config file
- Support to weights for dists along proj x and y
- Used and visualised proj params more compatible
- Remove unused "tstep" from domain configs
- Fix a few crashes in outlier removal methods (addresses issue #5)
- Fix a few warnings
parents c615173b 35621ffd
Pipeline #10369 failed with stages
in 0 seconds
......@@ -46,14 +46,20 @@
# Note that the interval is closed at the left and open at the right.
dtgs.cycle_length = '3H' # Default = '3H'
#
# custom_metrics_optimize_mode:
[metrics]
# method:
# > default: "correlation_aware_euclidean"
# > choices: "correlation_aware_euclidean", "haversine_plus_manhattan"
method = "correlation_aware_euclidean"
# optimize_mode:
# > default: "memory"
# > choices: "memory", "speed_mem_compromise", "speed"
# In terms of memory usage:
# "memory" < "speed_mem_compromise" < "speed"
# In terms of execution time:
# "memory" > "speed_mem_compromise" > "speed"
custom_metrics_optimize_mode = "memory"
optimize_mode = "memory"
##################################
# Options controlling the domain #
......@@ -64,7 +70,6 @@
# <https://hirlam.org/trac/wiki/HarmonieSystemDocumentation/ModelDomain>
# <https://hirlam.org/trac/browser/Harmonie/scr/Harmonie_domains.pm>
name = ""
tstep = 75
nlon = 900
nlat = 960
lonc = 16.763011639
......
#!/usr/bin/env python3
"""Common definitions."""
import numba
try:
# From python3.8
from importlib.metadata import version
......@@ -11,3 +13,8 @@ try:
__version__ = version(__name__)
except ModuleNotFoundError:
__version__ = "?"
# Set the threading layer before any parallel target compilation.
# Picking "omp" specifically to avoid warnings about old TBB versions.
# See <http://numba.pydata.org/numba-doc/latest/user/threading-layer.html>
numba.config.THREADING_LAYER = "omp"
......@@ -263,6 +263,34 @@ def generate_control_card():
),
html.Br(),
#
html.Div(
id="metrics_method_div",
children=[
html.Br(),
html.P("Metrics Calculation Method"),
dcc.Dropdown(
id="metrics_method",
options=[
{
"label": "Correlation-Aware Euclidean",
"value": "correlation_aware_euclidean",
},
{
"label": "Haversine + Manhattan",
"value": "haversine_plus_manhattan",
},
{
"label": "Haversine + Euclidean",
"value": "haversine_plus_euclidean",
},
],
value="correlation_aware_euclidean",
),
],
style={"display": "block", "text-align": "center"},
),
html.Br(),
#
html.Div(
id="optionals_div",
children=[
......@@ -604,6 +632,7 @@ def show_hide_max_num_refining_iter(outlier_rm_method):
State("eps", "value"),
State("date-picker-select", "date"),
State("cycle-select", "value"),
State("metrics_method", "value"),
State("outlier_rm_method", "value"),
State("max_num_refine_iter", "value"),
State("max_n_std_around_mean", "value"),
......@@ -624,6 +653,7 @@ def run_clustering_and_make_plot(
eps,
date,
cycle,
metrics_method,
outlier_rm_method,
max_num_refining_iter,
refine_max_std,
......@@ -660,12 +690,15 @@ def run_clustering_and_make_plot(
{
"general": dict(
clustering_method=method,
custom_metrics_optimize_mode=config.general.custom_metrics_optimize_mode,
dtgs=dict(
list=config.general.dtgs,
cycle_length=config.general.dtgs.cycle_length.freqstr,
),
),
"metrics": dict(
method=metrics_method,
optimize_mode=config.metrics.optimize_mode,
),
"clustering_method.%s"
% (method): dict(
eps=eps,
......
......@@ -89,51 +89,6 @@ def sort_df_by_cluster_size(df):
return df.drop("parent_cluster_size", axis=1).reset_index(drop=True)
def weights_dict_to_np_array(
df, pairwise_diff_weights=None, skip=("id", "time_utc"), default=1
):
"""Convert pairwise_diff_weights into a numpy array.
Takes a pandas dataframe and a {column_name:weight} dictionary and returns
an array of weights to be passed to the routine that calculates the
distance matrix.
Columns "lat" and "lon" in df are treated specially, in that they are
not assigned a weight individually, but rather a single weight gets
assigned to the "geo_dist" property.
Args:
df (pandas.Dataframe): Dataframe with observations.
pairwise_diff_weights (dict): {df_column_name:weight} dictionary.
Default value = None.
skip: df columns that will not enter the clustering and should
therefore be skipped. Default value = ("id", "time_utc")
default: Default weight to be assigned for a non-skipped df column if
the column name is present in df but not in pairwise_diff_weights.
Default value = 1.
Returns:
numpy.ndarray: weights to be passed to the routine that calculates the
distance matrix.
Raises:
ValueError: If the dataframe 'lat' column is not followed by the 'lon'
column.
"""
if df.columns.get_loc("lon") - df.columns.get_loc("lat") != 1:
raise ValueError("'lat' column is not followed by 'lon' column")
weights = []
col2weight = {c: ("geo_dist" if c == "lon" else c) for c in df.columns}
for col in df.columns[~df.columns.isin(list(skip) + ["lat"])]:
try:
weights.append(pairwise_diff_weights[col2weight[col]])
except (KeyError, TypeError):
weights.append(default)
return np.array(weights, dtype=np.float64)
def get_silhouette_samples(df, distance_matrix):
"""Calculate silhouette scores for every obs in df.
......@@ -177,50 +132,27 @@ def get_silhouette_samples(df, distance_matrix):
def run_clustering_on_df(
df,
method="hdbscan",
config,
domain,
distance_matrix=None,
distance_matrix_optimize_mode="memory",
skip=("id", "time_utc"),
weights_dict=None,
eps=15, # eps applies only to dbscan
min_cluster_size=3, # min_cluster_size applies only to hdbscan
min_samples=3,
n_jobs=-1,
outlier_rm_method=None,
max_num_refine_iter=50,
max_n_stdev_around_mean=2.0,
trunc_perc=0.25,
remove_outliers=True,
calc_silhouette_samples=True,
n_jobs=-1,
):
"""Low-level clustering routine.
Args:
df (pandas.Dataframe): Dataframe with observations.
method (str): {"hdbscan", "dbscan", "rsl", "optics"}
Clustering method.
config (netatmoqc.config.ParsedConfig): Program's general configs.
domain (netatmoqc.domains.Domain): The adopted spatial domain.
distance_matrix (HollowSymmetricMatrix): Obs distance matrix.
Default value = None.
distance_matrix_optimize_mode (str): The distance matrix optimization
mode. Default value = "memory".
skip (tuple): Columns to skip in df.
Default value = ("id", "time_utc").
weights_dict (dict): obs_name --> obs_weight map. Default value = None.
eps (float): DBSCAN's eps parameter. Default value = 15.0.
min_cluster_size (int): HDBSCAN's min_cluster_size. Default value = 3.
min_samples (int): (H)DBSCAN's min_samples. Default value = 3.
n_jobs (int): Max number of local-host parallel jobs.
Default value = -1.
outlier_rm_method (str): Outlier removal method. Default value = None.
max_num_refine_iter (int): Max number of iterations for the
"iterative" ourlier removal method. Default value = 50.
max_n_stdev_around_mean (float): Max number of stdev from cluster obs
mean for an observation to be considered OK in the "iteractive"
outlier removal method. Default value = 2.0.
trunc_perc (float): Percentage used in array truncation when
calculating stdevs and means in the "iterative" outlier removal
method. Default value = 0.25.
remove_outliers (bool): Use a post-clustering outlier removal method?
Default value = True.
calc_silhouette_samples (bool): Calculate or not silhouette_samples?
Default value = True.
n_jobs (int): Max number of local-host parallel jobs.
Default value = -1.
Returns:
pandas.DataFrame: Copy of df with clustering added info.
......@@ -229,35 +161,25 @@ def run_clustering_on_df(
NotImplementedError: If the `method` choice is not valid.
"""
method = method.lower()
method = config.general.clustering_method.lower()
# Compute clustering using DBSCAN or HDBSCAN
if method not in ["dbscan", "hdbscan", "rsl", "optics"]:
raise NotImplementedError('Method "{}" not available.'.format(method))
if len(df.index) == 0:
logger.warning("Dataframe has no rows")
df["cluster_label"] = None
return df
# Set weights to be used in the metrics for the various
# generalised distances. The distances used in the metrics
# will be used_dist(i) = weight(i)*real_dist(i)
weights = weights_dict_to_np_array(df, weights_dict)
# We will not do any df = StandardScaler().fit_transform(df),
# as we'll use a metric based on earth distances
if distance_matrix is None:
# My tests indicate that passing a pre-computed distance matrix to
# dbscan can be up to 2.5x faster than passing a metrics function (if
# they both are written in pure python) to fit df. If they are both
# written in fortran and interfaced via f2py3, or then written in
# python but jit-compiled with numba, then the relative speed up
# can reach up to 120x.
# dbscan can be up to 2.5x faster than passing a metrics function to
# fit df (if using pure python). If pre-computing the matrix using
# jit-compilation via numba, then the relative speed up can reach
# up to 120x.
distance_matrix = calc_distance_matrix(
# Drop columns that won't be used in the clustering
df.drop(list(skip), axis=1),
weights,
optimize_mode=distance_matrix_optimize_mode,
num_threads=n_jobs,
df=df, config=config, domain=domain, num_threads=n_jobs
)
# Running clustering with the computed distance matrix
......@@ -265,8 +187,8 @@ def run_clustering_on_df(
tstart = time.time()
if method == "dbscan":
db = DBSCAN(
eps=eps,
min_samples=min_samples,
eps=config.get_clustering_opt("eps"),
min_samples=config.get_clustering_opt("min_samples"),
metric="precomputed",
n_jobs=n_jobs,
).fit(distance_matrix)
......@@ -277,19 +199,19 @@ def run_clustering_on_df(
# For more info on the parameters, see
# <https://hdbscan.readthedocs.io/en/latest/parameter_selection.html>
db = HDBSCAN(
min_samples=min_samples,
min_cluster_size=min_cluster_size,
min_samples=config.get_clustering_opt("min_samples"),
min_cluster_size=config.get_clustering_opt("min_cluster_size"),
metric="precomputed",
core_dist_n_jobs=n_jobs,
allow_single_cluster=True,
# Default cluster_selection_method: 'eom'. Sometimes it leads to
# clusters that are too big. Using 'leaf' seems better.
cluster_selection_method="leaf",
cluster_selection_method=config.get_clustering_opt(
"cluster_selection_method"
),
).fit(distance_matrix)
elif method == "optics":
db = OPTICS(
min_samples=min_samples,
min_cluster_size=min_cluster_size,
min_samples=config.get_clustering_opt("min_samples"),
min_cluster_size=config.get_clustering_opt("min_cluster_size"),
n_jobs=n_jobs,
metric="precomputed",
).fit(distance_matrix)
......@@ -297,13 +219,16 @@ def run_clustering_on_df(
db = RobustSingleLinkage(
# cut: The reachability distance value to cut the cluster
# heirarchy at to derive a flat cluster labelling.
cut=eps, # default=0.4
# default cut for method: 0.4
cut=config.get_clustering_opt("eps"),
# Reachability distances will be computed with regard to the
# k nearest neighbors.
k=min_samples, # default=5
# default k for method: 5
k=config.get_clustering_opt("min_samples"),
# Ignore any clusters in the flat clustering with size less
# than gamma, and declare points in such clusters as noise points.
gamma=min_cluster_size, # default=5
# default gamma for method: 5
gamma=config.get_clustering_opt("min_cluster_size"),
metric="precomputed",
).fit(distance_matrix)
logger.debug(
......@@ -316,27 +241,15 @@ def run_clustering_on_df(
# expects 'cluster_label' to be the last column in the dataframe
df["cluster_label"] = db.labels_
# Refine clustering if requested
# It is important to have 'cluster_label' as the last column
# when running the iterative refine routine
if outlier_rm_method:
if remove_outliers:
# Refine clustering if requested
# It is important to have 'cluster_label' as the last column
# when running the iterative refine routine
df = filter_outliers(
df,
config=config,
db=db,
outlier_rm_method=outlier_rm_method,
# Args that apply only to LOF
distance_matrix=distance_matrix,
# Args that only apply for the iterative method
skip=skip,
max_num_refine_iter=max_num_refine_iter,
max_n_stdev_around_mean=max_n_stdev_around_mean,
trunc_perc=trunc_perc,
weights=weights,
method=method,
weights_dict=weights_dict,
eps=eps,
min_cluster_size=min_cluster_size,
min_samples=min_samples,
n_jobs=n_jobs,
reclustering_function=self_consistent_reclustering,
)
......@@ -378,12 +291,13 @@ def suspendlogging(func):
@suspendlogging
def self_consistent_reclustering(df, distance_matrix, **kwargs):
def self_consistent_reclustering(df, config, distance_matrix, **kwargs):
"""Recluster obs in df until further clustering on df returns df itself.
Args:
df (pandas.Dataframe): Pandas dataframe with observations and initial
clustering info.
config (ParsedConfig): Parsed configs.
distance_matrix (HollowSymmetricMatrix): Matrix of distances between
the obs in df.
**kwargs: Keyword args passed to run_clustering_on_df.
......@@ -403,9 +317,11 @@ def self_consistent_reclustering(df, distance_matrix, **kwargs):
df = df.drop(df[noise_mask].index)
df = df.drop(["cluster_label"], axis=1)
df_rec = run_clustering_on_df(
df,
outlier_rm_method=None,
config=config,
remove_outliers=False,
distance_matrix=distance_matrix.subspace(i_valid_obs),
calc_silhouette_samples="silhouette_score" in df.columns,
**kwargs,
......@@ -419,7 +335,7 @@ def self_consistent_reclustering(df, distance_matrix, **kwargs):
return df
def _cluster_netatmo_obs_one_domain(df, config, **kwargs):
def _cluster_netatmo_obs_one_domain(df, config, domain, **kwargs):
"""Cluster netatmo obs inside a given domain.
Helper for the main routine cluster_netatmo_obs
......@@ -427,6 +343,7 @@ def _cluster_netatmo_obs_one_domain(df, config, **kwargs):
Args:
df (pandas.Dataframe): Pandas dataframe with observations.
config (ParsedConfig): Parsed configs.
domain (netatmoqc.domains.Domain): The adopted spatial domain.
**kwargs: Keyword args passed to run_clustering_on_df.
Returns:
......@@ -435,24 +352,7 @@ def _cluster_netatmo_obs_one_domain(df, config, **kwargs):
"""
time_start_clustering = time.time()
logger.debug("Performing clustering...")
outlier_rm_method = config.get_clustering_opt("outlier_removal.method")
df = run_clustering_on_df(
df=df,
method=config.general.clustering_method,
distance_matrix_optimize_mode=config.general.custom_metrics_optimize_mode,
weights_dict=config.get_clustering_opt("obs_weights"),
eps=config.get_clustering_opt("eps"),
min_cluster_size=config.get_clustering_opt("min_cluster_size"),
min_samples=config.get_clustering_opt("min_samples"),
outlier_rm_method=outlier_rm_method,
max_num_refine_iter=config.get_clustering_opt(
"outlier_removal.{}.max_n_iter".format(outlier_rm_method)
),
max_n_stdev_around_mean=config.get_clustering_opt(
"outlier_removal.{}.max_n_stdev".format(outlier_rm_method)
),
**kwargs,
)
df = run_clustering_on_df(df=df, config=config, domain=domain, **kwargs)
time_end_clustering = time.time()
logger.debug(
"Done with clustering. Elapsed: %.2fs",
......@@ -523,7 +423,10 @@ def cluster_netatmo_obs(df, config, **kwargs):
domain.n_subdomains,
)
df_sub = _cluster_netatmo_obs_one_domain(
df=df_sub, config=config, **pre_clustering_kwargs
df=df_sub,
config=config,
domain=subdomain,
**pre_clustering_kwargs,
)
domain_split_dfs.append(df_sub)
......@@ -566,7 +469,9 @@ def cluster_netatmo_obs(df, config, **kwargs):
"DTG=%s: Main clustering over whole domain...",
df.metadata_dict["dtg"],
)
df = _cluster_netatmo_obs_one_domain(df=df, config=config, **kwargs)
df = _cluster_netatmo_obs_one_domain(
df=df, config=config, domain=domain, **kwargs
)
if df_rejected is not None:
# Put back eventual obs rejected at the pre-clustering step
......
......@@ -327,6 +327,8 @@ with config_section("_template.clustering_method") as section:
config_metadata.register("min_cluster_size", default=5, minval=1)
# obs_weights not explicitly set will be internally set to 1
config_metadata.register("obs_weights.geo_dist", default=1.0, minval=0.0)
config_metadata.register("obs_weights.x", default=1.0, minval=0.0)
config_metadata.register("obs_weights.y", default=1.0, minval=0.0)
config_metadata.register("obs_weights.alt", default=1.0, minval=0.0)
config_metadata.register(
"obs_weights.temperature", default=5.0, minval=0.0
......@@ -377,11 +379,9 @@ with config_section("general") as section:
config_metadata.register("dtgs.start")
config_metadata.register("dtgs.end")
config_metadata.register("dtgs.cycle_length", default="3H")
# Metrics optimisaion scheme
# Data cols to ignore when running clustering
config_metadata.register(
"custom_metrics_optimize_mode",
default="memory",
choices=["speed", "memory", "speed_mem_compromise"],
"unclusterable_data_columns", default=["id", "time_utc"]
)
# Data cols to export when saving obsoul output
config_metadata.register(
......@@ -389,7 +389,24 @@ with config_section("general") as section:
default=["pressure", "temperature", "humidity", "sum_rain_1"],
)
# "commans section"
# "metrics" section
with config_section("metrics") as section:
config_metadata.register(
"method",
default="correlation_aware_euclidean",
choices=[
"correlation_aware_euclidean",
"haversine_plus_manhattan",
"haversine_plus_euclidean",
],
)
config_metadata.register(
"optimize_mode",
default="memory",
choices=["speed", "memory", "speed_mem_compromise"],
)
# "commands" section
with config_section("commands.select") as section:
config_metadata.register("station_rejection_tol", default=0.15, minval=0.0)
......@@ -404,6 +421,12 @@ with config_section("clustering_method.hdbscan") as section:
choices=[None, "glosh", "lof", "iterative", "reclustering"],
)
config_metadata.copy_template("outlier_removal.iterative")
# 'eom' may sometimes lead to clusters that are too big and lower
# silhouette scores, but maybe the final station selection is better
# than that using "leaf"
config_metadata.register(
"cluster_selection_method", default="leaf", choices=["eom", "leaf"]
)
# clustering_method.dbscan
with config_section("clustering_method.dbscan") as section:
......@@ -441,7 +464,6 @@ with config_section("domain") as section:
# These defaults are for the METCOOP25C domain. See
# <https://hirlam.org/trac/wiki/HarmonieSystemDocumentation/ModelDomain>
# <https://hirlam.org/trac/browser/Harmonie/scr/Harmonie_domains.pm>
config_metadata.register("tstep", default=75, minval=0, astype=int)
config_metadata.register("nlon", default=900, minval=1, astype=int)
config_metadata.register("nlat", default=960, minval=1, astype=int)
config_metadata.register(
......
......@@ -7,7 +7,6 @@ import attr
import numpy as np
import pyproj
from .metrics import haversine_distance
from .plots import DEF_FIGSHOW_CONFIG, get_domain_fig
logger = logging.getLogger(__name__)
......@@ -39,13 +38,10 @@ class GridAxisConfig:
class Grid2D:
"""A general grid in two dimensions, named x and y by convention."""
def __init__(self, xaxis, yaxis, tstep=0.0):
def __init__(self, xaxis, yaxis):
"""Initialise params. Axis limits are open-ended intervals."""
self._xaxis = self._validated_axis_info(xaxis)
self._yaxis = self._validated_axis_info(yaxis)
if tstep < 0:
raise ValueError("tstep must be >= 0")
self.tstep = tstep
self.cart2grid = np.vectorize(self._cart2grid_non_vec)
......@@ -314,15 +310,15 @@ class DomainGrid(Grid2D):
df["j"] = jcol
if method == "nearest":
# Get (lon, lat) for grid points
g2lon, g2lat = self.ij2lonlat_map()
# Get (x, y) for grid points
grid2x, grid2y = self.ij2xy_map()
# Sort data by distance to nearest grid point
def _dist(lon, lat, i, j):
i, j = int(i), int(j)
p0 = np.array([lon, lat])
p1 = np.array([g2lon[i, j], g2lat[i, j]])
return haversine_distance(p0, p1)
p0 = self.proj.lonlat2xy(lon, lat)
p1 = np.array([grid2x[i, j], grid2y[i, j]])
return np.linalg.norm(p1 - p0)
df = df.loc[
df.apply(lambda x: _dist(x.lon, x.lat, x.i, x.j), axis=1)
......@@ -357,7 +353,6 @@ class Domain:
ezone_ngrid=0,
subdomain_split=(1, 1),
coarse_grid_factor=0,
tstep=0.0,
):
"""Initialise name, geometry, projection & grid/thinning grid attrs."""
self.name = name
......@@ -419,12 +414,7 @@ class Domain:
npts_ezone=ezone_ngrid,
)
# (d) Set _grid attr
return DomainGrid(
xaxis=grid_xaxis,
yaxis=grid_yaxis,
proj=proj,
tstep=tstep,
)
return DomainGrid(xaxis=grid_xaxis, yaxis=grid_yaxis, proj=proj)
self._grid = init_grid(ngrid_lonlat, grid_spacing, ezone_ngrid)
......@@ -466,7 +456,6 @@ class Domain:
lmrt=config.lmrt,
subdomain_split=(config.split.lon, config.split.lat),
coarse_grid_factor=config.thinning_grid_coarse_factor,
tstep=config.tstep,
)
def _auto_choose_projname(self, lat0, y_range):
......@@ -585,7 +574,6 @@ class Domain:
# We don't want the splits to have extension zones
ezone_ngrid=0,
lmrt=self.lmrt,
tstep=self.grid.tstep,
)
splits.append(split)
return splits
......
......@@ -164,7 +164,7 @@ def _to_dense(data, n, dtype):
"""
dense = np.zeros(shape=(n, n), dtype=dtype)
for idata in prange(len(data)):
for idata in prange(len(data)): # pylint: disable=not-an-iterable
i, j = _data_index_to_matrix_index(n, idata, check_bounds=False)
data_value = data[idata]
dense[i, j] = data_value
......@@ -195,7 +195,7 @@ def _to_compact(dense, dtype):
n = dense.shape[0]
n_indep = (n ** 2 - n) // 2
compact = np.zeros(n_indep, dtype=dtype)
for idata in prange(len(compact)):
for idata in prange(len(compact)): # pylint: disable=not-an-iterable
i, j = _data_index_to_matrix_index(n, idata, check_bounds=False)