Source code for canopy.core.grid.grid_lonlat

"""Grid with longitude and latitude coordinates.

This grid type has longitude and latutide coordinates. The longitude and latitude intervals are
constant, although they can be different (i.e. dlon != dlat). Reduction operations on this grid
are weighted according to their position on the grid.
"""
import pandas as pd
import numpy as np
import copy
import warnings
from typing import SupportsFloat
from canopy.core.grid.spatial_axis import SpatialAxis, LonAxis, LatAxis
from canopy.core.grid.grid_empty import GridEmpty
from canopy.core.grid.grid_abc import Grid
from canopy.core.grid.registry import register_grid, register_gridop, check_gridop
from canopy.core.constants import *

grid_type = 'lonlat'

# ---------------
# GRID DEFINITION
# ---------------

COORD_BOUNDARIES = {'lon': (-180.,180.), 'lat': (-90.,90.)}

[docs] @register_grid class GridLonLat(Grid): """A Grid type representing a longitude-latitude grid with constant increments.""" _grid_type: str = grid_type _xaxis_key: str = 'lon' _yaxis_key: str = 'lat' _xaxis: SpatialAxis = LonAxis _yaxis: SpatialAxis = LatAxis def __init__(self, lon_min: SupportsFloat = np.nan, lon_max: SupportsFloat = np.nan, dlon: SupportsFloat = np.nan, lat_min: SupportsFloat = np.nan, lat_max: SupportsFloat = np.nan, dlat: SupportsFloat = np.nan, lon_gridop: str | None = None, lat_gridop: str | None = None) -> None: """ Parameters ---------- lon_min : float Smallest longitude of the represented domain (degrees). lon_max : float Largest longitude of the represented domain (degrees). dlon : float Increment in longitude (degrees). lat_min : float Smallest latitude of the represented domain (degrees). lat_max : float Largest latitude of the represented domain (degrees). dlat : float Increment in latitude (degrees). lon_gridop : str | None Grid reduction operation along lon axis. If not None, values of lon_min, lon_max, and dlon are ignored. lat_gridop : str | None Grid reduction operation along lat axis. If not None, values of lat_min, lat_max, and dlat are ignored. """ if lon_gridop is not None: check_gridop(grid_type, lon_gridop, 'lon') if lat_gridop is not None: check_gridop(grid_type, lat_gridop, 'lat') # TODO: this to ABC? if lon_gridop is not None and lat_gridop is not None and lon_gridop != lat_gridop: raise ValueError(f"Grid operation must be the same on both axes") super().__init__(xaxis_gridop=lon_gridop, yaxis_gridop=lat_gridop) self.min = {} self.max = {} self.inc = {} min_passed = {'lon': float(lon_min), 'lat': float(lat_min)} max_passed = {'lon': float(lon_max), 'lat': float(lat_max)} inc_passed = {'lon': float(dlon), 'lat': float(dlat)} gridop_passed = {'lon': lon_gridop, 'lat': lat_gridop} for axis_key in self.axes: if gridop_passed[axis_key] is None: # Parameter validation # -------------------- if inc_passed[axis_key] <= 0: raise ValueError(f"d{axis_key} must be positive.") if min_passed[axis_key] > max_passed[axis_key]: raise ValueError(f"{axis_key}_min must be smaller or equal than {axis_key}_max.") if min_passed[axis_key] < COORD_BOUNDARIES[axis_key][0]: raise ValueError(f"'{axis_key}_min' must be larger than or equal to {COORD_BOUNDARIES[axis_key][0]}.") if max_passed[axis_key] > COORD_BOUNDARIES[axis_key][1]: raise ValueError(f"'{axis_key}_max' must be smaller than or equal to {COORD_BOUNDARIES[axis_key][1]}.") self.min[axis_key] = min_passed[axis_key] self.max[axis_key] = min_passed[axis_key] + round((max_passed[axis_key]-min_passed[axis_key])/inc_passed[axis_key])*inc_passed[axis_key] if axis_key == 'lon' and self.max[axis_key] >= COORD_BOUNDARIES[axis_key][1]: self.max[axis_key] -= inc_passed[axis_key] if axis_key == 'lat' and self.max[axis_key] > COORD_BOUNDARIES[axis_key][1]: self.max[axis_key] -= inc_passed[axis_key] self.inc[axis_key] = inc_passed[axis_key] else: self.min[axis_key] = np.nan self.max[axis_key] = np.nan self.inc[axis_key] = np.nan
[docs] def reduce(self, gridop: str, axis: str) -> Grid: """Create a new grid, reduced according to the parameters Parameters ---------- gridop : str The reduction operation axis : str The axis to be reduced Returns ------- An instance of GridLonLat """ # Check that gridop is defined check_gridop(self.grid_type, gridop, axis) if self.is_reduced(axis): raise ValueError(f"Grid axis '{axis}' is already reduced.") min_reduced = {axis_key: self.min[axis_key] for axis_key in self.axes} max_reduced = {axis_key: self.max[axis_key] for axis_key in self.axes} inc_reduced = {axis_key: self.inc[axis_key] for axis_key in self.axes} gridops_reduced = {axis_key: self.gridops[axis_key] for axis_key in self.axes} for axis_key in self.axes: if axis == axis_key or axis == 'both': gridops_reduced[axis_key] = gridop min_reduced[axis_key] = np.nan max_reduced[axis_key] = np.nan inc_reduced[axis_key] = np.nan return GridLonLat(lon_min = min_reduced['lon'], lon_max = max_reduced['lon'], dlon = inc_reduced['lon'], lat_min = min_reduced['lat'], lat_max = max_reduced['lat'], dlat = inc_reduced['lat'], lon_gridop = gridops_reduced['lon'], lat_gridop = gridops_reduced['lat'])
[docs] def crop(self, df: pd.DataFrame) -> Grid: if df.empty: return GridEmpty() min_cropped = {axis_key: self.min[axis_key] for axis_key in self.axis_names} max_cropped = {axis_key: self.max[axis_key] for axis_key in self.axis_names} inc_cropped = {axis_key: self.inc[axis_key] for axis_key in self.axis_names} gridops_cropped = {axis_key: self.gridops[axis_key] for axis_key in self.axis_names} for axis_key in self.axis_names: if axis_key not in df.index.names or self.is_reduced(axis_key): continue frame_min = df.index.get_level_values(axis_key).min() frame_max = df.index.get_level_values(axis_key).max() delta = 0.01*self.inc[axis_key] # Prevent floating point precision pitfalls if frame_max + delta <= self.min[axis_key] or frame_min - delta >= self.max[axis_key]: return GridEmpty() coords = self.axis_values(axis_key) min_cropped[axis_key] = coords[coords >= frame_min - delta].min() max_cropped[axis_key] = coords[coords <= frame_max + delta].max() return GridLonLat(lon_min = min_cropped['lon'], lon_max = max_cropped['lon'], dlon = inc_cropped['lon'], lat_min = min_cropped['lat'], lat_max = max_cropped['lat'], dlat = inc_cropped['lat'], lon_gridop = gridops_cropped['lon'], lat_gridop = gridops_cropped['lat'])
[docs] @classmethod def from_frame(cls, df: pd.DataFrame, dlon: float = np.nan, dlat: float = np.nan, lon_gridop: None | str = None, lat_gridop: None | str = None, ) -> 'GridLonLat': """Create a GridLonLat instance from a DataFrame. Parameters ---------- df : pd.DataFrame A pandas DataFrame with a valid format (see Field documentation). dlon: float If supplied, use this increment for the longitude axis, instead of inferring it from the DataFrame's index dlat: float If supplied, use this increment for the latitude axis, instead of inferring it from the DataFrame's index gridop_lon: str | None If the DataFrame's index does not have a 'lon' axis, a gridop for this axis must be supplied. gridop_lat: str | None If the DataFrame's index does not have a 'lat' axis, a gridop for this axis must be supplied. Returns ------- An instance of the grid subclass. """ min_new = {axis_key: np.nan for axis_key in ['lon', 'lat']} max_new = {axis_key: np.nan for axis_key in ['lon', 'lat']} inc_new = {'lon': float(dlon), 'lat': float(dlat)} gridops_new = {'lon': lon_gridop, 'lat': lat_gridop} # TODO: Check that frame lon and lat are within boundaries for axis_key in ['lon', 'lat']: # DataFrame's index does not have this axis if axis_key not in df.index.names: if gridops_new[axis_key] is None: raise ValueError(f"DataFrame has no '{axis_key}' level, but no gridop was specified for this axis.") check_gridop(grid_type, str(gridops_new[axis_key]), axis_key) continue # DataFrame's index has this axis ax_values = df.index.get_level_values(axis_key).values ax_values_unique_sorted = np.sort(df.index.unique(level=axis_key).values) ax_min = ax_values.min() ax_max = ax_values.max() how_inc = "Passed" if np.isnan(inc_new[axis_key]): how_inc = "Inferred" if len(ax_values_unique_sorted) < 2: raise ValueError(f"If 'd{axis_key}' is not specified, the '{axis_key}' level of the dataframe must have at least two unique values.") inc_new[axis_key] = (ax_values_unique_sorted[1:] - ax_values_unique_sorted[:-1]).min() min_new[axis_key] = ax_min max_new[axis_key] = ax_max gridops_new[axis_key] = None fr, _ = np.modf((ax_values_unique_sorted[1:] - ax_values_unique_sorted[:-1]).astype(np.float64)/inc_new[axis_key]) # Sometimes modf will return a value very close to 1 as the fractional part, instead of counting it # as a whole unit in the integer part. This is due to floating point precision issues # (see https://stackoverflow.com/questions/3884232/modf-returns-1-as-the-fractional). # Therefore, we need to check that the "fractional" part is sufficiently close to either 0 or 1 mask = fr > 0.5 if not (np.allclose(fr[~mask], 0.) and np.allclose(fr[mask], 1.)): raise ValueError(f"{how_inc} d{axis_key}={inc_new[axis_key]} incompatible with DataFrame's '{axis_key}' index values.") return cls(lon_min=min_new['lon'], lon_max=max_new['lon'], dlon=inc_new['lon'], lat_min=min_new['lat'], lat_max=max_new['lat'], dlat=inc_new['lat'], lon_gridop=gridops_new['lon'], lat_gridop=gridops_new['lat'])
[docs] def validate_frame(self, df: pd.DataFrame) -> bool: grid_suits_frame = True for axis_key in self.axis_names: # Key found in dataframe's index if axis_key in df.index.names: if self.is_reduced(axis_key): warnings.warn(f"DataFrame's index has a '{axis_key}' level, but corresponding grid axis is reduced.") grid_suits_frame = False continue if isinstance(df.index, pd.MultiIndex): axis_values = np.sort(df.index.get_level_values(axis_key).unique()) else: axis_values = np.sort(df.index.values) if not np.isclose(max(axis_values), self.max[axis_key]): warnings.warn(f"DataFrame index and grid maximum '{axis_key}' values are different.") grid_suits_frame = False if not np.isclose(min(axis_values), self.min[axis_key]): warnings.warn(f"DataFrame index and grid minimum '{axis_key}' values are different.") grid_suits_frame = False if len(axis_values) > 1: fr, _ = np.modf((axis_values[1:] - axis_values[:-1]).astype(np.float64)/self.inc[axis_key]) # Sometimes modf will return a value very close to 1 as the fractional part, instead of counting it # as a whole unit in the integer part. This is due to floating point precision issues # (see https://stackoverflow.com/questions/3884232/modf-returns-1-as-the-fractional). # Therefore, we need to check that the "fractional" part is sufficiently close to either 0 or 1 mask = fr > 0.5 if not (np.allclose(fr[~mask], 0.) and np.allclose(fr[mask], 1.)): warnings.warn(f"DataFrame index '{axis_key}' values not compatible with grid's increment ({self.inc[axis_key]}).") grid_suits_frame = False # Key not found in dataframe's index else: if not self.is_reduced(axis_key): warnings.warn(f"DataFrame's index does not have a '{axis_key}' level, but corresponding grid axis is not reduced.") grid_suits_frame = False return grid_suits_frame
[docs] def is_compatible(self, other: Grid) -> bool: if isinstance(other, GridEmpty): return True if not isinstance(other, GridLonLat): return False for axis_key in ['lon', 'lat']: if self.gridops[axis_key] != other.gridops[axis_key]: return False if self.is_reduced(axis_key): continue if self.inc[axis_key] != other.inc[axis_key]: return False x0_self = self.min[axis_key] - int((self.min[axis_key] - COORD_BOUNDARIES[axis_key][0])/self.inc[axis_key])*self.inc[axis_key] x0_other = other.min[axis_key] - int((other.min[axis_key] - COORD_BOUNDARIES[axis_key][0])/other.inc[axis_key])*other.inc[axis_key] if not np.isclose(x0_self, x0_other, rtol=1.e-7): return False return True
def __add__(self, other: Grid) -> Grid: if isinstance(other, GridEmpty): return copy.deepcopy(self) if not isinstance(other, GridLonLat): raise ValueError("Grid must be of type 'GridEmpty' or 'GridLonLat'") if not self.is_compatible(other): raise ValueError("Grids are not compatible.") if self.is_reduced('lon'): lon_min = np.nan lon_max = np.nan dlon = np.nan lon_gridop = self.gridops['lon'] else: lon_min = min(self.min['lon'], other.min['lon']) lon_max = max(self.max['lon'], other.max['lon']) dlon = self.inc['lon'] lon_gridop = None if self.is_reduced('lat'): lat_min = np.nan lat_max = np.nan dlat = np.nan lat_gridop = self.gridops['lat'] else: lat_min = min(self.min['lat'], other.min['lat']) lat_max = max(self.max['lat'], other.max['lat']) dlat = self.inc['lat'] lat_gridop = None return GridLonLat(lon_min=lon_min, lon_max=lon_max, dlon=dlon, lon_gridop=lon_gridop, lat_min=lat_min, lat_max=lat_max, dlat=dlat, lat_gridop=lat_gridop) def __repr__(self) -> str: precision = 11 repr_str = [super().__repr__()] if not self.is_reduced('lon'): repr_str.append(f"Longitude: {round(self.lon_min, precision)} to {round(self.lon_max, precision)} (step: {round(self.dlon, precision)})") if not self.is_reduced('lat'): repr_str.append(f"Latitude: {round(self.lat_min, precision)} to {round(self.lat_max, precision)} (step: {round(self.dlat, precision)})") return '\n'.join(repr_str) def __str__(self) -> str: return self.__repr__()
[docs] def axis_values(self, axis_key, extend: bool = False): if extend: axis_min = self.min[axis_key] - int((self.min[axis_key] - COORD_BOUNDARIES[axis_key][0])/self.inc[axis_key])*self.inc[axis_key] axis_max = self.max[axis_key] + int((COORD_BOUNDARIES[axis_key][1] - self.max[axis_key])/self.inc[axis_key])*self.inc[axis_key] axis_values = np.arange(axis_min, axis_max + 0.5*self.inc[axis_key], self.inc[axis_key]) else: axis_values = np.arange(self.min[axis_key], self.max[axis_key] + 0.5*self.inc[axis_key], self.inc[axis_key]) return axis_values
@property def lon(self): return self.axes['lon'] @property def dlon(self): return self.inc['lon'] @property def lon_min(self): return self.min['lon'] @property def lon_max(self): return self.max['lon'] @property def lons(self): return self.axis_values('lon') @property def lat(self): return self.axes['lat'] @property def dlat(self): return self.inc['lat'] @property def lat_min(self): return self.min['lat'] @property def lat_max(self): return self.max['lat'] @property def lats(self): if self.gridops['lat'] is None: return self.axis_values('lat') return None
# --------------- # GRID OPERATIONS # ---------------
[docs] @register_gridop(grid_type, 'av', 'lon') def av_lon(df: pd.DataFrame, grid: GridLonLat) -> pd.DataFrame: """Average data along the longitude axis. Parameters ---------- df : pd.DataFrame The pandas DataFrame whose data is to be averaged. grid : Grid A GridLonLat object. Returns ------- A reduced pandas DataFrame """ group_levels = ['lat', 'time'] df_red = df.groupby(group_levels).mean() return df_red
[docs] @register_gridop(grid_type, 'av', 'lat') def av_lat(df: pd.DataFrame, grid: GridLonLat) -> pd.DataFrame: """Average data along the latitude axis. Parameters ---------- df : pd.DataFrame The pandas DataFrame whose data is to be averaged. grid : GridLonLat A GridLonLat object. Returns ------- A reduced pandas DataFrame """ group_levels = ['lon', 'time'] df_red = df.groupby(group_levels).mean() return df_red
[docs] @register_gridop(grid_type, 'av', 'both') def av_both(df: pd.DataFrame, grid: GridLonLat) -> pd.DataFrame: """Average data across the whole domain. Each gridcell value is weighted by its corresponding area element: da = EARTH_RADIUS**2*dlon*dlat*cos(lat), where the angles are in radians. Parameters ---------- df : pd.DataFrame The pandas DataFrame whose data is to be averaged. grid : GridLonLat A GridLonLat object. Returns ------- A reduced pandas DataFrame """ group_levels = ['time'] weights = np.cos(df.index.get_level_values('lat').to_numpy()*DEG_TO_RAD) df_red = df.multiply(weights, axis='index') df_red['w'] = weights df_red = df_red.groupby(group_levels).sum() df_red = df_red.div(df_red['w'], axis='index') df_red.drop('w', axis=1, inplace=True) return df_red
[docs] @register_gridop(grid_type, 'sum', 'lon') def sum_lon(df: pd.DataFrame, grid: GridLonLat) -> pd.DataFrame: """Aggregate data along the longitude axis. Parameters ---------- df : pd.DataFrame The pandas DataFrame whose data is to be averaged. grid : GridLonLat A GridLonLat object. Returns ------- A reduced pandas DataFrame """ if grid.is_reduced('lat'): raise ValueError("Cannot calculate 'sum' along 'lon' axis on a field with a reduced 'lat' axis.") group_levels = ['lat', 'time'] weights = EARTH_RADIUS*grid.dlon*DEG_TO_RAD*np.cos(df.index.get_level_values('lat').to_numpy()*DEG_TO_RAD) df_red = df.multiply(weights, axis='index').groupby(group_levels).sum() return df_red
[docs] @register_gridop(grid_type, 'sum', 'lat') def sum_lat(df: pd.DataFrame, grid: GridLonLat) -> pd.DataFrame: """Aggregate data along the latitude axis. Parameters ---------- df : pandas DataFrame The pandas DataFrame whose data is to be averaged. grid : GridLonLat A GridLonLat object. Returns ------- A reduced pandas DataFrame """ group_levels = ['lon', 'time'] df_red = (EARTH_RADIUS*grid.dlat*DEG_TO_RAD*df).groupby(group_levels).sum() return df_red
[docs] @register_gridop(grid_type, 'sum', 'both') def sum_both(df: pd.DataFrame, grid: GridLonLat) -> pd.DataFrame: """Aggregate data across the whole domain. Each gridcell value is weighted by its corresponding area element: da = EARTH_RADIUS**2*dlon*dlat*cos(lat), where the angles are in radians. Parameters ---------- df: pandas DataFrame The pandas DataFrame whose data is to be averaged. grid : GridLonLat A GridLonLat object. Returns ------- A reduced pandas DataFrame """ group_levels = ['time'] weights = EARTH_RADIUS**2*DEG_TO_RAD**2*grid.dlat*grid.dlon*np.cos(df.index.get_level_values('lat').to_numpy()*DEG_TO_RAD) df_red = df.multiply(weights, axis='index') df_red = df_red.groupby(group_levels).sum() return df_red