Commit 0cadf21d authored by Paulo Medeiros's avatar Paulo Medeiros
Browse files

Change data normalisation scheme.

And some refactoring in metrics code.
parent c3a3730a
......@@ -50,7 +50,10 @@
[metrics]
# method:
# > default: "correlation_aware_euclidean"
# > choices: "correlation_aware_euclidean", "haversine_plus_manhattan"
# > choices:
# "correlation_aware_euclidean"
# "haversine_plus_manhattan"
# "haversine_plus_euclidean"
method = "correlation_aware_euclidean"
# optimize_mode:
# > default: "memory"
......
......@@ -37,8 +37,8 @@ def weights_dict_to_np_array(df, config, default=1):
"""Convert pairwise_diff_weights into a numpy array.
Takes a pandas dataframe and a {column_name:weight} dictionary and returns
an array of weights to be passed to the routine that calculates the
distance matrix.
an array of weights as needed in the calc_distance_matrix_haversine_plus
routine.
Columns "lat" and "lon" in df are treated specially, in that they are
not assigned a weight individually, but rather a single weight gets
......@@ -72,27 +72,31 @@ def weights_dict_to_np_array(df, config, default=1):
weights.append(weights_dict[col2weight[col]])
except (KeyError, TypeError):
weights.append(default)
return np.array(weights, dtype=np.float64)
weights = np.array(weights, dtype=np.float64)
weights = np.where(weights < 0, 0.0, weights)
@njit(cache=True)
def get_obs_norm_factors(obs_values):
"""Return multiplicative factor to normalise obs_values."""
stdev = np.std(obs_values)
obs_mean = np.mean(obs_values)
if (
(abs(obs_mean) > 1e-5 and abs(stdev / obs_mean) < 1e-5)
or abs(obs_mean) <= 1e-5
and stdev < 1e-7
): # pylint: disable=chained-comparison
rtn = 0.0
else:
rtn = 1.0 / stdev
return rtn
return weights
def normalize_df(df):
"""Normalise dataframe cols, excluding the location-related ones."""
for colname in df.select_dtypes(include=np.number).columns:
if colname in ["lat", "lon", "alt", "x", "y"]:
continue
mean = np.mean(df[colname])
stdev = np.std(df[colname])
if not np.isclose(stdev, 0.0):
df[colname] = (df[colname] - mean) / stdev
elif not np.isclose(mean, 0.0):
df[colname] = df[colname] / mean
return df
@njit("f4[:](f8[:, :], f8[:], types.unicode_type)", parallel=True, cache=True)
def numba_calc_distance_matrix_haversine_plus(df, weights, method):
def numba_calc_distance_matrix_haversine_plus(df, weights_array, method):
"""Calculate distance matrix using python+numba.
Spatial distances are calculated using the haversine method.
......@@ -103,7 +107,7 @@ def numba_calc_distance_matrix_haversine_plus(df, weights, method):
df (numpy.ndarray): Multidimensional numpy array containing the data
entries, obtained from a pandas dataframe (numba doesn't work with
pandas datafames).
weights (numpy.array): Weights chosen for each observation parameter.
weights_array (numpy.array): Weights chosen for each observation parameter.
The weigts determine the relative importance of the observation
parameters w.r.t. each other.
method (basestring): The method to be used for the non-spatial part of
......@@ -123,22 +127,8 @@ def numba_calc_distance_matrix_haversine_plus(df, weights, method):
nrows, ncols = df.shape
# Get normalisation factors so that observations in different
# scales have comparable diff values
# Remember: The first entry in weights refers to geo_dist and
# the first two columns of df are (lat, lon)
obs_norm_factors = np.ones(ncols - 1, dtype=np.float32)
for i in range(1, len(obs_norm_factors)):
obs_norm_factors[i] = get_obs_norm_factors(df[:, i + 1])
# weights can be used by the users to give extra or
# lower weight to a given observation type. The weights
# are applied to the normalised values of the obd diffs
weights_internal = weights
# Set any negative weight value to zero
weights_internal = np.where(weights_internal < 0, 0.0, weights_internal)
weights_internal *= obs_norm_factors
weights_array = np.where(weights_array < 0, 0.0, weights_array)
n_dists = (nrows * (nrows - 1)) // 2
rtn = np.zeros(n_dists, dtype=np.float32)
......@@ -153,22 +143,20 @@ def numba_calc_distance_matrix_haversine_plus(df, weights, method):
i, j = np.zeros(2, dtype=np.int64)
for idist in prange(n_dists): # pylint: disable=not-an-iterable
i, j = _data_index_to_matrix_index(nrows, idist, check_bounds=False)
rtn[idist] = weights_internal[0] * haversine_distance(df[i], df[j])
rtn[idist] = weights_array[0] * haversine_distance(df[i], df[j])
if method == "manhattan":
rtn[idist] += np.sum(
np.abs(weights_internal[1:] * (df[j, 2:] - df[i, 2:]))
np.abs(weights_array[1:] * (df[j, 2:] - df[i, 2:]))
)
elif method == "euclidean":
rtn[idist] += np.sqrt(
np.sum((weights_internal[1:] * (df[j, 2:] - df[i, 2:])) ** 2)
np.sum((weights_array[1:] * (df[j, 2:] - df[i, 2:])) ** 2)
)
return rtn
def calc_distance_matrix_haversine_plus(
df, weights, method, optimize_mode, num_threads=-1
):
def calc_distance_matrix_haversine_plus(df, config):
"""Calculate distance matrix between obs in dataframe df.
Spatial distances are calculated using the haversine method.
......@@ -177,54 +165,33 @@ def calc_distance_matrix_haversine_plus(
Args:
df (pandas.Dataframe): Input data.
weights (numpy.array): Weights chosen for each observation parameter.
The weigts determine the relative importance of the observation
parameters w.r.t. each other.
method (basestring): The method to be used for the non-spatial part of
the distance.
optimize_mode: How the distance matrix is to be calculated and stored.
This is passed onto the constructor for the class
netatmoqc.hollow_symmetric_matrix.HollowSymmetricMatrix.
num_threads: Max number of threads used for the computation.
(Default value = -1)
config (ParsedConfig): Parsed configs.
Returns:
netatmoqc.hollow_symmetric_matrix.HollowSymmetricMatrix: The distance
matrix.
Raises:
NotImplementedError: If method not in ["manhattan", "euclidean"].
NotImplementedError: If metrics calculation method not recognised.
"""
logger.debug(" > Calculating distance matrix...")
tstart = time.time()
method = method.lower()
method = config.metrics.method.lower().replace("haversine_plus_", "")
allowed_methods = ["manhattan", "euclidean"]
if method not in allowed_methods:
raise NotImplementedError(
"Argument 'method' must be one of: %s"
% (", ".join(allowed_methods))
"Argument 'method' must be one of: %s. Received: %s"
% (", ".join(allowed_methods), method)
)
if num_threads > 0:
original_nthreads = numba.get_num_threads()
numba.set_num_threads(num_threads)
atexit.register(numba.set_num_threads, original_nthreads)
rtn = HollowSymmetricMatrix(
return HollowSymmetricMatrix(
data=numba_calc_distance_matrix_haversine_plus(
df.to_numpy(), weights=weights, method=method
df.to_numpy(),
weights_array=weights_dict_to_np_array(df, config=config),
method=method,
),
optimize_mode=optimize_mode,
optimize_mode=config.metrics.optimize_mode,
)
logger.debug(
" * Done calculating distance matrix. Elapsed: %.1fs",
time.time() - tstart,
)
return rtn
@njit(parallel=True, cache=True)
def calc_distance_matrix_data_considering_correlation_numba(
......@@ -238,7 +205,7 @@ def calc_distance_matrix_data_considering_correlation_numba(
Args:
df (numpy.array): Input data.
weights_array (numpy.array): Weights chosen for each observation param.
The weigts determine the relative importance of the observation
The weights determine the relative importance of the observation
parameters w.r.t. each other.
covariance_matrix (numpy.array): The covariance matrix for the data
in df.
......@@ -247,6 +214,9 @@ def calc_distance_matrix_data_considering_correlation_numba(
numpy.ndarray: Data to be used in the construction of the dist matrix.
"""
# Set any negative weight value to zero
weights_array = np.where(weights_array < 0, 0.0, weights_array)
nrows, ncols = df.shape
n_dists = (nrows * (nrows - 1)) // 2
distance_matrix_data = np.zeros(n_dists, dtype=np.float32)
......@@ -262,10 +232,11 @@ def calc_distance_matrix_data_considering_correlation_numba(
for idist in prange(n_dists): # pylint: disable=not-an-iterable
i, j = _data_index_to_matrix_index(nrows, idist, check_bounds=False)
dij_squared_part1 = 0.0 # Part with each obs type alone
dij_squared_part2 = (
0.0 # Part with obs types correlated with each other
)
# Part with each obs type alone
dij_squared_part1 = 0.0
# Part with obs types correlated with each other
dij_squared_part2 = 0.0
for m in range(ncols):
dij_squared_part1 += (
weights_array[m] * (df[i, m] - df[j, m])
......@@ -287,9 +258,7 @@ def calc_distance_matrix_data_considering_correlation_numba(
return distance_matrix_data
def calc_distance_matrix_considering_correlation(
df, weights_dict, optimize_mode, domain, num_threads=-1
):
def calc_distance_matrix_considering_correlation(df, config, domain):
"""Calculate distance matrix between obs in dataframe df.
Spatial distances are calculated by projecting (lon, lat) into (x, y)
......@@ -299,15 +268,8 @@ def calc_distance_matrix_considering_correlation(
Args:
df (pandas.Dataframe): Input data.
weights_dict (dict): Weights chosen for each observation parameter.
The weigts determine the relative importance of the observation
parameters w.r.t. each other.
optimize_mode: How the distance matrix is to be calculated and stored.
This is passed onto the constructor for the class
netatmoqc.hollow_symmetric_matrix.HollowSymmetricMatrix.
config (ParsedConfig): Parsed configs.
domain (netatmoqc.domains.Domain): The adopted spatial domain.
num_threads: Max number of threads used for the computation.
(Default value = -1)
Returns:
netatmoqc.hollow_symmetric_matrix.HollowSymmetricMatrix: The distance
......@@ -318,13 +280,19 @@ def calc_distance_matrix_considering_correlation(
df = df.copy()
# Replace (lon, lat) with projected (x, y) in Km
if domain is None:
logger.warning(
"Domain not passed to '%s'. Constructing from configs.",
"calc_distance_matrix_considering_correlation",
)
domain = Domain.construct_from_dict(config.domain)
xvals, yvals = domain.proj.lonlat2xy(df["lon"], df["lat"])
df.insert(0, "y", yvals / 1000.0)
df.insert(0, "x", xvals / 1000.0)
df = df.drop(["lon", "lat"], axis=1)
selected_data_columns = [c for c in df.columns if c not in ["lon", "lat"]]
df = df[selected_data_columns]
# Make sure weights and df columns are consistent
weights_dict = config.get_clustering_opt("obs_weights")
weights_array = np.array([weights_dict.get(c, 1.0) for c in df.columns])
weights_array = np.where(weights_array < 0, 0.0, weights_array)
......@@ -336,9 +304,10 @@ def calc_distance_matrix_considering_correlation(
df.to_numpy(), weights_array, covariance_matrix.to_numpy()
)
)
rtn = HollowSymmetricMatrix(data=distance_matrix_data)
return rtn
return HollowSymmetricMatrix(
data=distance_matrix_data, optimize_mode=config.metrics.optimize_mode
)
def calc_distance_matrix(df, config, domain=None, num_threads=-1):
......@@ -365,6 +334,8 @@ def calc_distance_matrix(df, config, domain=None, num_threads=-1):
matrix.
"""
tstart = time.time()
accepted_methods = [
"correlation_aware_euclidean",
"haversine_plus_manhattan",
......@@ -376,31 +347,26 @@ def calc_distance_matrix(df, config, domain=None, num_threads=-1):
"Distance matrix calc method '%s' not available. " % (method)
+ "Please choose method from: %s" % (", ".join(accepted_methods))
)
logger.debug("Computing distance matrix using the '%s' method", method)
weights_dict = config.get_clustering_opt("obs_weights")
if num_threads > 0:
original_nthreads = numba.get_num_threads()
numba.set_num_threads(num_threads)
atexit.register(numba.set_num_threads, original_nthreads)
# Drop columns that won't be used in the clustering
df = df.copy().drop(config.general.unclusterable_data_columns, axis=1)
df = normalize_df(df)
if method == "correlation_aware_euclidean":
if domain is None:
logger.warning(
"Domain not passed to '%s'. Constructing from configs.",
"calc_distance_matrix",
)
domain = Domain.construct_from_dict(config.domain)
return calc_distance_matrix_considering_correlation(
df=df,
weights_dict=weights_dict,
optimize_mode=config.metrics.optimize_mode,
num_threads=num_threads,
domain=domain,
rtn = calc_distance_matrix_considering_correlation(
df=df, config=config, domain=domain
)
else:
return calc_distance_matrix_haversine_plus(
# Drop columns that won't be used in the clustering
df=df,
weights=weights_dict_to_np_array(df, config=config),
method=method.replace("haversine_plus_", ""),
optimize_mode=config.metrics.optimize_mode,
num_threads=num_threads,
)
rtn = calc_distance_matrix_haversine_plus(df=df, config=config)
logger.debug(
" * Done calculating distance matrix. Elapsed: %.1fs",
time.time() - tstart,
)
return rtn
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment