"""Grid with longitude and latitude coordinates.
This grid type has longitude and latutide coordinates. The longitude and latitude intervals are
constant, although they can be different (i.e. dlon != dlat). Reduction operations on this grid
are weighted according to their position on the grid.
"""
import pandas as pd
import numpy as np
import copy
import warnings
from typing import SupportsFloat
from canopy.core.grid.spatial_axis import SpatialAxis, LonAxis, LatAxis
from canopy.core.grid.grid_empty import GridEmpty
from canopy.core.grid.grid_abc import Grid
from canopy.core.grid.registry import register_grid, register_gridop, check_gridop
from canopy.core.constants import *
grid_type = 'lonlat'
# ---------------
# GRID DEFINITION
# ---------------
COORD_BOUNDARIES = {'lon': (-180.,180.), 'lat': (-90.,90.)}
[docs]
@register_grid
class GridLonLat(Grid):
"""A Grid type representing a longitude-latitude grid with constant increments."""
_grid_type: str = grid_type
_xaxis_key: str = 'lon'
_yaxis_key: str = 'lat'
_xaxis: SpatialAxis = LonAxis
_yaxis: SpatialAxis = LatAxis
def __init__(self,
lon_min: SupportsFloat = np.nan, lon_max: SupportsFloat = np.nan, dlon: SupportsFloat = np.nan,
lat_min: SupportsFloat = np.nan, lat_max: SupportsFloat = np.nan, dlat: SupportsFloat = np.nan,
lon_gridop: str | None = None, lat_gridop: str | None = None) -> None:
"""
Parameters
----------
lon_min : float
Smallest longitude of the represented domain (degrees).
lon_max : float
Largest longitude of the represented domain (degrees).
dlon : float
Increment in longitude (degrees).
lat_min : float
Smallest latitude of the represented domain (degrees).
lat_max : float
Largest latitude of the represented domain (degrees).
dlat : float
Increment in latitude (degrees).
lon_gridop : str | None
Grid reduction operation along lon axis. If not None, values of lon_min, lon_max, and dlon are ignored.
lat_gridop : str | None
Grid reduction operation along lat axis. If not None, values of lat_min, lat_max, and dlat are ignored.
"""
if lon_gridop is not None:
check_gridop(grid_type, lon_gridop, 'lon')
if lat_gridop is not None:
check_gridop(grid_type, lat_gridop, 'lat')
# TODO: this to ABC?
if lon_gridop is not None and lat_gridop is not None and lon_gridop != lat_gridop:
raise ValueError(f"Grid operation must be the same on both axes")
super().__init__(xaxis_gridop=lon_gridop, yaxis_gridop=lat_gridop)
self.min = {}
self.max = {}
self.inc = {}
min_passed = {'lon': float(lon_min), 'lat': float(lat_min)}
max_passed = {'lon': float(lon_max), 'lat': float(lat_max)}
inc_passed = {'lon': float(dlon), 'lat': float(dlat)}
gridop_passed = {'lon': lon_gridop, 'lat': lat_gridop}
for axis_key in self.axes:
if gridop_passed[axis_key] is None:
# Parameter validation
# --------------------
if inc_passed[axis_key] <= 0:
raise ValueError(f"d{axis_key} must be positive.")
if min_passed[axis_key] > max_passed[axis_key]:
raise ValueError(f"{axis_key}_min must be smaller or equal than {axis_key}_max.")
if min_passed[axis_key] < COORD_BOUNDARIES[axis_key][0]:
raise ValueError(f"'{axis_key}_min' must be larger than or equal to {COORD_BOUNDARIES[axis_key][0]}.")
if max_passed[axis_key] > COORD_BOUNDARIES[axis_key][1]:
raise ValueError(f"'{axis_key}_max' must be smaller than or equal to {COORD_BOUNDARIES[axis_key][1]}.")
self.min[axis_key] = min_passed[axis_key]
self.max[axis_key] = min_passed[axis_key] + round((max_passed[axis_key]-min_passed[axis_key])/inc_passed[axis_key])*inc_passed[axis_key]
if axis_key == 'lon' and self.max[axis_key] >= COORD_BOUNDARIES[axis_key][1]:
self.max[axis_key] -= inc_passed[axis_key]
if axis_key == 'lat' and self.max[axis_key] > COORD_BOUNDARIES[axis_key][1]:
self.max[axis_key] -= inc_passed[axis_key]
self.inc[axis_key] = inc_passed[axis_key]
else:
self.min[axis_key] = np.nan
self.max[axis_key] = np.nan
self.inc[axis_key] = np.nan
[docs]
def reduce(self, gridop: str, axis: str) -> Grid:
"""Create a new grid, reduced according to the parameters
Parameters
----------
gridop : str
The reduction operation
axis : str
The axis to be reduced
Returns
-------
An instance of GridLonLat
"""
# Check that gridop is defined
check_gridop(self.grid_type, gridop, axis)
if self.is_reduced(axis):
raise ValueError(f"Grid axis '{axis}' is already reduced.")
min_reduced = {axis_key: self.min[axis_key] for axis_key in self.axes}
max_reduced = {axis_key: self.max[axis_key] for axis_key in self.axes}
inc_reduced = {axis_key: self.inc[axis_key] for axis_key in self.axes}
gridops_reduced = {axis_key: self.gridops[axis_key] for axis_key in self.axes}
for axis_key in self.axes:
if axis == axis_key or axis == 'both':
gridops_reduced[axis_key] = gridop
min_reduced[axis_key] = np.nan
max_reduced[axis_key] = np.nan
inc_reduced[axis_key] = np.nan
return GridLonLat(lon_min = min_reduced['lon'], lon_max = max_reduced['lon'], dlon = inc_reduced['lon'],
lat_min = min_reduced['lat'], lat_max = max_reduced['lat'], dlat = inc_reduced['lat'],
lon_gridop = gridops_reduced['lon'], lat_gridop = gridops_reduced['lat'])
[docs]
def crop(self, df: pd.DataFrame) -> Grid:
if df.empty:
return GridEmpty()
min_cropped = {axis_key: self.min[axis_key] for axis_key in self.axis_names}
max_cropped = {axis_key: self.max[axis_key] for axis_key in self.axis_names}
inc_cropped = {axis_key: self.inc[axis_key] for axis_key in self.axis_names}
gridops_cropped = {axis_key: self.gridops[axis_key] for axis_key in self.axis_names}
for axis_key in self.axis_names:
if axis_key not in df.index.names or self.is_reduced(axis_key):
continue
frame_min = df.index.get_level_values(axis_key).min()
frame_max = df.index.get_level_values(axis_key).max()
delta = 0.01*self.inc[axis_key] # Prevent floating point precision pitfalls
if frame_max + delta <= self.min[axis_key] or frame_min - delta >= self.max[axis_key]:
return GridEmpty()
coords = self.axis_values(axis_key)
min_cropped[axis_key] = coords[coords >= frame_min - delta].min()
max_cropped[axis_key] = coords[coords <= frame_max + delta].max()
return GridLonLat(lon_min = min_cropped['lon'], lon_max = max_cropped['lon'], dlon = inc_cropped['lon'],
lat_min = min_cropped['lat'], lat_max = max_cropped['lat'], dlat = inc_cropped['lat'],
lon_gridop = gridops_cropped['lon'], lat_gridop = gridops_cropped['lat'])
[docs]
@classmethod
def from_frame(cls, df: pd.DataFrame,
dlon: float = np.nan,
dlat: float = np.nan,
lon_gridop: None | str = None,
lat_gridop: None | str = None,
) -> 'GridLonLat':
"""Create a GridLonLat instance from a DataFrame.
Parameters
----------
df : pd.DataFrame
A pandas DataFrame with a valid format (see Field documentation).
dlon: float
If supplied, use this increment for the longitude axis, instead of inferring it from the DataFrame's index
dlat: float
If supplied, use this increment for the latitude axis, instead of inferring it from the DataFrame's index
gridop_lon: str | None
If the DataFrame's index does not have a 'lon' axis, a gridop for this axis must be supplied.
gridop_lat: str | None
If the DataFrame's index does not have a 'lat' axis, a gridop for this axis must be supplied.
Returns
-------
An instance of the grid subclass.
"""
min_new = {axis_key: np.nan for axis_key in ['lon', 'lat']}
max_new = {axis_key: np.nan for axis_key in ['lon', 'lat']}
inc_new = {'lon': float(dlon), 'lat': float(dlat)}
gridops_new = {'lon': lon_gridop, 'lat': lat_gridop}
# TODO: Check that frame lon and lat are within boundaries
for axis_key in ['lon', 'lat']:
# DataFrame's index does not have this axis
if axis_key not in df.index.names:
if gridops_new[axis_key] is None:
raise ValueError(f"DataFrame has no '{axis_key}' level, but no gridop was specified for this axis.")
check_gridop(grid_type, str(gridops_new[axis_key]), axis_key)
continue
# DataFrame's index has this axis
ax_values = df.index.get_level_values(axis_key).values
ax_values_unique_sorted = np.sort(df.index.unique(level=axis_key).values)
ax_min = ax_values.min()
ax_max = ax_values.max()
how_inc = "Passed"
if np.isnan(inc_new[axis_key]):
how_inc = "Inferred"
if len(ax_values_unique_sorted) < 2:
raise ValueError(f"If 'd{axis_key}' is not specified, the '{axis_key}' level of the dataframe must have at least two unique values.")
inc_new[axis_key] = (ax_values_unique_sorted[1:] - ax_values_unique_sorted[:-1]).min()
min_new[axis_key] = ax_min
max_new[axis_key] = ax_max
gridops_new[axis_key] = None
fr, _ = np.modf((ax_values_unique_sorted[1:] - ax_values_unique_sorted[:-1]).astype(np.float64)/inc_new[axis_key])
# Sometimes modf will return a value very close to 1 as the fractional part, instead of counting it
# as a whole unit in the integer part. This is due to floating point precision issues
# (see https://stackoverflow.com/questions/3884232/modf-returns-1-as-the-fractional).
# Therefore, we need to check that the "fractional" part is sufficiently close to either 0 or 1
mask = fr > 0.5
if not (np.allclose(fr[~mask], 0.) and np.allclose(fr[mask], 1.)):
raise ValueError(f"{how_inc} d{axis_key}={inc_new[axis_key]} incompatible with DataFrame's '{axis_key}' index values.")
return cls(lon_min=min_new['lon'], lon_max=max_new['lon'], dlon=inc_new['lon'],
lat_min=min_new['lat'], lat_max=max_new['lat'], dlat=inc_new['lat'],
lon_gridop=gridops_new['lon'], lat_gridop=gridops_new['lat'])
[docs]
def validate_frame(self, df: pd.DataFrame) -> bool:
grid_suits_frame = True
for axis_key in self.axis_names:
# Key found in dataframe's index
if axis_key in df.index.names:
if self.is_reduced(axis_key):
warnings.warn(f"DataFrame's index has a '{axis_key}' level, but corresponding grid axis is reduced.")
grid_suits_frame = False
continue
if isinstance(df.index, pd.MultiIndex):
axis_values = np.sort(df.index.get_level_values(axis_key).unique())
else:
axis_values = np.sort(df.index.values)
if not np.isclose(max(axis_values), self.max[axis_key]):
warnings.warn(f"DataFrame index and grid maximum '{axis_key}' values are different.")
grid_suits_frame = False
if not np.isclose(min(axis_values), self.min[axis_key]):
warnings.warn(f"DataFrame index and grid minimum '{axis_key}' values are different.")
grid_suits_frame = False
if len(axis_values) > 1:
fr, _ = np.modf((axis_values[1:] - axis_values[:-1]).astype(np.float64)/self.inc[axis_key])
# Sometimes modf will return a value very close to 1 as the fractional part, instead of counting it
# as a whole unit in the integer part. This is due to floating point precision issues
# (see https://stackoverflow.com/questions/3884232/modf-returns-1-as-the-fractional).
# Therefore, we need to check that the "fractional" part is sufficiently close to either 0 or 1
mask = fr > 0.5
if not (np.allclose(fr[~mask], 0.) and np.allclose(fr[mask], 1.)):
warnings.warn(f"DataFrame index '{axis_key}' values not compatible with grid's increment ({self.inc[axis_key]}).")
grid_suits_frame = False
# Key not found in dataframe's index
else:
if not self.is_reduced(axis_key):
warnings.warn(f"DataFrame's index does not have a '{axis_key}' level, but corresponding grid axis is not reduced.")
grid_suits_frame = False
return grid_suits_frame
[docs]
def is_compatible(self, other: Grid) -> bool:
if isinstance(other, GridEmpty):
return True
if not isinstance(other, GridLonLat):
return False
for axis_key in ['lon', 'lat']:
if self.gridops[axis_key] != other.gridops[axis_key]:
return False
if self.is_reduced(axis_key):
continue
if self.inc[axis_key] != other.inc[axis_key]:
return False
x0_self = self.min[axis_key] - int((self.min[axis_key] - COORD_BOUNDARIES[axis_key][0])/self.inc[axis_key])*self.inc[axis_key]
x0_other = other.min[axis_key] - int((other.min[axis_key] - COORD_BOUNDARIES[axis_key][0])/other.inc[axis_key])*other.inc[axis_key]
if not np.isclose(x0_self, x0_other, rtol=1.e-7):
return False
return True
def __add__(self, other: Grid) -> Grid:
if isinstance(other, GridEmpty):
return copy.deepcopy(self)
if not isinstance(other, GridLonLat):
raise ValueError("Grid must be of type 'GridEmpty' or 'GridLonLat'")
if not self.is_compatible(other):
raise ValueError("Grids are not compatible.")
if self.is_reduced('lon'):
lon_min = np.nan
lon_max = np.nan
dlon = np.nan
lon_gridop = self.gridops['lon']
else:
lon_min = min(self.min['lon'], other.min['lon'])
lon_max = max(self.max['lon'], other.max['lon'])
dlon = self.inc['lon']
lon_gridop = None
if self.is_reduced('lat'):
lat_min = np.nan
lat_max = np.nan
dlat = np.nan
lat_gridop = self.gridops['lat']
else:
lat_min = min(self.min['lat'], other.min['lat'])
lat_max = max(self.max['lat'], other.max['lat'])
dlat = self.inc['lat']
lat_gridop = None
return GridLonLat(lon_min=lon_min, lon_max=lon_max, dlon=dlon, lon_gridop=lon_gridop,
lat_min=lat_min, lat_max=lat_max, dlat=dlat, lat_gridop=lat_gridop)
def __repr__(self) -> str:
precision = 11
repr_str = [super().__repr__()]
if not self.is_reduced('lon'):
repr_str.append(f"Longitude: {round(self.lon_min, precision)} to {round(self.lon_max, precision)} (step: {round(self.dlon, precision)})")
if not self.is_reduced('lat'):
repr_str.append(f"Latitude: {round(self.lat_min, precision)} to {round(self.lat_max, precision)} (step: {round(self.dlat, precision)})")
return '\n'.join(repr_str)
def __str__(self) -> str:
return self.__repr__()
[docs]
def axis_values(self, axis_key, extend: bool = False):
if extend:
axis_min = self.min[axis_key] - int((self.min[axis_key] - COORD_BOUNDARIES[axis_key][0])/self.inc[axis_key])*self.inc[axis_key]
axis_max = self.max[axis_key] + int((COORD_BOUNDARIES[axis_key][1] - self.max[axis_key])/self.inc[axis_key])*self.inc[axis_key]
axis_values = np.arange(axis_min, axis_max + 0.5*self.inc[axis_key], self.inc[axis_key])
else:
axis_values = np.arange(self.min[axis_key], self.max[axis_key] + 0.5*self.inc[axis_key], self.inc[axis_key])
return axis_values
@property
def lon(self):
return self.axes['lon']
@property
def dlon(self):
return self.inc['lon']
@property
def lon_min(self):
return self.min['lon']
@property
def lon_max(self):
return self.max['lon']
@property
def lons(self):
return self.axis_values('lon')
@property
def lat(self):
return self.axes['lat']
@property
def dlat(self):
return self.inc['lat']
@property
def lat_min(self):
return self.min['lat']
@property
def lat_max(self):
return self.max['lat']
@property
def lats(self):
if self.gridops['lat'] is None:
return self.axis_values('lat')
return None
# ---------------
# GRID OPERATIONS
# ---------------
[docs]
@register_gridop(grid_type, 'av', 'lon')
def av_lon(df: pd.DataFrame, grid: GridLonLat) -> pd.DataFrame:
"""Average data along the longitude axis.
Parameters
----------
df : pd.DataFrame
The pandas DataFrame whose data is to be averaged.
grid : Grid
A GridLonLat object.
Returns
-------
A reduced pandas DataFrame
"""
group_levels = ['lat', 'time']
df_red = df.groupby(group_levels).mean()
return df_red
[docs]
@register_gridop(grid_type, 'av', 'lat')
def av_lat(df: pd.DataFrame, grid: GridLonLat) -> pd.DataFrame:
"""Average data along the latitude axis.
Parameters
----------
df : pd.DataFrame
The pandas DataFrame whose data is to be averaged.
grid : GridLonLat
A GridLonLat object.
Returns
-------
A reduced pandas DataFrame
"""
group_levels = ['lon', 'time']
df_red = df.groupby(group_levels).mean()
return df_red
[docs]
@register_gridop(grid_type, 'av', 'both')
def av_both(df: pd.DataFrame, grid: GridLonLat) -> pd.DataFrame:
"""Average data across the whole domain.
Each gridcell value is weighted by its corresponding area element:
da = EARTH_RADIUS**2*dlon*dlat*cos(lat),
where the angles are in radians.
Parameters
----------
df : pd.DataFrame
The pandas DataFrame whose data is to be averaged.
grid : GridLonLat
A GridLonLat object.
Returns
-------
A reduced pandas DataFrame
"""
group_levels = ['time']
weights = np.cos(df.index.get_level_values('lat').to_numpy()*DEG_TO_RAD)
df_red = df.multiply(weights, axis='index')
df_red['w'] = weights
df_red = df_red.groupby(group_levels).sum()
df_red = df_red.div(df_red['w'], axis='index')
df_red.drop('w', axis=1, inplace=True)
return df_red
[docs]
@register_gridop(grid_type, 'sum', 'lon')
def sum_lon(df: pd.DataFrame, grid: GridLonLat) -> pd.DataFrame:
"""Aggregate data along the longitude axis.
Parameters
----------
df : pd.DataFrame
The pandas DataFrame whose data is to be averaged.
grid : GridLonLat
A GridLonLat object.
Returns
-------
A reduced pandas DataFrame
"""
if grid.is_reduced('lat'):
raise ValueError("Cannot calculate 'sum' along 'lon' axis on a field with a reduced 'lat' axis.")
group_levels = ['lat', 'time']
weights = EARTH_RADIUS*grid.dlon*DEG_TO_RAD*np.cos(df.index.get_level_values('lat').to_numpy()*DEG_TO_RAD)
df_red = df.multiply(weights, axis='index').groupby(group_levels).sum()
return df_red
[docs]
@register_gridop(grid_type, 'sum', 'lat')
def sum_lat(df: pd.DataFrame, grid: GridLonLat) -> pd.DataFrame:
"""Aggregate data along the latitude axis.
Parameters
----------
df : pandas DataFrame
The pandas DataFrame whose data is to be averaged.
grid : GridLonLat
A GridLonLat object.
Returns
-------
A reduced pandas DataFrame
"""
group_levels = ['lon', 'time']
df_red = (EARTH_RADIUS*grid.dlat*DEG_TO_RAD*df).groupby(group_levels).sum()
return df_red
[docs]
@register_gridop(grid_type, 'sum', 'both')
def sum_both(df: pd.DataFrame, grid: GridLonLat) -> pd.DataFrame:
"""Aggregate data across the whole domain.
Each gridcell value is weighted by its corresponding area element:
da = EARTH_RADIUS**2*dlon*dlat*cos(lat),
where the angles are in radians.
Parameters
----------
df: pandas DataFrame
The pandas DataFrame whose data is to be averaged.
grid : GridLonLat
A GridLonLat object.
Returns
-------
A reduced pandas DataFrame
"""
group_levels = ['time']
weights = EARTH_RADIUS**2*DEG_TO_RAD**2*grid.dlat*grid.dlon*np.cos(df.index.get_level_values('lat').to_numpy()*DEG_TO_RAD)
df_red = df.multiply(weights, axis='index')
df_red = df_red.groupby(group_levels).sum()
return df_red