Source code for canopy.util.compare_ts

"""
Statistically compare fields along the time axis
"""
import numpy as np
import pandas as pd
from typing import Any, Optional, cast
from canopy.core.field import Field
from canopy.util.checks import check_field_contains_layers, check_indices_match


[docs] def compare_ts(field1: Field, field2: Field, layers1: Optional[list[str] | str] = None, layers2: Optional[list[str] | str] = None) -> pd.DataFrame: """Statistically compare two fields along the time axes NaN values in either of the series are ignored. The field indices must be equal. The comparison calculates the following metrics: - ME: Mean Error - MAE: Mean Absolute Error - MSE: Mean Squared Error - RMSE: Root Mean Squared Error - r: Correlation coefficient - R2: Coefficient of determination Parameters ---------- field1 : Field One of the two fields to compare. In the case of observation vs. simulation, this field represents the OBSERVED DATA. field2 : Field One of the two fields to compare. In the case of observation vs. simulation, this field represents the MODELED DATA. layers1 : str | list[str] A string or list of strings to select the layers from field1 to be compared. If None, all of the field's layers are used. layers2 : str | list[str] A string or list of strings to select the layers from field2 to be compared. If None, all of the field's layers are used. Returns ------- A pandas DataFrame with the metrics for each gridcell, and each pairs of layers. Notes ----- The layers are compared in the order that they are passed to the function, and their name has no effect on the comparison. For example, if layers1 = ['C3G', 'C4G'] and layers2 = ['C4G', 'C3G'], field1's 'C3G' layer will be compared to field2's 'C4G' layer, and viceversa. """ if isinstance(layers1, str): layers1 = [layers1] if isinstance(layers2, str): layers2 = [layers2] # Check that indices match check_indices_match(field1, field2) if layers1 is None: layers1 = field1.data.columns.tolist() df1 = field1.data.copy(deep=False) else: check_field_contains_layers(field1, layers1, 'field1') df1 = field1.data.loc[:,layers1].copy(deep=False) if layers2 is None: layers2 = field2.data.columns.tolist() df2 = field2.data.copy(deep=False) else: check_field_contains_layers(field2, layers2, 'field2') df2 = field2.data.loc[:,layers2].copy(deep=False) # Check layer lists have the same length if len(layers1) != len(layers2): raise ValueError("'layers1' and 'layers2' must have equal lenght.") # If dataframes have only 'time' level, add redundant level so that groupby operations work if len(df1.index.names) == 1: index = pd.MultiIndex.from_product([['-'], df1.index]) index.names = ['TS', 'time'] df1.index = index df2.index = index # All levels except 'time' spatial_levels = [l for l in df1.index.names if l != 'time'] dict1 = {} dict2 = {} for l1, l2 in zip(layers1, layers2): key = f'{l1} v {l2}' dict1[l1] = key dict2[l2] = key df1 = df1.rename(columns=dict1) df2 = df2.rename(columns=dict2) # Residuals e = df1 - df2 # Residual squares e2 = e**2 # Count count = e.groupby(spatial_levels).count() count.columns = pd.MultiIndex.from_arrays([count.columns, ['n']*len(count.columns)]) # Mean error me = e.groupby(spatial_levels).mean() me.columns = pd.MultiIndex.from_arrays([me.columns, ['ME']*len(me.columns)]) # Mean absolute error mae = e.abs().groupby(spatial_levels).mean() mae.columns = pd.MultiIndex.from_arrays([mae.columns, ['MAE']*len(mae.columns)]) # Mean squared error mse = e2.groupby(spatial_levels).mean() mse.columns = pd.MultiIndex.from_arrays([mse.columns, ['MSE']*len(mse.columns)]) # Root mean squared error # np.sqrt(DataFrame) return a DataFrame, but numpy typing hints don't appear to cover for this rmse = cast(pd.DataFrame, np.sqrt(mse)) rmse = rmse.rename(columns={'MSE':'RMSE'}) # Correlation coefficient # list of levels is ok to unstack but pandas stubs is stricter and requires Hashable | int -> ignore x = df1.unstack(spatial_levels) # type: ignore y = df2.unstack(spatial_levels) # type: ignore x_mean = x.mean() y_mean = y.mean() corr = ( (x-x_mean) * (y-y_mean) ).sum()/np.sqrt( ((x-x_mean)**2).sum() * ((y-y_mean)**2).sum() ) corr = corr.unstack(0) corr.columns = pd.MultiIndex.from_arrays([corr.columns, ['r']*len(corr.columns)]) # Coefficient of determination # Let y_1, y_2, y_3, ... be a series of observations, and f_1, f_2, f_3, ... the corresponding # modeled quantities. The coefficient of determination, R^2, is defined as # R^2 = 1 - ss_{res}/ss_{tot}, # where # ss_res = \sum_i {(y_i - f_i)^2} = \sum e^2 # is the sum of squared residuals, and # ss_tot = \sum_i {(y_i - \bar{y})^2} # is the total sum of squares (\bar{y} is the mean of the y_i). # See https://en.wikipedia.org/wiki/Coefficient_of_determination ss_tot = ((df1 - df1.groupby(spatial_levels).mean())**2).groupby(spatial_levels).sum() ss_res = e2.groupby(spatial_levels).sum() r2 = 1. - ss_res/ss_tot r2 = r2.replace([np.inf, -np.inf], np.nan) r2.columns = pd.MultiIndex.from_arrays([r2.columns, ['R2']*len(r2.columns)]) stats = pd.concat([count, me, mae, mse, rmse, corr, r2], axis=1) stats.columns.names = ['layer', 'stats'] stats = stats.sort_values(axis=1, by='layer', kind='mergesort') return stats