Source code for canopy.core.frameops

import numpy as np
import pandas as pd
import janitor # type: ignore
import copy
from typing import Any, Optional, Union, Type, Literal, SupportsFloat, cast
from canopy.core.redspec import RedSpec
from canopy.core.grid.grid_abc import Grid
from canopy.core.grid import get_gridop, create_grid
from canopy.core.constants import *

# Implemented time reduction operations
TIME_REDOPS = ['av', 'std', 'sum', ] #'av_m', 'std_m', ]
# Dataframe frequencies
YEARLY_FREQ = pd.Period('2000', freq='Y').freq
MONTHLY_FREQ = pd.Period('2000', freq='M').freq
# Allowed frequncy units for time reductions
TIME_RED_FREQS = ['M', 'Y', ]
# Valid number of months for time reduction
N_MONTHS_RED = [2, 3, 4, 6, 12]


[docs] def check_indices_match(df1: pd.DataFrame, df2: pd.DataFrame): index1 = df1.index index2 = df2.index if len(index1) != len(index2): raise ValueError("Indices have different lenghts.") if not df1.index.names == df2.index.names: raise ValueError("Indices have different levels.") if not all(df1.index == df2.index): raise ValueError("Indices do not match.")
[docs] def is_yearly_freq(df) -> bool: return df.index.get_level_values('time').freq == YEARLY_FREQ
[docs] def is_monthly_freq(df) -> bool: return df.index.get_level_values('time').freq == MONTHLY_FREQ
[docs] def parse_timeop(timeop: str) -> str: if not timeop.lower() in TIME_REDOPS: raise ValueError(f"Time timeop must be one of {TIME_REDOPS} (got '{timeop}').") return timeop.lower()
[docs] def get_time_index(df: pd.DataFrame) -> pd.PeriodIndex: index = cast(pd.MultiIndex, df.index) return cast(pd.PeriodIndex, index.levels[-1])
[docs] def get_base_freq(idx) -> str: for freq_str, base_freq in zip(['Month', 'Year'], ['M', 'Y']): if freq_str in str(idx.freq.base): break return base_freq
[docs] def parse_freq(freq: str) -> tuple[int, str]: if freq[:-1] == '': n_periods = 1 else: try: n_periods = int(freq[:-1]) except ValueError: raise ValueError(f"Specified number of periods '{freq[:-1]}' is not a number.") freq_unit = freq[-1] if not freq_unit in TIME_RED_FREQS: raise ValueError(f"Frequency unit must be one of {TIME_RED_FREQS} (got '{freq_unit}').") return n_periods, freq_unit
[docs] def reduce_time(df, timeop: str, freq: str | None) -> tuple[pd.DataFrame, list[str]]: timeop = parse_timeop(timeop) # TODO: why? Now I can only read monthly and yearly files, but should work for daily, # hourly... => generalize time series resampling if not is_yearly_freq(df) and not is_monthly_freq(df): raise ValueError("Data must have yearly or monthly frequency.") if freq is None: idx = get_time_index(df) base_freq = cast(pd.offsets.YearEnd, idx.freq) n_periods = base_freq.n * len(idx) freq_unit = get_base_freq(idx) else: n_periods, freq_unit = parse_freq(freq) if is_yearly_freq(df) and n_periods == 1 and freq_unit == 'Y': raise ValueError("Frequency is already 1 year.") if is_monthly_freq(df) and n_periods == 1 and freq_unit == 'M': raise ValueError("Frequency is already 1 month.") if is_monthly_freq(df) and freq_unit == 'M' and not n_periods in N_MONTHS_RED : raise ValueError(f"The number of months for reduction:{timeop} must be one of {N_MONTHS_RED}.") freq_grouper = f'{n_periods}{freq_unit}S' freq_new_period = f'{n_periods}{freq_unit}' # - Conversion to timestamp is necessary because resampling with PeriodIndex is not supported as of Pandas 2.2.3 # TODO: Check this in future Pandas versions # - Coversion to datetime64[s] is necessary because Pandas timestamps have a resolution of # nanoseconds. This can cause the grouper to overflow. See https://stackoverflow.com/questions/78454291 index_orig = df.index df.index = df.index.set_levels(df.index.levels[2].to_timestamp().astype('datetime64[s]'), level=2) if timeop == 'av': df_ya = df.groupby(['lon', 'lat', pd.Grouper(freq=freq_grouper, level='time')]).mean() elif timeop == 'sum': df_ya = df.groupby(['lon', 'lat', pd.Grouper(freq=freq_grouper, level='time')]).sum() elif timeop == 'std': df_ya = df.groupby(['lon', 'lat', pd.Grouper(freq=freq_grouper, level='time')]).std() df_ya.index = df_ya.index.set_levels(df_ya.index.levels[2].to_period(freq=freq_new_period), level=2) df.index = index_orig if freq is None: freq_str = "whole time series" else: freq_str = freq log_msg = [f"Time reduction: {timeop}, {freq_str}"] return df_ya, log_msg
[docs] def reduce_grid(df, grid: Grid, gridop: str, axis: str) -> tuple[pd.DataFrame, Grid, list[str]]: # Check that relevant axes haven't been reduced already if grid.is_reduced(axis): raise ValueError(f"Axis '{axis}' is already reduced (gridop = {grid.gridops[axis]}).") df_red = get_gridop(grid, gridop, axis)(df, grid) # Create new grid grid_red = grid.reduce(gridop, axis) # Create log message log_msg = [f"Spatial reduction: '{gridop}', '{axis}'"] return df_red, grid_red, log_msg
[docs] def select_slice(df: pd.DataFrame, grid: Grid, slices: dict[str,tuple]) -> tuple[pd.DataFrame, Grid, list[str]]: valid_keys = set(grid.axis_names + ['time']) passed_keys = set(slices) if len(passed_keys - valid_keys) != 0: raise ValueError(f"Axes {list(passed_keys - valid_keys)} not recognized") for axis_key in grid.axis_names: if axis_key in slices and grid.is_reduced(axis_key): raise ValueError(f"Cannot slice reduced axis '{axis_key}'") for k, v in slices.items(): if len(v) != 2: raise ValueError(f"Slice tuple for dimension '{k}' must have two elements: (start, end).") space_time_levels = [ level for level in df.index.names if level != 'label' ] sliced_axes = [] selection_tuple: tuple[slice[int | Any, int | Any, int | Any], ...] = () for axis_key in space_time_levels: if axis_key == 'label': selection_tuple += (slice(None),) continue if not axis_key in slices: selection_tuple += (slice(None),) continue # Integers are interpreted as years if axis_key == 'time': # TODO: This type hint should be more specific (int, str or any kind of datetime/timestamp object) axis_slice: tuple[Any, ...] = () for dt in slices['time']: try: # If this is int or compatible, convert to datetime string year = int(dt) axis_slice += (f'{year}-01-01',) except: # Else just assume it's a valid datetime or datetime string axis_slice += (dt,) else: axis_slice = tuple(slices[axis_key]) selection_tuple += (slice(*axis_slice),) sliced_axes.append(axis_key) if len(df.index.names) == 1: # Only 'time' level df_sliced = df.loc[selection_tuple[0],:] else: df_sliced = df.loc[selection_tuple,:] index = cast(pd.MultiIndex, df_sliced.index) df_sliced.index = index.remove_unused_levels() grid_sliced = grid.crop(df_sliced) log_msg = [] for axis_key in sliced_axes: log_msg.append(f"Sliced field along {axis_key} axis: {slices[axis_key]}.") if df_sliced.empty: log_msg.append("Field was sliced to empty!") return df_sliced, grid_sliced, log_msg
[docs] def apply_reduction(df: pd.DataFrame, grid: Grid, redspec: RedSpec) -> tuple[pd.DataFrame, Grid, list[str]]: df_red, grid_red = df, grid log_msg = [] # Select layers if not redspec.layers is None: df_red = df_red[redspec.layers] # Slice data if redspec.slices is not None and len(redspec.slices) != 0: df_red, grid_red, lm = select_slice(df_red, grid_red, redspec.slices) log_msg.extend(lm) # Apply time reduction if redspec.timeop is not None: df_red, lm = reduce_time(df_red, redspec.timeop, freq=redspec.freq) log_msg.extend(lm) # Apply spatial reduction if redspec.gridop is not None: df_red, grid_red, lm = reduce_grid(df_red, grid_red, redspec.gridop, axis = redspec.axis) log_msg.extend(lm) return df_red, grid_red, log_msg
[docs] def check_columns(df: pd.DataFrame, layers: list[str]): s1 = set(df.columns) s2 = set(layers) if not s2.issubset(s1): raise ValueError("Field does not have specified layers.")
[docs] def apply_sum(df: pd.DataFrame, operand: SupportsFloat | list[str], layers: list[str]) -> None: check_columns(df, layers) if isinstance(operand, SupportsFloat): operand1 = np.array(float(operand)) else: check_columns(df, operand) operand1 = df[operand].values df[layers] = operand1 + df[layers].values
[docs] def apply_mul(df: pd.DataFrame, operand: SupportsFloat | list[str], layers: list[str]) -> None: check_columns(df, layers) if isinstance(operand, SupportsFloat): operand1 = np.array(float(operand)) else: check_columns(df, operand) operand1 = df[operand].values df[layers] = operand1 * df[layers].values
[docs] def apply_sub(df: pd.DataFrame, operand: SupportsFloat | list[str], layers: list[str], how: Literal['left', 'right']) -> None: if how not in ['left', 'right']: raise ValueError("'how' must be either 'left' or 'right'") check_columns(df, layers) if isinstance(operand, SupportsFloat): operand1 = np.array(float(operand)) else: check_columns(df, operand) operand1 = df[operand].values if how == 'right': df[layers] = operand1 - df[layers].values else: df[layers] = df[layers].values - operand1
[docs] def apply_div(df: pd.DataFrame, operand: SupportsFloat | list[str], layers: list[str], how: Literal['left', 'right']) -> None: if how not in ['left', 'right']: raise ValueError("'how' must be either 'left' or 'right'") check_columns(df, layers) if isinstance(operand, SupportsFloat): operand1 = np.array(float(operand)) else: check_columns(df, operand) operand1 = df[operand].values if how == 'right': df[layers] = operand1 / df[layers].values else: df[layers] = df[layers].values / operand1
[docs] def apply_function(df: pd.DataFrame, layers: list[str], function) -> None: check_columns(df, layers) function = np.vectorize(function) df[layers] = function(df[layers].values)