import numpy as np
import pandas as pd
import janitor # type: ignore
import copy
from typing import Any, Optional, Union, Type, Literal, SupportsFloat, cast
from canopy.core.redspec import RedSpec
from canopy.core.grid.grid_abc import Grid
from canopy.core.grid import get_gridop, create_grid
from canopy.core.constants import *
# Implemented time reduction operations
TIME_REDOPS = ['av', 'std', 'sum', ] #'av_m', 'std_m', ]
# Dataframe frequencies
YEARLY_FREQ = pd.Period('2000', freq='Y').freq
MONTHLY_FREQ = pd.Period('2000', freq='M').freq
# Allowed frequncy units for time reductions
TIME_RED_FREQS = ['M', 'Y', ]
# Valid number of months for time reduction
N_MONTHS_RED = [2, 3, 4, 6, 12]
[docs]
def check_indices_match(df1: pd.DataFrame, df2: pd.DataFrame):
index1 = df1.index
index2 = df2.index
if len(index1) != len(index2):
raise ValueError("Indices have different lenghts.")
if not df1.index.names == df2.index.names:
raise ValueError("Indices have different levels.")
if not all(df1.index == df2.index):
raise ValueError("Indices do not match.")
[docs]
def is_yearly_freq(df) -> bool:
return df.index.get_level_values('time').freq == YEARLY_FREQ
[docs]
def is_monthly_freq(df) -> bool:
return df.index.get_level_values('time').freq == MONTHLY_FREQ
[docs]
def parse_timeop(timeop: str) -> str:
if not timeop.lower() in TIME_REDOPS:
raise ValueError(f"Time timeop must be one of {TIME_REDOPS} (got '{timeop}').")
return timeop.lower()
[docs]
def get_time_index(df: pd.DataFrame) -> pd.PeriodIndex:
index = cast(pd.MultiIndex, df.index)
return cast(pd.PeriodIndex, index.levels[-1])
[docs]
def get_base_freq(idx) -> str:
for freq_str, base_freq in zip(['Month', 'Year'], ['M', 'Y']):
if freq_str in str(idx.freq.base):
break
return base_freq
[docs]
def parse_freq(freq: str) -> tuple[int, str]:
if freq[:-1] == '':
n_periods = 1
else:
try:
n_periods = int(freq[:-1])
except ValueError:
raise ValueError(f"Specified number of periods '{freq[:-1]}' is not a number.")
freq_unit = freq[-1]
if not freq_unit in TIME_RED_FREQS:
raise ValueError(f"Frequency unit must be one of {TIME_RED_FREQS} (got '{freq_unit}').")
return n_periods, freq_unit
[docs]
def reduce_time(df, timeop: str, freq: str | None) -> tuple[pd.DataFrame, list[str]]:
timeop = parse_timeop(timeop)
# TODO: why? Now I can only read monthly and yearly files, but should work for daily,
# hourly... => generalize time series resampling
if not is_yearly_freq(df) and not is_monthly_freq(df):
raise ValueError("Data must have yearly or monthly frequency.")
if freq is None:
idx = get_time_index(df)
base_freq = cast(pd.offsets.YearEnd, idx.freq)
n_periods = base_freq.n * len(idx)
freq_unit = get_base_freq(idx)
else:
n_periods, freq_unit = parse_freq(freq)
if is_yearly_freq(df) and n_periods == 1 and freq_unit == 'Y':
raise ValueError("Frequency is already 1 year.")
if is_monthly_freq(df) and n_periods == 1 and freq_unit == 'M':
raise ValueError("Frequency is already 1 month.")
if is_monthly_freq(df) and freq_unit == 'M' and not n_periods in N_MONTHS_RED :
raise ValueError(f"The number of months for reduction:{timeop} must be one of {N_MONTHS_RED}.")
freq_grouper = f'{n_periods}{freq_unit}S'
freq_new_period = f'{n_periods}{freq_unit}'
# - Conversion to timestamp is necessary because resampling with PeriodIndex is not supported as of Pandas 2.2.3
# TODO: Check this in future Pandas versions
# - Coversion to datetime64[s] is necessary because Pandas timestamps have a resolution of
# nanoseconds. This can cause the grouper to overflow. See https://stackoverflow.com/questions/78454291
index_orig = df.index
df.index = df.index.set_levels(df.index.levels[2].to_timestamp().astype('datetime64[s]'), level=2)
if timeop == 'av':
df_ya = df.groupby(['lon', 'lat', pd.Grouper(freq=freq_grouper, level='time')]).mean()
elif timeop == 'sum':
df_ya = df.groupby(['lon', 'lat', pd.Grouper(freq=freq_grouper, level='time')]).sum()
elif timeop == 'std':
df_ya = df.groupby(['lon', 'lat', pd.Grouper(freq=freq_grouper, level='time')]).std()
df_ya.index = df_ya.index.set_levels(df_ya.index.levels[2].to_period(freq=freq_new_period), level=2)
df.index = index_orig
if freq is None:
freq_str = "whole time series"
else:
freq_str = freq
log_msg = [f"Time reduction: {timeop}, {freq_str}"]
return df_ya, log_msg
[docs]
def reduce_grid(df, grid: Grid, gridop: str, axis: str) -> tuple[pd.DataFrame, Grid, list[str]]:
# Check that relevant axes haven't been reduced already
if grid.is_reduced(axis):
raise ValueError(f"Axis '{axis}' is already reduced (gridop = {grid.gridops[axis]}).")
df_red = get_gridop(grid, gridop, axis)(df, grid)
# Create new grid
grid_red = grid.reduce(gridop, axis)
# Create log message
log_msg = [f"Spatial reduction: '{gridop}', '{axis}'"]
return df_red, grid_red, log_msg
[docs]
def select_slice(df: pd.DataFrame,
grid: Grid,
slices: dict[str,tuple]) -> tuple[pd.DataFrame, Grid, list[str]]:
valid_keys = set(grid.axis_names + ['time'])
passed_keys = set(slices)
if len(passed_keys - valid_keys) != 0:
raise ValueError(f"Axes {list(passed_keys - valid_keys)} not recognized")
for axis_key in grid.axis_names:
if axis_key in slices and grid.is_reduced(axis_key):
raise ValueError(f"Cannot slice reduced axis '{axis_key}'")
for k, v in slices.items():
if len(v) != 2:
raise ValueError(f"Slice tuple for dimension '{k}' must have two elements: (start, end).")
space_time_levels = [ level for level in df.index.names if level != 'label' ]
sliced_axes = []
selection_tuple: tuple[slice[int | Any, int | Any, int | Any], ...] = ()
for axis_key in space_time_levels:
if axis_key == 'label':
selection_tuple += (slice(None),)
continue
if not axis_key in slices:
selection_tuple += (slice(None),)
continue
# Integers are interpreted as years
if axis_key == 'time':
# TODO: This type hint should be more specific (int, str or any kind of datetime/timestamp object)
axis_slice: tuple[Any, ...] = ()
for dt in slices['time']:
try: # If this is int or compatible, convert to datetime string
year = int(dt)
axis_slice += (f'{year}-01-01',)
except: # Else just assume it's a valid datetime or datetime string
axis_slice += (dt,)
else:
axis_slice = tuple(slices[axis_key])
selection_tuple += (slice(*axis_slice),)
sliced_axes.append(axis_key)
if len(df.index.names) == 1: # Only 'time' level
df_sliced = df.loc[selection_tuple[0],:]
else:
df_sliced = df.loc[selection_tuple,:]
index = cast(pd.MultiIndex, df_sliced.index)
df_sliced.index = index.remove_unused_levels()
grid_sliced = grid.crop(df_sliced)
log_msg = []
for axis_key in sliced_axes:
log_msg.append(f"Sliced field along {axis_key} axis: {slices[axis_key]}.")
if df_sliced.empty:
log_msg.append("Field was sliced to empty!")
return df_sliced, grid_sliced, log_msg
[docs]
def apply_reduction(df: pd.DataFrame, grid: Grid, redspec: RedSpec) -> tuple[pd.DataFrame, Grid, list[str]]:
df_red, grid_red = df, grid
log_msg = []
# Select layers
if not redspec.layers is None:
df_red = df_red[redspec.layers]
# Slice data
if redspec.slices is not None and len(redspec.slices) != 0:
df_red, grid_red, lm = select_slice(df_red, grid_red, redspec.slices)
log_msg.extend(lm)
# Apply time reduction
if redspec.timeop is not None:
df_red, lm = reduce_time(df_red, redspec.timeop, freq=redspec.freq)
log_msg.extend(lm)
# Apply spatial reduction
if redspec.gridop is not None:
df_red, grid_red, lm = reduce_grid(df_red, grid_red, redspec.gridop, axis = redspec.axis)
log_msg.extend(lm)
return df_red, grid_red, log_msg
[docs]
def check_columns(df: pd.DataFrame, layers: list[str]):
s1 = set(df.columns)
s2 = set(layers)
if not s2.issubset(s1):
raise ValueError("Field does not have specified layers.")
[docs]
def apply_sum(df: pd.DataFrame, operand: SupportsFloat | list[str], layers: list[str]) -> None:
check_columns(df, layers)
if isinstance(operand, SupportsFloat):
operand1 = np.array(float(operand))
else:
check_columns(df, operand)
operand1 = df[operand].values
df[layers] = operand1 + df[layers].values
[docs]
def apply_mul(df: pd.DataFrame, operand: SupportsFloat | list[str], layers: list[str]) -> None:
check_columns(df, layers)
if isinstance(operand, SupportsFloat):
operand1 = np.array(float(operand))
else:
check_columns(df, operand)
operand1 = df[operand].values
df[layers] = operand1 * df[layers].values
[docs]
def apply_sub(df: pd.DataFrame, operand: SupportsFloat | list[str], layers: list[str], how: Literal['left', 'right']) -> None:
if how not in ['left', 'right']:
raise ValueError("'how' must be either 'left' or 'right'")
check_columns(df, layers)
if isinstance(operand, SupportsFloat):
operand1 = np.array(float(operand))
else:
check_columns(df, operand)
operand1 = df[operand].values
if how == 'right':
df[layers] = operand1 - df[layers].values
else:
df[layers] = df[layers].values - operand1
[docs]
def apply_div(df: pd.DataFrame, operand: SupportsFloat | list[str], layers: list[str], how: Literal['left', 'right']) -> None:
if how not in ['left', 'right']:
raise ValueError("'how' must be either 'left' or 'right'")
check_columns(df, layers)
if isinstance(operand, SupportsFloat):
operand1 = np.array(float(operand))
else:
check_columns(df, operand)
operand1 = df[operand].values
if how == 'right':
df[layers] = operand1 / df[layers].values
else:
df[layers] = df[layers].values / operand1
[docs]
def apply_function(df: pd.DataFrame, layers: list[str], function) -> None:
check_columns(df, layers)
function = np.vectorize(function)
df[layers] = function(df[layers].values)